Source code for ewoksorange.tests.test_task_executor
from AnyQt.QtCore import QObject
from ewoksorange.bindings.taskexecutor import TaskExecutor
from ewoksorange.bindings.taskexecutor import ThreadedTaskExecutor
from ewoksorange.bindings.taskexecutor_queue import TaskExecutorQueue
from ewoksorange.bindings.qtapp import QtEvent
from ewokscore.tests.examples.tasks.sumtask import SumTask
[docs]def test_task_executor():
executor = TaskExecutor(SumTask)
assert not executor.has_task
assert not executor.succeeded
executor.create_task(inputs={"a": 1, "b": 2})
assert executor.has_task
assert not executor.succeeded
executor.execute_task()
assert executor.succeeded
results = {k: v.value for k, v in executor.output_variables.items()}
assert results == {"result": 3}
[docs]def test_threaded_task_executor(qtapp):
finished = QtEvent()
def finished_callback():
finished.set()
executor = ThreadedTaskExecutor(ewokstaskclass=SumTask)
executor.finished.connect(finished_callback)
assert not executor.has_task
assert not executor.succeeded
executor.create_task(inputs={"a": 1, "b": 2})
assert executor.has_task
assert not executor.succeeded
executor.start()
assert finished.wait(timeout=3)
assert executor.succeeded
results = {k: v.value for k, v in executor.output_variables.items()}
assert results == {"result": 3}
executor.finished.disconnect(finished_callback)
[docs]def test_threaded_task_executor_queue(qtapp):
class MyObject(QObject):
def __init__(self):
self.results = None
self.finished = QtEvent()
def finished_callback(self):
# task_executor = self.sender() # Doesn't work for unknown reasons
task_executor = executor._task_executor
self.results = {
k: v.value for k, v in task_executor.output_variables.items()
}
self.finished.set()
obj = MyObject()
executor = TaskExecutorQueue(ewokstaskclass=SumTask)
executor.add(inputs={"a": 1, "b": 2}, _callbacks=(obj.finished_callback,))
assert obj.finished.wait(timeout=3)
assert obj.results == {"result": 3}