Source code for ewoks.cliutils.utils
import os
import json
from glob import glob
from fnmatch import fnmatch
from typing import Tuple, Any, List
from json.decoder import JSONDecodeError
[docs]def parse_value(value: str) -> Any:
try:
return json.loads(value)
except JSONDecodeError:
return value
_NODE_ATTR_MAP = {"id": "id", "label": "label", "taskid": "task_identifier"}
[docs]def parse_parameter(input_item: str, node_attr: str, all: bool) -> dict:
"""The format of `input_item` is `"[NODE]:name=value"`"""
node_and_name, _, value = input_item.partition("=")
a, sep, b = node_and_name.partition(":")
if sep:
node = a
var_name = b
else:
node = None
var_name = a
var_value = parse_value(value)
if node is None:
return {"all": all, "name": var_name, "value": var_value}
return {
_NODE_ATTR_MAP[node_attr]: node,
"name": var_name,
"value": var_value,
}
[docs]def parse_option(option: str) -> Tuple[str, Any]:
option, _, value = option.partition("=")
return option, parse_value(value)
[docs]def parse_workflows(args) -> Tuple[List[str], List[str]]:
"""
:returns: workflows (possibly expanded due the search), graphs (execute graph arguments)
"""
if args.test:
return _parse_test_workflows(args.workflows, args.search)
return _parse_workflows(args.workflows, args.search)
def _parse_workflows(workflows: List[str], search: bool) -> Tuple[List[str], List[str]]:
"""
:returns: workflows (possibly expanded due the search), graphs (execute graph arguments)
"""
if not search:
return workflows, workflows
parsed_workflows = list()
files = (filename for workflow in workflows for filename in glob(workflow))
for filename in sorted(files, key=os.path.getmtime):
if filename not in parsed_workflows:
parsed_workflows.append(filename)
return parsed_workflows, parsed_workflows
def _parse_test_workflows(
workflows: List[str], search: bool
) -> Tuple[List[str], List[dict]]:
"""
:returns: workflows (possibly expanded due the search), graphs (execute graph arguments)
"""
from ewokscore.tests.examples.graphs import graph_names, get_graph
test_workflows = list(graph_names())
if search:
parsed_workflows = list()
for workflow in workflows:
for test_graph in test_workflows:
if fnmatch(test_graph, workflow) and test_graph not in parsed_workflows:
parsed_workflows.append(test_graph)
else:
for workflow in workflows:
if workflow not in test_workflows:
raise RuntimeError(
f"Test graph '{workflow}' does not exist: {test_workflows}"
)
parsed_workflows = workflows
graphs = [get_graph(workflow)[0] for workflow in parsed_workflows]
return parsed_workflows, graphs
[docs]def parse_destinations(args):
dest_dirname = os.path.dirname(args.destination)
basename = os.path.basename(args.destination)
dest_basename, dest_ext = os.path.splitext(basename)
if not dest_ext:
dest_ext = dest_basename
dest_basename = ""
if not dest_ext.startswith("."):
dest_ext = f".{dest_ext}"
if len(args.workflows) == 1 and dest_basename:
return [os.path.join(dest_dirname, f"{dest_basename}{dest_ext}")]
destinations = list()
for workflow in args.workflows:
basename, _ = os.path.splitext(os.path.basename(workflow))
destination = os.path.join(dest_dirname, f"{basename}{dest_basename}{dest_ext}")
destinations.append(destination)
return destinations