Source code for ewoksxrpd.tasks.sum

import os
import time
from contextlib import ExitStack
from typing import Optional, Iterator, Generator, Tuple, Any

import h5py
from ewokscore import Task
from blissdata.h5api import dynamic_hdf5
from ewoksxrpd.tasks.data_access import TaskWithDataAccess

from .utils.sum_utils import ImageSum, generate_range, save_sum
from .utils.data_utils import hdf5_url

SUM_TYPES = ["per_scan", "all_scans", "both"]


[docs]class SumBlissScanImages( TaskWithDataAccess, input_names=["filename", "scan", "detector_name", "output_filename"], optional_input_names=[ "monitor_name", "subscan", "scan_memory_url", "output_process", "background_step", ], output_names=["output_uri", "monitor"], ): """Sum images of a single camera of a single Bliss scan"""
[docs] def run(self): with ExitStack() as stack: self._run_in_stack(stack)
def _run_in_stack(self, stack: ExitStack): filename: str = self.inputs.filename scan: int = self.inputs.scan subscan = self.get_input_value("subscan", 1) detector_name: str = self.inputs.detector_name output_filename: str = self.inputs.output_filename output_nxentry_url = hdf5_url(output_filename, f"/{scan}.{subscan}") monitor_name: Optional[str] = self.get_input_value("monitor_name", None) output_nxprocess_name: str = self.get_input_value("output_process", "sum") background_step: bool = self.get_input_value("background_step", 1) if self.inputs.scan_memory_url: raise NotImplementedError("data from memory not supported yet") else: lima_iterator_context = self.iter_bliss_data( filename, scan, lima_names=[detector_name], subscan=subscan, ) lima_iterator = stack.enter_context(lima_iterator_context) with dynamic_hdf5.File( filename, lima_names=[detector_name], **self.get_retry_options() ) as root: data_name = f"{scan}.{subscan}/instrument/{detector_name}/data" # getting the shape should block until the end of the scan dataset_shape = root[data_name].shape nb_points_in_scan = dataset_shape[0] if monitor_name: counter_iterator_context = self.iter_bliss_data( filename, scan, counter_names=[monitor_name], lima_names=[], subscan=subscan, ) counter_iterator = stack.enter_context(counter_iterator_context) else: counter_iterator = ( {monitor_name: None} for _ in range(nb_points_in_scan) ) with self.open_h5item(output_nxentry_url, mode="a", create=True) as nxentry: nxprocess = nxentry.create_group(output_nxprocess_name) nxprocess.attrs["NX_class"] = "NXprocess" nxprocess.attrs["default"] = "results" nxdata = nxprocess.create_group("results") nxdata.attrs["NX_class"] = "NXdata" scan_sum = None summed_indices = [] tstart = time.time() for scan_index, ctrdata in iterate_scan_with_skip( counter_iterator, background_step ): limadata = next(lima_iterator) image = limadata[detector_name] if scan_sum is None: scan_sum = ImageSum(image.shape) scan_sum.add_to_sum(image, monitor=ctrdata[monitor_name]) summed_indices.append(scan_index) if scan_sum is None: tend = time.time() raise RuntimeError( f"No scan data yielded within {tend-tstart:03f} from {filename}::/{scan}.{subscan}" ) image_dset, monitor_dset = save_sum( nxdata, name=f"Scan{scan}-Images{summed_indices[0]}-{summed_indices[-1]}", image_sum=scan_sum, ) self.outputs.output_uri = f"{nxdata.file.filename}::{image_dset.name}" if monitor_dset is not None: self.outputs.monitor = scan_sum.summed_monitor else: self.outputs.monitor = None
[docs]def iterate_scan_with_skip( scan_iterator: Iterator[Any], step: int ) -> Generator[Tuple[int, Any], None, None]: """ :param scan_iterator: iterate over all scan points :param step: -1 means no skipping, 0 means skipping the first, N>0 means skipping every N points starting from 0 :yields: the non-skipped scan index and corresponding data """ if step < 0: # no skipping for scan_index, data in enumerate(scan_iterator): yield scan_index, data elif step == 0: # skip the first for scan_index, data in enumerate(scan_iterator): if scan_index == 0: continue yield scan_index, data else: # skip every N points, starting from zero block_size = step + 1 for scan_index, data in enumerate(scan_iterator): if (scan_index % block_size) == 0: continue yield scan_index, data
[docs]class SumImages( Task, input_names=["filename", "detector_name", "output_filename"], optional_input_names=[ "start_scan", "end_scan", "start_image", "end_image", "block_size", "monitor_name", "output_entry", "output_process", "sum_type", ], output_names=["output_uris", "monitor_uris"], ): """Sum images of a single camera from a Bliss scan file For each scan, images are added in blocks of `block_size` images (one block with all images by default). The result contains: * the block sums when sum_type=per_scan or sum_type=both * the sum of the block sums when sum_type=all_scans or sum_type=both """
[docs] def run(self): filename: str = self.inputs.filename detector_name: str = self.inputs.detector_name output_filename: str = self.inputs.output_filename start_scan: int = self.get_input_value("start_scan", 1) end_scan: Optional[int] = self.get_input_value("end_scan", None) start_image: int = self.get_input_value("start_image", 0) end_image: Optional[int] = self.get_input_value("end_image", None) block_size: Optional[int] = self.get_input_value("block_size", None) monitor_name: Optional[str] = self.get_input_value("monitor_name", None) output_entry: str = self.get_input_value("output_entry", "processing") output_process: str = self.get_input_value("output_process", "sum") sum_type: str = self.get_input_value("sum_type", "per_scan") if sum_type not in SUM_TYPES: raise TypeError( f"sum_type must be one of the following values: {SUM_TYPES}. Got {sum_type} instead." ) save_scan_sums = sum_type == "per_scan" or sum_type == "both" save_full_sum = sum_type == "all_scans" or sum_type == "both" same_h5file = os.path.abspath(os.path.normpath(filename)) == os.path.abspath( os.path.normpath(output_filename) ) if same_h5file: mode = "a" else: mode = "r" with h5py.File(filename, mode=mode) as h5infile: nscans = len(h5infile) scan_range = generate_range(start_scan, end_scan, nscans + 1) first_scan_name = list(h5infile.keys())[0] nimages, *detector_shape = h5infile[ f"{first_scan_name}/measurement/{detector_name}" ].shape image_range = list(generate_range(start_image, end_image, nimages)) scan_sum = ImageSum(detector_shape) full_sum = ImageSum(detector_shape) with h5py.File(output_filename, mode="a") as h5outfile: out_entry = h5outfile.require_group(output_entry) out_entry.attrs.setdefault("NX_class", "NXentry") out_entry.attrs["default"] = output_process out_process = out_entry.create_group(output_process) out_process.attrs["NX_class"] = "NXprocess" out_process.attrs["default"] = "results" out_results = out_process.create_group("results") out_results.attrs["NX_class"] = "NXdata" output_uris: list[str] = [] monitor_uris: list[str] = [] for scan_number in scan_range: scan_images = h5infile[ f"{scan_number}.1/measurement/{detector_name}" ] scan_monitor = ( h5infile[f"{scan_number}.1/measurement/{monitor_name}"] if monitor_name else None ) assert isinstance(scan_images, h5py.Dataset) if scan_monitor is not None: assert isinstance(scan_monitor, h5py.Dataset) for image_number in image_range: image = scan_images[image_number] monitor = scan_monitor[image_number] if scan_monitor else None scan_sum.add_to_sum(image, monitor) full_sum.add_to_sum(image, monitor) if save_scan_sums and scan_sum.nb_images == block_size: name = f"Scan{scan_number}-Images{image_number - scan_sum.nb_images + 1}-{image_number}" image_dset, monitor_dset = save_sum( out_results, name=name, image_sum=scan_sum ) output_uris.append(f"{output_filename}::{image_dset.name}") if monitor_dset is not None: monitor_uris.append( f"{output_filename}::{monitor_dset.name}" ) # Move to next sum scan_sum.reset() if save_scan_sums and scan_sum.nb_images > 0: name = f"Scan{scan_number}-Images{image_number - scan_sum.nb_images + 1}-{image_number}" image_dset, monitor_dset = save_sum( out_results, name=name, image_sum=scan_sum ) output_uris.append(f"{output_filename}::{image_dset.name}") if monitor_dset is not None: monitor_uris.append( f"{output_filename}::{monitor_dset.name}" ) if save_full_sum: name = f"Sum of scans {scan_range.start} to {scan_range.stop - scan_range.step}" image_dset, monitor_dset = save_sum( out_results, name=name, image_sum=full_sum ) output_uris.append(f"{output_filename}::{image_dset.name}") if monitor_dset is not None: monitor_uris.append(f"{output_filename}::{monitor_dset.name}") self.outputs.output_uris = output_uris if monitor_uris: self.outputs.monitor_uris = monitor_uris else: self.outputs.monitor_uris = None