Source code for ewoksorange.bindings.taskexecutor_queue

from typing import Iterable
from queue import Queue

from AnyQt.QtCore import QObject
from AnyQt.QtCore import pyqtSignal as Signal

from .taskexecutor import ThreadedTaskExecutor


[docs]class TaskExecutorQueue(QObject, Queue): """Processing Queue with a First In, First Out behavior""" sigComputationStarted = Signal() """Signal emitted when a computation is started""" sigComputationEnded = Signal() """Signal emitted when a computation is ended""" def __init__(self, ewokstaskclass): super().__init__() self._task_executor = _ThreadedTaskExecutor(ewokstaskclass=ewokstaskclass) self._task_executor.finished.connect(self._process_ended) self._available = True """Simple thread to know if we can do some processing and avoid to mix thinks with QSignals and different threads """ @property def is_available(self) -> bool: return self._available
[docs] def add(self, **kwargs): """Add a task `ewokstaskclass` execution request""" super().put(kwargs) if self.is_available: self._process_next()
def _process_next(self): if Queue.empty(self): return self._available = False self._task_executor.create_task(**Queue.get(self)) if self._task_executor.has_task: self.sigComputationStarted.emit() self._task_executor.start() else: self._task_executor.finished.emit() def _process_ended(self): self._process_ended_direct(self.sender()) def _process_ended_direct(self, task_executor: "_ThreadedTaskExecutor"): for callback in task_executor.callbacks: callback() self.sigComputationEnded.emit() self._available = True if self.is_available: self._process_next()
[docs] def stop(self): self._task_executor.finished.disconnect(self._process_ended) while not self.empty(): self.get() self._task_executor.stop(wait=True) self._task_executor = None
@property def current_task(self): return self._task_executor.current_task
class _ThreadedTaskExecutor(ThreadedTaskExecutor): """Processing thread with some information on callbacks to be executed""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__callbacks = tuple() def create_task( self, _callbacks: Iterable = tuple(), _log_missing_inputs: bool = False, **kwargs ): kwargs["log_missing_inputs"] = _log_missing_inputs super().create_task(**kwargs) self.__callbacks = _callbacks @property def callbacks(self): """Methods to be executed by the thread once the computation is done""" return self.__callbacks