Source code for nxtomomill.converter.hdf5.acquisition.standardacquisition

# coding: utf-8

"""
module to define a standard tomography acquisition (made by bliss)
"""

from __future__ import annotations

import pint
from datetime import datetime

import h5py
from silx.io.url import DataUrl
from silx.io.utils import h5py_read_dataset

from nxtomo.utils.transformation import DetXFlipTransformation, DetYFlipTransformation
from nxtomo.nxobject.nxsource import SourceType
from nxtomo.nxobject.nxdetector import ImageKey
from nxtomo.nxobject.utils.concatenate import (
    concatenate_pint_quantities as _concatenate_pint_quantities,
)

from nxtomomill.models.h52nx._acquisitionstep import AcquisitionStep
from nxtomomill.utils.utils import str_datetime_to_numpy_datetime64
from nxtomomill.utils.io import deprecated
from nxtomomill.utils.hdf5 import get_dataset_unit
from nxtomomill.utils._os import abspath

from .baseacquisition import BaseAcquisition, EntryReader
from .utils import (
    deduce_machine_current,
    get_bliss_scan_type,
    get_nx_detectors,
    guess_nx_detector,
)

try:
    import hdf5plugin  # noqa F401
except ImportError:
    pass
import fnmatch
import logging
import os

import numpy

from nxtomomill.converter.hdf5.acquisition.blisstomoconfig import (
    TomoConfig as BlissTomoConfig,
)
from nxtomomill.io.config import TomoHDF5Config
from ._bliss_activeconfig import BlissActiveConfig
from nxtomo.application.nxtomo import NXtomo

_ureg = pint.get_application_registry()
_logger = logging.getLogger(__name__)


__all__ = ["StandardAcquisition"]


