"""
=================================
SignalManager (``signalmanager``)
=================================
A SignalManager instance handles the runtime signal propagation between
widgets in a scheme workflow.
"""
import os
import logging
import warnings
import enum
import functools
from collections import defaultdict
from operator import attrgetter
from functools import partial, reduce
from itertools import chain
import typing
from typing import (
Any, Optional, List, NamedTuple, Set, Dict, Callable,
Sequence, Union, DefaultDict, Type
)
from AnyQt.QtCore import QObject, QTimer, QSettings, QEvent
from AnyQt.QtCore import pyqtSignal, pyqtSlot as Slot
from . import LinkEvent
from ..utils import unique, mapping_get, group_by_all
from ..registry import OutputSignal, InputSignal
from .scheme import Scheme, SchemeNode, SchemeLink
from ..utils.graph import traverse_bf, strongly_connected_components
if typing.TYPE_CHECKING:
V = typing.TypeVar("V")
K = typing.TypeVar("K")
log = logging.getLogger(__name__)
class Signal(
NamedTuple(
"Signal", (
("link", SchemeLink),
("value", Any),
("id", Any),
("index", int),
))
):
"""
A signal sent via a link between two nodes.
Attributes
----------
link : SchemeLink
The link on which the signal is sent
value : Any
The signal value
id : Any
.. deprecated:: 0.1.19
index: int
Position of the link in sink_node's input links at the time the signal
is enqueued or -1 if not applicable.
See also
--------
InputSignal.flags, OutputSignal.flags
"""
def __new__(cls, link: SchemeLink, value: Any, id: Any = None,
index: int = -1):
return super().__new__(cls, link, value, id, index)
@property
def channel(self) -> InputSignal:
"""Alias for `self.link.sink_channel`"""
return self.link.sink_channel
New: 'Type[New]'
Update: 'Type[Update]'
Close: 'Type[Close]'
class New(Signal): ...
class Update(Signal): ...
class Close(Signal): ...
Signal.New = New
Signal.Update = Update
Signal.Close = Close
is_enabled = attrgetter("enabled")
class _LazyValueType:
"""
LazyValue is an abstract type for wrapper for lazy evaluation of signals.
LazyValue is intended for situations in which computation of outputs is
reasonably fast, but we won't to compute it only if the output is connected
to some input, in order to save memory.
Assume the widget has a method `commit` that outputs the sum of two objects,
`self.a` and `self.b`. The output signal is names `signal_name` and its
type is `SomeType`.
```
def commit(self):
self.send(self.Outputs.signal_name, self.a + self.b)
```
To use lazy values, we modify the method as follows.
```
def commit(self):
def f():
return self.a + self.b
self.send(self.Outputs.signal_name, LazySignal[SomeType](f))
```
The lazy function receives no arguments, so `commit` will often prepare
some data accessible through closure or default arguments. After calling
the function, LazyValue will release the reference to function, which in
turn releases references to any data from closure or arguments.
LazyValue is a singleton, used in similar way as generic classes from
typing. "Indexing" returns an instance of (internal) class `LazyValue_`.
Indexing is cached; `LazyValue[SomeType]` always returns the same object.
LazySignal[SomeType] (that is: LazyValue_) has a constructor that expects
the following arguments.
- A function that computes the actual value. This function must expect
no arguments, but will usually get data (for instance `self`, in the
above example) from closure.
- An optional function that can be called to interrupt the computation.
This function is called when the signal is deleted.
- Optional extra arguments that are stored as LazyValue's attributes.
These are not accessible by the above function and are primarily
intended to be used in output summaries.
Properties:
- `is_cached()`, which returns `True` if the value is already computed.
Functions for output summaries can use this to show more information
if the value is available, and avoid computing it when not.
Methods:
- `get_value()` returns the actual value by calling the function, if
the value has not been computed yet, or providing the cached value.
- `type()` returns the type of the lazy signal (e.g. `SomeType`, in
above case.
"""
class LazyValueMeta(type):
def __repr__(cls):
"""
Pretty-prints the LazyValue[SomeType] as "LazyValue[SomeType]"
instead of generic `LazyValue_`.
"""
return f"LazyValue[{cls.type().__name__}]"
@classmethod
def is_lazy(cls, value):
"""
Tells whether the given value is lazy.
```
>>> def f():
... return 12
...
>>> lazy = LazyValue[int](f)
>>> eager = f()
>>> LazyValue.is_lazy(lazy)
True
>>> LazyValue.is_lazy(eager)
False
```
"""
return isinstance(type(value), cls.LazyValueMeta)
@classmethod
@functools.lru_cache(maxsize=None)
def __getitem__(cls, type_):
# This is cached, so that it always returns the same class for the
# same type.
# >>> t1 = LazyValue[int]
# >>> t2 = LazyValue[int]
# >>> t1 is t2
# True
class LazyValue_(metaclass=cls.LazyValueMeta):
__type = type_
def __init__(self, func: Callable, interrupt=None, **extra_attrs):
self.__func = func
self.__cached = None
self.interrupt = interrupt
self.__dict__.update(extra_attrs)
def __del__(self):
if self.interrupt is not None:
self.interrupt()
@property
def is_cached(self):
return self.__func is None
@classmethod
def type(cls):
return cls.__type
def get_value(self):
if self.__func is not None:
self.__cached = self.__func()
# This frees any references to closure and arguments
self.__func = None
return self.__cached
return LazyValue_
LazyValue = _LazyValueType()
class _OutputState:
"""Output state for a single node/channel"""
__slots__ = ('flags', 'outputs')
#: Flag indicating the output on the channel is invalidated.
Invalidated = 1
def __init__(self):
self.outputs = defaultdict()
self.flags = 0
def __repr__(self):
return "State(flags={}, outputs={!r})".format(
self.flags, dict(self.outputs)
)
__str__ = __repr__
class _LinkExtra:
"""Extra data tracked for a SchemeLink"""
__slots__ = ("flags",)
DidScheduleNew = 1
def __init__(self, flags=0):
self.flags = flags
class SignalManager(QObject):
"""
SignalManager handles the runtime signal propagation for a :class:`.Scheme`
instance.
Note
----
If a scheme instance is passed as a parent to the constructor it is also
set as the workflow model.
"""
class State(enum.IntEnum):
"""
SignalManager state flags.
.. seealso:: :func:`SignalManager.state()`
"""
#: The manager is running, i.e. it propagates signals
Running = 0
#: The manager is stopped. It does not track node output changes,
#: and does not deliver signals to dependent nodes
Stopped = 1
#: The manager is paused. It still tracks node output changes, but
#: does not deliver new signals to dependent nodes. The pending signals
#: will be delivered once it enters Running state again
Paused = 2
#: The manager is running, i.e. it propagates signals
Running = State.Running
#: The manager is stopped. It does not track node ouput changes,
#: and does not deliver signals to dependent nodes
Stopped = State.Stopped
#: The manager is paused. It still tracks node output changes, but
#: does not deliver new signals to dependent nodes. The pending signals
#: will be delivered once it enters Running state again
Paused = State.Paused
# unused; back-compatibility
Error = 3
class RuntimeState(enum.IntEnum):
"""
SignalManager runtime state.
See Also
--------
SignalManager.runtime_state
"""
#: Waiting, idle state. The signal queue is empty
Waiting = 0
#: ...
Processing = 1
Waiting = RuntimeState.Waiting
Processing = RuntimeState.Processing
#: Emitted when the state of the signal manager changes.
stateChanged = pyqtSignal(int)
#: Emitted when signals are added to the queue.
updatesPending = pyqtSignal()
#: Emitted right before a `SchemeNode` instance has its inputs updated.
processingStarted = pyqtSignal([], [SchemeNode])
#: Emitted right after a `SchemeNode` instance has had its inputs updated.
processingFinished = pyqtSignal([], [SchemeNode])
#: Emitted when `SignalManager`'s runtime state changes.
runtimeStateChanged = pyqtSignal(int)
#: Emitted when the execution finishes (there are no more nodes that
#: need to run). Note: the nodes can activate again due to user
#: interaction or other scheduled events, i.e. finished is not a definitive
#: state. Use at your own discretion.
finished = pyqtSignal()
#: Emitted when starting initial execution and when resuming after already
#: emitting `finished`.
started = pyqtSignal()
def __init__(self, parent=None, *, max_running=None, **kwargs):
# type: (Optional[QObject], Optional[int], Any) -> None
super().__init__(parent, **kwargs)
self.__workflow = None # type: Optional[Scheme]
self.__input_queue = [] # type: List[Signal]
# mapping a node to its current outputs
self.__node_outputs = {} # type: Dict[SchemeNode, DefaultDict[OutputSignal, _OutputState]]
#: Extra link state
self.__link_extra = defaultdict(_LinkExtra) # type: DefaultDict[SchemeLink, _LinkExtra]
self.__state = SignalManager.Running
self.__runtime_state = SignalManager.Waiting
self.__update_timer = QTimer(self, interval=100, singleShot=True)
self.__update_timer.timeout.connect(self.__process_next)
self.__max_running = max_running
self.__has_finished = True
if isinstance(parent, Scheme):
self.set_workflow(parent)
def _can_process(self): # type: () -> bool
"""
Return a bool indicating if the manger can enter the main
processing loop.
"""
return self.__state not in [SignalManager.Error, SignalManager.Stopped]
def workflow(self):
# type: () -> Optional[Scheme]
"""
Return the :class:`Scheme` instance.
"""
return self.__workflow
#: Alias
scheme = workflow
def set_workflow(self, workflow):
# type: (Scheme) -> None
"""
Set the workflow model.
Parameters
----------
workflow : Scheme
"""
if workflow is self.__workflow:
return
if self.__workflow is not None:
for node in self.__workflow.nodes:
node.state_changed.disconnect(self._update)
node.removeEventFilter(self)
for link in self.__workflow.links:
self.__on_link_removed(link)
self.__workflow.node_added.disconnect(self.__on_node_added)
self.__workflow.node_removed.disconnect(self.__on_node_removed)
self.__workflow.link_added.disconnect(self.__on_link_added)
self.__workflow.link_removed.disconnect(self.__on_link_removed)
self.__workflow.removeEventFilter(self)
self.__node_outputs = {}
self.__input_queue = []
self.__workflow = workflow
if workflow is not None:
workflow.node_added.connect(self.__on_node_added)
workflow.node_removed.connect(self.__on_node_removed)
workflow.link_added.connect(self.__on_link_added)
workflow.link_removed.connect(self.__on_link_removed)
for node in workflow.nodes:
self.__node_outputs[node] = defaultdict(_OutputState)
node.state_changed.connect(self._update)
node.installEventFilter(self)
for link in workflow.links:
self.__on_link_added(link)
workflow.installEventFilter(self)
def has_pending(self): # type: () -> bool
"""
Does the manager have any signals to deliver?
"""
return bool(self.__input_queue)
def start(self): # type: () -> None
"""
Start the update loop.
Note
----
The updates will not happen until the control reaches the Qt event
loop.
"""
if self.__state != SignalManager.Running:
self.__state = SignalManager.Running
self.stateChanged.emit(SignalManager.Running)
self._update()
def stop(self): # type: () -> None
"""
Stop the update loop.
Note
----
If the `SignalManager` is currently in `process_queues` it will
still update all current pending signals, but will not re-enter
until `start()` is called again.
"""
if self.__state != SignalManager.Stopped:
self.__state = SignalManager.Stopped
self.stateChanged.emit(SignalManager.Stopped)
self.__update_timer.stop()
def pause(self): # type: () -> None
"""
Pause the delivery of signals.
"""
if self.__state != SignalManager.Paused:
self.__state = SignalManager.Paused
self.stateChanged.emit(SignalManager.Paused)
self.__update_timer.stop()
def resume(self):
# type: () -> None
"""
Resume the delivery of signals.
"""
if self.__state == SignalManager.Paused:
self.__state = SignalManager.Running
self.stateChanged.emit(self.__state)
self._update()
def step(self):
# type: () -> None
"""
Deliver signals to a single node (only applicable while the `state()`
is `Paused`).
"""
if self.__state == SignalManager.Paused:
self.process_queued()
def state(self):
# type: () -> State
"""
Return the current state.
Return
------
state : SignalManager.State
"""
return self.__state
def _set_runtime_state(self, state):
# type: (Union[RuntimeState, int]) -> None
"""
Set the runtime state.
Should only be called by `SignalManager` implementations.
"""
state = SignalManager.RuntimeState(state)
if self.__runtime_state != state:
self.__runtime_state = state
self.runtimeStateChanged.emit(self.__runtime_state)
def runtime_state(self):
# type: () -> RuntimeState
"""
Return the runtime state. This can be `SignalManager.Waiting`
or `SignalManager.Processing`.
"""
return self.__runtime_state
def __on_node_removed(self, node):
# type: (SchemeNode) -> None
# remove all pending input signals for node so we don't get
# stale references in process_node.
# NOTE: This does not remove output signals for this node. In
# particular the final 'None' will be delivered to the sink
# nodes even after the source node is no longer in the scheme.
log.info("Removing pending signals for '%s'.", node.title)
self.remove_pending_signals(node)
del self.__node_outputs[node]
node.state_changed.disconnect(self._update)
node.removeEventFilter(self)
def __on_node_added(self, node):
# type: (SchemeNode) -> None
self.__node_outputs[node] = defaultdict(_OutputState)
# schedule update pass on state change
node.state_changed.connect(self._update)
node.installEventFilter(self)
def __on_link_added(self, link):
# type: (SchemeLink) -> None
# push all current source values to the sink
link.set_runtime_state(SchemeLink.Empty)
state = self.__node_outputs[link.source_node][link.source_channel]
link.set_runtime_state_flag(
SchemeLink.Invalidated,
bool(state.flags & _OutputState.Invalidated)
)
signals: List[Signal] = [Signal.New(*s)
for s in self.signals_on_link(link)]
if not link.is_enabled():
# Send New signals even if disabled. This is changed behaviour
# from <0.1.19 where signals were only sent when link was enabled.
# Because we need to maintain input consistency we cannot use the
# current signal value so replace it with None.
signals = [s._replace(value=None) for s in signals]
log.info("Scheduling signal data update for '%s'.", link)
self._schedule(signals)
link.enabled_changed.connect(self.__on_link_enabled_changed)
def __on_link_removed(self, link):
# type: (SchemeLink) -> None
link.enabled_changed.disconnect(self.__on_link_enabled_changed)
self.__link_extra.pop(link, None)
def eventFilter(self, recv: QObject, event: QEvent) -> bool:
etype = event.type()
if etype == LinkEvent.InputLinkRemoved:
event = typing.cast(LinkEvent, event)
link = event.link()
log.info("Scheduling close signal (%s).", link)
signals: List[Signal] = [Signal.Close(link, None, id, event.pos())
for id in self.link_contents(link)]
self._schedule(signals)
return super().eventFilter(recv, event)
def __on_link_enabled_changed(self, enabled):
if enabled:
link = self.sender()
log.info("Link %s enabled. Scheduling signal data update.", link)
self._update_link(link)
def signals_on_link(self, link):
# type: (SchemeLink) -> List[Signal]
"""
Return :class:`Signal` instances representing the current values
present on the `link`.
"""
if self.__workflow is None:
return []
items = self.link_contents(link)
links_in = self.__workflow.find_links(sink_node=link.sink_node)
index = links_in.index(link)
return [Signal(link, value, key, index=index)
for key, value in items.items()]
def link_contents(self, link):
# type: (SchemeLink) -> Dict[Any, Any]
"""
Return the contents on the `link`.
"""
node, channel = link.source_node, link.source_channel
if node in self.__node_outputs:
return self.__node_outputs[node][channel].outputs
else:
# if the the node was already removed its tracked outputs in
# __node_outputs are cleared, however the final 'None' signal
# deliveries for the link are left in the _input_queue.
pending = [sig for sig in self.__input_queue
if sig.link is link]
return {sig.id: sig.value for sig in pending}
def send(self, node, channel, value, *args, **kwargs):
# type: (SchemeNode, OutputSignal, Any, Any, Any) -> None
"""
Send the `value` on the output `channel` from `node`.
Schedule the signal delivery to all dependent nodes
Parameters
----------
node : SchemeNode
The originating node.
channel : OutputSignal
The nodes output on which the value is sent.
value : Any
The value to send,
id : Any
Signal id.
.. deprecated:: 0.1.19
"""
if self.__workflow is None:
raise RuntimeError("'send' called with no workflow!.")
# parse deprecated id parameter from *args, **kwargs.
def _id_(id):
return id
try:
id = _id_(*args, **kwargs)
except TypeError:
id = None
else:
warnings.warn(
"`id` parameter is deprecated and will be removed in v0.2",
FutureWarning, stacklevel=2
)
log.debug("%r sending %r (id: %r) on channel %r",
node.title, type(value), id, channel.name)
scheme = self.__workflow
state = self.__node_outputs[node][channel]
if state.outputs and id not in state.outputs:
raise RuntimeError(
"Sending multiple values on the same output channel via "
"different ids is no longer supported."
)
sigtype: Type[Signal]
if id in state.outputs:
sigtype = Signal.Update
else:
sigtype = Signal.New
state.outputs[id] = value
assert len(state.outputs) == 1
# clear invalidated flag
if state.flags & _OutputState.Invalidated:
log.debug("%r clear invalidated flag on channel %r",
node.title, channel.name)
state.flags &= ~_OutputState.Invalidated
links = scheme.find_links(source_node=node, source_channel=channel)
signals = []
for link in links:
extra = self.__link_extra[link]
links_in = scheme.find_links(sink_node=link.sink_node)
index = links_in.index(link)
if not link.is_enabled() and not extra.flags & _LinkExtra.DidScheduleNew:
# Send Signal.New with None value. Proper update will be done
# when/if the link is re-enabled.
signal = Signal.New(link, None, id, index=index)
elif link.is_enabled():
signal = sigtype(link, value, id, index=index)
else:
continue
signals.append(signal)
link.set_runtime_state_flag(SchemeLink.Invalidated, False)
self._schedule(signals)
def invalidate(self, node, channel):
# type: (SchemeNode, OutputSignal) -> None
"""
Invalidate the `channel` on `node`.
The channel is effectively considered changed but unavailable until
a new value is sent via `send`. While this state is set the dependent
nodes will not be updated.
All links originating with this node/channel will be marked with
`SchemeLink.Invalidated` flag until a new value is sent with `send`.
Parameters
----------
node: SchemeNode
The originating node.
channel: OutputSignal
The channel to invalidate.
.. versionadded:: 0.1.8
"""
log.debug("%r invalidating channel %r", node.title, channel.name)
self.__node_outputs[node][channel].flags |= _OutputState.Invalidated
if self.__workflow is None:
return
links = self.__workflow.find_links(
source_node=node, source_channel=channel
)
for link in links:
link.set_runtime_state(link.runtime_state() | link.Invalidated)
def purge_link(self, link):
# type: (SchemeLink) -> None
"""
Purge the link (send None for all ids currently present)
.. deprecated:: 0.1.19
"""
warnings.warn(
"`purge_link` is deprecated.", DeprecationWarning, stacklevel=2
)
self._schedule([Signal(link, None, id)
for id in self.link_contents(link)])
def _schedule(self, signals):
# type: (List[Signal]) -> None
"""
Schedule a list of :class:`Signal` for delivery.
"""
self.__input_queue.extend(signals)
for sig in signals:
if isinstance(sig, Signal.New):
extra = self.__link_extra[sig.link]
extra.flags |= _LinkExtra.DidScheduleNew
for link in {sig.link for sig in signals}:
# update the SchemeLink's runtime state flags
contents = self.link_contents(link)
if any(value is not None for value in contents.values()):
state = SchemeLink.Active
else:
state = SchemeLink.Empty
link.set_runtime_state(state | SchemeLink.Pending)
for node in {sig.link.sink_node for sig in signals}: # type: SchemeNode
# update the SchemeNodes's runtime state flags
node.set_state_flags(SchemeNode.Pending, True)
if signals:
self.updatesPending.emit()
self._update()
def _update_link(self, link):
# type: (SchemeLink) -> None
"""
Schedule update of a single link.
"""
self._schedule([Signal.Update(*s) for s in self.signals_on_link(link)])
def process_queued(self, max_nodes=None):
# type: (Any) -> None
"""
Process queued signals.
Take the first eligible node from the pending input queue and deliver
all scheduled signals.
"""
if not (max_nodes is None or max_nodes == 1):
warnings.warn(
"`max_nodes` is deprecated and will be removed in the future",
FutureWarning, stacklevel=2)
if self.__runtime_state == SignalManager.Processing:
raise RuntimeError("Cannot re-enter 'process_queued'")
if not self._can_process():
raise RuntimeError("Can't process in state %i" % self.__state)
self.process_next()
def process_next(self):
# type: () -> bool
"""
Process queued signals.
Take the first eligible node from the pending input queue and deliver
all scheduled signals for it and return `True`.
If no node is eligible for update do nothing and return `False`.
"""
return self.__process_next_helper(use_max_active=False)
def process_node(self, node):
# type: (SchemeNode) -> None
"""
Process pending input signals for `node`.
"""
assert self.__runtime_state != SignalManager.Processing
signals_in = self.pending_input_signals(node)
self.remove_pending_signals(node)
signals_in = self.compress_signals(signals_in)
log.debug("Processing %r, sending %i signals.",
node.title, len(signals_in))
# Clear the link's pending flag.
for link in {sig.link for sig in signals_in}:
link.set_runtime_state(link.runtime_state() & ~SchemeLink.Pending)
def process_dynamic(signals):
# type: (List[Signal]) -> List[Signal]
"""
Process dynamic signals; Update the link's dynamic_enabled flag if
the value is valid; replace values that do not type check with
`None`
"""
res = []
for sig in signals:
# Check and update the dynamic link state
link = sig.link
if sig.link.is_dynamic():
enabled = can_enable_dynamic(link, sig.value)
link.set_dynamic_enabled(enabled)
if not enabled:
# Send None instead (clear the link)
sig = sig._replace(value=None)
res.append(sig)
return res
signals_in = process_dynamic(signals_in)
assert ({sig.link for sig in self.__input_queue}
.intersection({sig.link for sig in signals_in}) == set([]))
self._set_runtime_state(SignalManager.Processing)
self.processingStarted.emit()
self.processingStarted[SchemeNode].emit(node)
try:
self.send_to_node(node, signals_in)
finally:
node.set_state_flags(SchemeNode.Pending, False)
self.processingFinished.emit()
self.processingFinished[SchemeNode].emit(node)
self._set_runtime_state(SignalManager.Waiting)
def compress_signals(self, signals):
# type: (List[Signal]) -> List[Signal]
"""
Compress a list of :class:`Signal` instances to be delivered.
Before the signal values are delivered to the sink node they can be
optionally `compressed`, i.e. values can be merged or dropped
depending on the execution semantics.
The input list is in the order that the signals were enqueued.
The base implementation returns the list unmodified.
Parameters
----------
signals : List[Signal]
Return
------
signals : List[Signal]
"""
return signals
def send_to_node(self, node, signals):
# type: (SchemeNode, List[Signal]) -> None
"""
Abstract. Reimplement in subclass.
Send/notify the `node` instance (or whatever object/instance it is a
representation of) that it has new inputs as represented by the
`signals` list).
Parameters
----------
node : SchemeNode
signals : List[Signal]
"""
raise NotImplementedError
def is_pending(self, node):
# type: (SchemeNode) -> bool
"""
Is `node` (class:`SchemeNode`) scheduled for processing (i.e.
it has incoming pending signals).
Parameters
----------
node : SchemeNode
Returns
-------
pending : bool
"""
return node in [signal.link.sink_node for signal in self.__input_queue]
def pending_nodes(self):
# type: () -> List[SchemeNode]
"""
Return a list of pending nodes.
The nodes are returned in the order they were enqueued for
signal delivery.
Returns
-------
nodes : List[SchemeNode]
"""
return list(unique(sig.link.sink_node for sig in self.__input_queue))
def pending_input_signals(self, node):
# type: (SchemeNode) -> List[Signal]
"""
Return a list of pending input signals for node.
"""
return [signal for signal in self.__input_queue
if node is signal.link.sink_node]
def remove_pending_signals(self, node):
# type: (SchemeNode) -> None
"""
Remove pending signals for `node`.
"""
for signal in self.pending_input_signals(node):
try:
self.__input_queue.remove(signal)
except ValueError:
pass
def __nodes(self):
# type: () -> Sequence[SchemeNode]
return self.__workflow.nodes if self.__workflow else []
def blocking_nodes(self):
# type: () -> List[SchemeNode]
"""
Return a list of nodes in a blocking state.
"""
return [node for node in self.__nodes() if self.is_blocking(node)]
def invalidated_nodes(self):
# type: () -> List[SchemeNode]
"""
Return a list of invalidated nodes.
.. versionadded:: 0.1.8
"""
return [node for node in self.__nodes()
if self.has_invalidated_outputs(node) or
self.is_invalidated(node)]
def active_nodes(self):
# type: () -> List[SchemeNode]
"""
Return a list of active nodes.
.. versionadded:: 0.1.8
"""
return [node for node in self.__nodes() if self.is_active(node)]
def is_blocking(self, node):
# type: (SchemeNode) -> bool
"""
Is the node in `blocking` state.
Is it currently in a state where will produce new outputs and
therefore no signals should be delivered to dependent nodes until
it does so. Also no signals will be delivered to the node until
it exits this state.
The default implementation returns False.
.. deprecated:: 0.1.8
Use a combination of `is_invalidated` and `is_ready`.
"""
return False
def is_ready(self, node: SchemeNode) -> bool:
"""
Is the node in a state where it can receive inputs.
Re-implement this method in as subclass to prevent specific nodes from
being considered for input update (e.g. they are still initializing
runtime resources, executing a non-interruptable task, ...)
Note that whenever the implicit state changes the
`post_update_request` should be called.
The default implementation returns the state of the node's
`SchemeNode.NotReady` flag.
Parameters
----------
node: SchemeNode
"""
return not node.test_state_flags(SchemeNode.NotReady)
def is_invalidated(self, node: SchemeNode) -> bool:
"""
Is the node marked as invalidated.
Parameters
----------
node : SchemeNode
Returns
-------
state: bool
"""
return node.test_state_flags(SchemeNode.Invalidated)
def has_invalidated_outputs(self, node):
# type: (SchemeNode) -> bool
"""
Does node have any explicitly invalidated outputs.
Parameters
----------
node: SchemeNode
Returns
-------
state: bool
See also
--------
invalidate
.. versionadded:: 0.1.8
"""
out = self.__node_outputs.get(node)
if out is not None:
return any(state.flags & _OutputState.Invalidated
for state in out.values())
else:
return False
def has_invalidated_inputs(self, node):
# type: (SchemeNode) -> bool
"""
Does the node have any immediate ancestor with invalidated outputs.
Parameters
----------
node : SchemeNode
Returns
-------
state: bool
Note
----
The node's ancestors are only computed over enabled links.
.. versionadded:: 0.1.8
"""
if self.__workflow is None:
return False
workflow = self.__workflow
return any(self.has_invalidated_outputs(link.source_node)
for link in workflow.find_links(sink_node=node)
if link.is_enabled())
def is_active(self, node):
# type: (SchemeNode) -> bool
"""
Is the node considered active (executing a task).
Parameters
----------
node: SchemeNode
Returns
-------
active: bool
"""
return bool(node.state() & SchemeNode.Running)
def node_update_front(self):
# type: () -> Sequence[SchemeNode]
"""
Return a list of nodes on the update front, i.e. nodes scheduled for
an update that have no ancestor which is either itself scheduled
for update or is in a blocking state).
Note
----
The node's ancestors are only computed over enabled links.
"""
if self.__workflow is None:
return []
workflow = self.__workflow
expand = partial(expand_node, workflow)
components = strongly_connected_components(workflow.nodes, expand)
node_scc = {node: scc for scc in components for node in scc}
def isincycle(node): # type: (SchemeNode) -> bool
return len(node_scc[node]) > 1
def dependents(node): # type: (SchemeNode) -> List[SchemeNode]
return dependent_nodes(workflow, node)
# A list of all nodes currently active/executing a non-interruptable
# task.
blocking_nodes = set(self.blocking_nodes())
# nodes marked as having invalidated outputs (not yet available)
invalidated_nodes = set(self.invalidated_nodes())
#: transitive invalidated nodes (including the legacy self.is_blocked
#: behaviour - blocked nodes are both invalidated and cannot receive
#: new inputs)
invalidated_ = reduce(
set.union,
map(dependents, invalidated_nodes | blocking_nodes),
set([]),
) # type: Set[SchemeNode]
pending = self.pending_nodes()
pending_ = set()
for n in pending:
depend = set(dependents(n))
if isincycle(n):
# a pending node in a cycle would would have a circular
# dependency on itself, preventing any progress being made
# by the workflow execution.
cc = node_scc[n]
depend -= set(cc)
pending_.update(depend)
def has_invalidated_ancestor(node): # type: (SchemeNode) -> bool
return node in invalidated_
def has_pending_ancestor(node): # type: (SchemeNode) -> bool
return node in pending_
#: nodes that are eligible for update.
ready = list(filter(
lambda node: not has_pending_ancestor(node)
and not has_invalidated_ancestor(node)
and not self.is_blocking(node),
pending
))
return ready
@Slot()
def __process_next(self):
if not self.__state == SignalManager.Running:
log.debug("Received 'UpdateRequest' while not in 'Running' state")
return
if self.__runtime_state == SignalManager.Processing:
# This happens if QCoreApplication.processEvents is called from
# the input handlers. A `__process_next` must be rescheduled when
# exiting process_queued.
log.warning("Received 'UpdateRequest' while in 'process_queued'. "
"An update will be re-scheduled when exiting the "
"current update.")
return
if not self.__input_queue:
return
if self.__has_finished:
self.__has_finished = False
self.started.emit()
if self.__process_next_helper(use_max_active=True):
# Schedule another update (will be a noop if nothing to do).
self._update()
def __process_next_helper(self, use_max_active=True) -> bool:
eligible = [n for n in self.node_update_front() if self.is_ready(n)]
if not eligible:
return False
max_active = self.max_active()
nactive = len(set(self.active_nodes()) | set(self.blocking_nodes()))
log.debug(
"Process next, queued signals: %i, nactive: %i "
"(max_active: %i)",
len(self.__input_queue), nactive, max_active
)
_ = lambda nodes: list(map(attrgetter('title'), nodes))
log.debug("Pending nodes: %s", _(self.pending_nodes()))
log.debug("Blocking nodes: %s", _(self.blocking_nodes()))
log.debug("Invalidated nodes: %s", _(self.invalidated_nodes()))
log.debug("Nodes ready for update: %s", _(eligible))
# Select an node that is already running (effectively cancelling
# already executing tasks that are immediately updatable)
selected_node = None # type: Optional[SchemeNode]
for node in eligible:
if self.is_active(node):
selected_node = node
break
# Return if over committed, except in the case that the selected_node
# is already active.
if use_max_active and nactive >= max_active and selected_node is None:
return False
if selected_node is None:
selected_node = eligible[0]
self.process_node(selected_node)
self.__maybe_emit_finished()
return True
def _update(self): # type: () -> None
"""
Schedule processing at a later time.
"""
if self.__state == SignalManager.Running and \
not self.__update_timer.isActive():
self.__update_timer.start()
def __maybe_emit_finished(self):
if self.__has_finished: # already emitted finished
return
if any(chain(self.active_nodes(), self.blocking_nodes(),
self.pending_nodes())):
return
self.__has_finished = True
self.finished.emit()
def post_update_request(self):
"""
Schedule an update pass.
Call this method whenever:
* a node's outputs change (note that this is already done by `send`)
* any change in the node that influences its eligibility to be picked
for an input update (is_ready, is_blocking ...).
Multiple update requests are merged into one.
"""
self._update()
def set_max_active(self, val: int) -> None:
if self.__max_running != val:
self.__max_running = val
self._update()
def max_active(self) -> int:
value = self.__max_running # type: Optional[int]
if value is None:
value = mapping_get(os.environ, "MAX_ACTIVE_NODES", int, None)
if value is None:
s = QSettings()
s.beginGroup(__name__)
value = s.value("max-active-nodes", defaultValue=1, type=int)
if value < 0:
ccount = os.cpu_count()
if ccount is None:
return 1
else:
return max(1, ccount + value)
else:
return max(1, value)
def can_enable_dynamic(link, value):
# type: (SchemeLink, Any) -> bool
"""
Can the a dynamic `link` (:class:`SchemeLink`) be enabled for`value`.
"""
if LazyValue.is_lazy(value):
value = value.get_value()
return isinstance(value, link.sink_types())
def compress_signals(signals: List[Signal]) -> List[Signal]:
"""
Compress a list of signals by dropping 'stale' signals.
* Multiple consecutive updates are dropped - preserving only the latest,
except when one of the updates had `None` value in which case the
`None` update signal is preserved (by historical convention this meant
a reset of the input for pending nodes). So for instance if a link had:
`1, 2, None, 3` scheduled updates then the list would be compressed
to `None, 3`.
* Updates preceding a Close signal are dropped - only Close is preserved.
See Also
--------
SignalManager.compress_signals
"""
# group by key in reverse order (to preserve order of last update)
groups = group_by_all(reversed(signals), key=lambda sig: (sig.link, sig.id))
out: List[Signal] = []
id_to_index = {id(s): i for i, s in enumerate(signals)}
for _, signals_rev in groups:
signals = compress_single(list(reversed(signals_rev)))
out.extend(reversed(signals))
out = list(reversed(out))
assert all(id(s) in id_to_index for s in out), 'Must preserve signal id'
# maintain relative order of (surviving) signals
return sorted(out, key=lambda s: id_to_index[id(s)])
def compress_single(signals: List[Signal]) -> List[Signal]:
def is_none_update(signal: 'Optional[Signal]') -> bool:
return is_update(signal) and signal is not None and signal.value is None
def is_update(signal: 'Optional[Signal]') -> bool:
return isinstance(signal, Update) or type(signal) is Signal
def is_close(signal: 'Optional[Signal]') -> bool:
return isinstance(signal, Close)
out: List[Signal] = []
# 1.) Merge all consecutive updates
for i, sig in enumerate(signals):
prev = out[-1] if out else None
prev_prev = out[-2] if len(out) > 1 else None
if is_none_update(prev_prev) and is_update(prev) and is_none_update(sig):
# ..., None, X, None --> ..., None
out[-2:] = [sig]
elif is_none_update(prev_prev) and is_update(prev) and is_update(sig):
# ..., None, X, Y -> ..., None, Y
out[-1] = sig
elif is_none_update(prev) and is_none_update(sig):
# ..., None, None -> ..., None
out[-1] = sig
elif is_none_update(prev) and is_update(sig):
# ..., None, X -> ..., None, X
out.append(sig)
elif is_update(prev) and is_update(sig):
# ..., X, Y -> ..., Y
out[-1] = sig
else:
# ..., X -> ..., X
out.append(sig)
signals = out
# Sanity check. There cannot be more then 2 consecutive updates in the
# compressed signals queue.
for i in range(len(signals) - 3):
assert not all(map(is_update, signals[i: i + 3]))
out: List[Signal] = []
# 2.) Drop all Update preceding a Close
for i, sig in enumerate(signals):
prev = out[-1] if out else None
prev_prev = out[-2] if len(out) > 1 else None
if is_update(prev_prev) and is_update(prev) and is_close(sig):
# ..., Y, X, Close --> ..., Close
assert is_none_update(prev_prev)
out[-2:] = [sig]
elif is_update(prev) and is_close(sig):
# ..., X, Close -> ..., Close
out[-1] = sig
else:
# ..., X -> ..., X
out.append(sig)
return out
def expand_node(workflow, node):
# type: (Scheme, SchemeNode) -> List[SchemeNode]
return [link.sink_node
for link in workflow.find_links(source_node=node)
if link.enabled]
def dependent_nodes(scheme, node):
# type: (Scheme, SchemeNode) -> List[SchemeNode]
"""
Return a list of all nodes (in breadth first order) in `scheme` that
are dependent on `node`,
Note
----
This does not include nodes only reachable by disables links.
"""
nodes = list(traverse_bf(node, partial(expand_node, scheme)))
assert nodes[0] is node
# Remove the first item (`node`).
return nodes[1:]