Source code for ewokscore.tests.test_sub_graph_serialize

import json
import yaml
import pytest
from ewokscore import execute_graph
from ewokscore.graph import load_graph


[docs] def subsubsubgraph(): return { "graph": { "id": "subsubsubgraph", "input_nodes": [{"id": "in", "node": "task1"}, {"id": "notconnected"}], "output_nodes": [{"id": "out", "node": "task2"}, {"id": "notconnected"}], }, "nodes": [ { "id": "task1", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, { "id": "task2", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, ], "links": [ { "source": "task1", "target": "task2", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, ], }
[docs] def subsubgraph(_subsubsubgraph): return { "graph": { "id": "subsubgraph", "input_nodes": [{"id": "in", "node": "task1"}, {"id": "notconnected"}], "output_nodes": [ {"id": "out", "node": "subsubsubgraph", "sub_node": "out"}, {"id": "notconnected"}, ], }, "nodes": [ { "id": "task1", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, { "id": "task2", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, { "id": "subsubsubgraph", "task_type": "graph", "task_identifier": _subsubsubgraph, }, ], "links": [ { "source": "task1", "target": "task2", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, { "source": "task2", "target": "subsubsubgraph", "sub_target": "in", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, ], }
[docs] def subgraph(_subsubgraph): return { "graph": { "id": "subgraph", "input_nodes": [{"id": "in", "node": "task1"}], "output_nodes": [{"id": "out", "node": "subsubgraph", "sub_node": "out"}], }, "nodes": [ { "id": "task1", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, { "id": "task2", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.add", }, { "id": "subsubgraph", "task_type": "graph", "task_identifier": _subsubgraph, }, ], "links": [ { "source": "task1", "target": "task2", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, { "source": "task2", "target": "subsubgraph", "sub_target": "in", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, ], }
[docs] def graph(_subgraph): return { "graph": {"id": "graph"}, "nodes": [ {"id": "subgraph1", "task_type": "graph", "task_identifier": _subgraph}, {"id": "subgraph2", "task_type": "graph", "task_identifier": _subgraph}, { "id": "append", "task_type": "method", "task_identifier": "ewokscore.tests.examples.tasks.simplemethods.append", }, ], "links": [ { "source": "subgraph1", "sub_source": "out", "target": "subgraph2", "sub_target": "in", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, # Link all nodes from "subgraph1" to "append" { "source": "subgraph1", "sub_source": "task1", "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 0}], }, { "source": "subgraph1", "sub_source": "task2", "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 1}], }, { "source": "subgraph1", "sub_source": ("subsubgraph", "task1"), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 2}], }, { "source": "subgraph1", "sub_source": ("subsubgraph", "task2"), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 3}], }, { "source": "subgraph1", "sub_source": ("subsubgraph", ("subsubsubgraph", "task1")), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 4}], }, { "source": "subgraph1", "sub_source": ("subsubgraph", ("subsubsubgraph", "task2")), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 5}], }, # Link all nodes from "subgraph2" to "append" { "source": "subgraph2", "sub_source": "task1", "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 6}], }, { "source": "subgraph2", "sub_source": "task2", "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 7}], }, { "source": "subgraph2", "sub_source": ("subsubgraph", "task1"), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 8}], }, { "source": "subgraph2", "sub_source": ("subsubgraph", "task2"), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 9}], }, { "source": "subgraph2", "sub_source": ("subsubgraph", ("subsubsubgraph", "task1")), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 10}], }, { "source": "subgraph2", "sub_source": ("subsubgraph", ("subsubsubgraph", "task2")), "target": "append", "data_mapping": [{"source_output": "return_value", "target_input": 11}], }, ], }
[docs] def savegraph(graph, tmpdir, name, representation="json"): if representation == "json": filename = name + ".json" with open(tmpdir / filename, mode="w") as f: json.dump(graph, f, indent=2) elif representation == "yaml": filename = name + ".yml" with open(tmpdir / filename, mode="w") as f: yaml.dump(graph, f) else: raise ValueError(representation) return filename
[docs] def serialized_graph(tmpdir, representation="json") -> str: subname = savegraph( subsubsubgraph(), tmpdir, "subsubsubgraph", representation=representation ) subname = savegraph( subsubgraph(subname), tmpdir, "subsubgraph", representation=representation ) subname = savegraph( subgraph(subname), tmpdir, "subgraph", representation=representation ) return savegraph(graph(subname), tmpdir, "graph", representation=representation)
[docs] def nonserialized_graph() -> dict: return graph(subgraph(subsubgraph(subsubsubgraph())))
[docs] @pytest.mark.parametrize( "representation", (None, "json", "json_dict", "json_string", "yaml") ) def test_sub_graph_serialize(representation, tmpdir): ewoksgraph = load_graph(nonserialized_graph()) if representation == "yaml": destination = str(tmpdir / "file.yml") elif representation == "json": destination = str(tmpdir / "file.json") else: destination = None inmemorydump = ewoksgraph.dump(destination, representation=representation) if destination: source = destination else: source = inmemorydump ewoksgraph2 = load_graph(source, representation=representation) assert ewoksgraph == ewoksgraph2
[docs] @pytest.mark.parametrize("representation", ("json", "yaml")) def test_sub_graph_execute(representation, tmpdir): g = serialized_graph(tmpdir, representation=representation) ewoksgraph = load_graph(g, root_dir=str(tmpdir)) tasks = execute_graph(ewoksgraph, output_tasks=True) assert len(tasks) == 13 task = tasks[("subgraph1", "task1")] assert task.outputs.return_value == 1 task = tasks[("subgraph1", "task2")] assert task.outputs.return_value == 2 task = tasks[("subgraph1", ("subsubgraph", "task1"))] assert task.outputs.return_value == 3 task = tasks[("subgraph1", ("subsubgraph", "task2"))] assert task.outputs.return_value == 4 task = tasks[("subgraph1", ("subsubgraph", ("subsubsubgraph", "task1")))] assert task.outputs.return_value == 5 task = tasks[("subgraph1", ("subsubgraph", ("subsubsubgraph", "task2")))] assert task.outputs.return_value == 6 task = tasks[("subgraph2", "task1")] assert task.outputs.return_value == 7 task = tasks[("subgraph2", "task2")] assert task.outputs.return_value == 8 task = tasks[("subgraph2", ("subsubgraph", "task1"))] assert task.outputs.return_value == 9 task = tasks[("subgraph2", ("subsubgraph", "task2"))] assert task.outputs.return_value == 10 task = tasks[("subgraph2", ("subsubgraph", ("subsubsubgraph", "task1")))] assert task.outputs.return_value == 11 task = tasks[("subgraph2", ("subsubgraph", ("subsubsubgraph", "task2")))] assert task.outputs.return_value == 12 task = tasks["append"] assert task.outputs.return_value == tuple(range(1, 13))