Source code for ewoksutils.tests.test_logging_utils

import gc
import sqlite3
import logging
from logging.handlers import QueueHandler
from queue import Queue

from ewoksutils.sqlite3_utils import select

from ..logging_utils.connection import ConnectionHandler
from ..logging_utils.sqlite3 import Sqlite3Handler
from ..logging_utils.asyncwrapper import AsyncHandlerWrapper
from ..logging_utils.cleanup import cleanup_logger


[docs]def test_cleanup_logger(): logger = logging.getLogger(__name__) q = Queue() h = QueueHandler(q) logger.addHandler(h) logger.setLevel(logging.INFO) h.setLevel(logging.INFO) logger.info("info message 1") assert q.qsize() == 1 cleanup_logger(__name__) assert q.qsize() == 0
[docs]def test_connection_handler(): connected = None destination = list() expected = list() class Connection: def __init__(self): nonlocal connected connected = True def __del__(self): nonlocal connected connected = False class MyHandler(ConnectionHandler): def _connect(self, timeout=1) -> None: self._connection = Connection() def _disconnect(self) -> None: del self._connection self._connection = None def _send_serialized_record(self, srecord): destination.append(srecord) def _serialize_record(self, record): msg = self.format(record) return record.levelno, msg logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) handler = MyHandler() handler.setLevel(logging.INFO) logger.addHandler(handler) # Check lazy connecting assert not connected assert destination == expected logger.debug("debug message") assert not connected assert destination == expected logger.info("info message 1") expected.append((logging.INFO, "info message 1")) assert connected assert destination == expected # Check closing a handler handler.close() assert not connected # Check reconnect logger.info("info message 2") expected.append((logging.INFO, "info message 2")) assert connected assert destination == expected # Check connection closed when no reference to the handler anymore logger.removeHandler(handler) handler = None while gc.collect(): pass assert not connected handler = MyHandler() handler.setLevel(logging.INFO) logger.addHandler(handler) logger.info("info message 3") expected.append((logging.INFO, "info message 3")) assert connected assert destination == expected # Check connection closed when no reference to the logger anymore handler = None logger = None cleanup_logger(__name__) while gc.collect(): pass assert not connected
[docs]def test_sqlite3_handler(tmpdir): logger = logging.getLogger(__name__) uri = str(tmpdir / "test.db") field_types = {"field1": 0, "field2": "", "field3": True, "field4": 0.0} handler = Sqlite3Handler(uri, "mytable", field_types) logger.addHandler(handler) logger.setLevel(logging.INFO) handler.setLevel(logging.INFO) expected = list() logger.info("message1") expected.append({"field1": None, "field2": None, "field3": None, "field4": None}) logger.info("message2", extra={"field2": "2"}) expected.append({"field1": None, "field2": "2", "field3": None, "field4": None}) logger.info( "message2", extra={"field1": 1, "field2": "2", "field3": True, "field4": 1.1} ) expected.append({"field1": 1, "field2": "2", "field3": True, "field4": 1.1}) with sqlite3.connect(uri, uri=True, check_same_thread=False) as conn: rows = list(select(conn, "mytable", field_types=field_types)) assert rows == expected types = {"field1": int, "field2": str, "field3": bool, "field4": float} for row in rows: for name, value in row.items(): if value is not None: vtype = types[name] assert isinstance(value, vtype), name cleanup_logger(__name__)
[docs]def test_async_wrapper(): logger = logging.getLogger(__name__) q = Queue() h = AsyncHandlerWrapper(QueueHandler(q)) logger.addHandler(h) logger.setLevel(logging.INFO) h.setLevel(logging.INFO) logger.info("message") assert q.get(timeout=3).msg == "message" cleanup_logger(__name__)