Source code for ewoksserver.app.backends.binary_backend

import base64
import mimetypes
from urllib import request
import logging
from pathlib import Path
from typing import Iterator, Union


ResourceIdentifierType = str
ResourceUrlType = Path
ResourceContentType = dict

_logger = logging.getLogger(__name__)


[docs] def root_url(root_url: Union[str, Path, None], category: str) -> ResourceUrlType: if not root_url: root_url = Path(".") elif isinstance(root_url, str): root_url = Path(root_url) return root_url / category
[docs] def resource_identifiers(root: ResourceUrlType) -> Iterator[ResourceIdentifierType]: for url in _resource_urls(root): yield _url_to_identifier(url)
[docs] def resources(root: ResourceUrlType) -> Iterator[ResourceContentType]: for url in _resource_urls(root): yield _load_url(url)
[docs] def resource_exists(root: ResourceUrlType, identifier: ResourceIdentifierType) -> bool: return _identifier_to_url(root, identifier).exists()
def _resource_urls(root: ResourceUrlType) -> Iterator[ResourceUrlType]: for url in root.iterdir(): if _is_resource(url): yield url def _is_resource(url: ResourceUrlType) -> bool: return ( url.is_file() and not url.name.startswith(".") and not url.name.endswith(".py") )
[docs] def save_resource( root: ResourceUrlType, identifier: ResourceIdentifierType, resource: ResourceContentType, ): url = _identifier_to_url(root, identifier) _save_url(url, resource)
[docs] def load_resource( root: ResourceUrlType, identifier: ResourceIdentifierType ) -> ResourceContentType: url = _identifier_to_url(root, identifier) return _load_url(url)
[docs] def delete_resource(root: ResourceUrlType, identifier: ResourceIdentifierType) -> None: url = _identifier_to_url(root, identifier) _delete_url(url)
def _delete_url(url: ResourceUrlType) -> ResourceContentType: _logger.debug("Delete file '%s'", url) url.unlink() def _identifier_to_url(root: ResourceUrlType, identifier: ResourceIdentifierType): return root / identifier def _url_to_identifier(url: ResourceUrlType) -> ResourceIdentifierType: return url.name def _load_url(url: ResourceUrlType) -> ResourceContentType: mimetype, encoding = mimetypes.guess_type(url, strict=True) if mimetype is None: raise ValueError(f"Cannot derive mime type from '{url}'") try: with open(url, "rb") as f: data = f.read() except FileNotFoundError: _logger.error(f"'{url}' not found") raise if not encoding: encoding = "base64" data = base64.b64encode(data).decode() return {"data_url": f"data:{mimetype};{encoding},{data}"} def _save_url(url: ResourceUrlType, resource: ResourceContentType) -> None: _logger.debug("Save file '%s'", url) url.parent.mkdir(parents=True, exist_ok=True) with request.urlopen(resource["data_url"]) as f: data = f.read() with open(url, "wb") as f: f.write(data)