Source code for ewoksorange.bindings.owwidgets
"""
Orange widget base classes to execute Ewoks tasks
"""
import inspect
import logging
import warnings
from contextlib import contextmanager
from typing import Any, Optional, Mapping, List, Callable
from AnyQt import QtWidgets
from ..orange_version import ORANGE_VERSION
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
from oasys.widgets.widget import OWWidget
from orangewidget.widget import WidgetMetaClass
from orangewidget.settings import Setting
OWBaseWidget = OWWidget
summarize = None
PartialSummary = None
has_progress_bar = True
else:
from orangewidget.widget import OWBaseWidget
from orangewidget.settings import Setting
from orangewidget.utils.signals import summarize
from orangewidget.utils.signals import PartialSummary
if ORANGE_VERSION == ORANGE_VERSION.latest_orange:
from Orange.widgets.widget import OWWidget
from Orange.widgets.widget import WidgetMetaClass
has_progress_bar = True
else:
OWWidget = OWBaseWidget
WidgetMetaClass = type(OWBaseWidget)
has_progress_bar = False
from ewokscore.variable import Variable
from ewokscore.variable import value_from_transfer
from ewokscore import missing_data
from .progress import QProgress
from .taskexecutor import TaskExecutor
from .taskexecutor import ThreadedTaskExecutor
from .taskexecutor_queue import TaskExecutorQueue
from . import owsignals
from .events import scheme_ewoks_events
from . import invalid_data
_logger = logging.getLogger(__name__)
__all__ = [
"OWEwoksWidgetNoThread",
"OWEwoksWidgetOneThread",
"OWEwoksWidgetOneThreadPerRun",
"OWEwoksWidgetWithTaskStack",
"ow_build_opts",
]
if summarize is not None:
[docs] @summarize.register(Variable)
def summarize_variable(var: Variable):
if var.is_missing():
dtype = var.value
else:
dtype = type(var.value).__name__
desc = f"ewoks variable ({dtype})"
return PartialSummary(desc, desc)
[docs] @summarize.register(object)
def summarize_object(value: object):
return PartialSummary(str(type(value)), str(type(value)))
[docs]def prepare_OWEwoksWidgetclass(namespace, ewokstaskclass):
"""This needs to be called before signal and setting parsing"""
# Add the Ewoks class as an attribute to the Orange widget class
namespace["ewokstaskclass"] = ewokstaskclass
# Make sure the values above are always the default setting values:
# https://orange3.readthedocs.io/projects/orange-development/en/latest/tutorial-settings.html
# schema_only=False: when a widget is removed, its settings are stored to be used
# as defaults for future instances of this widget.
# schema_only=True: setting defaults should not change. Future instances of this widget
# have the default settings hard-coded in this function.
schema_only = True
# Add the settings as widget class attributes
namespace["_ewoks_default_inputs"] = Setting(dict(), schema_only=schema_only)
namespace["_ewoks_varinfo"] = Setting(dict(), schema_only=schema_only)
namespace["_ewoks_execinfo"] = Setting(dict(), schema_only=schema_only)
# Deprecated:
namespace["default_inputs"] = Setting(dict(), schema_only=schema_only)
# Add missing inputs and outputs as widget class attributes
owsignals.validate_inputs(
namespace,
name_to_ignore=namespace.get("_ewoks_inputs_to_hide_from_orange", tuple()),
)
owsignals.validate_outputs(
namespace,
name_to_ignore=namespace.get("_ewoks_outputs_to_hide_from_orange", tuple()),
)
class _OWEwoksWidgetMetaClass(WidgetMetaClass):
def __new__(metacls, name, bases, attrs, ewokstaskclass=None, **kw):
if ewokstaskclass:
prepare_OWEwoksWidgetclass(attrs, ewokstaskclass)
return super().__new__(metacls, name, bases, attrs, **kw)
# insure compatibility between old orange widget and new
# orangewidget.widget.WidgetMetaClass. This was before split of the two
# projects. Parameter name "openclass" is undefined on the old version
ow_build_opts = dict()
if "openclass" in inspect.signature(WidgetMetaClass).parameters:
ow_build_opts["openclass"] = True
[docs]class OWEwoksBaseWidget(OWWidget, metaclass=_OWEwoksWidgetMetaClass, **ow_build_opts):
"""Base class for boiler plate code to interconnect ewoks and orange3"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__dynamic_inputs = dict()
self.__task_output_changed_callbacks: List[Callable[[], None]] = [
self.task_output_changed
]
self.__post_task_exception: Optional[Exception] = None
def _init_control_area(self) -> None:
"""The control area is used for task inputs."""
layout = self._get_control_layout()
trigger = QtWidgets.QPushButton("Trigger")
execute = QtWidgets.QPushButton("Execute")
layout.addWidget(trigger)
trigger.released.connect(self.execute_ewoks_task)
self._trigger_button = trigger
layout.addWidget(execute)
execute.released.connect(self.execute_ewoks_task_without_propagation)
self._execute_button = execute
def _init_main_area(self):
"""The main area is used to display results."""
self._get_main_layout()
def _get_control_layout(self):
layout = self.controlArea.layout()
# sp = self.controlArea.sizePolicy()
# sp.setVerticalPolicy(QtWidgets.QSizePolicy.Expanding)
# self.controlArea.setSizePolicy(sp)
# print("changed the size policy")
if layout is None:
layout = QtWidgets.QVBoxLayout()
self.controlArea.setLayout(layout)
return layout
def _get_main_layout(self):
if not self.want_main_area:
raise RuntimeError(
f"{type(self).__name__} must have class attribute `want_main_area = True`"
)
layout = self.mainArea.layout()
if layout is None:
layout = QtWidgets.QVBoxLayout()
self.mainArea.setLayout(layout)
return layout
def _get_task_arguments(self):
if self.signalManager is None:
execinfo = None
node_id = None
else:
scheme = self.signalManager.scheme()
node = scheme.node_for_widget(self)
node_id = node.title
if not node_id:
node_id = scheme.nodes.index(node)
execinfo = scheme_ewoks_events(scheme, self._ewoks_execinfo)
return {
"inputs": self.get_task_inputs(),
"varinfo": self._ewoks_varinfo,
"execinfo": execinfo,
"node_id": node_id,
}
def _deprecated_default_inputs(self):
adict = dict(self.default_inputs)
if not adict:
return
self.default_inputs.clear()
adict = {
name: value
for name, value in adict.items()
if not invalid_data.is_invalid_data(value)
and name not in self._ewoks_default_inputs
}
warnings.warn(
".ows file node property 'default_inputs' has been converted to '_ewoks_default_inputs'. Please save the workflow to keep this change.",
DeprecationWarning,
)
self.update_default_inputs(**adict)
[docs] def set_default_input(self, name: str, value: Any) -> None:
if invalid_data.is_invalid_data(value):
_logger.info("ewoks widget: remove default input %r", name)
self._ewoks_default_inputs.pop(name, None)
else:
_logger.info("ewoks widget: set default input %r = %s", name, value)
self._ewoks_default_inputs[name] = value
[docs] def set_dynamic_input(self, name: str, value: Any) -> None:
if invalid_data.is_invalid_data(value):
_logger.info("ewoks widget: remove dynamic input %r", name)
self.__dynamic_inputs.pop(name, None)
else:
_logger.info(
"ewoks widget: set dynamic input %r = %s",
name,
value_from_transfer(value, varinfo=self._ewoks_varinfo),
)
self.__dynamic_inputs[name] = value
def _receive_dynamic_input(self, name: str, value: Any) -> None:
warnings.warn(
"`_receive_dynamic_input` is deprecated in favor of `set_dynamic_input`.",
DeprecationWarning,
)
self.set_dynamic_input(name, value)
[docs] def get_default_input_value(self, name: str, default=None) -> Any:
return self._ewoks_default_inputs.get(name, default)
[docs] def get_dynamic_input_value(self, name: str, default=None) -> Any:
return self.__dynamic_inputs.get(name, default)
[docs] def get_task_output_value(self, name, default=missing_data.MISSING_DATA) -> Any:
adict = self.get_task_outputs()
try:
value = adict[name]
except KeyError:
return default
value = self._extract_value(value)
if missing_data.is_missing_data(value):
return default
return value
[docs] def get_task_input_value(self, name: str, default=missing_data.MISSING_DATA) -> Any:
adict = self.get_task_inputs()
try:
value = adict[name]
except KeyError:
return default
value = self._extract_value(value)
if missing_data.is_missing_data(value):
return default
return value
[docs] def get_default_input_names(self, include_missing: bool = False) -> set:
self._deprecated_default_inputs()
if include_missing:
return self.get_input_names()
else:
return set(self._ewoks_default_inputs)
[docs] def get_dynamic_input_names(self, include_missing: bool = False) -> set:
if include_missing:
return self.get_input_names()
else:
return set(self.__dynamic_inputs)
[docs] def update_default_inputs(self, **inputs) -> None:
for name, value in inputs.items():
self.set_default_input(name, value)
[docs] def update_dynamic_inputs(self, **inputs) -> None:
for name, value in inputs.items():
self.set_dynamic_input(name, value)
[docs] def get_default_input_values(
self, include_missing: bool = False, defaults: Optional[Mapping] = None
) -> dict:
self._deprecated_default_inputs()
if include_missing:
values = {
name: invalid_data.INVALIDATION_DATA for name in self.get_input_names()
}
else:
values = dict()
if defaults:
values.update(defaults)
values.update(self._ewoks_default_inputs)
return {name: invalid_data.as_missing(value) for name, value in values.items()}
[docs] def get_dynamic_input_values(
self, include_missing: bool = False, defaults: Optional[Mapping] = None
) -> dict:
if include_missing:
values = {
name: invalid_data.INVALIDATION_DATA for name in self.get_input_names()
}
else:
values = dict()
if defaults:
values.update(defaults)
values.update(
{k: self._extract_value(v) for k, v in self.__dynamic_inputs.items()}
)
return {name: invalid_data.as_missing(value) for name, value in values.items()}
[docs] def get_task_output_values(self) -> dict:
return {k: self._extract_value(v) for k, v in self.get_task_outputs().items()}
[docs] def get_task_input_values(self) -> dict:
return {k: self._extract_value(v) for k, v in self.get_task_inputs().items()}
def _extract_value(self, data) -> Any:
return value_from_transfer(data, varinfo=self._ewoks_varinfo)
[docs] def get_task_inputs(self) -> dict:
"""Default inputs overwritten by inputs from previous tasks"""
inputs = self.get_default_input_values()
inputs.update(self.__dynamic_inputs)
return inputs
def _get_output_signal(self, ewoksname: str):
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
for signal in self.outputs:
if signal.name == ewoksname:
break
else:
signal = None
else:
signal = getattr(self.Outputs, ewoksname, None)
if signal is None:
raise RuntimeError(f"Output signal '{ewoksname}' does not exist")
return signal
[docs] def trigger_downstream(self) -> None:
_logger.debug("%s: trigger downstream", self)
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
for ewoksname, var in self.get_task_outputs().items():
ewoks_to_orange = owsignals.get_ewoks_to_orange_mapping(
type(self), "outputs"
)
orangename = ewoks_to_orange.get(ewoksname, ewoksname)
if invalid_data.is_invalid_data(var.value):
self.send(
orangename, invalid_data.INVALIDATION_DATA
) # or self.invalidate?
else:
self.send(orangename, var)
else:
for ewoksname, var in self.get_task_outputs().items():
channel = self._get_output_signal(ewoksname)
if invalid_data.is_invalid_data(var.value):
channel.send(
invalid_data.INVALIDATION_DATA
) # or channel.invalidate?
else:
channel.send(var)
def _output_changed(self) -> None:
self.__post_task_execute(self.__task_output_changed_callbacks)
def __post_task_execute(self, callbacks: List[Callable[[], None]]) -> None:
ncallbacks = len(callbacks)
if ncallbacks == 0:
return
try:
callbacks[0]()
except Exception as e:
self.__post_task_exception = e
raise
finally:
if ncallbacks > 1:
self.__post_task_execute(callbacks[1:])
@property
def task_output_changed_callbacks(self) -> list:
return self.__task_output_changed_callbacks
[docs] def clear_downstream(self) -> None:
_logger.debug("%s: clear downstream", self)
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
for ewoksname in self.get_task_outputs():
ewoks_to_orange = owsignals.get_ewoks_to_orange_mapping(
type(self), "outputs"
)
orangename = ewoks_to_orange.get(ewoksname, ewoksname)
self.send(
orangename, invalid_data.INVALIDATION_DATA
) # or self.invalidate?
else:
for name in self.get_task_outputs():
channel = self._get_output_signal(name)
channel.send(invalid_data.INVALIDATION_DATA) # or channel.invalidate?
[docs] def propagate_downstream(self, succeeded: Optional[bool] = None) -> None:
if succeeded is None:
succeeded = self.task_succeeded
if succeeded:
self.__post_task_execute([self.trigger_downstream])
else:
self.__post_task_execute([self.clear_downstream])
[docs] def handleNewSignals(self) -> None:
"""Invoked by the workflow signal propagation manager after all
signals handlers have been called.
"""
self.execute_ewoks_task(log_missing_inputs=False)
[docs] def execute_ewoks_task(self, log_missing_inputs: bool = True) -> None:
_logger.debug("%s: execute ewoks task (with propagation)", self)
self._execute_ewoks_task(propagate=True, log_missing_inputs=log_missing_inputs)
[docs] def execute_ewoks_task_without_propagation(self) -> None:
_logger.debug("%s: execute ewoks task (without propagation)", self)
self._execute_ewoks_task(propagate=False, log_missing_inputs=False)
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
raise NotImplementedError("Base class")
@property
def task_succeeded(self) -> Optional[bool]:
raise NotImplementedError("Base class")
@property
def task_done(self) -> Optional[bool]:
raise NotImplementedError("Base class")
@property
def task_exception(self) -> Optional[Exception]:
raise NotImplementedError("Base class")
@property
def post_task_exception(self) -> Optional[Exception]:
return self.__post_task_exception
[docs]def is_native_widget_class(widget_class):
return is_orange_widget_class(widget_class) and not is_ewoks_widget_class(
widget_class
)
[docs]def is_native_widget(widget_class):
return is_orange_widget(widget_class) and not is_ewoks_widget(widget_class)
[docs]class OWEwoksWidgetNoThread(OWEwoksBaseWidget, **ow_build_opts):
"""Widget which will execute_ewoks_task the ewokscore.Task directly"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__task_executor = TaskExecutor(self.ewokstaskclass)
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
self.__task_executor.create_task(
log_missing_inputs=log_missing_inputs, **self._get_task_arguments()
)
try:
self.__task_executor.execute_task()
except Exception as e:
_logger.error(f"task failed: {e}", exc_info=True)
try:
self.__post_task_exception = None
if propagate:
self.propagate_downstream()
finally:
self._output_changed()
@property
def task_succeeded(self) -> Optional[bool]:
return self.__task_executor.succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__task_executor.done
@property
def task_exception(self) -> Optional[Exception]:
return self.__task_executor.exception
class _OWEwoksThreadedBaseWidget(OWEwoksBaseWidget, **ow_build_opts):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if has_progress_bar:
self.__taskProgress = QProgress()
self.__taskProgress.sigProgressChanged.connect(self.progressBarSet)
else:
self.__taskProgress = None
def onDeleteWidget(self):
if has_progress_bar:
self.__taskProgress.sigProgressChanged.disconnect(self.progressBarSet)
self._cleanup_task_executor()
super().onDeleteWidget()
def _cleanup_task_executor(self):
raise NotImplementedError("Base class")
@contextmanager
def _ewoks_task_start_context(self):
try:
self.__ewoks_task_init()
yield
except Exception:
self.__ewoks_task_finished()
raise
@contextmanager
def _ewoks_task_finished_context(self):
try:
yield
finally:
self.__ewoks_task_finished()
def __ewoks_task_init(self):
if has_progress_bar:
self.progressBarInit()
def __ewoks_task_finished(self):
if has_progress_bar:
self.progressBarFinished()
self._output_changed()
def _get_task_arguments(self):
adict = super()._get_task_arguments()
adict["progress"] = self.__taskProgress
return adict
[docs]class OWEwoksWidgetOneThread(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
All the processing is done on one thread.
If a processing is requested when the thread is already running then
it is refused.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass)
self.__task_executor.finished.connect(self._ewoks_task_finished_callback)
self.__propagate = None
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
if self.__task_executor.isRunning():
_logger.error("A processing is already ongoing")
return
self.__task_executor.create_task(
log_missing_inputs=log_missing_inputs, **self._get_task_arguments()
)
if self.__task_executor.has_task:
with self._ewoks_task_start_context():
self.__propagate = propagate
self.__task_executor.start()
else:
self.__propagate = propagate
self.__task_executor.finished.emit()
@property
def task_succeeded(self) -> Optional[bool]:
return self.__task_executor.succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__task_executor.done
@property
def task_exception(self) -> Optional[Exception]:
return self.__task_executor.exception
def _ewoks_task_finished_callback(self):
with self._ewoks_task_finished_context():
self.__post_task_exception = None
if self.__propagate:
self.propagate_downstream()
def _cleanup_task_executor(self):
self.__task_executor.finished.disconnect(self._ewoks_task_finished_callback)
self.__task_executor.stop()
self.__task_executor = None
[docs]class OWEwoksWidgetOneThreadPerRun(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
Each time a task processing is requested this will create a new thread
to do the processing.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__task_executors = dict()
self.__last_output_variables = dict()
self.__last_task_succeeded = None
self.__last_task_done = None
self.__last_task_exception = None
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass)
task_executor.create_task(
log_missing_inputs=log_missing_inputs, **self._get_task_arguments()
)
with self.__init_task_executor(task_executor, propagate):
if task_executor.has_task:
with self._ewoks_task_start_context():
task_executor.start()
else:
task_executor.finished.emit()
@contextmanager
def __init_task_executor(self, task_executor, propagate: bool):
self.__disconnect_all_task_executors()
task_executor.finished.connect(self._ewoks_task_finished_callback)
self.__add_task_executor(task_executor, propagate)
try:
yield
except Exception:
task_executor.finished.disconnect(self._ewoks_task_finished_callback)
self.__remove_task_executor(task_executor)
raise
def __disconnect_all_task_executors(self):
for task_executor, _ in self.__task_executors:
try:
task_executor.finished.disconnect(self._ewoks_task_finished_callback)
except KeyError:
pass
def _ewoks_task_finished_callback(self):
with self._ewoks_task_finished_context():
task_executor = None
try:
task_executor = self.sender()
self.__last_output_variables = task_executor.output_variables
self.__last_task_succeeded = task_executor.succeeded
self.__last_task_done = task_executor.done
self.__last_task_exception = task_executor.exception
self.__post_task_exception = None
propagate = self.__is_task_executor_propagated(task_executor)
if propagate:
self.propagate_downstream(succeeded=task_executor.succeeded)
finally:
self.__remove_task_executor(task_executor)
def _cleanup_task_executor(self):
self.__disconnect_all_task_executors()
for task_executor, _ in self.__task_executors:
task_executor.quit()
self.__task_executors.clear()
def __add_task_executor(self, task_executor, propagate: bool):
self.__task_executors[id(task_executor)] = task_executor, propagate
def __remove_task_executor(self, task_executor):
self.__task_executors.pop(id(task_executor), None)
def __is_task_executor_propagated(self, task_executor) -> bool:
return self.__task_executors.get(id(task_executor), (None, False))[1]
@property
def task_succeeded(self) -> Optional[bool]:
return self.__last_task_succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__last_task_done
@property
def task_exception(self) -> Optional[Exception]:
return self.__last_task_exception
[docs]class OWEwoksWidgetWithTaskStack(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
Each time a task processing is requested add it to the FIFO stack.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.__task_executor_queue = TaskExecutorQueue(
ewokstaskclass=self.ewokstaskclass
)
self.__last_output_variables = dict()
self.__last_task_succeeded = None
self.__last_task_done = None
self.__last_task_exception = None
@property
def task_executor_queue(self):
return self.__task_executor_queue
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
def callback():
self._ewoks_task_finished_callback(propagate)
with self._ewoks_task_start_context():
self.__task_executor_queue.add(
_callbacks=(callback,),
_log_missing_inputs=log_missing_inputs,
**self._get_task_arguments(),
)
@property
def task_succeeded(self) -> Optional[bool]:
return self.__last_task_succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__last_task_done
@property
def task_exception(self) -> Optional[Exception]:
return self.__last_task_exception
def _cleanup_task_executor(self):
self.__task_executor_queue.stop()
self.__task_executor_queue = None
def _ewoks_task_finished_callback(self, propagate: bool):
with self._ewoks_task_finished_context():
task_executor = self.sender()
self.__last_output_variables = task_executor.output_variables
self.__last_task_succeeded = task_executor.succeeded
self.__last_task_done = task_executor.done
self.__last_task_exception = task_executor.exception
self.__post_task_exception = None
if propagate:
self.propagate_downstream()