Source code for ewoksutils.logging_utils.sqlite3

import time
import sqlite3
from typing import Any, Dict, List, Optional, Sequence

from ewoksutils import sqlite3_utils
from .connection import ConnectionHandler


Sqlite3RecordType = List[Any]


[docs]class Sqlite3Handler(ConnectionHandler): def __init__(self, uri: str, table: str, field_types: Dict): self._uri = uri self._field_sql_types = sqlite3_utils.python_to_sql_types(field_types) self._ensure_table_query = sqlite3_utils.ensure_table_query( table, self._field_sql_types ) self._insert_row_query = sqlite3_utils.insert_query( table, len(self._field_sql_types) ) super().__init__() def _connect(self, timeout=1) -> None: try: conn = sqlite3.connect( self._uri, timeout=timeout, uri=True, check_same_thread=False ) self._sql_query(self._ensure_table_query, conn=conn) except (OSError, TimeoutError): self._connection = None else: self._connection = conn def _disconnect(self) -> None: self._connection.close() def _send_serialized_record(self, values: Optional[Sqlite3RecordType]): if values: self._sql_query(self._insert_row_query, values) def _serialize_record(self, record) -> Optional[Sqlite3RecordType]: lst = list() for field, sql_type in self._field_sql_types.items(): value = getattr(record, field, None) lst.append(sqlite3_utils.serialize(value, sql_type)) return lst def _sql_query( self, sql: str, parameters: Sequence = tuple(), conn=None, timeout=1 ) -> None: if conn is None: conn = self._connection exception = None t0 = time.time() while True: try: conn.execute(sql, parameters) break except sqlite3.OperationalError as e: exception = e t1 = time.time() if timeout is not None and (t1 - t0) > timeout: raise TimeoutError("cannot execute SQL query") from exception while True: try: conn.commit() break except sqlite3.OperationalError as e: exception = e t1 = time.time() if timeout is not None and (t1 - t0) > timeout: raise TimeoutError("cannot commit SQL query") from exception