Getting started

Installation

Install on the client side

pip install ewoksjob[redis,sql]

Install on the worker side

pip install ewoksjob[worker,redis,sql]

The redis and sql options are needed when using either for messaging or data transfer.

Configuration

Both the client and the worker(s) need to know where the celery messages and results are stored

# SQLite backend
broker_url = f"sqla+sqlite:///path/to/celery.db"
result_backend = f"db+sqlite:///path/to/celery_results.db"

# Redis backend
broker_url = "redis://localhost:10003/3"
result_backend = "redis://localhost:10003/4"

# RabbitMQ backend
broker_url = f"pyamqp://guest@localhost//"
result_backend = "rpc://"
result_persistent = True

Other backends for results are supported (mongo, memcached). Other configurations as available, like the serialization of results (json by default)

result_serializer = "pickle"
accept_content = ["application/json", "application/x-python-serialize"]
result_expires = 600
task_remote_tracebacks = True
enable_utc = False

The Celery documentation describes the different parameters available.

The configuration can be declared in a

  • Python module:
    • myproject.config

  • Python file:
    • /tmp/ewoks/config.py

  • Yaml file:
    • /tmp/ewoks/config.yml

  • Beacon yaml file:
    • beacon:///ewoks/config.yml

    • beacon://id22:25000/ewoks/config.yml

The configuration URI can be provided as an environment variable

export EWOKS_CONFIG_URI=myproject.config

In case of a Beacon URL that has /ewoks/config.yml

export EWOKS_CONFIG_URI=beacon://hostname:25000/ewoks/config.yml

it is enough to specify the BEACON_HOST environment variable

export BEACON_HOST=hostname:25000

On the worker side, the configuration URI can also be provided as a CLI argument

ewoksjob --config=myproject.config worker

Worker side

Launch a worker which serves the ewoks application

ewoksjob worker

Client side

Prepare for sending/receiving ewoks events

# Redis backend
events_url = "redis://localhost:10003/2"
handlers = [
    {
        "class": "ewoksjob.events.handlers.RedisEwoksEventHandler",
        "arguments": [{"name": "url", "value": events_url}],
    }
]

# SQLite backend (does not support task monitoring or cancelling)
events_url = f"file://{os.path.join(..., 'ewoks_events.db')}"
handlers = [
    {
        "class": "ewoksjob.events.handlers.Sqlite3EwoksEventHandler",
        "arguments": [{"name": "uri", "value": events_url}],
    }
]

reader = instantiate_reader(events_url)

Test workflow

workflow = {
    "graph": {"id": "mygraph"},
    "nodes": [
        {"id": "task1", "task_type": "method", "task_identifier": "numpy.add"},
        {"id": "task2", "task_type": "method", "task_identifier": "numpy.add"},
    ],
    "links": [
        {
            "source": "task1",
            "target": "task2",
            "data_mapping": [{"source_output": "return_value", "target_input": 0}],
        }
    ],
}

Job arguments are the same as the arguments of ewoks.execute_graph

varinfo = {"root_uri": ..., "scheme": "nexus"}
inputs = [
    {"id": "task1", "name": 0, "value": 1},
    {"id": "task1", "name": 1, "value": 2},
    {"id": "task2", "name": 1, "value": 3},
]
execinfo = {"handlers": handlers}
args = (workflow,)
kwargs = {
    "engine": None,
    "execinfo": execinfo,
    "inputs": inputs,
    "varinfo": varinfo,
    "outputs": [{"all": False}],
}

Execute workflow and get results

future = submit(args=args, kwargs=kwargs)
job_id = future.task_id
# events could be received in the mean time (see below)
workflow_results = future.get(timeout=3, interval=0.1)
assert workflow_results == {"return_value": 6}

Get intermediate results from ewoks events

results_during_execution = list(reader.get_events(job_id=job_id))
assert len(results_during_execution) == 8  # start/stop for job, workflow and node

# Get start event of node "task1"
result_event = list(
    reader.get_events_with_variables(job_id=job_id, node_id="task1", type="start")
)
assert len(result_event) == 1
result_event = result_event[0]

# Get access to all output variables of "task1"
results = result_event["outputs"]
assert results["return_value"].value == 3

Install brokers

Redis can be installed using the system package manager or conda

apt install redis-server    # system package
conda install redis-server  # conda package
redis-server

RabbitMQ can be installed using the system package manager or conda

apt install rabbitmq-server    # system package
conda install rabbitmq-server  # conda package
rabbitmq-server