Getting started#
Installation#
Install on the client side
pip install ewoksjob[redis,sql]
Install on the worker side
pip install ewoksjob[worker,redis,sql]
The redis and sql options are needed when using either for messaging or data transfer.
Configuration#
Both the client and the worker(s) need to know where the celery messages and results are stored
# SQLite backend
broker_url = f"sqla+sqlite:///path/to/celery.db"
result_backend = f"db+sqlite:///path/to/celery_results.db"
# Redis backend
broker_url = "redis://localhost:10003/3"
result_backend = "redis://localhost:10003/4"
# RabbitMQ backend
broker_url = f"pyamqp://guest@localhost//"
result_backend = "rpc://"
result_persistent = True
Other backends for results are supported (mongo, memcached). Other configurations as available, like the serialization of results (json by default)
result_serializer = "pickle"
accept_content = ["application/json", "application/x-python-serialize"]
result_expires = 600
task_remote_tracebacks = True
enable_utc = False
The Celery documentation describes the different parameters available.
The configuration can be declared in a
- Python module:
myproject.config
- Python file:
/tmp/ewoks/config.py
- Yaml file:
/tmp/ewoks/config.yml
- Beacon yaml file:
beacon:///ewoks/config.yml
beacon://id22:25000/ewoks/config.yml
The configuration URI can be provided as an environment variable
export EWOKS_CONFIG_URI=myproject.config
In case of a Beacon URL that has /ewoks/config.yml
export EWOKS_CONFIG_URI=beacon://hostname:25000/ewoks/config.yml
it is enough to specify the BEACON_HOST environment variable
export BEACON_HOST=hostname:25000
On the worker side, the configuration URI can also be provided as a CLI argument
ewoksjob --config=myproject.config worker
Worker side#
Launch a worker which serves the ewoks application
ewoksjob worker
Client side#
Prepare for sending/receiving ewoks events
# Redis backend
events_url = "redis://localhost:10003/2"
handlers = [
{
"class": "ewoksjob.events.handlers.RedisEwoksEventHandler",
"arguments": [{"name": "url", "value": events_url}],
}
]
# SQLite backend (does not support task monitoring or cancelling)
events_url = f"file://{os.path.join(..., 'ewoks_events.db')}"
handlers = [
{
"class": "ewoksjob.events.handlers.Sqlite3EwoksEventHandler",
"arguments": [{"name": "uri", "value": events_url}],
}
]
reader = instantiate_reader(events_url)
Test workflow
workflow = {
"graph": {"id": "mygraph"},
"nodes": [
{"id": "task1", "task_type": "method", "task_identifier": "numpy.add"},
{"id": "task2", "task_type": "method", "task_identifier": "numpy.add"},
],
"links": [
{
"source": "task1",
"target": "task2",
"data_mapping": [{"source_output": "return_value", "target_input": 0}],
}
],
}
Job arguments are the same as the arguments of ewoks.execute_graph
varinfo = {"root_uri": ..., "scheme": "nexus"}
inputs = [
{"id": "task1", "name": 0, "value": 1},
{"id": "task1", "name": 1, "value": 2},
{"id": "task2", "name": 1, "value": 3},
]
execinfo = {"handlers": handlers}
args = (workflow,)
kwargs = {
"engine": None,
"execinfo": execinfo,
"inputs": inputs,
"varinfo": varinfo,
"outputs": [{"all": False}],
}
Execute workflow and get results
future = submit(args=args, kwargs=kwargs)
job_id = future.task_id
# events could be received in the mean time (see below)
workflow_results = future.get(timeout=3, interval=0.1)
assert workflow_results == {"return_value": 6}
Get intermediate results from ewoks events
results_during_execution = list(reader.get_events(job_id=job_id))
assert len(results_during_execution) == 8 # start/stop for job, workflow and node
# Get start event of node "task1"
result_event = list(
reader.get_events_with_variables(job_id=job_id, node_id="task1", type="start")
)
assert len(result_event) == 1
result_event = result_event[0]
# Get access to all output variables of "task1"
results = result_event["outputs"]
assert results["return_value"].value == 3
Install brokers#
Redis can be installed using the system package manager or conda
apt install redis-server # system package
conda install redis-server # conda package
redis-server
RabbitMQ can be installed using the system package manager or conda
apt install rabbitmq-server # system package
conda install rabbitmq-server # conda package
rabbitmq-server