Source code for nxtomomill.tests.utils.bliss

# coding: utf-8

from __future__ import annotations

import datetime
import os

import numpy
import h5py
from tomoscan.io import HDF5File


[docs]class MockBlissAcquisition: """ :param n_sequence: number of sequence to create :param n_scan_per_sequence: number of scans (projection series) per sequence :param n_projections_per_scan: number of projection frame in a scan :param n_darks: number of dark frame in the series. Only one series at the beginning :param n_flats: number of flats to create. In this case will only create one series of n flats after dark if any :param output_dir: will contain the proposal file and one folder per sequence. :param str acqui_type: acquisition type. Can be "basic", "zseries-v1", "zseries-v3 :param Iterable z_values: if acqui_type is zseries then users should provide the serie of values for z (one per stage) :param nb_loop: number of pcotomo loop for v1 of bliss pcotomo :param nb_tomo: number of tomo per loop for v1 of bliss pcotomo :param nb_turns: number of turns for v2 of bliss pcotomo ( <=> nb NXtomo to generate) :param file_name_prefix: bliss file prefix name :param file_name_z_fill: optional z fill for the file name index. If None then file index will not be 'z filled' :param create_tomo_config: if True create the 'tomo_config' group under instrument which contains metadata describing the acquisition (which dataset to read for rotation, translation ...) """
[docs] def __init__( self, n_sample, n_sequence, n_scan_per_sequence, n_darks, n_flats, output_dir, with_nx_detector_attr=True, detector_name="pcolinux", acqui_type="basic", z_values=None, nb_loop=None, nb_tomo=None, nb_turns=None, with_rotation_motor_info=True, frame_data_type=numpy.uint16, file_name_prefix="sample", file_name_z_fill=None, create_tomo_config: bool = True, ebs_tomo_version: str | None = None, z_series_v_3_options=None, ): self._n_darks = n_darks self._n_flats = n_flats self._n_scan_per_sequence = n_scan_per_sequence self.__folder = output_dir if not os.path.exists(output_dir): os.makedirs(output_dir) self.__proposal_file = os.path.join(self.__folder, "ihproposal_file.h5") if acqui_type not in ("pcotomo"): if nb_loop is not None or nb_tomo is not None: raise ValueError( "nb_loop and nb_tomo are only handled by acqui_type: `pcotomo`" ) else: if not ( nb_turns is not None or (nb_loop is not None and nb_tomo is not None) ): raise ValueError( "nb_turns should be provided or nb_loop and nb_tomo must be provided" ) # create sample self.__samples = [] for sample_i in range(n_sample): if file_name_z_fill is None: dir_name = f"{file_name_prefix}_{sample_i}" else: dir_name = f"{file_name_prefix}_{str(sample_i).zfill(file_name_z_fill)}" sample_dir = os.path.join(self.path, dir_name) os.mkdir(sample_dir) sample_file = os.path.join(sample_dir, dir_name + ".h5") if acqui_type == "basic": acqui_tomo = _BlissBasicTomo( sample_dir=sample_dir, sample_file=sample_file, n_sequence=n_sequence, n_scan_per_sequence=n_scan_per_sequence, n_darks=n_darks, n_flats=n_flats, with_nx_detector_attr=with_nx_detector_attr, detector_name=detector_name, with_rotation_motor_info=with_rotation_motor_info, frame_data_type=frame_data_type, create_tomo_config=create_tomo_config, ebs_tomo_version=ebs_tomo_version, ) elif acqui_type == "pcotomo": acqui_tomo = _BlissPCOTomo( sample_dir=sample_dir, sample_file=sample_file, n_sequence=n_sequence, n_scan_per_sequence=n_scan_per_sequence, n_darks=n_darks, n_flats=n_flats, with_nx_detector_attr=with_nx_detector_attr, detector_name=detector_name, with_rotation_motor_info=with_rotation_motor_info, nb_loop=nb_loop, nb_tomo=nb_tomo, nb_turns=nb_turns, frame_data_type=frame_data_type, create_tomo_config=create_tomo_config, ebs_tomo_version=ebs_tomo_version, ) elif acqui_type in ("z-series-v1", "z-series-v3"): if z_values is None: raise ValueError("for z-series z_values should be provided") acqui_tomo = _BlissZseriesTomo( sample_dir=sample_dir, sample_file=sample_file, n_sequence=n_sequence, n_scan_per_sequence=n_scan_per_sequence, n_darks=n_darks, n_flats=n_flats, with_nx_detector_attr=with_nx_detector_attr, detector_name=detector_name, z_values=z_values, with_rotation_motor_info=with_rotation_motor_info, frame_data_type=frame_data_type, create_tomo_config=create_tomo_config, ebs_tomo_version=ebs_tomo_version, z_series_version=acqui_type.split("-")[-1], z_series_v_3_options=z_series_v_3_options, ) else: raise NotImplementedError("") self.__samples.append(acqui_tomo)
@property def samples(self): return self.__samples @property def proposal_file(self): # for now a simple file return self.__proposal_file @property def path(self): return self.__folder
class _BlissSample: """ Simple mock of a bliss sample. For now we only create the hierarchy of files. """ def __init__( self, sample_dir, sample_file, n_sequence, n_scan_per_sequence, n_darks, n_flats, detector_name, with_nx_detector_attr=True, with_rotation_motor_info=True, frame_data_type=numpy.uint16, create_tomo_config: bool = True, ebs_tomo_version: str | None = None, ): self._with_nx_detector_attr = with_nx_detector_attr self._sample_dir = sample_dir self._sample_file = sample_file self._n_sequence = n_sequence self._n_scan_per_seq = n_scan_per_sequence self._n_darks = n_darks self._n_flats = n_flats self._scan_folders = [] self._index = 1 self._detector_name = detector_name self._det_width = 64 self._det_height = 64 self._tomo_n = 10 self._energy = 19.0 self._sample_detector_distance = 100.0 # in mm self._source_sample_distance = 52000.0 # in mm self._pixel_size = (0.0065, 0.0066) self._with_rotation_motor_info = with_rotation_motor_info self._frame_data_type = frame_data_type self._create_tomo_config = create_tomo_config self._ebs_tomo_version = ebs_tomo_version for _ in range(n_sequence): self.add_sequence() @property def frame_data_type(self): return self._frame_data_type def get_next_free_index(self): idx = self._index self._index += 1 return idx @property def current_scan_index(self) -> int: return self._index - 1 def get_main_entry_title(self): raise NotImplementedError("Base class") @staticmethod def get_title(scan_type): if scan_type == "dark": return "dark images" elif scan_type == "flat": return "reference images 1" elif scan_type == "projection": return "projections 1 - 2000" else: raise ValueError("Not implemented") def create_entry_and_technique(self, seq_ini_index): # add sequence init information with HDF5File(self.sample_file, mode="a") as h5f: seq_node = h5f.require_group(str(seq_ini_index) + ".1") seq_node.attrs["NX_class"] = "NXentry" seq_node["title"] = self.get_main_entry_title() seq_node.require_group("instrument/positioners") # write energy seq_node["technique/scan/energy"] = self._energy seq_node["technique/scan/tomo_n"] = self._tomo_n * self._n_scan_per_seq seq_node["technique/scan/sample_detector_distance"] = ( self._sample_detector_distance ) seq_node["technique/scan/sample_detector_distance"].attrs["units"] = "mm" seq_node["technique/scan/source_sample_distance"] = ( self._source_sample_distance ) seq_node["technique/scan/source_sample_distance"].attrs["units"] = "mm" seq_node["technique/detector/pixel_size"] = numpy.asarray(self._pixel_size) seq_node["start_time"] = str(datetime.datetime.now()) seq_node["end_time"] = str( datetime.datetime.now() + datetime.timedelta(minutes=10) ) if self._create_tomo_config: self._add_tomo_config(seq_node) @staticmethod def get_next_group_name(seq_ini_index, scan_idx): return str(scan_idx) + ".1" def add_scan( self, scan_type, seq_ini_index, z_value, skip_title=False, nb_loop=None, nb_tomo=None, nb_turns=1, ): """ :param nb_loop: number of loop in pcotomo use case. Else must be 1 :param nb_tomo: number of tomography done in pcotomo 'per iteration' use case. Else must be 1 """ scan_idx = self.get_next_free_index() scan_name = str(scan_idx).zfill(4) scan_path = os.path.join(self.path, scan_name) self._scan_folders.append(_BlissScan(folder=scan_path, scan_type=scan_type)) if nb_turns is not None: nb_nxtomo = nb_turns if nb_tomo is not None or nb_loop is not None: raise ValueError( "nb_tomo and nb_loop should be provided or nb_turns. Not both" ) elif nb_loop is not None and nb_tomo is not None: nb_nxtomo = nb_loop * nb_tomo if nb_turns is not None: raise ValueError( "nb_tomo and nb_loop should be provided or nb_turns. Not both" ) else: raise ValueError( "nb_tomo and nb_loop should be provided or nb_turns. None provided" ) # register the scan information with HDF5File(self.sample_file, mode="a") as h5f: seq_node = h5f.require_group(str(scan_idx) + ".1") if "start_time" not in seq_node: seq_node["start_time"] = str(datetime.datetime.now()) # write title title = self.get_title(scan_type=scan_type) if not skip_title: seq_node["title"] = title # write data data = ( numpy.random.random( self._det_height * self._det_width * self._tomo_n * nb_nxtomo ) * 256 ) n_frames = self._tomo_n * nb_nxtomo data = data.reshape(n_frames, self._det_height, self._det_width) data = data.astype(self.frame_data_type) det_path_1 = "/".join(("instrument", self._detector_name)) det_grp = seq_node.require_group(det_path_1) det_grp["data"] = data if self._with_nx_detector_attr: det_grp.attrs["NX_class"] = "NXdetector" acq_grp = det_grp.require_group("acq_parameters") acq_grp["acq_expo_time"] = 4 det_path_2 = "/".join(("technique", "scan", self._detector_name)) seq_node[det_path_2] = data seq_node.attrs["NX_class"] = "NXentry" # write rotation angle value and translations instrument_group = seq_node.require_group("instrument") positioners_grp = instrument_group.require_group("positioners") positioners_grp["hrsrot"] = numpy.linspace( start=0.0, stop=360, num=n_frames ) positioners_grp["sx"] = numpy.array(numpy.random.random(size=n_frames)) positioners_grp["sy"] = numpy.random.random(size=n_frames) positioners_grp["sz"] = numpy.asarray([z_value] * n_frames) positioners_grp["yrot"] = numpy.random.random(size=n_frames) if self._with_rotation_motor_info: scan_node = seq_node.require_group("technique/scan") scan_node["motor"] = ("rotation", "hrsrot", "srot") if self._ebs_tomo_version is not None: technique_group = seq_node.require_group("technique") technique_group.attrs["tomo_version"] = self._ebs_tomo_version def _add_tomo_config(self, group: h5py.Group): technique_group = group.require_group("technique") tomo_config_group = technique_group.require_group("tomoconfig") tomo_config_group["rotation"] = ["hrsrot"] tomo_config_group["detector"] = [ self._detector_name, ] tomo_config_group["sample_x"] = ["sx"] tomo_config_group["sample_y"] = ["sy"] tomo_config_group["translation_z"] = ["sz"] tomo_config_group["translation_y"] = ["yrot"] def add_sequence(self): """Add a sequence to the bliss file""" raise NotImplementedError("Base class") @property def path(self): return self._sample_dir @property def sample_directory(self): return self._sample_dir @property def sample_file(self): return self._sample_file def scans_folders(self): return self._scan_folders @property def n_darks(self): return self._n_darks @property def with_rotation_motor_info(self): return self._with_rotation_motor_info class _BlissScan: """ mock of a bliss scan """ def __init__(self, folder, scan_type: str): assert scan_type in ("dark", "flat", "projection") self.__path = folder def path(self): return self.__path class _BlissBasicTomo(_BlissSample): def get_main_entry_title(self): return "tomo:fullturn" def add_sequence(self): # reserve the index for the 'initialization' sequence. No scan folder # will be created for this one. seq_ini_index = self.get_next_free_index() self.create_entry_and_technique(seq_ini_index=seq_ini_index) if self.n_darks > 0: self.add_scan(scan_type="dark", seq_ini_index=seq_ini_index, z_value=1) if self._n_flats > 0: self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1) for _ in range(self._n_scan_per_seq): self.add_scan( scan_type="projection", seq_ini_index=seq_ini_index, z_value=1 ) class _BlissPCOTomo(_BlissSample): def __init__( self, sample_dir, sample_file, n_sequence, n_scan_per_sequence, n_darks, n_flats, detector_name, with_nx_detector_attr=True, with_rotation_motor_info=True, nb_loop=None, nb_tomo=None, nb_turns=1, frame_data_type=numpy.uint16, create_tomo_config: bool = True, ebs_tomo_version=None, ): self.nb_loop = nb_loop self.nb_tomo = nb_tomo self.nb_turns = nb_turns super().__init__( sample_dir, sample_file, n_sequence, n_scan_per_sequence, n_darks, n_flats, detector_name, with_nx_detector_attr, with_rotation_motor_info=with_rotation_motor_info, frame_data_type=frame_data_type, create_tomo_config=create_tomo_config, ebs_tomo_version=ebs_tomo_version, ) if nb_loop is not None and nb_tomo is not None: if nb_turns is not None: raise ValueError( "All of nb_loop, nb_tomo and nb_turns provided. Unable to deduce the pcotomo version" ) pcotomo_version = 1 elif nb_turns is not None: pcotomo_version = 2 else: pcotomo_version = None if pcotomo_version is not None: # write Bliss version in attrs with HDF5File(self.sample_file, mode="a") as h5f: if "creator_version" not in h5f.attrs: if pcotomo_version == 1: h5f.attrs["creator_version"] = "1.2.3" if pcotomo_version == 2: h5f.attrs["creator_version"] = "1.10.0" def get_main_entry_title(self): return "tomo:pcotomo" def add_sequence(self): # reserve the index for the 'initialization' sequence. No scan folder # will be created for this one. seq_ini_index = self.get_next_free_index() self.create_entry_and_technique(seq_ini_index=seq_ini_index) # start dark if self.n_darks > 0: self.add_scan(scan_type="dark", seq_ini_index=seq_ini_index, z_value=1) # start flat if self._n_flats > 0: self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1) for _ in range(self._n_scan_per_seq): self.add_scan( scan_type="projection", seq_ini_index=seq_ini_index, z_value=1, nb_loop=self.nb_loop, nb_tomo=self.nb_tomo, nb_turns=self.nb_turns, ) # end flat if self._n_flats > 0: self.add_scan(scan_type="flat", seq_ini_index=seq_ini_index, z_value=1) def add_scan( self, scan_type, seq_ini_index, z_value, skip_title=False, nb_loop=None, nb_tomo=None, nb_turns=1, ): super().add_scan( scan_type, seq_ini_index, z_value, skip_title, nb_loop, nb_tomo, nb_turns ) if scan_type == "projection": # register pcotomo specific informations (only for projections) with HDF5File(self.sample_file, mode="a") as h5f: seq_node = h5f.require_group(str(self._index - 1) + ".1") scan_grp = seq_node.require_group("technique/proj") if nb_loop is not None and "nb_loop" not in scan_grp: scan_grp["nb_loop"] = nb_loop if nb_tomo is not None and "nb_tomo" not in scan_grp: scan_grp["nb_tomo"] = nb_tomo if nb_turns is not None and "nb_turns" not in scan_grp: scan_grp["nb_turns"] = nb_turns if "tomo_n" not in scan_grp: scan_grp["tomo_n"] = self._tomo_n class _BlissZseriesTomo(_BlissSample): def __init__( self, sample_dir, sample_file, n_sequence, n_scan_per_sequence, n_darks, n_flats, detector_name, z_values, z_series_version: str, with_nx_detector_attr=True, with_rotation_motor_info=True, frame_data_type=numpy.uint16, create_tomo_config: bool = True, ebs_tomo_version: str = None, z_series_v_3_options=None, ): assert z_series_version in ("v1", "v3") self.z_series_version = z_series_version self._z_values = z_values self._z_series_v_3_options = z_series_v_3_options super().__init__( sample_dir=sample_dir, sample_file=sample_file, n_sequence=n_sequence, n_scan_per_sequence=n_scan_per_sequence, n_darks=n_darks, n_flats=n_flats, detector_name=detector_name, with_nx_detector_attr=with_nx_detector_attr, with_rotation_motor_info=with_rotation_motor_info, frame_data_type=frame_data_type, create_tomo_config=create_tomo_config, ebs_tomo_version=ebs_tomo_version, ) def get_main_entry_title(self): return "tomo:zseries" def create_dark_at_start(self) -> bool: if self.z_series_version == "v1": return True else: return self._z_series_v_3_options["dark_at_start"] def create_flat_at_start(self) -> bool: if self.z_series_version == "v1": return True else: return self._z_series_v_3_options["flat_at_start"] def create_dark_at_end(self) -> bool: if self.z_series_version == "v1": return True else: return self._z_series_v_3_options["dark_at_end"] def create_flat_at_end(self) -> bool: if self.z_series_version == "v1": return True else: return self._z_series_v_3_options["flat_at_end"] def create_intermediary_flat(self) -> bool: return self.z_series_version == "v1" def create_intermediary_dark(self) -> bool: return self.z_series_version == "v1" def add_sequence(self): # reserve the index for the 'initialization' sequence. No scan folder # will be created for this one. if self.z_series_version == "v1": seq_ini_index = self.get_next_free_index() self.create_entry_and_technique(seq_ini_index=seq_ini_index) for z_value in self._z_values: if self.z_series_version == "v3": seq_ini_index = self.get_next_free_index() self.create_entry_and_technique(seq_ini_index=seq_ini_index) if z_value == self._z_values[0]: create_dark = self.create_dark_at_start() create_flat = self.create_flat_at_start() elif z_value == self._z_values[-1]: create_dark = self.create_dark_at_end() create_flat = self.create_flat_at_end() else: create_dark = self.create_intermediary_dark() create_flat = self.create_intermediary_flat() if create_dark and self.n_darks > 0: self.add_scan( scan_type="dark", seq_ini_index=seq_ini_index, z_value=z_value ) if create_flat and self._n_flats > 0: self.add_scan( scan_type="flat", seq_ini_index=seq_ini_index, z_value=z_value ) for _ in range(self._n_scan_per_seq): self.add_scan( scan_type="projection", seq_ini_index=seq_ini_index, z_value=z_value ) def create_entry_and_technique(self, seq_ini_index): super().create_entry_and_technique(seq_ini_index=seq_ini_index) # add sequence init information if self.z_series_version == "v3": with HDF5File(self.sample_file, mode="a") as h5f: seq_node = h5f.require_group(str(seq_ini_index) + ".1") seq_node.attrs["NX_class"] = "NXentry" # write scab flags seq_node["technique/scan_flags/dark_images_at_start"] = ( self._z_series_v_3_options["dark_at_start"] ) seq_node["technique/scan_flags/dark_images_at_end"] = ( self._z_series_v_3_options["dark_at_end"] ) seq_node["technique/scan_flags/ref_images_at_start"] = ( self._z_series_v_3_options["flat_at_start"] ) seq_node["technique/scan_flags/ref_images_at_end"] = ( self._z_series_v_3_options["flat_at_end"] ) def add_scan( self, scan_type, seq_ini_index, z_value, skip_title=False, nb_loop=None, nb_tomo=None, nb_turns=1, ): super().add_scan( scan_type, seq_ini_index, z_value, skip_title, nb_loop, nb_tomo, nb_turns ) with HDF5File(self.sample_file, mode="a") as h5f: seq_node = h5f.require_group(str(self.current_scan_index) + ".1") seq_node["sample/name"] = "mysample_0000"