from __future__ import annotations
import os
import h5py
import numpy
import logging
from tqdm import tqdm
from silx.io.utils import open as open_hdf5
from silx.io.utils import h5py_read_dataset
from silx.io.url import DataUrl
from nxtomomill.utils.utils import str_datetime_to_numpy_datetime64
from nxtomomill.models.h52nx.FrameGroup import (
FrameGroup,
filter_acqui_frame_type,
)
from nxtomomill.models.h52nx._acquisitionstep import AcquisitionStep
from nxtomomill.io.config import TomoHDF5Config
from nxtomomill.models.utils import _get_title_dataset
from nxtomomill.utils.hdf5 import EntryReader
from nxtomomill.converter.hdf5.acquisition.multitomo import MultiTomoAcquisition
from nxtomomill.converter.hdf5.acquisition.backandforth import BackAndForthAcquisition
from .acquisition.utils.bliss_scan_type import get_bliss_scan_type
from .acquisition.baseacquisition import BaseAcquisition
from .acquisition.standardacquisition import StandardAcquisition
from .acquisition.zseriesacquisition import (
ZSeriesBaseAcquisition,
is_z_series_sequence,
is_z_series_frm_translation_z,
)
try:
import hdf5plugin # noqa F401
except ImportError:
pass
_logger = logging.getLogger(__name__)
[docs]def is_multitomo_sequence(
entry: h5py.Group, configuration: TomoHDF5Config, fallback_on_title: bool = True
) -> bool:
"""
Check if the provided h5py.Group must be consider as an "initialization" of a sequence. And if so then if it is a multi-tomo sequence.
It will first check for value contained in technique/scan_category else for the title name (legacy)
"""
# check 'technique/scan_category' first
scan_category = entry.get("technique/scan_category", None)
if scan_category is not None:
category_name = h5py_read_dataset(scan_category)
elif fallback_on_title:
# else fallback on title check
category_name = _get_title_dataset(
entry=entry, title_paths=BaseAcquisition.TITLE_PATHS
)
else:
category_name = None
if category_name is None:
return False
for multitomo_init_title in configuration.multitomo_init_titles:
if category_name.startswith(multitomo_init_title):
return True
return False
[docs]def is_back_and_forth_sequence(
entry: h5py.Group, configuration: TomoHDF5Config, fallback_on_title: bool = True
) -> bool:
"""
Check if the provided h5py.Group must be consider as an "initialization" of a sequence. And if so then if it is a back-and-forth sequence.
It will first check for value contained in technique/scan_category else for the title name (legacy)
"""
# check 'technique/scan_category' first
scan_category = entry.get("technique/scan_category", None)
if scan_category is not None:
category_name = h5py_read_dataset(scan_category)
elif fallback_on_title:
# else fallback on title check
category_name = _get_title_dataset(
entry=entry, title_paths=BaseAcquisition.TITLE_PATHS
)
else:
category_name = None
if category_name is None:
return False
for back_and_forth_init_title in configuration.back_and_forth_init_titles:
if category_name.startswith(back_and_forth_init_title):
return True
return False
class _AcquisitionConstructorBase:
"""
Base class of an acquisition constructor.
The role of an acquisition constructor is to create instances of nxtomomill 'BaseAcquisition'.
Then associate each bliss scan to his role (dark, flat, projs) and his (nxtomomill) acquisition.
"""
def __init__(
self,
configuration: TomoHDF5Config,
progress: tqdm | None,
detector_sel_callback,
):
"""Constructor that build nxtomomill acquisition sequence from a set of entry"""
self.configuration = configuration
self.progress = progress
self.detector_sel_callback = detector_sel_callback
self._acquisitions = []
self._current_acquisition = None
"""Pointer to the `BaseAcquisition` instance to be populated (we expect to add darks, flat and projections to it when iterating on bliss scans)"""
self._start_index = 0
"""Counter on the index of the NXtomo to be created. This allow us to know from which nxtomo index a new `BaseAcquisition` must start from"""
self._need_to_split_to_several_nxtomo = False
"""
Flag required by multitomo and back-and-forth sequences.
We need to wait until all bliss scans are registered to the nxtomomill Acquisition to update 'start_index'.
It true at the start of a new the sequence we increase the start index by (nb_loop * nb_tomo) == expected of NXtomo generated by the previous sequence
"""
def clear_acquisitions(self):
self._acquisitions.clear()
def build_sequence(self) -> tuple[BaseAcquisition]:
raise NotImplementedError("Base class")
@staticmethod
def sort_fct(node_name: str, h5d: h5py.File):
"""
sort the scan according to the 'start_time parameter'. If fails keep the original order.
If a node has the 'is_rearranged' attribute then skip sort and keep the original sequence.
"""
#
node_link_to_treat = h5d.get(node_name, getlink=True)
note_to_treat = h5d.get(node_name)
is_rearranged = note_to_treat is not None and note_to_treat.attrs.get(
"is_rearranged", False
)
# in some case the user might want to keep the order of the original sequence.
# in this case we expect some preprocessing to be done and which has tag the node with the 'is_rearranged' attribute
if is_rearranged:
return False
else:
node = h5d.get(node_name)
if node is not None:
# node can be None in the case of a broken link
start_time = node.get("start_time", None)
else:
_logger.warning(f"Broken link at {node_name}")
start_time = None
if start_time is not None:
start_time = h5py_read_dataset(start_time)
return str_datetime_to_numpy_datetime64(start_time)
elif isinstance(node_link_to_treat, (h5py.ExternalLink, h5py.SoftLink)):
return float(node_link_to_treat.path.split("/")[-1])
else:
# we expect to have node names like (1.1, 2.1...)
return float(node_name)
def build_progress(self, n: int) -> tqdm | None:
"""create tqdm progress bar"""
if self.progress is not None:
progress_read = tqdm(desc="read sequences")
progress_read.total = n
return progress_read
def _update_start_index_for_acquisitions_splitting_nxtomos(
self, url: DataUrl
) -> None:
"""
Util called when processing `BaseAcquisition` instance splitting the NXtomo generated to update the '_start_index'
This is the case for multi-tomo and back-and-forth tomo
"""
nb_loop = self._current_acquisition.get_nb_loop(url)
nb_tomo = self._current_acquisition.get_nb_tomo(url)
if nb_loop is not None and nb_tomo is not None:
self._start_index += int(nb_loop) * int(nb_tomo)
self._need_to_split_to_several_nxtomo = False
def _update_start_index_for_zseries(self, url: DataUrl) -> None:
"""
Util called when processing `ZSeriesBaseAcquisition` instance splitting the NXtomo generated to update the '_start_index'
"""
with EntryReader(url) as entry:
z = self._current_acquisition.get_z(entry)
if z not in self._acquisitions:
self._start_index += 1
class _AcquisitionConstructorFromUrls(_AcquisitionConstructorBase):
"""
Create (nxtomomill) acquisitions from a set of urls and roles (dark, flat, proj) provided by the users
"""
def build_sequence(self) -> tuple[BaseAcquisition]:
"""
Build acquisitions classes from the url definition
:return:
"""
self.clear_acquisitions()
# when building from urls `tomo_n` has no meaning
if self.configuration.check_tomo_n is None:
self.configuration.check_tomo_n = False
if self.configuration.is_using_titles:
raise ValueError("Configuration specify that titles should be used")
assert self.configuration.output_file is not None, "output_file requested"
data_scans = self.configuration.data_scans
# step 0: copy some urls instead if needed
# update copy parameter
for frame_grp in data_scans:
if frame_grp.copy is None:
frame_grp.copy = self.configuration.default_data_copy
# step 1: if there is no init FrameGroup create an empty one because
# this is requested
if len(data_scans) == 0:
return
elif data_scans[0].frame_type is not AcquisitionStep.INITIALIZATION:
data_scans = [
FrameGroup(frame_type=AcquisitionStep.INITIALIZATION, url=None),
]
data_scans.extend(self.configuration.data_scans)
self.configuration.data_scans = tuple(data_scans)
# step 2: treat FrameGroups
progress_read = self.build_progress(len(data_scans))
for frame_grp in data_scans:
self.treat_bliss_scan(frame_grp, progress_read=progress_read)
return tuple(self._acquisitions)
def treat_bliss_scan(self, frame_grp: FrameGroup, progress_read: tqdm | None):
"""
Treat a bliss scan from an instance of `FrameGroup`.
This instance contains information provided by the user from the configuration file (url, frame_type and copy)
:param frame_grp: Group to treat with (url, frame type and copy)
:param progress_read: tqdm progress bar
"""
if progress_read is not None:
progress_read.update()
# handle frame_type == init
if frame_grp.frame_type is AcquisitionStep.INITIALIZATION:
# re-init the constructor
if self._need_to_split_to_several_nxtomo is True:
_logger.warning(
f"Fail to retrieve expected number of nxtomo for {self._current_acquisition}"
)
self._need_to_split_to_several_nxtomo = False
self._current_acquisition = self._get_current_acquisition(
frame_grp=frame_grp
)
if isinstance(
self._current_acquisition,
(MultiTomoAcquisition, BackAndForthAcquisition),
):
self._start_index += 0
self._need_to_split_to_several_nxtomo = True
elif isinstance(self._current_acquisition, StandardAcquisition):
self._start_index += self._current_acquisition.get_expected_nx_tomo()
self._need_to_split_to_several_nxtomo = False
elif isinstance(self._current_acquisition, ZSeriesBaseAcquisition):
self._need_to_split_to_several_nxtomo = False
self._start_index += 0
self._acquisitions.append(self._current_acquisition)
# handle frame_type != init
else:
if self._current_acquisition is None:
raise RuntimeError(
"processing error. Was not able to find acquisition initialization scan. Is it a bliss-tomo acquisition ?"
)
self._current_acquisition.register_step(
url=frame_grp.url,
entry_type=frame_grp.frame_type,
copy_frames=frame_grp.copy_data,
)
# in case of z we append an index according to if
# is already registered or not
if isinstance(self._current_acquisition, ZSeriesBaseAcquisition):
self._update_start_index_for_zseries(url=frame_grp.url)
if self._need_to_split_to_several_nxtomo:
if frame_grp.frame_type is AcquisitionStep.PROJECTION:
self._update_start_index_for_acquisitions_splitting_nxtomos(
url=frame_grp.url
)
def _get_current_acquisition(self, frame_grp: FrameGroup):
"""Deduce the acquisition type from the frame group"""
acqui_projs_fg = filter_acqui_frame_type(
init=frame_grp,
sequences=self.configuration.data_scans,
frame_type=AcquisitionStep.PROJECTION,
)
acqui_projs_urls = tuple([acqui_proj.url for acqui_proj in acqui_projs_fg])
h5f = None
try:
# open the initialization url
if frame_grp.url is not None:
h5f = h5py.File(frame_grp.url.file_path(), mode="r")
init_entry = h5f.get(frame_grp.url.data_path())
else:
init_entry = None
if is_z_series_frm_translation_z(acqui_projs_urls, self.configuration):
return ZSeriesBaseAcquisition(
root_url=frame_grp.url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
elif (init_entry is not None) and is_multitomo_sequence(
init_entry, self.configuration
):
return MultiTomoAcquisition(
root_url=frame_grp.url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
elif (init_entry is not None) and is_back_and_forth_sequence(
init_entry, self.configuration
):
return BackAndForthAcquisition(
root_url=frame_grp.url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
else:
return StandardAcquisition(
root_url=frame_grp.url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
finally:
init_entry = None
if h5f is not None:
h5f.close()
class _AcquisitionConstructorFromTitles(_AcquisitionConstructorBase):
"""
determine bliss scan role (dark, flat, proj, init) from titles.
Create and populate instance of BaseAcquisition according to those roles.
"""
def build_sequence(self) -> tuple[BaseAcquisition]:
self.clear_acquisitions()
if self.configuration.check_tomo_n is None:
self.configuration.check_tomo_n = True
with open_hdf5(self.configuration.input_file) as h5d:
groups = list(h5d.keys())
try:
def sort(name):
return self.sort_fct(name, h5d=h5d)
groups.sort(key=sort)
except numpy.core._exceptions._UFuncNoLoopError:
raise ValueError(
"Fail to order according to 'start_time'. Probably not all scans have a 'start_time' dataset"
)
# step 1: pre processing: group scan together
progress_read = self.build_progress(len(groups))
for group_name in groups:
self.treat_bliss_scan(
group_name=group_name, h5d=h5d, progress_read=progress_read
)
return self._acquisitions
def treat_bliss_scan(self, group_name: h5py.Group, h5d, progress_read: tqdm | None):
"""
Treat a bliss scan (given as a group). Either create a new 'current acquisition' or register it to the previous (except if the scan is asked to be ignored by the use)
:param group_name: group to be treat
:param scan_group: h5d - h5py.File (open) containing the group.
:param progress_read: optional progress to provide feedback
"""
_logger.debug(f"parse {group_name}")
if progress_read is not None:
progress_read.update()
try:
entry = h5d[group_name]
except KeyError:
# case the key doesn't exist. Usual use case is that a bliss scan has been canceled
_logger.warning(
f"Unable to open {group_name} from {h5d.name}. Did the scan was canceled ? (Most likely). Skip this entry"
)
return
# improve handling of External (this is the case of proposal files)
if isinstance(h5d.get(group_name, getlink=True), h5py.ExternalLink):
external_link = h5d.get(group_name, getlink=True)
file_path = external_link.filename
data_path = external_link.path
if not os.path.isabs(file_path):
file_path = os.path.abspath(
os.path.join(
os.path.dirname(self.configuration.input_file),
file_path,
)
)
else:
file_path = self.configuration.input_file
data_path = entry.name
url = DataUrl(
file_path=file_path,
data_path=data_path,
scheme="silx",
data_slice=None,
)
entry_type = get_bliss_scan_type(url=url, configuration=self.configuration)
if entry_type is AcquisitionStep.INITIALIZATION:
if self._need_to_split_to_several_nxtomo is True:
_logger.warning(
f"Fail to retrieve expected number of nxtomo for {self._current_acquisition}"
)
try:
if is_z_series_sequence(entry=entry, configuration=self.configuration):
self._current_acquisition = ZSeriesBaseAcquisition(
root_url=url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
self._start_index += (
self._current_acquisition.get_expected_nx_tomo()
)
elif is_multitomo_sequence(
entry=entry, configuration=self.configuration
):
self._current_acquisition = MultiTomoAcquisition(
root_url=url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
self._start_index += 0
self._need_to_split_to_several_nxtomo = True
elif is_back_and_forth_sequence(
entry=entry, configuration=self.configuration
):
self._current_acquisition = BackAndForthAcquisition(
root_url=url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
self._start_index += 0
self._need_to_split_to_several_nxtomo = True
else:
self._current_acquisition = StandardAcquisition(
root_url=url,
configuration=self.configuration,
detector_sel_callback=self.detector_sel_callback,
start_index=self._start_index,
)
self._start_index += (
self._current_acquisition.get_expected_nx_tomo()
)
except Exception as e:
if self._ignore_entry(group_name):
return
else:
raise e
if self._ignore_entry(group_name):
self._current_acquisition = None
return
self._acquisitions.append(self._current_acquisition)
# continue "standard" tomo dataset handling
elif self._current_acquisition is not None and not self._ignore_sub_entry(url):
self._current_acquisition.register_step(
url=url,
entry_type=entry_type,
copy_frames=self.configuration.default_data_copy,
)
# in case of z we append an index according to if
# is already registered or not
if isinstance(self._current_acquisition, ZSeriesBaseAcquisition):
self._update_start_index_for_zseries(url=url)
if self._need_to_split_to_several_nxtomo:
if entry_type is AcquisitionStep.PROJECTION:
self._update_start_index_for_acquisitions_splitting_nxtomos(url=url)
else:
_logger.info(f"ignore entry {entry}")
def _ignore_entry(self, group_name) -> bool:
"""check if the entry is part of the configuration entries (if provided)"""
if len(self.configuration.entries) == 0:
return False
else:
if not group_name.startswith("/"):
group_name = "/" + group_name
for entry in self.configuration.entries:
if group_name == entry.data_path():
return False
return True
def _ignore_sub_entry(self, sub_entry_url: DataUrl | None) -> bool:
"""
:return: True if the provided sub_entry should be ignored
"""
if sub_entry_url is None:
return False
if not isinstance(sub_entry_url, DataUrl):
raise TypeError(
f"sub_entry_url is expected to be a DataUrl not {type(sub_entry_url)}"
)
if self.configuration.sub_entries_to_ignore is None:
return False
sub_entry_fp = sub_entry_url.file_path()
sub_entry_dp = sub_entry_url.data_path()
for entry in self.configuration.sub_entries_to_ignore:
assert isinstance(entry, DataUrl)
if entry.file_path() == sub_entry_fp and entry.data_path() == sub_entry_dp:
return True
return False