Source code for ewoksfluo.tasks.example_data.scan_data

import os
from typing import Iterator, Tuple, Optional, List, Sequence, Dict

import h5py
import numpy
import imageio
from scipy import ndimage

from . import xrf_spectra
from .monitor import monitor_signal
from .deadtime import apply_dualchannel_signal_processing


[docs]def save_2d_xrf_scans( filename: str, emission_line_groups: List[str], scan_number: int, shape: Tuple[int, int], energy: float = 12, flux: float = 1e7, expo_time: float = 0.1, counting_noise: bool = True, integral_type: bool = True, rois: Sequence = tuple(), ndetectors: int = 1, max_deviation: float = 0, seed: Optional[int] = None, ) -> None: if len(shape) != 2: raise ValueError("Only 2D scans are supported") npoints = shape[0] * shape[1] emission_line_groups = [s.split("-") for s in emission_line_groups] I0_reference = int(flux * expo_time) I0 = (I0_reference * monitor_signal(expo_time, npoints)).reshape(shape, order="F") info = dict() rates = list() for (samz, samy), rate in _iter_2dscan_data( shape, info, nmaps=len(emission_line_groups) + 2, max_deviation=max_deviation, seed=seed, ): rates.append(rate) scatterrates = rates[:2] fluorates = rates[2:] scattergroups = [ xrf_spectra.ScatterLineGroup( "Compton000", (I0 * scatterrates[0]).astype(numpy.uint32) ), xrf_spectra.ScatterLineGroup( "Peak000", (I0 * scatterrates[1]).astype(numpy.uint32) ), ] linegroups = [ xrf_spectra.EmissionLineGroup(element, group, (I0 * rate).astype(numpy.uint32)) for rate, (element, group) in zip(fluorates, emission_line_groups) ] theoretical_spectra, config = xrf_spectra.xrf_spectra( linegroups, scattergroups, energy=energy, flux=flux, elapsed_time=expo_time, ) if integral_type: integral_type = numpy.uint32 else: integral_type = None measured_data = apply_dualchannel_signal_processing( theoretical_spectra, elapsed_time=expo_time, counting_noise=counting_noise, integral_type=integral_type, ) roi_data_theory = dict() roi_data_cor = dict() # I0 and LT corrected for i, roi in enumerate(rois, 1): roi_name = f"roi{i}" idx = Ellipsis, slice(*roi) roi_theory = theoretical_spectra[idx].sum(axis=-1) / I0 * I0_reference roi_meas = measured_data["spectrum"][idx].sum(axis=-1) roi_cor = I0_reference * roi_meas / I0 * expo_time / measured_data["live_time"] roi_data_theory[roi_name] = roi_theory roi_data_cor[roi_name] = roi_cor measured_data[roi_name] = roi_meas with h5py.File(filename, "a") as nxroot: scan_name = f"{scan_number}.1" nxroot.attrs["NX_class"] = "NXroot" nxroot.attrs["creator"] = "ewoksfluo" nxentry = nxroot.require_group(scan_name) nxentry.attrs["NX_class"] = "NXentry" title = f"{info['title']} {expo_time}" if "title" in nxentry: del nxentry["title"] nxentry["title"] = title nxinstrument = nxentry.require_group("instrument") nxinstrument.attrs["NX_class"] = "NXinstrument" measurement = nxentry.require_group("measurement") measurement.attrs["NX_class"] = "NXcollection" for name in ("positioners_start", "positioners"): positioners = nxinstrument.require_group(name) positioners.attrs["NX_class"] = "NXcollection" if "energy" in positioners: del positioners["energy"] positioners["energy"] = energy positioners["energy"].attrs["units"] = "keV" if name == "positioners": continue if "samz" in positioners: del positioners["samz"] positioners["samz"] = samz[0][0] positioners["samz"].attrs["units"] = "um" if "samy" in positioners: del positioners["samy"] positioners["samy"] = samy[0][0] positioners["samy"].attrs["units"] = "um" nxdetector = nxinstrument.require_group("I0") nxdetector.attrs["NX_class"] = "NXdetector" if "data" in nxdetector: del nxdetector["data"] nxdetector["data"] = I0.flatten(order="F") if "I0" not in measurement: measurement["I0"] = h5py.SoftLink(nxdetector["data"].name) positioners = nxinstrument["positioners"] nxpositioner = nxinstrument.require_group("samz") nxpositioner.attrs["NX_class"] = "NXpositioner" if "value" in nxpositioner: del nxpositioner["value"] nxpositioner["value"] = samz.flatten(order="F") nxpositioner["value"].attrs["units"] = "um" if "samz" not in measurement: measurement["samz"] = h5py.SoftLink(nxpositioner["value"].name) if "samz" not in positioners: positioners["samz"] = h5py.SoftLink(nxpositioner["value"].name) nxpositioner = nxinstrument.require_group("samy") nxpositioner.attrs["NX_class"] = "NXpositioner" if "value" in nxpositioner: del nxpositioner["value"] nxpositioner["value"] = samy.flatten(order="F") nxpositioner["value"].attrs["units"] = "um" if "samy" not in measurement: measurement["samy"] = h5py.SoftLink(nxpositioner["value"].name) if "samy" not in positioners: positioners["samy"] = h5py.SoftLink(nxpositioner["value"].name) for i in range(ndetectors): det_name = f"mca{i}" nxdetector = nxinstrument.require_group(det_name) nxdetector.attrs["NX_class"] = "NXdetector" for signal_name, signal_values in measured_data.items(): if signal_name in nxdetector: del nxdetector[signal_name] if signal_name == "spectrum": mca_shape = (shape[0] * shape[1], signal_values.shape[-1]) nxdetector[signal_name] = signal_values.reshape( mca_shape, order="F" ) if "data" not in nxdetector: nxdetector["data"] = h5py.SoftLink("spectrum") meas_name = det_name else: nxdetector[signal_name] = signal_values.flatten(order="F") meas_name = f"{det_name}_{signal_name}" if meas_name not in measurement: measurement[meas_name] = h5py.SoftLink(nxdetector[signal_name].name) nxprocess = nxentry.require_group("theory") nxprocess.attrs["NX_class"] = "NXprocess" nxnote = nxprocess.require_group("configuration") nxnote.attrs["NX_class"] = "NXnote" if "data" in nxnote: del nxnote["data"] if "type" in nxnote: del nxnote["type"] nxnote["type"] = "application/pymca" nxnote["data"] = config.tostring() signals = { f"{g.element}-{g.name}": g.counts / I0 * I0_reference for g in linegroups } signals.update({g.name: g.counts / I0 * I0_reference for g in scattergroups}) _save_nxdata(nxprocess, "elements", signals, positioners) if roi_data_theory: _save_nxdata(nxprocess, "rois", roi_data_theory, positioners) if roi_data_cor: _save_nxdata(nxprocess, "rois_corrected", roi_data_cor, positioners)
def _save_nxdata( parent: h5py.Group, name: str, signals: Dict[str, numpy.ndarray], positioners: h5py.Group, ) -> None: nxdata = parent.require_group(name) nxdata.attrs["NX_class"] = "NXdata" # nxdata.attrs["interpretation"] = "image" names = list(signals.keys()) nxdata.attrs["signal"] = names[0] if len(names) > 1: nxdata.attrs["auxiliary_signals"] = names[1:] for signal_name, signal_values in signals.items(): if signal_name in nxdata: del nxdata[signal_name] nxdata[signal_name] = signal_values.flatten(order="F") nxdata.attrs["axes"] = ["samz", "samy"] # Order: fast to slow if "samz" not in nxdata: nxdata["samz"] = h5py.SoftLink(positioners["samz"].name) if "samy" not in nxdata: nxdata["samy"] = h5py.SoftLink(positioners["samy"].name) def _iter_2dscan_data( shape: Tuple[int, int], info: dict, nmaps: int = 1, max_deviation: float = 0, seed: Optional[int] = None, ) -> Iterator[Tuple[List[numpy.ndarray], numpy.ndarray]]: """Yield random samples of an image scanned in F-order (row is the fast axis). :returns: list of axes positions (F-order matrices, from fast to slow axis), signal (F-order matrix) """ filename = os.path.join(os.path.dirname(__file__), "ihc.png") channels = numpy.transpose(imageio.imread(filename), [2, 0, 1]) _mark_scan_direction(channels) image_shape = channels[0].shape d = abs(max_deviation) pmin = [2 * d, 2 * d] pmax = [n - 1 - 2 * d for n in image_shape] rstate = numpy.random.RandomState(seed=seed) coordinates = _regular_scan_positions(pmin, pmax, shape, rstate, max_deviation) title = f"amesh samz {pmin[0]} {pmax[0]} {shape[0]-1} samy {pmin[1]} {pmax[1]} {shape[1]-1}" info["title"] = title flat_coordinates = [x.flatten(order="F") for x in coordinates] for _ in range(nmaps): fractions = rstate.uniform(low=0, high=1, size=3) fractions /= 255 * fractions.sum() image = sum(fractions[:, numpy.newaxis, numpy.newaxis] * channels) rate = ndimage.map_coordinates( image, flat_coordinates, order=1, cval=0, mode="nearest" ) yield coordinates, rate.reshape(shape, order="F") def _mark_scan_direction(channels: numpy.ndarray) -> None: image_shape = channels[0].shape dstart = image_shape[0] // 10, image_shape[1] // 10 dtick = dstart[0] // 4, dstart[1] // 4 p0 = dstart[0] - dtick[0], dstart[1] - dtick[1] p1 = dstart[0] + dtick[0], dstart[1] + dtick[1] channels[:, p0[0] : p1[0], p0[1] : p1[1]] = 255 dtick = dtick[0] // 2, dtick[1] // 2 dend = image_shape[0] // 2, image_shape[1] // 10 p0 = dstart[0] - dtick[0], dstart[1] - dtick[1] p1 = dend[0] + dtick[0], dend[1] + dtick[1] channels[:, p0[0] : p1[0], p0[1] : p1[1]] = 255 def _regular_scan_positions( pmin: Sequence[float], pmax: Sequence[float], shape: Sequence[int], rstate: numpy.random.RandomState, max_deviation: float = 0, ) -> List[numpy.ndarray]: """Generate motor positions of a regular nD scan. Positions are F-order arrays (fast axis first), ordered from fast to slow motor. The deviation is given as a fraction of the step size. """ positions = [ numpy.linspace(start, stop, num) for start, stop, num in zip(pmin, pmax, shape) ] positions = numpy.meshgrid(*positions, indexing="ij") if not max_deviation: return positions deviations = [ abs(max_deviation if num <= 1 else (stop - start) / (num - 1) * max_deviation) for start, stop, num in zip(pmin, pmax, shape) ] positions = [ values + rstate.uniform(low=-d, high=d, size=shape) for values, d in zip(positions, deviations) ] return positions