Source code for ewoksjob.client.celery.utils

import logging
from typing import Dict, List, Optional, Set
from celery import current_app
from celery.result import AsyncResult

__all__ = [
    "get_future",
    "cancel",
    "get_result",
    "get_not_finished_task_ids",
    "get_not_finished_futures",
    "get_workers",
]


logger = logging.getLogger(__name__)


[docs]def get_future(task_id) -> AsyncResult: return AsyncResult(task_id)
[docs]def cancel(task_id): future = get_future(task_id) if future is not None: future.revoke(terminate=True)
[docs]def get_result(task_id, **kwargs): kwargs.setdefault("interval", 0.1) future = AsyncResult(task_id) if future is not None: return future.get(**kwargs)
[docs]def get_not_finished_task_ids(): inspect = current_app.control.inspect() task_ids = list() workers = inspect.active() # running if workers is None: logger.warning("No Celery workers were detected") workers = dict() for tasks in workers.values(): for task in tasks: task_ids.append(task["id"]) workers = inspect.scheduled() # pending if workers is None: workers = dict() for tasks in workers.values(): for task in tasks: task_ids.append(task["id"]) return task_ids
[docs]def get_not_finished_futures() -> List[AsyncResult]: lst = [get_future(task_id) for task_id in get_not_finished_task_ids()] return [future for future in lst if future is not None]
[docs]def get_workers() -> List[str]: queues_dict: Optional[Dict[str, List[dict]]] = ( current_app.control.inspect().active_queues() ) if queues_dict is None: return list() workers: Set[str] = set() for queue_infos in queues_dict.values(): for queue_info in queue_infos: workers.add(queue_info["name"]) return list(workers)