import os
import json
import logging
from uuid import uuid4
from collections import namedtuple
from typing import IO, Iterator, List, Optional, Tuple, Type, Union, NamedTuple
from ..orange_version import ORANGE_VERSION
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
from oasys.widgets.widget import OWWidget as OWBaseWidget
from orangecanvas.scheme import readwrite
from orangecanvas.scheme import annotations
else:
from orangewidget.widget import OWBaseWidget
from orangecanvas.scheme import readwrite
from orangecanvas.scheme import annotations
from ewoksutils.import_utils import qualname
from ewoksutils.import_utils import import_qualname
from ewokscore import load_graph
from ewokscore.graph import TaskGraph
from ewokscore.inittask import task_executable_info
from ewokscore.task import Task
from ewokscore.node import get_node_label
from ..registration import get_owwidget_descriptions
from .taskwrapper import OWWIDGET_TASKS_GENERATOR
from .owsignals import signal_ewoks_to_orange_name
from .owsignals import signal_orange_to_ewoks_name
from .owwidgets import is_ewoks_widget_class
from . import invalid_data
__all__ = ["ows_to_ewoks", "ewoks_to_ows", "graph_is_supported"]
ReadSchemeType = readwrite._scheme
logger = logging.getLogger(__name__)
[docs]def ows_to_ewoks(
source: Union[str, IO],
preserve_ows_info: Optional[bool] = True,
title_as_node_id: Optional[bool] = False,
**load_graph_options,
) -> TaskGraph:
"""Load an Orange Workflow Scheme from a file or stream and convert it to a `TaskGraph`."""
ows = read_ows(source)
description = ows.description
try:
ewoksinfo = json.loads(description)
description = ewoksinfo["description"]
except Exception:
ewoksinfo = dict()
if not description and isinstance(source, str):
description = (
"Ewoks workflow '%s'" % os.path.splitext(os.path.basename(source))[0]
)
if not description:
description = "Ewoks workflow"
title = ows.title
if not title and isinstance(source, str):
title = os.path.splitext(os.path.basename(source))[0]
if not title:
title = str(uuid4())
nodes = list()
widget_classes = dict()
if title_as_node_id:
id_to_title = {ows_node.id: ows_node.title for ows_node in ows.nodes}
if len(set(id_to_title.values())) != len(id_to_title):
id_to_title = dict()
else:
id_to_title = dict()
for ows_node in ows.nodes:
widget_class, node_attrs, ewokstaskclass = widget_to_task(
ows_node.qualified_name
)
owsinfo = {
"title": ows_node.title,
"name": ows_node.name,
"position": str(ows_node.position),
"version": ows_node.version, # widget version
}
node_attrs["id"] = id_to_title.get(ows_node.id, ows_node.id)
node_attrs["label"] = ows_node.title
if preserve_ows_info:
node_attrs["ows"] = owsinfo
if widget_class is not None:
default_inputs = node_data_to_default_inputs(
ows_node.data, widget_class, ewokstaskclass
)
if default_inputs:
node_attrs["default_inputs"] = default_inputs
widget_classes[ows_node.id] = widget_class
nodes.append(node_attrs)
links = list()
for ows_link in ows.links:
widget_class = widget_classes[ows_link.source_node_id]
if widget_class is None:
source_name = ows_link.source_channel
else:
source_name = signal_orange_to_ewoks_name(
widget_class, "outputs", ows_link.source_channel
)
widget_class = widget_classes[ows_link.sink_node_id]
if widget_class is None:
sink_name = ows_link.sink_channel
else:
sink_name = signal_orange_to_ewoks_name(
widget_class, "inputs", ows_link.sink_channel
)
link = {
"source": id_to_title.get(ows_link.source_node_id, ows_link.source_node_id),
"target": id_to_title.get(ows_link.sink_node_id, ows_link.sink_node_id),
"data_mapping": [{"source_output": source_name, "target_input": sink_name}],
}
links.append(link)
links += ewoksinfo.get("missing_links", list())
graph_attrs = dict()
graph_attrs["id"] = title
graph_attrs["label"] = description
if ows.annotations:
graph_attrs["ows"] = {
"annotations": [
_serialize_annotation(annotation) for annotation in ows.annotations
]
}
graph = {
"graph": graph_attrs,
"links": links,
"nodes": nodes,
}
return load_graph(graph, **load_graph_options)
[docs]def graph_is_supported(graph: TaskGraph) -> bool:
all_explicit_datamapping = all(
link_attrs.get("data_mapping") for link_attrs in graph.graph.edges.values()
)
return (
not graph.is_cyclic
and not graph.has_conditional_links
and all_explicit_datamapping
)
[docs]def ewoks_to_ows(
graph,
destination: Union[str, IO],
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
error_on_duplicates: bool = True,
**load_graph_options,
):
"""Save an ewoks graph as an Orange Workflow Scheme file. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
ewoksgraph = load_graph(graph, **load_graph_options)
if ewoksgraph.is_cyclic:
raise RuntimeError("Orange can only handle DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Orange cannot handle conditional links")
if not all(
link_attrs.get("data_mapping") for link_attrs in ewoksgraph.graph.edges.values()
):
raise RuntimeError("Orange cannot handle links without explicit data mapping")
owsgraph = OwsSchemeWrapper(
ewoksgraph,
varinfo=varinfo,
execinfo=execinfo,
error_on_duplicates=error_on_duplicates,
)
write_ows(owsgraph, destination)
[docs]class OwsNodeWrapper:
"""Only part of the API used by scheme_to_ows_stream"""
_node_desc = namedtuple(
"NodeDescription",
["name", "qualified_name", "version", "project_name"],
)
def __init__(self, orangeid: int, node_attrs: dict):
self.id = str(orangeid)
ows = node_attrs.get("ows", dict())
node_id = node_attrs["id"]
node_label = get_node_label(node_id, node_attrs)
self.title = ows.get("title", node_label)
self.position = ows.get("position", (0.0, 0.0))
default_name = node_attrs["qualified_name"].split(".")[-1]
self.description = self._node_desc(
name=ows.get("name", default_name),
qualified_name=node_attrs["qualified_name"],
project_name=node_attrs["project_name"],
version=ows.get("version", ""), # widget version
)
default_inputs = node_attrs.get("default_inputs", list())
default_inputs = {item["name"]: item["value"] for item in default_inputs}
# Note: OWEwoksBaseWidget must have these settings in the Oasys fork
# otherwise `WidgetsScheme.sync_node_properties` will remove the
# unknown properties
self.properties = {
"_ewoks_default_inputs": default_inputs,
"_ewoks_varinfo": node_attrs.get("varinfo", dict()),
"_ewoks_execinfo": node_attrs.get("execinfo", dict()),
}
def __str__(self):
return self.title
[docs]class OwsSchemeWrapper:
"""Only the part of the scheme API used by scheme_to_ows_stream"""
_link = namedtuple(
"Link",
["source_node", "sink_node", "source_channel", "sink_channel", "enabled"],
)
_link_channel = namedtuple(
"Linkchannel",
["name", "id"],
)
def __init__(
self,
graph,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
error_on_duplicates: bool = True,
):
if isinstance(graph, TaskGraph):
graph = graph.dump()
self.title = graph["graph"].get("id", "")
self._description = graph["graph"].get("label", "")
ows = graph["graph"].get("ows", dict())
self._annotations = [
_deserialize_annotation(annotation)
for annotation in ows.get("annotations", list())
]
self._nodes = dict() # the keys of this dictionary never used
self._widget_classes = dict()
for orangeid, node_attrs in enumerate(graph["nodes"]):
task_type, task_info = task_executable_info(node_attrs["id"], node_attrs)
if task_type != "class":
raise ValueError("Orange workflows only support task type 'class'")
widget_class, node_attrs["project_name"] = task_to_widget(
task_info["task_identifier"], error_on_duplicates=error_on_duplicates
)
node_attrs["qualified_name"] = qualname(widget_class)
if varinfo:
node_attrs["varinfo"] = varinfo
if execinfo:
node_attrs["execinfo"] = execinfo
self._nodes[node_attrs["id"]] = OwsNodeWrapper(orangeid, node_attrs)
self._widget_classes[node_attrs["id"]] = widget_class
self.links = list()
self.missing_links = list()
for link in graph["links"]:
self._convert_link(link)
@property
def nodes(self):
return list(self._nodes.values())
@property
def annotations(self):
return self._annotations
@property
def description(self):
if self.missing_links:
description = {
"description": self._description,
"missing_links": self.missing_links,
}
return json.dumps(description)
else:
return self._description
def _convert_link(self, link):
"""In Orange, a link must transfer data"""
try:
source_node = self._nodes[link["source"]]
sink_node = self._nodes[link["target"]]
source_class = self._widget_classes[link["source"]]
sink_class = self._widget_classes[link["target"]]
data_mapping = link.get("data_mapping", None)
if not data_mapping:
logger.warning(
"link '%s' -> '%s' cannot be created in Orange because it has no data transfer",
source_node,
sink_node,
)
self.missing_links.append(link)
return
for item in data_mapping:
target_name = item["target_input"]
source_name = item["source_output"]
target_name = signal_ewoks_to_orange_name(
sink_class, "inputs", target_name
)
source_name = signal_ewoks_to_orange_name(
source_class, "outputs", source_name
)
sink_channel = self._link_channel(name=target_name, id=sink_node.id)
source_channel = self._link_channel(name=source_name, id=source_node.id)
link2 = self._link(
source_node=source_node,
sink_node=sink_node,
source_channel=source_channel,
sink_channel=sink_channel,
enabled=True,
)
self.links.append(link2)
except Exception as e:
raise RuntimeError(
f"Failed to create link '{link['source']}' -> '{link['target']}'"
) from e
[docs] def window_group_presets(self):
return list()
[docs]def read_ows(source: Union[str, IO]) -> ReadSchemeType:
"""Read an Orange Workflow Scheme from a file or a stream."""
return readwrite.parse_ows_stream(source)
[docs]def write_ows(scheme: OwsSchemeWrapper, destination: Union[str, IO]):
"""Write an Orange Workflow Scheme. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
if not isinstance(scheme, OwsSchemeWrapper):
raise TypeError(scheme, type(scheme))
tree = readwrite.scheme_to_etree(scheme, data_format="literal")
for node in tree.getroot().find("nodes"):
del node.attrib["scheme_node_type"]
readwrite.indent(tree.getroot(), 0)
if isinstance(destination, str) and os.path.dirname(destination):
os.makedirs(os.path.dirname(destination), exist_ok=True)
tree.write(destination, encoding="utf-8", xml_declaration=True)
def _serialize_annotation(annotation: readwrite._annotation) -> dict:
data = _serialize_namedtuple(annotation)
data["params"] = _serialize_namedtuple(data["params"])
return data
def _serialize_namedtuple(ntuple: NamedTuple):
return dict(zip(ntuple._fields, ntuple))
def _deserialize_annotation(annotation: dict) -> annotations.BaseSchemeAnnotation:
params = dict(annotation["params"])
if annotation["type"] == "text":
params["rect"] = tuple(params.pop("geometry"))
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
params.pop("content_type", None)
return annotations.SchemeTextAnnotation(**params)
if annotation["type"] == "arrow":
start, end = params.pop("geometry")
start = tuple(start)
end = tuple(end)
return annotations.SchemeArrowAnnotation(start, end, **params)
raise ValueError("cannot deserialize annotation params")