from numbers import Integral, Real
import json
from datetime import datetime
from typing import Any, Dict, Iterator, Union, Optional
import sqlite3
from .datetime_utils import fromisoformat
[docs]def ensure_table_query(table: str, field_sql_types: Dict[str, str]) -> str:
s = f"CREATE TABLE IF NOT EXISTS {table}"
if not field_sql_types:
return s
lst = [f"{k} {v}" for k, v in field_sql_types.items()]
columns = ", ".join(lst)
return f"{s} ({columns})"
[docs]def insert_query(table: str, nfields: int):
values = ("?," * nfields)[:-1]
return f"INSERT INTO {table} VALUES({values})"
[docs]def python_to_sql_type(value: Any) -> str:
if isinstance(value, (Integral, bool)):
return "INTEGER"
elif isinstance(value, Real):
return "REAL"
elif isinstance(value, (str, datetime)):
return "TEXT"
else:
return "BLOB"
[docs]def python_to_sql_types(field_types: Optional[Dict]) -> dict:
if not field_types:
return dict()
return {k: python_to_sql_type(v) for k, v in field_types.items()}
[docs]def serialize(value: Any, sql_type: Optional[str] = None):
if value is not None and sql_type is not None:
vsql_type = python_to_sql_type(value)
if sql_type != vsql_type:
raise TypeError(f"value {value} does not have SQL type {sql_type}")
if isinstance(value, (Integral, Real, bool, str)):
return value
elif isinstance(value, datetime):
return value.isoformat()
else:
return json.dumps(value).encode()
def _select_serialize(value: Any, sql_type: Optional[str] = None):
sql_value = serialize(value, sql_type)
if isinstance(sql_value, str):
return f"'{sql_value}'"
return sql_value
[docs]def deserialize(sql_value, field_type: Optional[str] = None):
if isinstance(sql_value, bytes):
sql_value = sql_value.decode()
if sql_value == "null" or sql_value is None:
return None
elif isinstance(field_type, bool):
return bool(sql_value)
elif isinstance(field_type, (Integral, Real, str)):
return sql_value
elif isinstance(field_type, datetime):
return fromisoformat(sql_value)
else:
return json.loads(sql_value)
[docs]def select(
conn,
table: str,
field_types: Optional[Dict] = None,
sql_types: Optional[Dict] = None,
starttime: Optional[Union[str, datetime]] = None,
endtime: Optional[Union[str, datetime]] = None,
**is_equal_filter,
) -> Iterator[dict]:
cursor = conn.cursor()
if is_equal_filter:
if sql_types is None:
sql_types = python_to_sql_types(field_types)
conditions = [
f"{k} = {_select_serialize(v, sql_types.get(k))}"
for k, v in is_equal_filter.items()
]
else:
conditions = list()
if starttime:
if isinstance(starttime, str):
starttime = fromisoformat(starttime)
conditions.append(f"time >= '{starttime.isoformat()}'")
if endtime:
if isinstance(endtime, str):
endtime = fromisoformat(endtime)
conditions.append(f"time <= '{endtime.isoformat()}'")
if conditions:
search_condition = " AND ".join(conditions)
query = f"SELECT * FROM {table} WHERE {search_condition}"
else:
query = f"SELECT * FROM {table}"
try:
cursor.execute(query)
except sqlite3.OperationalError as e:
if "no such table" in str(e):
return
rows = cursor.fetchall()
conn.commit()
if cursor.description is None:
return
fields = [col[0] for col in cursor.description]
if field_types is None:
field_types = dict()
for values in rows:
yield {k: deserialize(v, field_types.get(k)) for k, v in zip(fields, values)}