Source code for ewoksdata.data.hdf5.dataset_writer

import time
from typing import Optional, List

import h5py
import numpy
from numpy.typing import ArrayLike

from .types import StrictPositiveIntegral
from .config import guess_dataset_config


class _DatasetWriterBase:
    def __init__(
        self,
        parent: h5py.Group,
        name: str,
        attrs: Optional[dict] = None,
        flush_period: Optional[float] = None,
    ) -> None:
        self._parent = parent
        self._name = name
        self._attrs = attrs
        self._dataset_name = f"{parent.name}/{name}"
        self._dataset = None
        self._flush_period = flush_period
        self._last_flush = None

    @property
    def dataset_name(self) -> str:
        return self._dataset_name

    def __enter__(self) -> "_DatasetWriterBase":
        return self

    def __exit__(self, *args) -> None:
        self.flush_buffer()

    @property
    def dataset(self) -> Optional[h5py.Dataset]:
        return self._dataset

    def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset:
        raise NotImplementedError

    def flush_buffer(self, align: bool = False) -> bool:
        raise NotImplementedError

    def _flush_time_expired(self) -> bool:
        if self._flush_period is None:
            return False
        if self._last_flush is None:
            self._last_flush = time.time()
            return False
        return (time.time() - self._last_flush) >= self._flush_period


