Source code for nxtomomill.converter.hdf5.acquisition.utils.machine_current
import numpy
[docs]def deduce_machine_current(timestamps: tuple, known_machine_current: dict) -> tuple:
"""
:param known_machine_current: keys are timestamp. Value is machine current
:param timestamps: timestamp for which we want to get machine current
"""
if not isinstance(known_machine_current, dict):
raise TypeError("known_machine_current is expected to be a dict")
for elmt in timestamps:
if not isinstance(elmt, numpy.datetime64):
raise TypeError(
f"Elements of timestamps are expected to be {numpy.datetime64} and not {type(elmt)}"
)
if len(known_machine_current) == 0:
raise ValueError("known_machine_current should contain at least one element")
for key, value in known_machine_current.items():
if not isinstance(key, numpy.datetime64):
raise TypeError(
f"known_machine_current keys are expected to be instances of {numpy.datetime64} and not {type(key)}"
)
if not isinstance(value, (float, numpy.number)):
raise TypeError(
"known_machine_current values are expected to be instances of float"
)
# 1. Order **known** machine current by time stamps (key)
known_machine_current = dict(sorted(known_machine_current.items()))
known_timestamps = numpy.fromiter(
known_machine_current.keys(),
dtype="datetime64[ns]",
count=len(known_machine_current),
)
known_machine_current_values = numpy.fromiter(
known_machine_current.values(),
dtype="float64",
count=len(known_machine_current),
)
# 2. Sort the supplied timestamps
timestamps = numpy.array(timestamps, dtype="datetime64[ns]")
timestamp_input_ordering = numpy.argsort(timestamps)
timestamps_sorted = numpy.take_along_axis(
timestamps, indices=timestamp_input_ordering, axis=0
)
# 3. Convert to float for numpy.interp
known_timestamps_float = known_timestamps.astype("float64")
timestamps_float = timestamps_sorted.astype("float64")
# 4. Interpolate the values
interpolated_values = numpy.interp(
timestamps_float, known_timestamps_float, known_machine_current_values
)
# 5. Reorder interpolated values to match the order of original timestamps
ordered_interpolated_values = numpy.zeros_like(interpolated_values)
for i, o_pos in enumerate(timestamp_input_ordering):
ordered_interpolated_values[o_pos] = interpolated_values[i]
return tuple(ordered_interpolated_values)