ewoksjob.events.readers.base.EwoksEventReader

class ewoksjob.events.readers.base.EwoksEventReader[source]

Bases: object

Base class for receiving ewoks events on the client side.

close()[source]
static dereference_data_uris(event)[source]
Parameters:

event (Dict[str, str]) –

Return type:

None

static event_passes_filter(event, starttime=None, endtime=None)[source]
Parameters:
  • event (Dict[str, str]) –

  • starttime (Optional[datetime]) –

  • endtime (Optional[datetime]) –

Return type:

bool

get_events(**filters)[source]

Returns all currently available events matching the filter.

Return type:

Iterable[Dict[str, str]]

get_events_with_variables(*args, **kwargs)[source]

get_events with URI dereferencing.

Return type:

Iterable[Dict[str, str]]

get_full_job_events(**filters)[source]

Returns events grouped by “job_id”. When one event matches the filter, all events with the “job_id” are returned.

Return type:

Iterable[Tuple[Dict[str, str]]]

get_full_job_events_with_variables(**filters)[source]

get_full_job_events with URI dereferencing.

Return type:

Iterable[Tuple[Dict[str, str]]]

poll_events(timeout=None, stop_event=None, interval=0.1, **filters)[source]

Yield events matching the filter until timeout is reached.

Parameters:

stop_event (Optional[Event]) –

Return type:

Iterable[Dict[str, str]]

poll_events_with_variables(*args, **kwargs)[source]

get_events with URI dereferencing.

Return type:

Iterable[Dict[str, str]]

static split_filter(starttime=None, endtime=None, **is_equal_filter)[source]

Splits the filter

to be applied on the list of events fetched from the database

Parameters:
  • starttime (Optional[datetime]) –

  • endtime (Optional[datetime]) –

Return type:

Tuple[dict, dict]

wait_events(timeout=None, stop_event=None, **filters)[source]

Yield events matching the filter until timeout is reached.

Parameters:

stop_event (Optional[Event]) –

Return type:

Iterable[Dict[str, str]]

wait_events_with_variables(*args, **kwargs)[source]

get_events with URI dereferencing.

Return type:

Iterable[Dict[str, str]]