Source code for ewoksfluo.tasks.fit.execute

from queue import Queue
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from concurrent.futures import ProcessPoolExecutor
from contextlib import ExitStack
import multiprocessing

from PyMca5.PyMcaIO.ConfigDict import ConfigDict

from ...xrffit import perform_batch_fit
from ...xrffit import outputbuffer_context
from ...xrffit import queue_outputbuffer_context

from ...xrffit.handlers import NexusOutputHandler
from ...xrffit.handlers import consume_handler_queue
from ...xrffit.handlers import stop_queue


[docs]def fit_single( scan_uri: str, xrf_spectra_uri_template: str, output_uri_template: str, detector_name: str, config: Union[str, ConfigDict], energy: Optional[float] = None, quantification: Optional[dict] = None, energy_multiplier: Optional[float] = None, fast_fitting: bool = False, diagnostics: bool = False, figuresofmerit: Optional[bool] = None, ) -> str: """Fitting of one scan with one detector. Returns the URL of the fit result. """ if figuresofmerit is None: figuresofmerit = diagnostics output_uri = output_uri_template.format(detector_name) with outputbuffer_context( output_uri, diagnostics=diagnostics, figuresofmerit=figuresofmerit, ) as output_buffer: if output_buffer.already_existed: print(f"{output_buffer.fit_results_uri} already exists") else: xrf_spectra_uri = ( f"{scan_uri}/{xrf_spectra_uri_template.format(detector_name)}" ) perform_batch_fit( xrf_spectra_uris=[xrf_spectra_uri], cfg=config, output_buffer=output_buffer, energy=energy, energy_multiplier=energy_multiplier, quantification=quantification, fast=fast_fitting, ) return output_buffer.fit_results_uri
[docs]def fit_multi( scan_uris: Sequence[str], xrf_spectra_uri_template: str, output_uri_template: str, detector_names: Sequence[str], configs: Sequence[Union[str, ConfigDict]], energies: Optional[Sequence[Optional[float]]] = None, quantification: Optional[dict] = None, energy_multiplier: Optional[float] = None, fast_fitting: bool = False, diagnostics: bool = False, figuresofmerit: Optional[bool] = None, ) -> List[str]: """Parallelized fitting of multiple scans with multiple detectors. Returns the URL's of the fit result (one URL per detector). """ nscans = len(scan_uris) if energies: if len(energies) != nscans: raise ValueError(f"Requires {nscans} energies, one for each scan") else: energies = [None] * nscans ndetectors = len(detector_names) if len(configs) != ndetectors: raise ValueError( f"Requires {ndetectors} pymca configurations, one for each detector" ) if figuresofmerit is None: figuresofmerit = diagnostics if diagnostics: default_group = "fit" else: default_group = "parameters" fit_results_uris: List[str] = [] with ExitStack() as stack: ctx = multiprocessing.Manager() manager = stack.enter_context(ctx) queue = manager.Queue() ctx = ProcessPoolExecutor() executor = stack.enter_context(ctx) arguments: List[Tuple[tuple, dict, dict]] = list() output_handlers: Dict[int, NexusOutputHandler] = dict() queue_sendids: Set[int] = set() output_uris = [ output_uri_template.format(detector_name) for detector_name in detector_names ] if len(set(output_uris)) != len(output_uris): raise ValueError( "Add a place-holder '{}' for the detector name in the output URI template" ) xrf_spectra_uris = [ [ f"{scan_uri}/{xrf_spectra_uri_template.format(detector_name)}" for scan_uri in scan_uris ] for detector_name in detector_names ] if len(set(xrf_spectra_uris[0])) != len(xrf_spectra_uris[0]): raise ValueError( "Add a place-holder '{}' for the detector name in the XRF spectra URI template" ) for destinationid, (output_uri, detector_spectra_uris, config) in enumerate( zip(output_uris, xrf_spectra_uris, configs) ): ctx = NexusOutputHandler(output_uri, default_group=default_group) output_handler = stack.enter_context(ctx) fit_results_uris.append(output_handler.fit_results_uri) if output_handler.already_existed: print(f"{output_handler.fit_results_uri} already exists") continue output_handlers[destinationid] = output_handler for i_scan, (xrf_spectra_uri, energy) in enumerate( zip(detector_spectra_uris, energies) ): queue_sendid = destinationid * nscans + i_scan queue_sendids.add(queue_sendid) buffer_args = ( queue, queue_sendid, destinationid, nscans, i_scan, ) buffer_kwargs = { "diagnostics": diagnostics, "figuresofmerit": figuresofmerit, } fit_kwargs = { "xrf_spectra_uris": [xrf_spectra_uri], "cfg": config, "energy": energy, "energy_multiplier": energy_multiplier, "quantification": quantification, "fast": fast_fitting, } arguments.append((buffer_args, buffer_kwargs, fit_kwargs)) if not arguments: return fit_results_uris # Sub-processes will fit send the results to the queue arguments = list(zip(*arguments)) results = executor.map(_fit_main, *arguments, chunksize=1) # Main process will receive results from the queue and save them in HDF5 consume_handler_queue(output_handlers, queue, queue_sendids) # Re-raise exceptions if any for _ in results: pass return fit_results_uris
def _fit_main( buffer_args: Tuple[Queue, int, int, Optional[int], int], buffer_kwargs: dict, fit_kwargs: dict, ) -> None: queue, sendid, destinationid, nscans, scan_index = buffer_args if nscans == 1: nscans = None scan_index = None try: with queue_outputbuffer_context( queue, sendid, destinationid, nscans=nscans, scan_index=scan_index, **buffer_kwargs, ) as output_buffer: perform_batch_fit(output_buffer=output_buffer, **fit_kwargs) finally: stop_queue(queue, sendid)