Source code for ewoksppf.tests.test_ppf_workflow24

from ewoksutils.import_utils import qualname
from ewoksppf import execute_graph


[docs]def myfunc(name=None, succeeded=tuple(), raise_on_names=tuple(), **_): if name in raise_on_names: raise RuntimeError(f"raise on name: {name}") succeeded += (name,) return {"succeeded": succeeded}
[docs]def task1(**kw): return myfunc(name="task1", **kw)
[docs]def task2(**kw): return myfunc(name="task2", **kw)
[docs]def task3(**kw): return myfunc(name="task3", **kw)
[docs]def subtask1(**kw): return myfunc(name="subtask1", **kw)
[docs]def subtask2(**kw): return myfunc(name="subtask2", **kw)
[docs]def subtask3(**kw): return myfunc(name="subtask3", **kw)
[docs]def subsubtask1(**kw): return myfunc(name="subsubtask1", **kw)
[docs]def subsubtask2(**kw): return myfunc(name="subsubtask2", **kw)
[docs]def subsubtask3(**kw): return myfunc(name="subsubtask3", **kw)
[docs]def subsub_handler(**kw): return myfunc(name="subsub_handler", **kw)
[docs]def subsubmodel(): graph = { "id": "subsubmodel", "input_nodes": [{"id": "in", "node": "subsubtask1"}], "output_nodes": [{"id": "out", "node": "subsubtask3"}], } nodes = [ { "id": "subsubtask1", "task_type": "ppfmethod", "task_identifier": qualname(subsubtask1), }, { "id": "subsubtask2", "task_type": "ppfmethod", "task_identifier": qualname(subsubtask2), }, { "id": "subsubtask3", "task_type": "ppfmethod", "task_identifier": qualname(subsubtask3), }, { "id": "subsub_handler", "task_type": "ppfmethod", "task_identifier": qualname(subsub_handler), "default_error_node": True, }, ] links = [ {"source": "subsubtask1", "target": "subsubtask2", "map_all_data": True}, {"source": "subsubtask2", "target": "subsubtask3", "map_all_data": True}, ] return {"graph": graph, "nodes": nodes, "links": links}
[docs]def submodel(): graph = { "id": "submodel", "input_nodes": [{"id": "in", "node": "subtask1"}], "output_nodes": [{"id": "out", "node": "subtask3"}], } nodes = [ { "id": "subtask1", "task_type": "ppfmethod", "task_identifier": qualname(subtask1), }, { "id": "subtask2", "task_type": "ppfmethod", "task_identifier": qualname(subtask2), }, { "id": "subtask3", "task_type": "ppfmethod", "task_identifier": qualname(subtask3), }, { "id": "sub_handler", "task_type": "graph", "task_identifier": subsubmodel(), "default_error_node": True, }, ] links = [ {"source": "subtask1", "target": "subtask2", "map_all_data": True}, {"source": "subtask2", "target": "subtask3", "map_all_data": True}, ] return {"graph": graph, "nodes": nodes, "links": links}
[docs]def workflow(): nodes = [ { "id": "task1", "task_type": "ppfmethod", "task_identifier": qualname(task1), }, { "id": "task2", "task_type": "ppfmethod", "task_identifier": qualname(task2), }, { "id": "task3", "task_type": "ppfmethod", "task_identifier": qualname(task3), }, { "id": "top_handler", "task_type": "graph", "task_identifier": submodel(), "default_error_node": True, }, ] links = [ {"source": "task1", "target": "task2", "map_all_data": True}, {"source": "task2", "target": "task3", "map_all_data": True}, ] return {"graph": {"id": "workflow"}, "nodes": nodes, "links": links}
[docs]def test_ppf_workflow24(ppf_log_config): """test default error handlers""" inputs = [ {"name": "succeeded", "value": tuple()}, {"name": "raise_on_names", "value": tuple()}, ] result = execute_graph(workflow(), inputs=inputs, raise_on_error=False) succeeded = "task1", "task2", "task3" assert result["_ppfdict"]["succeeded"] == succeeded assert "WorkflowExceptionInstance" not in result["_ppfdict"] inputs = [ {"name": "succeeded", "value": tuple()}, {"name": "raise_on_names", "value": ("task3",)}, ] result = execute_graph(workflow(), inputs=inputs, raise_on_error=False) succeeded = "task1", "task2", "subtask1", "subtask2", "subtask3" assert result["_ppfdict"]["succeeded"] == succeeded errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"]) assert errorMessage == "Task 'task3' failed" inputs = [ {"name": "succeeded", "value": tuple()}, {"name": "raise_on_names", "value": ("task3", "subtask2")}, ] result = execute_graph(workflow(), inputs=inputs, raise_on_error=False) succeeded = ( "task1", "task2", "subtask1", "subsubtask1", "subsubtask2", "subsubtask3", ) assert result["_ppfdict"]["succeeded"] == succeeded errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"]) assert errorMessage == "Task 'task3' failed" inputs = [ {"name": "succeeded", "value": tuple()}, {"name": "raise_on_names", "value": ("task3", "subtask3", "subsubtask1")}, ] result = execute_graph(workflow(), inputs=inputs, raise_on_error=False) succeeded = "task1", "task2", "subtask1", "subtask2", "subsub_handler" assert result["_ppfdict"]["succeeded"] == succeeded errorMessage = str(result["_ppfdict"]["WorkflowExceptionInstance"]) assert errorMessage == "Task 'task3' failed"