Source code for ewoksjob.events.readers.redis
import os
import json
import socket
from typing import Iterable
import redis
from .base import EwoksEventReader, EventType
[docs]class RedisEwoksEventReader(EwoksEventReader):
def __init__(self, url: str, **_):
client_name = f"ewoks:reader:{socket.gethostname()}:{os.getpid()}"
self._proxy = redis.Redis.from_url(url, client_name=client_name)
super().__init__()
[docs] def wait_events(self, **kwargs) -> Iterable[EventType]:
yield from self.poll_events(**kwargs)
[docs] def get_events(self, job_id=None, **filters) -> Iterable[EventType]:
is_equal_filter, post_filter = self.split_filter(**filters)
if job_id:
pattern = f"ewoks:{job_id}:*"
else:
pattern = "ewoks:*"
keys = sorted(
self._proxy.scan_iter(pattern), key=lambda x: int(x.decode().split(":")[-1])
)
for key in keys:
event = self._proxy.hgetall(key)
event = {k.decode(): json.loads(v) for k, v in event.items()}
if not self.event_passes_filter(event, **post_filter) or any(
event[k] != v for k, v in is_equal_filter.items()
):
continue
yield event