Source code for ewoksxrpd.tasks.data_access
import sys
from contextlib import contextmanager
from typing import Iterator, Union
from packaging.specifiers import SpecifierSet
if sys.version_info >= (3, 8):
from importlib.metadata import version
else:
from importlib_metadata import version
import numpy
import h5py
from silx.io.url import DataUrl
from silx.io import h5py_utils
from ewokscore import TaskWithProgress
from ewoksdata.data import nexus
from ewoksdata.data import bliss
from ewoksdata.data.contextiterator import contextiterator
from .utils import data_utils
_LIMA_TEMPLATE_SUPPORTED = version("blissdata") not in SpecifierSet(
"<1.1.0", prereleases=True
)
class TaskWithDataAccess(
TaskWithProgress,
optional_input_names=[
"retry_timeout",
"retry_period",
"lima_url_template",
"lima_url_template_args",
],
register=False,
):
def get_retry_options(self):
retry_timeout = self.get_input_value("retry_timeout", None)
retry_period = self.get_input_value("retry_period", None)
return {"retry_timeout": retry_timeout, "retry_period": retry_period}
def _get_blissdata_options(self):
lima_url_template = self.get_input_value("lima_url_template", None)
lima_url_template_args = self.get_input_value("lima_url_template_args", None)
if _LIMA_TEMPLATE_SUPPORTED:
return {
"lima_url_template": lima_url_template,
"lima_url_template_args": lima_url_template_args,
**self.get_retry_options(),
}
if lima_url_template:
raise ValueError("'lima_url_template' requires blissdata>=1.1.0")
if lima_url_template_args:
raise ValueError("'lima_url_template_args' requires blissdata>=1.1.0")
return self.get_retry_options()
@contextmanager
def open_h5item(
self, url: Union[str, DataUrl], create: bool = False, **openoptions
) -> Iterator[Union[h5py.Group, h5py.Dataset, numpy.ndarray]]:
if isinstance(url, DataUrl):
url = DataUrl(url)
retryoptions = self.get_retry_options()
if create:
url = nexus.create_url(url, **retryoptions)
with h5py_utils.open_item(
url.file_path(), url.data_path(), **retryoptions, **openoptions
) as item:
idx = url.data_slice()
if idx is None:
yield item
else:
yield item[idx]
def get_data(self, *args, **kw):
kw.update(self.get_retry_options())
return bliss.get_data(*args, **kw)
def get_image(self, *args, **kw):
kw.update(self.get_retry_options())
return bliss.get_image(*args, **kw)
@contextiterator
def iter_bliss_data(self, *args, **kw):
kw.update(self._get_blissdata_options())
with bliss.iter_bliss_scan_data(*args, **kw) as iterator:
yield from iterator
def iter_bliss_data_from_memory(self, *args, **kw):
kw.update(self.get_retry_options())
yield from bliss.iter_bliss_scan_data_from_memory(*args, **kw)
def link_bliss_scan(self, *args, **kw):
kw.update(self._get_blissdata_options())
return data_utils.link_bliss_scan(*args, **kw)