[docs]class DatasetWriter(_DatasetWriterBase): def __init__( self, parent: h5py.Group, name: str, npoints: Optional[StrictPositiveIntegral] = None, attrs: Optional[dict] = None, flush_period: Optional[float] = None, ) -> None: super().__init__(parent, name, attrs=attrs, flush_period=flush_period) self._npoints = npoints self._buffer: List[ArrayLike] = list() self._npoints_added: int = 0 self._chunked: bool = False self._nchunk: int = 0 self._nflushed: int = 0 def _create_dataset(self, first_data_point: numpy.ndarray) -> h5py.Dataset: scan_shape = (self._npoints,) detector_shape = first_data_point.shape dtype = first_data_point.dtype if self._npoints is None: max_shape = scan_shape + detector_shape shape = (1,) + first_data_point.shape else: max_shape = None shape = scan_shape + first_data_point.shape options = guess_dataset_config( scan_shape, detector_shape, dtype=dtype, max_shape=max_shape ) options["shape"] = shape options["dtype"] = dtype options["fillvalue"] = numpy.nan # converts to 0 for integers if max_shape: options["maxshape"] = max_shape if options["chunks"]: self._chunked = True self._nchunk = options["chunks"][0] dset = self._parent.create_dataset(self._name, **options) if self._attrs: dset.attrs.update(self._attrs) return dset @property def npoints_added(self) -> int: return self._npoints_added
[docs] def add_point(self, data: ArrayLike) -> bool: if self._dataset is None: self._dataset = self._create_dataset(data) self._buffer.append(data) self._npoints_added += 1 return self.flush_buffer(align=True)
[docs] def add_points(self, data: ArrayLike) -> bool: if self._dataset is None: self._dataset = self._create_dataset(data[0]) self._buffer.extend(data) self._npoints_added += len(data) return self.flush_buffer(align=True)
[docs] def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush nbuffer = len(self._buffer) if self._flush_time_expired(): nflush = nbuffer elif align and self._chunked: n = nbuffer + (self._nflushed % self._nchunk) nflush = n // self._nchunk * self._nchunk nflush = min(nflush, nbuffer) else: nflush = nbuffer if nflush == 0: return False # Enlarge the dataset when needed nalloc = self._dataset.shape[0] istart = self._nflushed nflushed = istart + nflush if self._chunked and nflushed > nalloc: self._dataset.resize(nflushed, axis=0) # Move data from memory to HDF5 self._dataset[istart : istart + nflush] = self._buffer[:nflush] self._buffer = self._buffer[nflush:] self._nflushed = nflushed self._last_flush = time.time() return True
[docs]class StackDatasetWriter(_DatasetWriterBase): def __init__( self, parent: h5py.Group, name: str, npoints: Optional[StrictPositiveIntegral] = None, nstack: Optional[StrictPositiveIntegral] = None, attrs: Optional[dict] = None, flush_period: Optional[float] = None, ) -> None: super().__init__(parent, name, attrs=attrs, flush_period=flush_period) self._npoints = npoints self._nstack = nstack self._buffers: List[List[ArrayLike]] = list() self._chunked: bool = False self._nchunk: ArrayLike = numpy.zeros(2, dtype=int) self._nflushed: ArrayLike = numpy.array(list(), dtype=int) def _create_dataset( self, first_data_point: numpy.ndarray, stack_index: int ) -> h5py.Dataset: scan_shape = (self._nstack, self._npoints) detector_shape = first_data_point.shape dtype = first_data_point.dtype if self._npoints is None or self._nstack is None: max_shape = scan_shape + detector_shape shape = (stack_index + 1, 1) + first_data_point.shape else: max_shape = None shape = scan_shape + first_data_point.shape options = guess_dataset_config( scan_shape, detector_shape, dtype=dtype, max_shape=max_shape ) options["shape"] = shape options["dtype"] = dtype options["fillvalue"] = numpy.nan # converts to 0 for integers if max_shape: options["maxshape"] = max_shape if options["chunks"]: self._chunked = True self._nchunk = numpy.array(options["chunks"][:2], dtype=int) dset = self._parent.create_dataset(self._name, **options) if self._attrs: dset.attrs.update(self._attrs) return dset def _get_buffer(self, stack_index: int) -> List[ArrayLike]: for _ in range(max(stack_index - len(self._buffers) + 1, 0)): self._buffers.append(list()) self._nflushed = numpy.append(self._nflushed, 0) return self._buffers[stack_index]
[docs] def add_point(self, data: ArrayLike, stack_index: int) -> bool: if self._dataset is None: self._dataset = self._create_dataset(data, stack_index) buffer = self._get_buffer(stack_index) buffer.append(data) return self.flush_buffer(align=True)
[docs] def add_points(self, data: ArrayLike, stack_index: int) -> bool: if self._dataset is None: self._dataset = self._create_dataset(data[0], stack_index) buffer = self._get_buffer(stack_index) buffer.extend(data) return self.flush_buffer(align=True)
[docs] def flush_buffer(self, align: bool = False) -> bool: # Determine how many points to flush for each buffer in the stack nbuffer = numpy.array([len(buffer) for buffer in self._buffers]) nchunk_dim0, nchunk_dim1 = self._nchunk[:2] if self._flush_time_expired(): nflush = nbuffer elif align and self._chunked: n = nbuffer + (self._nflushed % nchunk_dim1) nflush = n // nchunk_dim1 * nchunk_dim1 nflush = numpy.minimum(nflush, nbuffer) for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0): nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] = min( nflush[i0_chunk0 : i0_chunk0 + nchunk_dim0] ) else: nflush = nbuffer if not any(nflush): return False # Enlarge the dataset when needed nalloc = self._dataset.shape[:2] istart = self._nflushed nflushed = istart + nflush nalloc_new = numpy.array([len(nbuffer), max(nflushed)]) if self._chunked and any(nalloc_new > nalloc): for axis, n in enumerate(nalloc_new): self._dataset.resize(n, axis=axis) # Move data from memory to HDF5 if nchunk_dim0 == 0: nchunk_dim0 = len(nbuffer) for i0_chunk0 in range(0, len(nbuffer), nchunk_dim0): idx_dim0 = slice(i0_chunk0, i0_chunk0 + nchunk_dim0) nflush_dim1 = nflush[idx_dim0] istart0_dim1 = istart[idx_dim0] buffers = self._buffers[idx_dim0] if all(nflush_dim1 == nflush_dim1[0]) and all( istart0_dim1 == istart0_dim1[0] ): data = [buffer[: nflush_dim1[0]] for buffer in buffers] idx_dim1 = slice(istart0_dim1[0], istart0_dim1[0] + nflush_dim1[0]) self._dataset[idx_dim0, idx_dim1] = data else: for buffer, i_dim0, istart_dim1, n_dim1 in zip( buffers, range(i0_chunk0, i0_chunk0 + nchunk_dim0), istart0_dim1, nflush_dim1, ): self._dataset[i_dim0, istart_dim1 : istart_dim1 + n_dim1, ...] = ( buffer[:n_dim1] ) self._buffers = [ buffer[n_dim1:] for buffer, n_dim1 in zip(self._buffers, nflush) ] self._nflushed = nflushed self._last_flush = time.time() return True