Source code for ewoksjob.client.celery.tasks
from celery.execute import send_task
from celery.result import AsyncResult
from ..dummy_workflow import dummy_workflow
__all__ = [
"execute_graph",
"execute_test_graph",
"convert_workflow",
"discover_tasks_from_modules",
"discover_all_tasks",
]
[docs]def execute_graph(**kw) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.execute_graph", **kw)
[docs]def execute_test_graph(
seconds=0, filename=None, args=None, kwargs=None, **kw
) -> AsyncResult:
if args:
raise TypeError("execute_test_graph does not take position arguments")
args = (dummy_workflow(),)
if kwargs is None:
kwargs = dict()
kwargs["inputs"] = [
{"id": "sleep", "name": 0, "value": seconds},
{"id": "result", "name": "filename", "value": filename},
]
return execute_graph(args=args, kwargs=kwargs, **kw)
[docs]def convert_workflow(**kw) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.convert_graph", **kw)
[docs]def discover_tasks_from_modules(**kw) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.discover_tasks_from_modules", **kw)
[docs]def discover_all_tasks(**kw) -> AsyncResult:
return send_task("ewoksjob.apps.ewoks.discover_all_tasks", **kw)