Source code for ewoksdata.data.nexus
import os
from contextlib import contextmanager
from typing import Union, Iterator, Optional, Tuple
import h5py
from silx.io import h5py_utils
from .url import DataUrl
from .url import as_dataurl
[docs]
def ensure_nxclass(group: h5py.Group) -> None:
if group.attrs.get("NX_class"):
return
groups = [s for s in group.name.split("/") if s]
n = len(groups)
if n == 0:
group.attrs["NX_class"] = "NXroot"
elif n == 1:
group.attrs["NX_class"] = "NXentry"
else:
group.attrs["NX_class"] = "NXcollection"
[docs]
def select_default_plot(nxdata: h5py.Group) -> None:
parent = nxdata.parent
for name in nxdata.name.split("/")[::-1]:
if not name:
continue
parent.attrs["default"] = name
parent = parent.parent
[docs]
def create_url(url: str, **open_options) -> DataUrl:
url = as_dataurl(url)
filename = url.file_path()
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
open_options.setdefault("mode", "a")
with h5py_utils.open_item(filename, "/", **open_options) as parent:
h5item, _ = _create_h5group(parent, url.data_path())
return as_dataurl(f"{filename}::{h5item.name}")
[docs]
def get_nxentry(h5item: Union[h5py.Dataset, h5py.Group]):
parts = [s for s in h5item.name.split("/") if s]
if parts:
return h5item.file[parts[0]]
raise ValueError("HDF5 item must be part of an NXentry")
[docs]
@contextmanager
def create_nexus_group(
url: Union[str, DataUrl],
retry_timeout=None,
retry_period=None,
default_levels: Optional[Tuple[str]] = None,
**open_options,
) -> Iterator[Tuple[h5py.Group, bool]]:
"""
:yields: (h5group, already_existed)
"""
url = as_dataurl(url)
filename = url.file_path()
itemname = url.data_path()
if not itemname:
itemname = "/"
dirname = os.path.dirname(filename)
if dirname:
os.makedirs(dirname, exist_ok=True)
open_options.setdefault("mode", "a")
with h5py_utils.open_item(
filename,
"/",
retry_timeout=retry_timeout,
retry_period=retry_period,
**open_options,
) as root:
yield _create_h5group(root, itemname, default_levels=default_levels)
def _create_h5group(
parent, data_path: Optional[str], default_levels: Optional[Tuple[str]] = None
) -> Tuple[h5py.Group, bool]:
"""
:yields: (h5group, already_existed)
"""
if not data_path:
data_path = ""
groups = [""] + [s for s in data_path.split("/") if s]
if default_levels:
default_levels = [""] + list(default_levels)
else:
default_levels = ["", "results"]
if len(groups) < len(default_levels):
groups += default_levels[len(groups) :]
data_path = "/".join(groups)
groups[0] = "/"
create = False
ensure_nxclass(parent)
for group in groups:
if group in parent:
parent = parent[group]
else:
parent = parent.create_group(group)
create = True
ensure_nxclass(parent)
return parent, not create