Source code for ewoksid22.rebin

import os
import logging
from ewokscore import Task
import pyopencl
from multianalyzer.app.rebin import rebin_file
from silx.io.h5py_utils import retry, File, safe_top_level_names

from . import dirutils
from .roitools import unscramble_roi_collection

HDF5_PLUGIN_PATH = os.environ.get("HDF5_PLUGIN_PATH")
if HDF5_PLUGIN_PATH:
    del os.environ["HDF5_PLUGIN_PATH"]


logger = logging.getLogger(__name__)


[docs]@retry(retry_period=0.5, retry_timeout=10) def read_data(filename, unscramble_method=None, entries=None, exclude_entries=None): res = {} with File(filename, mode="r") as fh: if entries: entries = set(entries) else: entries = {k for k, v in fh.items() if v.attrs.get("NX_class") == "NXentry"} if exclude_entries: entries -= set(exclude_entries) try: entries = sorted(entries, key=lambda x: float(x)) except TypeError: entries = sorted(entries) entries = [entry for entry in entries if entry.endswith(".1")] print(f"Reading and unscrambling {len(entries)} entries: {entries}") for entry in entries: title = entry + "/title" if title not in fh: print(f"skip scan {entry}: no 'title'") continue end_time = entry + "/end_time" if end_time not in fh: print(f"skip scan {entry}: no 'end_time'") continue title = fh[title][()] try: title = title.decode() except Exception: pass if not title.startswith("fscan") and not title.startswith("f2scan"): print(f"skip scan {entry}: {title}") continue sort_data = {} non_sort_data = {} try: entry_grp = fh[entry] sort_data["x"] = entry_grp[ "instrument/eiger_roi_collection/selection/x" ][()] sort_data["y"] = entry_grp[ "instrument/eiger_roi_collection/selection/y" ][()] sort_data["roicol"] = entry_grp["measurement/eiger_roi_collection"][()] non_sort_data["arm"] = entry_grp["measurement/tth"][()] non_sort_data["mon"] = entry_grp["measurement/mon"][()] non_sort_data["tha"] = entry_grp["instrument/positioners/manom"][()] non_sort_data["thd"] = entry_grp["instrument/positioners/mantth"][()] except KeyError as e: logger.warning(str(e)) continue if unscramble_method == "xy": keys = "x", "y" elif unscramble_method == "yx": keys = "y", "x" else: keys = None if keys: sort_data = unscramble_roi_collection(sort_data, keys) # Check the sorting # with File("/users/opid22/test.h5", "w") as fh: # fh["x"] = sort_data["x"] # fh["y"] = sort_data["y"] res[entry] = {**sort_data, **non_sort_data} for name, val in res[entry].items(): try: print(f"{name} shape = {val.shape}") except TypeError: pass return res
[docs]class ID22Rebin( Task, input_names=["filename", "parsfile"], optional_input_names=[ "entries", "debug", "wavelength", "energy", "step", "range", "phi", "iter", "startp", "endp", "pixel", "width", "delta2theta", "device", "outdirs", "primary_outdir", "outprefix", "unscramble_method", "retry_timeout", ], output_names=["outfile", "entries"], ):
[docs] def run(self): outdirs = self.get_input_value("outdirs", dict()) primary_outdir = self.get_input_value("primary_outdir", None) outdirs = dirutils.prepare_outdirs(outdirs, primary_outdir) options = { "pars": self.inputs.parsfile, "debug": self.get_input_value("debug", False), "wavelength": self.get_input_value("wavelength", None), "energy": self.get_input_value("energy", None), "step": self.get_input_value("step", None), "range": self.get_input_value("range", [float("nan"), float("nan")]), "phi": self.get_input_value("phi", None), "iter": self.get_input_value("iter", None), "startp": self.get_input_value("startp", None), "endp": self.get_input_value("endp", None), "pixel": self.get_input_value("pixel", None), "width": self.get_input_value("width", None), "delta2theta": self.get_input_value("delta2theta", 0.0), "device": self.get_input_value("device", None), } sdelta2theta = str(options["delta2theta"]).replace(".", "") outfilename = ( os.path.splitext(os.path.basename(self.inputs.filename))[0] + f"_w{sdelta2theta}.h5" ) if not self.missing_inputs.outprefix: outfilename = self.inputs.outprefix + "_" + outfilename outfile = dirutils.primary_file(outfilename, outdirs) options["output"] = outfile if options["device"] is not None: options["device"] = str(options["device"]) cl_platforms = pyopencl.get_platforms() for i, p in enumerate(cl_platforms): for j, d in enumerate(p.get_devices()): print(f"{i},{j}: {d}") filename = self.inputs.filename entries = self.get_input_value("entries", None) unscramble_method = self.get_input_value("unscramble_method", "xy") if True: print(f"Unscrambling of ROI collection: {unscramble_method}") retry_timeout = self.get_input_value("retry_timeout", 10) if os.path.exists(outfile): exclude_entries = safe_top_level_names( outfile, retry_timeout=retry_timeout ) else: exclude_entries = None options["hdf5_data"] = read_data( filename, unscramble_method=unscramble_method, entries=entries, exclude_entries=exclude_entries, retry_timeout=retry_timeout, ) else: # Let multianalyzer read the data options["filename"] = filename options["entries"] = entries entries = rebin_file(**options) assert entries is not None, "upgrade multianalyzer" dirutils.copy_file(outfilename, outdirs) self.outputs.outfile = outfile self.outputs.entries = entries