Source code for ewoks.cliutils.cli_execute_utils

from . import utils
from ewokscore.graph.serialize import GraphRepresentation

_REPRESENTATIONS = [str(s).split(".")[-1] for s in GraphRepresentation]


[docs]def add_execute_parameters(parser): parser.add_argument( "workflows", type=str, help="Workflow to execute (e.g. JSON filename)", nargs="+", ) parser.add_argument( "--workflow-dir", type=str, default="", dest="root_dir", help="Directory of sub-workflows (current working directory by default)", ) parser.add_argument( "--workflow-module", type=str, default="", dest="root_module", help="Python module of sub-workflows (current working directory by default)", ) parser.add_argument( "--workflow-format", type=str.lower, default="", dest="representation", choices=_REPRESENTATIONS, help="Source format", ) parser.add_argument( "--data-root-uri", type=str, default="", dest="data_root_uri", help="Root for saving task results", ) parser.add_argument( "--data-scheme", type=str, choices=["nexus", "json"], default="nexus", dest="data_scheme", help="Default task result format", ) parser.add_argument( "-p", "--parameter", dest="parameters", action="append", default=[], metavar="[NODE:]NAME=VALUE", help="Input variable for a particular node (or all start nodes when missing)", ) parser.add_argument( "-o", "--option", dest="options", action="append", default=[], metavar="OPTION=VALUE", help="Execution options", ) parser.add_argument( "-j", "--jobid", dest="job_id", type=str, default=None, help="Job id for ewoks events", ) parser.add_argument( "--disable-events", action="store_true", help="Disable ewoks events", ) parser.add_argument( "--sqlite3", dest="sqlite3_uri", type=str, default=None, help="Store ewoks events in an Sqlite3 database", ) parser.add_argument( "--test", action="store_true", help="The 'workflow' argument refers to the name of a test graph", ) parser.add_argument( "--outputs", type=str, choices=["none", "end", "all"], default="none", help="Log outputs (per task or merged values dictionary)", ) parser.add_argument( "--input-node-id", dest="node_attr", type=str, choices=["id", "label", "taskid"], default="id", help="The NODE attribute used when specifying an input parameter with [NODE:]NAME=VALUE", ) parser.add_argument( "--inputs", type=str, choices=["start", "all"], default="start", help="Inputs without a specific node are given to either all start nodes or all nodes", ) parser.add_argument( "--merge-outputs", action="store_true", dest="merge_outputs", help="Merge node outputs", ) parser.add_argument( "--engine", type=str, choices=["none", "dask", "ppf", "orange"], default="none", help="Execution engine to be used", ) parser.add_argument( "--search", action="store_true", help="The 'workflow' argument is a pattern to be search", )
[docs]def apply_execute_parameters(args): args.workflows, args.graphs = utils.parse_workflows(args) execute_options = dict(utils.parse_option(item) for item in args.options) execute_options["inputs"] = [ utils.parse_parameter(input_item, args.node_attr, args.inputs == "all") for input_item in args.parameters ] if args.outputs == "all": execute_options["outputs"] = [{"all": True}] elif args.outputs == "end": execute_options["outputs"] = [{"all": False}] else: execute_options["outputs"] = [] execute_options["merge_outputs"] = args.merge_outputs execute_options["varinfo"] = { "root_uri": args.data_root_uri, "scheme": args.data_scheme, } load_options = dict() execute_options["load_options"] = load_options if args.root_module: load_options["root_module"] = args.root_module if args.root_dir: load_options["root_dir"] = args.root_dir if args.representation: load_options["representation"] = args.representation execinfo = dict() execute_options["execinfo"] = execinfo if args.job_id: execinfo["job_id"] = args.job_id if args.sqlite3_uri: # TODO: asynchronous handling may loose events execinfo["asynchronous"] = False execinfo["handlers"] = [ { "class": "ewokscore.events.handlers.Sqlite3EwoksEventHandler", "arguments": [{"name": "uri", "value": args.sqlite3_uri}], } ] args.execute_options = execute_options