Source code for ewoksid22.topas_extract

Written by Ola G. Grendal @ ID22 ESRF

import os
import sys
import numpy as np
import h5py
import bisect
from concurrent.futures import ProcessPoolExecutor as Pool

X_LOW = 0
X_HIGH = 511
ROIS = [i for i in range(NUM_CHANNELS)]
ROWS = [i for i in range(X_LOW, X_HIGH + 1)]

# TODO check for sorted/non-unique tth. !!Chop data fixed when data is not unique!!

[docs]def minmax_tth(min_usr_tth, max_usr_tth, tth_min, tth_max): # Takes in the user set min/max tth, and min/max of scans, and return min/max tth for data to to exported to xye-files print(f"tth min - max is: {tth_min} - {tth_max}") print( f"Widest range possible for MA06 with all detectors: {tth_min+12} - {tth_max-12}" ) print(f"Your selection for MA06 is: {min_usr_tth} - {max_usr_tth}") if min_usr_tth is None and max_usr_tth is None: print( f"Your MA06 range is set to the widest possible: {tth_min+12}-{tth_max-12}" ) return tth_min + 12, tth_max - 12 elif min_usr_tth is None and max_usr_tth is not None: print("Your MA06 range is {tth_min+12} - {min(max_usr_tth, tth_max)}") return tth_min + 12, min(max_usr_tth, tth_max) elif min_usr_tth is not None and max_usr_tth is None: print(f"Your MA06 range is: {max(min_usr_tth, tth_min)} - {tth_max-12}") return max(min_usr_tth, tth_min), tth_max - 12 else: print( f"Your MA06 range is: {max(min_usr_tth, tth_min)} - {min(max_usr_tth, tth_max)}" ) return max(min_usr_tth, tth_min), min(max_usr_tth, tth_max)
[docs]def set_savepath(savepath): # Returns a string with the path to where to save the output images if savepath is None: return str("./") else: os.makedirs(savepath, exist_ok=True) return savepath
[docs]def set_filename(scan, in_file, out_file): # Returns a list of filename-strings if out_file is None: file_name_base = str(in_file.split("/")[-1].split(".")[0]) else: file_name_base = out_file out_file_name_base = file_name_base + "_s{}_".format(scan) return out_file_name_base
[docs]def read_h5(scan, in_file): # Read h5-file and returns dict with info print(f"Reading data from scan {scan} of {in_file}...") res = {} with h5py.File(in_file, "r") as h: res["roicol"] = h["/{}.1/measurement/eiger_roi_collection".format(scan)][()] res["tth"] = h["/{}.1/measurement/tth".format(scan)][()] res["mon"] = h["/{}.1/measurement/mon".format(scan)][()] res["xs"] = h["/{}.1/instrument/eiger_roi_collection/selection/x".format(scan)][ () ] # FIX res["ys"] = h["/{}.1/instrument/eiger_roi_collection/selection/y".format(scan)][ () ] # FIX return res
[docs]def get_tth_indexes(tth_minMA06, tth_maxMA06, tth): # Find the tth subranges for the detectors assuming a nominal offset of 2-deg, and the corresponding indexes sub_ranges_tth = {} sub_ranges_idx = {} for i in range(NUM_CHANNELS): tth_low = tth_minMA06 + (6 - i) * 2 tth_high = tth_maxMA06 + (6 - i) * 2 idx_low = bisect.bisect_left(tth, tth_low) idx_high = bisect.bisect_left(tth, tth_high) sub_ranges_tth[f"MA{i:02d}"] = [tth_low, tth_high] sub_ranges_idx[f"MA{i:02d}"] = [idx_low, idx_high] # print(sub_ranges_tth, sub_ranges_idx) return sub_ranges_idx
[docs]def set_scans(scans, ex_scans): if isinstance(scans, int): all_scans = [scans] elif len(scans) == 1: all_scans = scans elif len(scans) == 2: all_scans = [x for x in range(scans[0], scans[1] + 1)] if ex_scans is not None: all_scans = [x for x in all_scans if x not in ex_scans] elif len(scans) >= 3: all_scans = [x for x in scans] if ex_scans is not None: all_scans = [x for x in all_scans if x not in ex_scans] else: all_scans = tuple() assert all_scans, "no scans to process" print("Converting the following scan(s): ", end="") print(*all_scans, sep=", ") return all_scans
# def check_tth(tth): # is_sorted = np.all(tth[:-1] <= tth[1:]) # if is_sorted is False: # print('!!tth is not a sorted array!!') # tth_set = set(tth) # if len(tth_set) == len(tth): # all_unique = True # chop = None # else: # all_unique = False # chop = len(tth) - len(tth_set) # print('!!tth does not only contain unique values!!') # return is_sorted, all_unique, chop
[docs]def run( in_file, out_file, scans=(1,), savepath=None, ex_scans=None, min_usr_tth=None, max_usr_tth=None, full_tth=False, ): # Saves data from roi_collection to .xye-files save_data_path = set_savepath(savepath) all_scans = set_scans(scans, ex_scans) fmt = ("%3.8f", "%i", "%i") for scan in all_scans: data = read_h5(scan, in_file) out_filename = set_filename(scan, in_file, out_file) # is_sorted, all_unique, chop = check_tth(data['tth']) # if all_unique is False: # data['tth'] = data['tth'][chop:] # data['mon'] = data['mon'][chop:] # data['roicol'] = data['roicol'][chop:] sorted_xs = sorted(set(data["xs"])) nfiles = len(data["xs"]) print(f"Extract scan {scan} from file {in_file} ...") if full_tth: with Pool() as pool: for i in range(nfiles): channel = sorted_xs.index(data["xs"][i]) suffix = "MA{}_c{}.xye".format(channel, data["ys"][i]) filenamei = os.path.join(save_data_path, out_filename + suffix) datai = np.c_[data["tth"], data["roicol"][:, i], data["mon"]] pool.submit(np.savetxt, filenamei, datai, fmt=fmt) else: tth_min = round(data["tth"][0]) tth_max = round(data["tth"][-1]) tth_minMA06, tth_maxMA06 = minmax_tth( min_usr_tth, max_usr_tth, tth_min, tth_max ) ranges_idx = get_tth_indexes(tth_minMA06, tth_maxMA06, data["tth"]) with Pool() as pool: for i in range(nfiles): channel = sorted_xs.index(data["xs"][i]) low_idx = ranges_idx[f"MA{channel:02d}"][0] high_idx = ranges_idx[f"MA{channel:02d}"][1] suffix = "MA{}_c{}.xye".format(channel, data["ys"][i]) filenamei = os.path.join(save_data_path, out_filename + suffix) datai = np.c_[ data["tth"][low_idx:high_idx], data["roicol"][low_idx:high_idx, i], data["mon"][low_idx:high_idx], ] pool.submit(np.savetxt, filenamei, datai, fmt=("%3.8f", "%i", "%i")) print(f"Saved {nfiles} xye files")
[docs]def main(argv=None): import argparse from datetime import datetime if argv is None: argv = sys.argv parser = argparse.ArgumentParser( description="Script for exporting xye-files for TOPAS from roi collection @ ID22, ESRF" ) parser.add_argument( "-in", "--in_file", type=str, required=True, help="Filepath to h5-file including filename", ) parser.add_argument( "-s", "--scans", type=int, default=1, nargs="+", help="Scan number(s) of scan(s) to convert. Default is scan 1.1. Ex: 2 or 1 10 or 2 3 5", ) parser.add_argument( "-es", "--ex_scans", type=int, default=None, nargs="+", help="Scan number(s) of scan(s) to exclude. Default is none. Ex: 2 or 2 3 5", ) parser.add_argument( "-l_tth", "--tth_min", type=float, default=None, help="Lowest tth to include (MA6). Default is lowest possible (scan_start + 12).", ) parser.add_argument( "-h_tth", "--tth_max", type=float, default=None, help="Highest tth to include (MA6). Default is highest possible (scan_end - 12).", ) parser.add_argument( "-full_tth", "--full_tth", type=int, default=0, help="Set 1 to use all data/full tth-range. Default is 0.", ) parser.add_argument( "-o", "--savepath", type=str, default=None, help="Full path to where you want to save your data. By default data is saved in the current directory: ./", ) parser.add_argument( "-f", "--out_file", type=str, default=None, help="Filename. Ex: LaB6_35keV --> LaB6_35keV_sx_MAy_z.xye. If not given filename of h5-file is used --> h5filename_sx_MAy_z.xye.", ) # parser.add_argument('-is', '--includescan', type=int, nargs='+', default=None, help='If used alone list of the only scans to be included. If used with -sts and/or -ens it will include scans outside the given range. Ex: 2 3 5 10') args = parser.parse_args(argv[1:]) time_start_TOT = run( args.in_file, args.out_file, scans=args.scans, savepath=args.savepath, ex_scans=args.ex_scans, min_usr_tth=args.tth_min, max_usr_tth=args.tth_max, full_tth=bool(args.full_tth), ) print("Time total: {}".format( - time_start_TOT))
if __name__ == "__main__": sys.exit(main())