Source code for nxtomomill.converter.fluo.fluoconverter
# coding: utf-8
"""
module to convert fluo-tomo files (after PyMCA fit, tif files) to (nexus tomo compliant) .nx
"""
from __future__ import annotations
import logging
import os
from tqdm import tqdm
from tomoscan.esrf.scan.fluoscan import FluoTomoScan
from nxtomo import NXtomo
from nxtomo.nxobject.nxdetector import ImageKey
from nxtomomill import utils
from nxtomomill.io.config.fluoconfig import TomoFluoConfig
_logger = logging.getLogger(__name__)
__all__ = [
"from_fluo_to_nx",
]
[docs]def from_fluo_to_nx(
configuration: TomoFluoConfig, progress: tqdm | None = None
) -> tuple:
"""
Converts an fluo-tomo tiff files to a nexus file.
For now duplicates data.
:param configuration: configuration to use to process the data
:param progress: if provided then will be updated with conversion progress
:return: (nexus_file, entry)
"""
if configuration.input_folder is None:
raise ValueError("input_folder should be provided")
if not os.path.isdir(configuration.input_folder):
raise OSError(f"{configuration.input_folder} is not a valid folder path")
if configuration.output_file is None:
raise ValueError("output_file should be provided")
if configuration.detectors is None:
raise ValueError("Detector names should be provided.")
fileout_h5 = utils.get_file_name(
file_name=configuration.output_file,
extension=configuration.file_extension,
check=True,
)
scan = FluoTomoScan(
scan=configuration.input_folder,
dataset_basename=configuration.dataset_basename,
detectors=configuration.detectors,
)
if progress is not None:
progress.set_description("fluo2nx")
progress.total = len(scan.el_lines)
progress.n = 0
_logger.info(f"Fluo lines preset in dataset are {scan.el_lines}")
entry_list = []
for element, lines in scan.el_lines.items():
if progress is not None:
progress.set_postfix_str(f"elmt - {element}")
line_progress = tqdm(
desc=f"elmt: {element} - line: ", position=1, leave=False
)
line_progress.total = len(lines)
else:
line_progress = None
for i_line, line in enumerate(lines):
if line_progress is not None:
line_progress.set_postfix_str(f"elmt: {element} - line: {line}")
for det in scan.detectors:
elmt_line_data = scan.load_data(det, element=element, line_ind=i_line)
my_nxtomo = NXtomo()
my_nxtomo.instrument.detector.data = elmt_line_data
my_nxtomo.instrument.detector.image_key_control = [
ImageKey.PROJECTION
] * elmt_line_data.shape[0]
my_nxtomo.sample.rotation_angle = scan.rot_angles_deg
my_nxtomo.instrument.detector.x_pixel_size.value = scan.pixel_size
my_nxtomo.instrument.detector.x_pixel_size.unit = "um"
my_nxtomo.instrument.detector.y_pixel_size.value = scan.pixel_size
my_nxtomo.instrument.detector.y_pixel_size.unit = "um"
# define a value to sample-detector and source-sample distance. To be set to the real value in the future
my_nxtomo.instrument.detector.distance.value = 1.0
my_nxtomo.instrument.source.distance.value = 1.0
my_nxtomo.energy = scan.energy
data_path = f"{det}_{element}_{line}"
my_nxtomo.save(
file_path=fileout_h5,
data_path=data_path,
overwrite=configuration.overwrite,
)
entry_list.append((fileout_h5, data_path))
if line_progress is not None:
line_progress.update()
if progress is not None:
progress.update()
return tuple(entry_list)