Source code for ewoksutils.sqlite3_utils

from numbers import Integral, Real
import json
from datetime import datetime
from typing import Any, Dict, Iterator, Union, Optional
import sqlite3
from .datetime_utils import fromisoformat


[docs]def ensure_table_query(table: str, field_sql_types: Dict[str, str]) -> str: s = f"CREATE TABLE IF NOT EXISTS {table}" if not field_sql_types: return s lst = [f"{k} {v}" for k, v in field_sql_types.items()] columns = ", ".join(lst) return f"{s} ({columns})"
[docs]def insert_query(table: str, nfields: int): values = ("?," * nfields)[:-1] return f"INSERT INTO {table} VALUES({values})"
[docs]def python_to_sql_type(value: Any) -> str: if isinstance(value, (Integral, bool)): return "INTEGER" elif isinstance(value, Real): return "REAL" elif isinstance(value, (str, datetime)): return "TEXT" else: return "BLOB"
[docs]def python_to_sql_types(field_types: Optional[Dict]) -> dict: if not field_types: return dict() return {k: python_to_sql_type(v) for k, v in field_types.items()}
[docs]def serialize(value: Any, sql_type: Optional[str] = None): if value is not None and sql_type is not None: vsql_type = python_to_sql_type(value) if sql_type != vsql_type: raise TypeError(f"value {value} does not have SQL type {sql_type}") if isinstance(value, (Integral, Real, bool, str)): return value elif isinstance(value, datetime): return value.isoformat() else: return json.dumps(value).encode()
def _select_serialize(value: Any, sql_type: Optional[str] = None): sql_value = serialize(value, sql_type) if isinstance(sql_value, str): return f"'{sql_value}'" return sql_value
[docs]def deserialize(sql_value, field_type: Optional[str] = None): if isinstance(sql_value, bytes): sql_value = sql_value.decode() if sql_value == "null" or sql_value is None: return None elif isinstance(field_type, bool): return bool(sql_value) elif isinstance(field_type, (Integral, Real, str)): return sql_value elif isinstance(field_type, datetime): return fromisoformat(sql_value) else: return json.loads(sql_value)
[docs]def select( conn, table: str, field_types: Optional[Dict] = None, sql_types: Optional[Dict] = None, starttime: Optional[Union[str, datetime]] = None, endtime: Optional[Union[str, datetime]] = None, **is_equal_filter, ) -> Iterator[dict]: cursor = conn.cursor() if is_equal_filter: if sql_types is None: sql_types = python_to_sql_types(field_types) conditions = [ f"{k} = {_select_serialize(v, sql_types.get(k))}" for k, v in is_equal_filter.items() ] else: conditions = list() if starttime: if isinstance(starttime, str): starttime = fromisoformat(starttime) conditions.append(f"time >= '{starttime.isoformat()}'") if endtime: if isinstance(endtime, str): endtime = fromisoformat(endtime) conditions.append(f"time <= '{endtime.isoformat()}'") if conditions: search_condition = " AND ".join(conditions) query = f"SELECT * FROM {table} WHERE {search_condition}" else: query = f"SELECT * FROM {table}" try: cursor.execute(query) except sqlite3.OperationalError as e: if "no such table" in str(e): return rows = cursor.fetchall() conn.commit() if cursor.description is None: return fields = [col[0] for col in cursor.description] if field_types is None: field_types = dict() for values in rows: yield {k: deserialize(v, field_types.get(k)) for k, v in zip(fields, values)}