[docs]class StandardAcquisition(BaseAcquisition): """ Class to collect information from a bliss - hdf scan (see https://bliss.gitlab-pages.esrf.fr/fscan). Once all data is collected a set of NXtomo will be created. Then NXtomo instances will be saved to disk. :param root_url: url of the acquisition. Can be None if this is the initialization entry :param configuration: configuration to use to collect raw data and generate outputs :param detector_sel_callback: possible callback to retrieve missing information """
[docs] def __init__( self, root_url: DataUrl | None, configuration: TomoHDF5Config, detector_sel_callback, start_index, parent=None, ): super().__init__( root_url=root_url, configuration=configuration, detector_sel_callback=detector_sel_callback, start_index=start_index, ) self._parent = parent # possible parent. Like for z series self._nx_tomos = [NXtomo()] self._image_key_control = None self._rotation_angle: pint.Quantity | None = None self._sample_x: pint.Quantity | None = None self._sample_y: pint.Quantity | None = None self._translation_y: pint.Quantity | None = None self._translation_z: pint.Quantity | None = None self._lr_flipped: bool | None = None self._ud_flipped: bool | None = None self._unique_detector_names = list() # register names self._active_config_tomo_detector = None """Detector registered by the bliss active config. Used to read data_axes (detector flip)""" self._virtual_sources = None self._acq_expo_time: pint.Quantity | None = None self._copied_dataset = {} "register dataset copied. Key if the original location as" "DataUrl.path. Value is the DataUrl it has been moved to" self._known_machine_current_am: None | dict = None # store all registered machine current self._frames_timestamp = None
# try to deduce time stamp of each frame def parent_root_url(self) -> DataUrl | None: if self._parent is not None: return self._parent.root_url else: return None
[docs] def get_expected_nx_tomo(self): return 1
@property def image_key_control(self): return self._image_key_control @property def rotation_angle(self): return self._rotation_angle @property def translation_y(self): return self._translation_y @property def translation_z(self): return self._translation_z @property def lr_flipped(self): return self._lr_flipped @deprecated( replacement="lr_flipped", reason="renamed", since_version="2.0", ) @property def x_flipped(self): return self.lr_flipped @property def ud_flipped(self): return self._ud_flipped @deprecated( replacement="ud_flipped", reason="renamed", since_version="2.0", ) @property def y_flipped(self): return self.ud_flipped @property def n_frames(self): return self._n_frames @property def n_frames_actual_bliss_scan(self): return self._n_frames_actual_bliss_scan @property def dim_1(self): return self._dim_1 @property def dim_2(self): return self._dim_2 @property def data_type(self): return self._data_type @property def expo_time(self): return self._acq_expo_time @property def known_machine_current(self) -> dict | None: """ Return the dict of all known machine currents. Key is the time stamp, value is the machine current """ return self._known_machine_current_am @property def sample_x(self): """Return the '_sample_x' attribute. In **ESRF coordinate**""" return self._sample_x @property def sample_y(self): """Return the '_sample_y' attribute. In **ESRF coordinate**""" return self._sample_y
[docs] def register_step( self, url: DataUrl, entry_type: AcquisitionStep | None = None, copy_frames=False ) -> None: """ :param url: entry to be registered and contained in the acquisition :param entry_type: type of the entry if know. Overwise will be 'evaluated' """ if entry_type is None: entry_type = get_bliss_scan_type(url=url, configuration=self.configuration) assert ( entry_type is not AcquisitionStep.INITIALIZATION ), "Initialization are root node of a new sequence and not a scan of a sequence" if entry_type is None: _logger.warning(f"{url} not recognized, skip it") else: self._registered_entries[url.path()] = entry_type self._copy_frames[url.path()] = copy_frames self._entries_o_path[url.path()] = url.data_path()
# path from the original file. Haven't found another way to get it ?! def _get_valid_camera_names(self, instrument_grp: h5py.Group): # 1: try to get detector from nx property detectors = get_nx_detectors(instrument_grp) detectors = [grp.name.split("/")[-1] for grp in detectors] def filter_detectors(det_grps): if len(det_grps) > 0: _logger.info(f"{len(det_grps)} detector found from NX_class attribute") if len(det_grps) > 1: # if an option: pick the first one once orderered # else ask user if self._detector_sel_callback is None: sel_det = det_grps[0] _logger.warning( f"several detector found. Only one is managed for now. Will pick {sel_det}" ) else: sel_det = self._detector_sel_callback(det_grps) if sel_det is None: _logger.warning("no detector given, avoid conversion") det_grps = (sel_det,) return det_grps return None detectors = filter_detectors(det_grps=detectors) if detectors is not None: return detectors # 2: get nx detector from shape... detectors = guess_nx_detector(instrument_grp) detectors = [grp.name.split("/")[-1] for grp in detectors] return filter_detectors(det_grps=detectors)
[docs] @staticmethod def concatenate_pint_quantities(quantities: tuple[pint.Quantity | None]): """ concatenation dedicated to acquisition. quantities items can be None or a quantity """ return _concatenate_pint_quantities( tuple( filter( lambda a: a is not None and len(a) > 0, quantities, ) ) )
def __get_data_from_camera( self, data_dataset: h5py.Dataset, data_name, frame_type, entry, entry_path, camera_dataset_url, ): if data_dataset.ndim == 2: shape = (1, data_dataset.shape[0], data_dataset.shape[1]) elif data_dataset.ndim != 3: err = f"dataset {data_name} is expected to be 3D when {data_dataset.ndim}D found." if data_dataset.ndim == 1: err = "\n".join( [ err, "This might be a bliss-EDF dataset. Those are not handled by nxtomomill", ] ) _logger.error(err) return 0 else: shape = data_dataset.shape n_frame = shape[0] self._n_frames += n_frame self._n_frames_actual_bliss_scan = n_frame if self.dim_1 is None: self._dim_2 = shape[1] self._dim_1 = shape[2] else: if self._dim_1 != shape[2] or self._dim_2 != shape[1]: raise ValueError("Inconsistency in detector shapes") if self._data_type is None: self._data_type = data_dataset.dtype elif self._data_type != data_dataset.dtype: raise ValueError("detector frames have incoherent " "data types") # update image_key and image_key_control # Note: for now there is no image_key on the master file # should be added later. image_key_control = frame_type.to_image_key_control() self._image_key_control.extend([image_key_control.value] * n_frame) data_dataset_path = data_dataset.name.replace(entry.name, entry_path, 1) # replace data_dataset name by the original entry_path. # this is a workaround to use the dataset path on the # "treated file". Because .name if the name on the 'target' # file of the virtual dataset v_source = h5py.VirtualSource( camera_dataset_url.file_path(), data_dataset_path, data_dataset.shape, dtype=self._data_type, ) self._virtual_sources.append(v_source) self._virtual_sources_len.append(n_frame) return n_frame def _treate_valid_camera( self, detector_node, entry, frame_type, input_file_path, entry_path, entry_url ) -> bool: """ treate a dataset considered as a 'camera' dataset. """ if "data_cast" in detector_node: _logger.warning( f"!!! looks like this data has been cast. Take cast data for {detector_node}!!!" ) data_dataset = detector_node["data_cast"] data_name = "/".join((detector_node.name, "data_cast")) elif "data" in detector_node: data_dataset = detector_node["data"] data_name = "/".join((detector_node.name, "data")) else: raise KeyError(f"Unable to find camera dataset in {detector_node.name}") camera_dataset_url = DataUrl( file_path=entry_url.file_path(), data_path=data_name, scheme="silx" ) n_frame = self.__get_data_from_camera( data_dataset, data_name=data_name, frame_type=frame_type, entry=entry, entry_path=entry_path, camera_dataset_url=camera_dataset_url, ) # save information if this url must be embed / copy or not. Will be used later at nxtomo side self._copy_frames[camera_dataset_url.path()] = self._copy_frames[ entry_url.path() ] soft_lr_flip, soft_ud_flip = self._get_soft_flip_frame() # handle None flip values soft_lr_flip = soft_lr_flip or False soft_ud_flip = soft_ud_flip or False # get the final flip value (soft + mechanical) lr_flipped = soft_lr_flip ^ self.configuration.mechanical_lr_flip ud_flipped = soft_ud_flip ^ self.configuration.mechanical_ud_flip if self._lr_flipped is None and self._ud_flipped is None: # if this is the first scan then define frame flips self._lr_flipped, self._ud_flipped = bool(lr_flipped), bool(ud_flipped) elif lr_flipped != self._lr_flipped or ud_flipped != self._ud_flipped: # else check it is consistent with already existing flips raise ValueError( f"Found different detector flips inside the same sequence on {entry}. Unable to handle it." ) # store rotation rots = self._get_rotation_angle(root_node=entry, n_frame=n_frame) # handle rotation_is_clockwise. It None or False consider it is counter-clockwise # NXtomo coordinate system uses MCstas that uses counter-clockwise rotation direction. # https://manual.nexusformat.org/design.html#the-nexus-coordinate-system if self.configuration.rotation_is_clockwise is True: rots *= -1.0 self._rotation_angle = StandardAcquisition.concatenate_pint_quantities( ( self._rotation_angle, rots, ), ) self._sample_x = StandardAcquisition.concatenate_pint_quantities( ( self._sample_x, self._get_sample_x(root_node=entry, n_frame=n_frame), ), ) self._sample_y = StandardAcquisition.concatenate_pint_quantities( ( self._sample_y, self._get_sample_y(root_node=entry, n_frame=n_frame), ), ) self._translation_y = StandardAcquisition.concatenate_pint_quantities( ( self._translation_y, self._get_translation_y(root_node=entry, n_frame=n_frame), ), ) self._translation_z = StandardAcquisition.concatenate_pint_quantities( ( self._translation_z, self._get_translation_z(root_node=entry, n_frame=n_frame), ), ) # store acquisition time self._acq_expo_time = StandardAcquisition.concatenate_pint_quantities( ( self._acq_expo_time, self._get_expo_time( root_node=entry, detector_node=detector_node, n_frame=n_frame, ), ) ) self._current_scan_n_frame = n_frame def camera_is_valid(self, det_name): assert isinstance(det_name, str) if len(self.configuration.valid_camera_names) == 0: return True for vcm in self.configuration.valid_camera_names: if fnmatch.fnmatch(det_name, vcm): return True return False def _preprocess_registered_entry(self, entry_url, type_): with EntryReader(entry_url) as entry: entry_path = self._entries_o_path[entry_url.path()] input_file_path = entry_url.file_path() input_file_path = abspath(os.path.relpath(input_file_path, os.getcwd())) input_file_path = abspath(input_file_path) if type_ is AcquisitionStep.INITIALIZATION: raise RuntimeError( "no initialization should be registered." "There should be only one per acquisition." ) if "instrument" not in entry: _logger.error( f"no instrument group found in {entry.name}, unable to retrieve frames" ) return instrument_grp = entry["instrument"] # if we don't get a valid camera (not provided by the user or not found on the bliss tomo metadata) if len(self.configuration.valid_camera_names) == 0: # if we need to guess detector name(s) # ignore in case we read information from bliss config det_grps = self._get_valid_camera_names(instrument_grp) # update valid camera names self.configuration.valid_camera_names = det_grps has_frames = False for key, _ in instrument_grp.items(): if ( "NX_class" in instrument_grp[key].attrs and instrument_grp[key].attrs["NX_class"] == "NXdetector" ): _logger.debug(f"Found one detector at {key} for {entry.name}.") if not self.camera_is_valid(key): _logger.debug(f"ignore {key}, not a `valid` camera name") continue else: detector_node = instrument_grp[key] if key not in self._unique_detector_names: self._unique_detector_names.append(key) try: self._treate_valid_camera( detector_node, entry=entry, frame_type=type_, input_file_path=input_file_path, entry_path=entry_path, entry_url=entry_url, ) except KeyError as e: if self.configuration.raises_error: raise e has_frames = False else: has_frames = True # try to get some other metadata # handle frame time stamp start_time = self._get_start_time(entry) if start_time is not None: start_time = datetime.fromisoformat(start_time) end_time = self._get_end_time(entry) if end_time is not None: end_time = datetime.fromisoformat(end_time) if has_frames: self._register_frame_timestamp(entry, start_time, end_time) # handle machine current. Can retrieve some current even on bliss scan entry not containing directly frames self._register_machine_current(entry, start_time, end_time) def _register_machine_current(self, entry: h5py.Group, start_time, end_time): """Update machine electric current for provided entry (bliss scan""" machine_currents: pint.Quantity | None = self._get_machine_current( root_node=entry ) # electric current will be saved as Ampere if machine_currents is not None and len(machine_currents) > 0: machine_currents = machine_currents.to(_ureg.ampere).magnitude new_know_machine_currents_am = {} if start_time is None or end_time is None: if start_time != end_time: _logger.warning( f"Unable to find {'start_time' if start_time is None else 'end_time'}. Will pick the first available machine_current for the frame" ) t_time = start_time or end_time # if at least one can find out new_know_machine_currents_am[ str_datetime_to_numpy_datetime64(t_time) ] = machine_currents[0] else: _logger.error( "Unable to find start_time and end_time. Will not register any machine current" ) elif len(machine_currents) == 1: # if we have only one value, consider the machine current is constant during this time # might be improved later if we can know if current is determine at the # beginning or the end. But should have no impact # as the time slot is short new_know_machine_currents_am[ str_datetime_to_numpy_datetime64(start_time) ] = machine_currents[0] else: # linspace from datetime within ms precision. # see https://stackoverflow.com/questions/37964100/creating-numpy-linspace-out-of-datetime timestamps = numpy.linspace( start=str_datetime_to_numpy_datetime64(start_time).astype( numpy.float128 ), stop=str_datetime_to_numpy_datetime64(end_time).astype( numpy.float128 ), num=len(machine_currents), endpoint=True, dtype="<M8[ms]", ) for timestamp, machine_current_item in zip( timestamps, machine_currents ): new_know_machine_currents_am[timestamp.astype(numpy.datetime64)] = ( machine_current_item ) # filter nan values filtered_new_know_machine_currents = { key: value for key, value in new_know_machine_currents_am.items() if not numpy.isnan(value) } if len(filtered_new_know_machine_currents) != len( new_know_machine_currents_am ): _logger.warning(f"Found current == nan in {entry}") # update known_machine_current self._known_machine_current_am.update(filtered_new_know_machine_currents) def _register_frame_timestamp(self, entry: h5py.Group, start_time, end_time): """ update frame time stamp for the provided entry (bliss scan) """ if start_time is None or end_time is None: if start_time != end_time: t_time = str_datetime_to_numpy_datetime64(start_time or end_time) message = f"Unable to find start_time and / or end_time. Takes {t_time} as frame time stamp for {entry} " self._frames_timestamp.extend( [t_time] * self._n_frames_actual_bliss_scan ) _logger.warning(message) else: message = f"Unable to find start_time and end_time. Can't deduce frames time stamp for {entry}" _logger.error(message) else: frames_times_stamps_as_f8 = numpy.linspace( start=str_datetime_to_numpy_datetime64(start_time).astype( numpy.float128 ), stop=str_datetime_to_numpy_datetime64(end_time).astype(numpy.float128), num=self._n_frames_actual_bliss_scan, endpoint=True, dtype="<M8[ms]", ) frames_times_stamps_as_f8 = [ timestamp.astype("<M8[ms]") for timestamp in frames_times_stamps_as_f8 ] self._frames_timestamp.extend(frames_times_stamps_as_f8) def _preprocess_registered_entries(self): """parse all frames of the different steps and retrieve data, image_key...""" self._n_frames = 0 self._n_frames_actual_bliss_scan = 0 # number of frame contains in X.1 self._dim_1 = None self._dim_2 = None self._data_type = None self._translation_y = None self._translation_z = None self._sample_x = None self._sample_y = None self._image_key_control = [] self._rotation_angle = None self._known_machine_current_am = {} self._frames_timestamp = [] self._virtual_sources = [] self._instrument_name = None self._virtual_sources_len = [] self._diode = [] self._acq_expo_time: pint.Quantity | None = None self._diode_unit = None self._copied_dataset = {} self._lr_flipped = None self._ud_flipped = None # if rotation motor is not defined try to deduce it from root_url/technique/scan/motor if self.configuration.rotation_angle_keys is None: rotation_motor = self._read_rotation_motor_name() if rotation_motor is not None: self.configuration.rotation_angle_keys = (rotation_motor,) else: self.configuration.rotation_angle_keys = tuple() # list of data virtual source for the virtual dataset for entry_url, type_ in self._registered_entries.items(): url = DataUrl(path=entry_url) self._n_frames_actual_bliss_scan = 0 self._preprocess_registered_entry(url, type_) if len(self._diode) == 0: self._diode = None if self._diode is not None: self._diode = numpy.asarray(self._diode) self._diode = self._diode / self._diode.mean() def _get_diode(self, root_node, n_frame) -> tuple: values, unit = self._get_node_values_for_frame_array( node=root_node["measurement"], n_frame=n_frame, keys=self.configuration.diode_keys, info_retrieve="diode", expected_unit=_ureg.volt, ) return values, unit def _generic_path_getter(self, paths: tuple, message, level="warning", entry=None): """ :param level: level can be logging.level values : "warning", "error", "info" :param H5group entry: user can provide directly an entry to be used as an open h5Group """ if not isinstance(paths, tuple): raise TypeError url = self.parent_root_url() or self.root_url if url is not None: self._check_has_metadata(url) def process(h5_group): for path in paths: if h5_group is not None and path in h5_group: return h5py_read_dataset(h5_group[path]) if message is not None: getattr(_logger, level)(message) if entry is None: if url is None: return None with EntryReader(url) as h5_group: return process(h5_group) else: return process(entry) def _get_source_name(self): """ """ return self._generic_path_getter( paths=self._SOURCE_NAME, message="Unable to find source name", level="info" ) def _get_source_type(self): """ """ return self._generic_path_getter( paths=self._SOURCE_TYPE, message="Unable to find source type", level="info" ) def _get_title(self): """return acquisition title""" return self._generic_path_getter( paths=self.TITLE_PATHS, message="Unable to find title" ) def _get_instrument_name(self): """:return instrument instrument name (aka beamline name)""" name = self._generic_path_getter( paths=self._INSTRUMENT_NAME_PATH, message="Unable to find instrument name", level="info", ) # on some path / old hdf5 the name is prefixed by "ESRF:". clean those if name is not None and name.startswith("ESRF:"): name = name.replace("ESRF:", "") return name def _get_dataset_name(self): """return name of the acquisition""" return self._generic_path_getter( paths=self._DATASET_NAME_PATH, message="No name describing the acquisition has been " "found, Name dataset will be skip", ) def _get_sample_name(self): """return sample name""" return self._generic_path_getter( paths=self._SAMPLE_NAME_PATH, message="No sample name has been " "found, Sample name dataset will be skip", ) def _get_grp_size(self): """return the nb_scans composing the zseries if is part of a group of sequence""" return self._generic_path_getter(paths=self._GRP_SIZE_PATH, message=None) def _get_tomo_n(self): return self._generic_path_getter( paths=self._TOMO_N_PATH, message="unable to find information regarding tomo_n", ) def _get_start_time(self, entry=None): return self._generic_path_getter( paths=self._START_TIME_PATH, message="Unable to find start time", level="info", entry=entry, ) def _get_end_time(self, entry=None): return self._generic_path_getter( paths=self._END_TIME_PATH, message="Unable to find end time", level="info", entry=entry, ) @staticmethod def _cast_flip_as_str_to_bool(soft_lr_flip: str | bool, soft_ud_flip: str | bool): if soft_lr_flip == "y": soft_lr_flip = False elif soft_lr_flip == "-y": soft_lr_flip = True elif isinstance(soft_lr_flip, (str, numpy.str_)): _logger.error(f"Unexpected string value: '{soft_lr_flip}'") soft_lr_flip = False # handle flip given as a str axis if soft_ud_flip == "z": soft_ud_flip = True elif soft_ud_flip == "-z": soft_ud_flip = False elif isinstance(soft_ud_flip, (str, numpy.str_)): _logger.error(f"Unexpected string value: '{soft_ud_flip}'") soft_ud_flip = False return soft_lr_flip, soft_ud_flip def _get_soft_flip_frame(self): """ Retrieve registered frame flip (know by bliss-tomo) """ url = self.parent_root_url() or self.root_url if url is None: return None, None self._check_has_metadata(url) with EntryReader(url) as entry: for flip_path in self._FRAME_FLIP_PATHS: if len(self._unique_detector_names) > 0: key = flip_path.format( detector_name=self._unique_detector_names[0], active_config_tomo_detector=self._active_config_tomo_detector, ) else: key = flip_path if key in entry: soft_lr_flip, soft_ud_flip = h5py_read_dataset(entry[key]) if key.endswith("data_axes"): # bliss_tomo contains the information as (x, y) (soft_lr_flip, soft_ud_flip ) # when data_axes contains it as (y, x) (soft_ud_flip, soft_lr_flip) soft_lr_flip, soft_ud_flip = soft_ud_flip, soft_lr_flip soft_lr_flip, soft_ud_flip = self._cast_flip_as_str_to_bool( soft_lr_flip=soft_lr_flip, soft_ud_flip=soft_ud_flip ) return soft_lr_flip, soft_ud_flip else: return None, None def _get_propagation_distance(self) -> pint.Quantity: url = self.parent_root_url() or self.root_url if url is None: return None self._check_has_metadata(url) with EntryReader(url) as entry: for prog_dst_path in self._PROPAGATION_DISTANCE_PATHS: if prog_dst_path in entry: dataset = entry[prog_dst_path] prog_dst = h5py_read_dataset(dataset) unit = get_dataset_unit( dataset, default=_ureg.mm, from_dataset=f"{dataset.name} (reading propagation distance)", ) return prog_dst * unit else: return None def _get_energy(self, ask_if_0, input_callback) -> pint.Quantity | None: """Try to read the energy from root url. If fails and if a input_callback given then execute this fallback """ url = self.parent_root_url() or self.root_url if url is None: return None self._check_has_metadata() with EntryReader(url) as entry: if self._ENERGY_PATH in entry: dataset = entry[self._ENERGY_PATH] energy = h5py_read_dataset(dataset) unit = get_dataset_unit( dataset, default=_ureg.keV, from_dataset=f"{dataset.name} (from reading energy)", ) if energy == 0 and ask_if_0: desc = ( "Energy has not been registered. Please enter " "incoming beam energy (in kev):" ) if input_callback is None: en = input(desc) else: en = input_callback("energy", desc) if energy is not None: energy = float(en) return energy * unit else: mess = f"unable to find energy for entry {entry}." if self.raise_error_if_issue: raise ValueError(mess) else: mess += " Default value will be set (19kev)" _logger.warning(mess) return 19.0 * _ureg.keV def _get_sample_detector_distance(self) -> pint.Quantity | None: """return tuple(distance, unit)""" url = self.parent_root_url() or self.root_url if url is None: return None self._check_has_metadata(url) with EntryReader(url) as entry: for key in self.configuration.sample_detector_distance_keys: if key in entry: dataset = entry[key] distance = h5py_read_dataset(dataset) unit = get_dataset_unit( dataset, default=_ureg.mm, from_dataset=f"{dataset.name} (from reading sample-detector distance)", ) # convert to meter return distance * unit mess = f"unable to find distance for entry {entry}." if self.raise_error_if_issue: raise ValueError(mess) else: mess += "Default value will be set (1m)" _logger.warning(mess) return 1.0 * _ureg.meter def _get_source_sample_distance(self) -> pint.Quantity | None: """return tuple(distance, unit)""" url = self.parent_root_url() or self.root_url if url is None: return None self._check_has_metadata(url) with EntryReader(url) as entry: for key in self.configuration.source_sample_distance_keys: if key in entry: dataset = entry[key] distance = h5py_read_dataset(dataset) unit = get_dataset_unit( dataset, default=_ureg.mm, from_dataset=f"{dataset.name} (from reading source-sample distance)", ) # convert to meter return distance * unit mess = f"unable to find distance for entry {entry}." if self.raise_error_if_issue: raise ValueError(mess) else: mess += "Default value will be set (1m)" _logger.warning(mess) return 1.0 * _ureg.meter def _get_sample_pixel_size(self, axis) -> pint.Quantity | None: """read **sample** pixel size from predefined set of path""" return self._get_pixel_size( axis=axis, x_pixel_size_paths=self.configuration.sample_x_pixel_size_keys, y_pixel_size_paths=self.configuration.sample_y_pixel_size_keys, ) def _get_detector_pixel_size(self, axis) -> pint.Quantity | None: """read **detector** pixel size from predefined set of path""" return self._get_pixel_size( axis=axis, x_pixel_size_paths=self.configuration.detector_x_pixel_size_keys, y_pixel_size_paths=self.configuration.detector_y_pixel_size_keys, ) def _get_pixel_size(self, axis, x_pixel_size_paths, y_pixel_size_paths): url = self.parent_root_url() or self.root_url if url is None: return None if axis not in ("x", "y"): raise ValueError self._check_has_metadata() if axis == "x": keys = x_pixel_size_paths elif axis == "y": keys = y_pixel_size_paths else: raise ValueError(f"axis {axis} is invalid") # solve according to detector name if len(self._unique_detector_names) > 1: _logger.warning( f"More than one detector found. Will pick the first one found ({self._unique_detector_names[0]})" ) if len(self._unique_detector_names) > 0: keys = [ key.format(detector_name=self._unique_detector_names[0]) for key in keys ] self._unique_detector_names with EntryReader(url) as entry: for key in keys: if key in entry: dataset = entry[key] dataset_value = h5py_read_dataset(dataset) # if the pixel size is provided as x, y if isinstance(dataset_value, numpy.ndarray): if len(dataset_value) > 1 and axis == "y": size_ = dataset_value[1] else: size_ = dataset_value[0] # if this is a single value else: size_ = dataset_value unit = get_dataset_unit( dataset, default=_ureg.micrometer, from_dataset=f"{dataset.name} (from reading pixel size)", ) # convert to meter return size_ * unit mess = f"unable to find {axis} sample pixel size for entry {entry}" if self.raise_error_if_issue: raise ValueError(mess) else: mess += "Value will be set to default (10-6m)" _logger.warning(mess) return 10e-6 * _ureg.meter def _get_field_of_view(self) -> str: if self.configuration.field_of_view is not None: return self.configuration.field_of_view.value url = self.parent_root_url() or self.root_url if url is None: return None with EntryReader(url) as entry: if self._FOV_PATH in entry: return h5py_read_dataset(entry[self._FOV_PATH]) else: # FOV is optional: don't raise an error _logger.warning( f"unable to find information regarding field of view for entry {entry}. set it to default value (Full)" ) return "Full" def _update_configuration_from_tomo_config(self): """ force some values from EBS tomo 'tomoconfig' group to make sure correct dataset are read """ if self.configuration.ignore_bliss_tomo_config: return url = self.parent_root_url() or self.root_url if url is None: # case of entries are made manually and user do not provide an init node. return with EntryReader(url) as entry: technique_grp = entry.get("technique", None) if technique_grp is None: _logger.warning( f"Unable to find a technique group in {entry}. Unable to reach EBStomo metadata" ) return bliss_tomo_version = technique_grp.attrs.get("tomo_version", None) # read bliss metadata # bliss tomo config try: bliss_metadata = BlissTomoConfig.from_technique_group( technique_group=technique_grp ) except KeyError: if bliss_tomo_version is not None: _logger.warning( f"Unable to find bliss 'tomo_config' when expected (tomo_version={bliss_tomo_version}). Fallback to conversion based on list of paths to check" ) else: # check if some metadata are missing metadata_values = { "detector": bliss_metadata.tomo_detector, "sample_x": bliss_metadata.sample_x, "sample_y": bliss_metadata.sample_y, "translation_z": bliss_metadata.translation_z, "rotation": bliss_metadata.rotation, "rotation_is_clockwise": bliss_metadata.rotation_is_clockwise, } missing_metadata = list( [k for k, v in metadata_values.items() if v is None] ) _logger.info(f"read tomo config from bliss. Get {metadata_values}") if len(missing_metadata) > 0: _logger.warning( f"couldn't find {missing_metadata} in bliss 'technique/tomoconfig' dataset" ) if bliss_metadata.tomo_detector is not None: self.configuration.valid_camera_names = bliss_metadata.tomo_detector if bliss_metadata.sample_x is not None: self.configuration.sample_x_keys = bliss_metadata.sample_x if bliss_metadata.sample_y is not None: self.configuration.sample_y_keys = bliss_metadata.sample_y if bliss_metadata.translation_y is not None: self.configuration.translation_y_keys = bliss_metadata.translation_y if bliss_metadata.translation_z is not None: self.configuration.translation_z_keys = bliss_metadata.translation_z if bliss_metadata.rotation is not None: self.configuration.rotation_angle_keys = bliss_metadata.rotation # if the 'rotation_is_clockwise' has not been defined by the user and if we can access the # metadata from blisstomo take it. if ( bliss_metadata.rotation_is_clockwise is not None and self.configuration.rotation_is_clockwise is None ): self.configuration.rotation_is_clockwise = ( bliss_metadata.rotation_is_clockwise ) # bliss active tomo config if "active_tomo_config" in technique_grp: dataset_name = technique_grp.get("active_tomo_config", None) # note: today saved as a tuple of str (and contains a single element) active_tomo_config_dataset_name = h5py_read_dataset( dataset_name, decode_ascii=True )[0] active_tomo_config = BlissActiveConfig.from_hdf5_group( entry[f"instrument/{active_tomo_config_dataset_name}"] ) if active_tomo_config.tomo_detector: # tomo_detector is given as a tuple of 1 element self._active_config_tomo_detector = ( active_tomo_config.tomo_detector[0] ) else: _logger.warning( "Unable to extract 'active_config_tomo_detector' from the bliss active config." ) self._active_config_tomo_detector = None else: _logger.warning("Cannot find 'active_tomo_config'") self._active_config_tomo_detector = None def to_NXtomos(self, request_input, input_callback, check_tomo_n=True) -> tuple: self._update_configuration_from_tomo_config() self._preprocess_registered_entries() nx_tomo = NXtomo() # 1. root level information # start and end time nx_tomo.start_time = self._get_start_time() nx_tomo.end_time = self._get_end_time() # title nx_tomo.title = self._get_dataset_name() # group size nx_tomo.group_size = self._get_grp_size() # 2. define beam try: energy: pint.Quantity = self._get_user_settable_parameter( param_key="energy_kev", fallback_fct=self._get_energy, input_callback=input_callback, ask_if_0=request_input, ) except TypeError as e: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get energy. Error is %s", e, exc_info=e) energy = None if energy is not None: # TODO: better management of energy ? might be energy.beam or energy.instrument.beam ? nx_tomo.energy = energy # 3. define instrument nx_tomo.instrument.name = self._get_instrument_name() nx_tomo.instrument.detector.data = self._virtual_sources nx_tomo.instrument.detector.image_key_control = self.image_key_control nx_tomo.instrument.detector.count_time = self._acq_expo_time nx_tomo.instrument.detector.roi = self.get_detector_roi() if self.image_key_control is None: _logger.warning( "No image key defined. Unable to determine the number of frame and the type. This is a mandatory field" ) else: n_frames = len(self.image_key_control) nx_tomo.instrument.detector.sequence_number = numpy.linspace( start=0, stop=n_frames, num=n_frames, dtype=numpy.uint32, endpoint=False ) # sample - detector distance try: sample_detector_distance = self._get_user_settable_parameter( param_key="detector_sample_distance_m", fallback_fct=self._get_sample_detector_distance, ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get sample/detector distance") sample_detector_distance = None if sample_detector_distance is not None: nx_tomo.instrument.detector.distance = sample_detector_distance # source - sample detector distance try: source_sample_distance: pint.Quantity = self._get_user_settable_parameter( param_key="source_sample_distance_m", fallback_fct=self._get_source_sample_distance, ) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error("Fail to get source/sample distance") source_sample_distance = None if source_sample_distance is not None: # source_sample_distance is positive when the NXtomo application states it should be negative. So # let's make sure this will be negative source_sample_distance = -numpy.abs(source_sample_distance) nx_tomo.instrument.source.distance = source_sample_distance # handle detector & sample ; x & y pixel size for (nx_obj, attr), params in { (nx_tomo.sample, "x_pixel_size"): { "param_key": "x_sample_pixel_size_m", "fallback_fct": self._get_sample_pixel_size, "axis": "x", }, (nx_tomo.sample, "y_pixel_size"): { "param_key": "y_sample_pixel_size_m", "fallback_fct": self._get_sample_pixel_size, "axis": "y", }, (nx_tomo.instrument.detector, "x_pixel_size"): { "param_key": "x_detector_pixel_size_m", "fallback_fct": self._get_detector_pixel_size, "axis": "x", }, (nx_tomo.instrument.detector, "y_pixel_size"): { "param_key": "y_detector_pixel_size_m", "fallback_fct": self._get_detector_pixel_size, "axis": "y", }, }.items(): try: pixel_size = self._get_user_settable_parameter(**params) except TypeError: # if the path lead to unexpected dataset (group instead of a dataset or a broken folder) _logger.error(f"Fail to get {attr} of {nx_obj}") pixel_size = None else: nx_obj.__setattr__(attr, pixel_size) # flips nx_tomo.instrument.detector.transformations.add_transformation( DetYFlipTransformation(flip=self.lr_flipped) ) nx_tomo.instrument.detector.transformations.add_transformation( DetXFlipTransformation(flip=self.ud_flipped) ) # fov fov = self._get_field_of_view() if fov is not None: nx_tomo.instrument.detector.field_of_view = fov # x_rotation_axis_pixel_position # TODO Missing calibration mechanism from Bliss if self.translation_y is None or len(self.translation_y) < 1: _logger.warning("Unable to find translation_y") elif nx_tomo.sample.x_pixel_size is not None: x_sample_pixel_size = nx_tomo.sample.x_pixel_size translation_y_m = ( self.translation_y[0].to(_ureg.meter).magnitude ) # This is very fragile, no idea how to make it better x_sample_pixel_size = x_sample_pixel_size.to(_ureg.meter).magnitude nx_tomo.instrument.detector.x_rotation_axis_pixel_position = ( translation_y_m / x_sample_pixel_size ) # define tomo_n nx_tomo.instrument.detector.tomo_n = self._get_tomo_n() # 4. define nx source source_name = self._get_source_name() nx_tomo.instrument.source.name = source_name source_type = self._get_source_type() if source_type is not None: if "synchrotron" in source_type.lower(): source_type = SourceType.SYNCHROTRON_X_RAY_SOURCE.value # drop a warning if the source type is invalid if source_type not in SourceType.values(): _logger.warning( f"Source type ({source_type}) is not a 'standard value'" ) nx_tomo.instrument.source.type = source_type # 5. define sample # warning: the NXtomo XYZ reference (see https://manual.nexusformat.org/_static/NeXusManual.pdf - p 26) # # # Y axis # ^ X axis # | / # x-ray | / # --------> ------> Z axis # # is not the same as the ESRF XYZ (see https://tomo.gitlab-pages.esrf.fr/bliss-tomo/master/modelization_sample_stage.html) # # Z axis # ^ Y axis # | / # x-ray |/ # --------> ------> X axis # # nx_tomo.sample.name = self._get_sample_name() assert isinstance(self.rotation_angle, (pint.Quantity, type(None))) nx_tomo.sample.rotation_angle = self.rotation_angle assert isinstance(self.sample_x, (pint.Quantity, type(None))) nx_tomo.sample.z_translation = self.sample_x assert isinstance(self.translation_z, (pint.Quantity, type(None))) nx_tomo.sample.y_translation = self.translation_z assert isinstance(self.sample_y, (pint.Quantity, type(None))) nx_tomo.sample.x_translation = self.sample_y z1 = nx_tomo.instrument.source.distance z2 = nx_tomo.instrument.detector.distance propagation_distance: pint.Quantity | None = self._get_propagation_distance() if propagation_distance is not None: _logger.debug("set propagation distance from bliss metadata") nx_tomo.sample.propagation_distance = propagation_distance elif z1 is not None and z2 is not None: _logger.debug( "compute propagation distance from sample-source and sample-detector distances" ) nx_tomo.sample.propagation_distance = (-z1 * z2) / (-z1 + z2) else: distances = { "sample-detector distance": z1, "source-sample distance": z2, } _logger.warning( "Unable to define propagation distance. %s missing", [key for key, value in distances.items() if value is not None], ) # 6. define control if ( self.configuration.create_control_data and self.known_machine_current not in (None, dict()) ): nx_tomo.control.data = ( deduce_machine_current( timestamps=self._frames_timestamp, known_machine_current=self._known_machine_current_am, ) * _ureg.ampere ) types = set() if nx_tomo.control.data is not None: for d in nx_tomo.control.data: types.add(type(d)) if check_tomo_n: self.check_tomo_n() return (nx_tomo,) def check_tomo_n(self): # check scan is complete tomo_n = self._get_tomo_n() if self.configuration.check_tomo_n and tomo_n is not None: image_key_control = numpy.asarray(self._image_key_control) proj_found = len( image_key_control[image_key_control == ImageKey.PROJECTION.value] ) if proj_found < tomo_n: mess = f"Incomplete scan. Expect {tomo_n} projection but only {proj_found} found" if self.configuration.raises_error is True: raise ValueError(mess) else: _logger.error(mess) def _check_has_metadata(self, url: DataUrl | None = None): url = url or self.root_url if url is None: raise ValueError( "no initialization entry specify, unable to" "retrieve energy" ) def _get_user_settable_parameter( self, param_key, fallback_fct, *fallback_args, **fallback_kwargs, ) -> pint.Quantity | None: """ return value, unit :param fallback_fct: callback function to retrieve the value. Must return a quantity """ value = getattr(self.configuration, param_key, None) if value is not None: units = { "energy_kev": _ureg.keV, "x_sample_pixel_size_m": _ureg.meter, "y_sample_pixel_size_m": _ureg.meter, "x_detector_pixel_size_m": _ureg.meter, "y_detector_pixel_size_m": _ureg.meter, "detector_sample_distance_m": _ureg.meter, "source_sample_distance_m": _ureg.meter, } if param_key not in units: raise ValueError(f"key {param_key} is not handled") unit = units[param_key] value = value * unit else: value = fallback_fct(*fallback_args, **fallback_kwargs) assert value is None or isinstance( value, pint.Quantity ), "fallback must return a pint.Quantity" return value