Source code for ewoksutils.logging_utils.asyncwrapper

import logging
from queue import Queue, Empty
from logging.handlers import QueueHandler, QueueListener
from typing import Any


[docs]class AsyncHandlerWrapper(QueueHandler): """A handler which blocks too long on handling events can be wrapped by this handler which will queue the logging event and redirect it to the original handler in a separate thread. """ def __init__(self, handler: logging.Handler): queue: Queue[Any] = Queue() self._listener = QueueListener(queue, handler, respect_handler_level=True) self._listener.start() super().__init__(queue) @property def wrapped_handler(self) -> logging.Handler: return self._listener.handlers[0]
[docs] def flush(self): """Dequeue and handle records in the current thread""" # Called by logging.shutdown: atexit callback self.acquire() try: while True: try: record = self._listener.dequeue(block=False) except Empty: return self._listener.handle(record) finally: self.release()
[docs] def close(self): """Stop accepting events, process queued events and stop the listener thread.""" # Called by logging.shutdown: atexit callback super().close() # stop accepting events self._listener.stop() # process the queue and stop listening