Source code for est.core.process.larch.xftf

"""wrapper to the larch xftf process"""

from est.core.types import Spectrum, XASObject
from est.core.process.process import Process
from larch.xafs.xafsft import xftf
from larch.symboltable import Group
import logging

_logger = logging.getLogger(__name__)


[docs]def process_spectr_xftf( spectrum, configuration, overwrite=True, callbacks=None, output=None, output_dict=None, ): """ :param spectrum: spectrum to process :type: :class:`.Spectrum` :param configuration: configuration of the pymca normalization :type: dict :param overwrite: False if we want to return a new Spectrum instance :type: bool :param callback: callback to execute. :param output: list to store the result, needed for pool processing :type: multiprocessing.manager.list :param output_dict: key is input spectrum, value is index in the output list. :type: dict :return: processed spectrum :rtype: tuple (configuration, spectrum) """ _logger.debug("start xftf on spectrum (%s, %s)" % (spectrum.x, spectrum.y)) assert isinstance(spectrum, Spectrum) if spectrum.k is None or spectrum.chi is None: _logger.error( "k and/or chi is/are not specified, unable to compute " "xftf. Maybe you need to run autobk process before ?" ) return None, None # if kmax is not provided take default value kmax = configuration.get("kmax", None) if kmax is None: configuration["kmax"] = max(spectrum.k) * 0.9 kmin = configuration.get("kmin", None) if kmin is None: configuration["kmin"] = min(spectrum.k) opts = {} for opt_name in ( "kmin", "kmax", "kweight", "dk", "dk2", "with_phase", "window", "rmax_out", "nfft", "kstep", ): if opt_name in configuration: opts[opt_name] = configuration[opt_name] if opt_name == "kweight": opts["kw"] = configuration[opt_name] if not overwrite: spectrum = Spectrum.from_dict(spectrum.to_dict()) res_group = Group() xftf(k=spectrum.k, chi=spectrum.chi, group=res_group, **opts) spectrum.r = res_group.r spectrum.chir = res_group.chir spectrum.chir_mag = res_group.chir_mag spectrum.chir_re = res_group.chir_re spectrum.chir_im = res_group.chir_im with_phase = opts.get("with_phase", False) if with_phase: spectrum.chir_pha = res_group.chir_pha else: spectrum.chir_pha = None # handle chi(x) * k**k_weight plot with r max if spectrum.k is not None and spectrum.chi is not None: if "kweight" in opts: kweight = opts["kweight"] else: kweight = 0 spectrum.chi_weighted_k = spectrum.chi * (spectrum.k**kweight) spectrum.larch_dict["xftf_k_weight"] = kweight spectrum.larch_dict["xftf_k_min"] = configuration["kmin"] spectrum.larch_dict["xftf_k_max"] = configuration["kmax"] if callbacks: for callback in callbacks: callback() return configuration, spectrum
[docs]def larch_xftf(xas_obj, **optional_inputs): """ :param xas_obj: object containing the configuration and spectra to process :type: Union[XASObject, dict] :return: spectra dict :rtype: XASObject """ process = Larch_xftf(inputs={"xas_obj": xas_obj, **optional_inputs}) process.run() return process.get_output_value("xas_obj", None)
[docs]class Larch_xftf( Process, name="xftf", input_names=["xas_obj"], optional_input_names=["xftf_config"], output_names=["xas_obj"], ):
[docs] def run(self): xas_obj = self.inputs.xas_obj _xas_obj = self.getXasObject(xas_obj=xas_obj) self._advancement.reset(max_=_xas_obj.n_spectrum) self._advancement.startProcess() self._pool_process(xas_obj=_xas_obj) self._advancement.endProcess() self.outputs.xas_obj = _xas_obj
def _pool_process(self, xas_obj): assert isinstance(xas_obj, XASObject) xftf_config = self.get_input_value("xftf_config", dict()) n_s = len(xas_obj.spectra.data.flat) for i_s, spectrum in enumerate(xas_obj.spectra): process_spectr_xftf( spectrum=spectrum, configuration=xftf_config, callbacks=self.callbacks, overwrite=True, ) self.progress = i_s / n_s * 100.0
[docs] def definition(self): return "xftf calculation"
[docs] def program_version(self): import larch.version return larch.version.version_data()["larch"]
[docs] @staticmethod def program_name(): return "larch_xftf"