"""Scan dedicated for bliss format - based on EDF files"""
from __future__ import annotations
import logging
import os
import glob
import pint
import numpy
from tomoscan.identifier import ScanIdentifier
from tomoscan.scanbase import TomoScanBase
from tomoscan.utils import docstring
from dataclasses import dataclass, field
import numpy as np
from numpy.typing import NDArray, DTypeLike
from silx.io.utils import h5py_read_dataset
try:
from tqdm.auto import tqdm
except ImportError:
has_tqdm = False
else:
has_tqdm = True
import h5py
_ureg = pint.get_application_registry()
_logger = logging.getLogger(__name__)
try:
import tifffile # noqa #F401 needed for later possible lazy loading
except ImportError:
has_tifffile = False
else:
has_tifffile = True
__all__ = ["FluoTomoScanBase", "FluoTomoScan3D", "FluoTomoScan2D"]
[docs]@dataclass
class FluoTomoScanBase:
"""Dataset manipulation class."""
scan: str
dataset_basename: str
detectors: tuple = ()
dtype: DTypeLike = numpy.float32
verbose: bool = False
angles: float | None = None
"""rotation angles in degree"""
el_lines: dict[str, list[str]] = field(default_factory=dict)
pixel_size: float | None = None
"""pixel size in meter"""
energy: float | None = None
"""energy in keV"""
detected_folders: list[str] = field(default_factory=list)
def __post_init__(self):
self.detected_folders = self.detect_folders()
self.detected_detectors = tuple(self.detect_detectors())
self.get_metadata_from_h5_file()
_logger.info(f"Detectors: {self.detected_detectors}")
if len(self.detectors) == 0:
self.detectors = self.detected_detectors
for det in self.detectors:
if det not in self.detected_detectors:
raise ValueError(
f"The detector should be in {self.detected_detectors} and is {self.detectors}"
)
self.detect_elements()
if self.angles is None:
self.angles = self.detect_rot_angles()
[docs] def detect_folders(self) -> list[str]:
"""List all folders to process."""
raise NotImplementedError("Base class")
@property
def rot_angles_deg(self) -> NDArray:
if self.angles is None:
raise ValueError("Rotation angles not initialized")
return self.angles * _ureg.deg
@property
def rot_angles_rad(self) -> NDArray:
return self.rot_angles_deg.to(_ureg.rad)
[docs] def detect_rot_angles(self) -> NDArray:
"""Build rotation angles list."""
raise NotImplementedError("Base class")
def _check_ready_to_load_data(self, det):
if not has_tifffile:
raise RuntimeError("tiffile not install. Cannot load data.")
if det not in self.detectors:
raise RuntimeError(
f"The detector {det} is invalid. Valid ones are {self.detectors}"
)
if self.angles is None:
raise RuntimeError("Rotation angles not initialized")
if self.detectors is None:
raise RuntimeError("Detectors not initialized")
def get_metadata_from_h5_file(self):
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to load metadata")
h5_path = os.path.join(self.scan, self.detected_folders[0])
h5_files = glob.glob1(h5_path, "*.h5")
if len(h5_files) > 1:
raise ValueError(
"More than one hdf5 file in scan directory. Expect only ONE to pick pixel size."
)
elif len(h5_files) == 0:
pattern = os.path.join(h5_path, "*.h5")
raise ValueError(
f"Unable to find the hdf5 file in scan directory to pick pixel size. RegExp used is {pattern}"
)
else:
with h5py.File(os.path.join(h5_path, h5_files[0]), "r") as f:
if len(list(f.keys())) != 1:
raise ValueError(
f"H5 file should contain only one entry, found {len(list(f.keys()))}"
)
else:
entry_name = list(f.keys())[0]
self.pixel_size = (
(
h5py_read_dataset(f[entry_name]["FLUO"]["pixelSize"])
* _ureg.micrometer
)
.to(_ureg.meter)
.magnitude
)
self.energy = float(
h5py_read_dataset(
f[entry_name]["instrument"]["monochromator"]["energy"]
)
)
def detect_detectors(self):
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to detect detectors")
proj_1_dir = os.path.join(self.scan, "fluofit", self.detected_folders[0])
detected_detectors = []
file_names = glob.glob1(proj_1_dir, "IMG_*area_density_ngmm2.tif")
for file in file_names:
det = file.split("_")[1]
if det not in detected_detectors:
detected_detectors.append(det)
if "" in detected_detectors:
raise ValueError(
f"Suspicious detector! Detected detectors are {detected_detectors}. Please use --detectors <det1>... where det1 are valid detector name."
)
return detected_detectors
def detect_elements(self):
if len(self.detected_folders) == 0:
raise ValueError("No folder found, unable to detect elements")
proj_1_dir = os.path.join(self.scan, "fluofit", self.detected_folders[0])
detector = self.detectors[0]
file_names = glob.glob1(proj_1_dir, f"IMG_{detector}*area_density_ngmm2.tif")
for file in file_names:
el_str = file.split("_")[2]
element, line = el_str.split("-")
try:
if line not in self.el_lines[element]:
self.el_lines[element].append(line)
self.el_lines[element] = sorted(self.el_lines[element])
except KeyError:
self.el_lines[element] = [line]
[docs] def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
"""Main function of class to load data."""
raise NotImplementedError("Base class")
[docs] @staticmethod
def from_identifier(identifier):
"""Return the Dataset from a identifier"""
raise NotImplementedError("Not implemented for fluo-tomo yet.")
[docs] @docstring(TomoScanBase)
def get_identifier(self) -> ScanIdentifier:
raise NotImplementedError("Not implemented for fluo-tomo yet.")
[docs]@dataclass
class FluoTomoScan3D(FluoTomoScanBase):
"""Dataset manipulation class."""
[docs] def detect_folders(self):
fit_folders = glob.glob1(
os.path.join(self.scan, "fluofit"), rf"{self.dataset_basename}_projection*"
)
fit_folders.sort()
proj_indices = [int(f[-3:]) for f in fit_folders]
if len(fit_folders) < proj_indices[-1]:
_logger.debug(
f"The number of projections ({len(fit_folders)}) is lower than the largest projection index {proj_indices[-1]}. Some projections are probably missing."
)
if len(fit_folders) == 0:
raise FileNotFoundError(
"No projection was found in the fluofit folder. The searched for pattern is <scan_dir>/fluofit/<dataset_basename>_projection*'."
)
elif not os.path.isdir(os.path.join(self.scan, fit_folders[0])):
raise FileNotFoundError(
"Found fitted data folders but not the corresponding raw data folder."
)
else:
return fit_folders
[docs] def detect_rot_angles(self) -> NDArray:
tmp_angles = []
for f in self.detected_folders:
prj_dir = os.path.join(self.scan, "fluofit", f)
info_file = os.path.join(prj_dir, "info.txt")
try:
with open(info_file, "r") as f:
info_str = f.read()
tmp_angles.append(float(info_str.split(" ")[2]))
except FileNotFoundError:
_logger.debug(
f"{info_file} doesn't exist, while expected to be present in each projection folder."
)
raise FileNotFoundError(
f"{info_file} doesn't exist, while expected to be present in each projection folder."
)
_logger.info(f"Correctly found all {len(tmp_angles)} rotation angles.")
return numpy.array(tmp_angles, ndmin=1, dtype=numpy.float32)
[docs] def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
self._check_ready_to_load_data(det)
line = self.el_lines[element][line_ind]
data_det = []
description = f"Loading images of {element}-{line} ({det}): "
if has_tqdm:
proj_iterator = tqdm(
range(len(self.detected_folders)),
disable=self.verbose,
desc=description,
)
else:
proj_iterator = range(len(self.detected_folders))
for ii_i in proj_iterator:
proj_dir = os.path.join(
self.scan,
"fluofit",
self.detected_folders[ii_i],
)
img_path = os.path.join(
proj_dir, f"IMG_{det}_{element}-{line}_area_density_ngmm2.tif"
)
if self.verbose:
_logger.info(f"Loading {ii_i + 1} / {len(self.angles)}: {img_path}")
img = tifffile.imread(img_path)
data_det.append(numpy.nan_to_num(numpy.array(img, dtype=self.dtype)))
data = numpy.array(data_det)
return numpy.ascontiguousarray(data)
[docs]@dataclass
class FluoTomoScan2D(FluoTomoScanBase):
"""Dataset manipulation class."""
def get_metadata_from_h5_file(self):
super().get_metadata_from_h5_file()
h5_path = os.path.join(self.scan, self.detected_folders[0])
h5_files = glob.glob1(h5_path, "*.h5")
if len(h5_files) > 1:
raise ValueError(
"More than one hdf5 file in scan directory. Expect only ONE to pick pixel size."
)
elif len(h5_files) == 0:
pattern = os.path.join(h5_path, "*.h5")
raise ValueError(
f"Unable to find the hdf5 file in scan directory to pick pixel size. RegExp used is {pattern}"
)
else:
with h5py.File(os.path.join(h5_path, h5_files[0]), "r") as f:
if len(list(f.keys())) != 1:
raise ValueError(
f"H5 file should contain only one entry, found {len(list(f.keys()))}"
)
else:
entry_name = list(f.keys())[0]
self.scanRange_2 = float(
h5py_read_dataset(f[entry_name]["FLUO"]["scanRange_2"])
)
self.scanDim_2 = int(
h5py_read_dataset(f[entry_name]["FLUO"]["scanDim_2"])
)
[docs] def detect_rot_angles(self) -> NDArray:
nb_projs = self.scanDim_2
angular_coverage = self.scanRange_2
return np.linspace(0, angular_coverage, nb_projs, endpoint=True)
[docs] def detect_folders(self):
fit_folder = os.path.join(self.scan, "fluofit", self.dataset_basename)
if not os.path.isdir(fit_folder):
raise FileNotFoundError(
f"No folder {fit_folder} was found in the fluofit folder. The searched for pattern is <scan_dir>/fluofit/<dataset_basename>'."
)
elif not os.path.isdir(os.path.join(self.scan, self.dataset_basename)):
raise FileNotFoundError(
"Found fitted data folders but not the corresponding raw data folder."
)
else:
return [
self.dataset_basename,
]
[docs] def load_data(self, det: str, element: str, line_ind: int = 0) -> NDArray:
self._check_ready_to_load_data(det)
line = self.el_lines[element][line_ind]
data_det = []
description = f"Loading images of {element}-{line} ({det}): "
if has_tqdm:
slice_iterator = tqdm(
range(len(self.detected_folders)),
disable=self.verbose,
desc=description,
)
else:
slice_iterator = range(len(self.detected_folders))
for ii_i in slice_iterator:
proj_dir = os.path.join(
self.scan,
"fluofit",
self.dataset_basename, # WARNING: dataset_basename is ONE SINGLE sinogram.
)
img_path = os.path.join(
proj_dir, f"IMG_{det}_{element}-{line}_area_density_ngmm2.tif"
)
if self.verbose:
_logger.info(f"Loading {ii_i + 1} / {len(self.angles)}: {img_path}")
img = tifffile.imread(img_path)
data_det.append(numpy.nan_to_num(numpy.array(img, dtype=self.dtype)))
data = numpy.array(data_det)
return numpy.ascontiguousarray(data)