Source code for ewoksjob.tests.utils
import os
import time
import logging
from types import ModuleType
logger = logging.getLogger(__name__)
[docs]def has_redis() -> bool:
try:
import redis # noqa F401
except ImportError:
return False
with os.popen("redis-server --version") as output:
return bool(output.read())
[docs]def get_result(future, **kwargs):
if hasattr(future, "get"):
return future.get(**kwargs)
else:
return future.result(**kwargs)
[docs]def wait_not_finished(mod: ModuleType, expected_task_ids: set, timeout=3):
"""Wait until all running task ID's are `expected_task_ids`"""
if mod.__name__.endswith("celery") and not has_redis():
time.sleep(0.1)
logger.warning("memory and sqlite do not support task monitoring")
return
t0 = time.time()
while True:
task_ids = set(mod.get_not_finished_task_ids())
if task_ids == expected_task_ids:
return
dt = time.time() - t0
if dt > timeout:
assert task_ids == expected_task_ids, task_ids
time.sleep(0.2)