Skip to content

Commit

Permalink
I split the CalibrationInterpolator
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Toennis committed Jul 31, 2024
1 parent 8680178 commit db1c983
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 36 deletions.
10 changes: 8 additions & 2 deletions src/ctapipe/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@

from .datawriter import DATA_MODEL_VERSION, DataWriter

from .interpolation import Interpolator, PointingInterpolator, CalibrationInterpolator
from .interpolation import (
Interpolator,
PointingInterpolator,
GainInterpolator,
PedestalInterpolator,
)

__all__ = [
"HDF5TableWriter",
Expand All @@ -39,5 +44,6 @@
"get_hdf5_datalevels",
"Interpolator",
"PointingInterpolator",
"CalibrationInterpolator",
"PedestalInterpolator",
"GainInterpolator",
]
97 changes: 74 additions & 23 deletions src/ctapipe/io/interpolation.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ class StepFunction:

"""
Step function Interpolator for the gain and pedestals
Interpolates data so that for each time the value from the closest previous
point given.
Parameters
----------
Expand Down Expand Up @@ -80,7 +82,7 @@ class Interpolator(Component, metaclass=ABCMeta):
default_value=False,
).tag(config=True)

table_location = None
telescope_data_group = None
required_columns = set()
expected_units = {}

Expand Down Expand Up @@ -152,7 +154,7 @@ def _check_interpolators(self, tel_id):
def _read_parameter_table(self, tel_id):
input_table = read_table(
self.h5file,
f"{self.table_location}/tel_{tel_id:03d}",
f"{self.telescope_data_group}/tel_{tel_id:03d}",
)
self.add_table(tel_id, input_table)

Expand All @@ -162,7 +164,7 @@ class PointingInterpolator(Interpolator):
Interpolator for pointing and pointing correction data
"""

table_location = "/dl0/monitoring/telescope/pointing"
telescope_data_group = "/dl0/monitoring/telescope/pointing"
required_columns = {"time", "azimuth", "altitude"}
expected_units = {"azimuth": u.rad, "altitude": u.rad}

Expand Down Expand Up @@ -228,12 +230,12 @@ def add_table(self, tel_id, input_table):
self._interpolators[tel_id]["alt"] = interp1d(mjd, alt, **self.interp_options)


class CalibrationInterpolator(Interpolator):
class GainInterpolator(Interpolator):
"""
Interpolator for calibration data
Interpolator for relative gain data
"""

table_location = "dl1/calibration" # TBD
telescope_data_group = "dl1/calibration/gain" # TBD

def __call__(self, tel_id, time):
"""
Expand All @@ -248,16 +250,16 @@ def __call__(self, tel_id, time):
Returns
-------
cal : array [float]
interpolated calibration quantity
gain : array [float]
interpolated relative gain
"""

self._check_interpolators(tel_id)

cal = self._interpolators[tel_id][self.par_name](time)
return cal
gain = self._interpolators[tel_id]["gain"](time)
return gain

def add_table(self, tel_id, input_table, par_name="pedestal"):
def add_table(self, tel_id, input_table):
"""
Add a table to this interpolator
Expand All @@ -267,25 +269,74 @@ def add_table(self, tel_id, input_table, par_name="pedestal"):
Telescope id
input_table : astropy.table.Table
Table of pointing values, expected columns
are ``time`` as ``Time`` column. The calibration
parameter column is given through the variable ``par_name``
par_name : str
Name of the parameter that is to be interpolated
``pedestal`` is used for pedestals, ``gain`` for gain
can also be the name of statistical parameters to
interpolate the content of StatisticsContainers
are ``time`` as ``Time`` column and "gain"
for the relative gain data
"""

self.required_columns = {"time", "gain"}

self._check_tables(input_table)

input_table.sort("time")
time = input_table["time"]
gain = input_table["gain"]
self._interpolators[tel_id] = {}
self._interpolators[tel_id]["gain"] = StepFunction(
time, gain, **self.interp_options
)


class PedestalInterpolator(Interpolator):
"""
Interpolator for Pedestal data
"""

telescope_data_group = "dl1/calibration/pedestal" # TBD

def __call__(self, tel_id, time):
"""
Interpolate pedestal or gain for a given time and tel_id.
self.par_name = par_name
Parameters
----------
tel_id : int
telescope id
time : astropy.time.Time
time for which to interpolate the calibration data
Returns
-------
pedestal : array [float]
interpolated pedestal values
"""

self._check_interpolators(tel_id)

pedestal = self._interpolators[tel_id]["pedestal"](time)
return pedestal

def add_table(self, tel_id, input_table):
"""
Add a table to this interpolator
Parameters
----------
tel_id : int
Telescope id
input_table : astropy.table.Table
Table of pointing values, expected columns
are ``time`` as ``Time`` column and "pedestal"
for the pedestal data
"""

self.required_columns = {"time", par_name}
self.required_columns = {"time", "pedestal"}

self._check_tables(input_table)

input_table.sort("time")
time = input_table["time"]
cal = input_table[par_name]
pedestal = input_table["pedestal"]
self._interpolators[tel_id] = {}
self._interpolators[tel_id][par_name] = StepFunction(
time, cal, **self.interp_options
self._interpolators[tel_id]["pedestal"] = StepFunction(
time, pedestal, **self.interp_options
)
45 changes: 34 additions & 11 deletions src/ctapipe/io/tests/test_interpolator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
from astropy.table import Table
from astropy.time import Time

from ctapipe.io.interpolation import CalibrationInterpolator, PointingInterpolator
from ctapipe.io.interpolation import (
GainInterpolator,
PedestalInterpolator,
PointingInterpolator,
)

t0 = Time("2022-01-01T00:00:00")

Expand Down Expand Up @@ -97,25 +101,37 @@ def test_bounds():
},
)

table_cal = Table(
table_pedestal = Table(
{
"time": np.arange(0.0, 10.1, 2.0),
"pedestal": np.reshape(np.random.normal(4.0, 1.0, 1850 * 6), (6, 1850)),
},
)

table_gain = Table(
{
"time": np.arange(0.0, 10.1, 2.0),
"gain": np.reshape(np.random.normal(1.0, 1.0, 1850 * 6), (6, 1850)),
},
)

interpolator_pointing = PointingInterpolator()
interpolator_cal = CalibrationInterpolator()
interpolator_pedestal = PedestalInterpolator()
interpolator_gain = GainInterpolator()
interpolator_pointing.add_table(1, table_pointing)
interpolator_cal.add_table(1, table_cal, "pedestal")
interpolator_pedestal.add_table(1, table_pedestal)
interpolator_gain.add_table(1, table_gain)

error_message = "below the interpolation range"

with pytest.raises(ValueError, match=error_message):
interpolator_pointing(tel_id=1, time=t0 - 0.1 * u.s)

with pytest.raises(ValueError, match=error_message):
interpolator_cal(tel_id=1, time=-0.1)
interpolator_pedestal(tel_id=1, time=-0.1)

with pytest.raises(ValueError, match=error_message):
interpolator_gain(tel_id=1, time=-0.1)

with pytest.raises(ValueError, match="above the interpolation range"):
interpolator_pointing(tel_id=1, time=t0 + 10.2 * u.s)
Expand All @@ -124,24 +140,31 @@ def test_bounds():
assert u.isclose(alt, 69 * u.deg)
assert u.isclose(az, 1 * u.deg)

pedestal = interpolator_cal(tel_id=1, time=1.0)
assert all(pedestal == table_cal["pedestal"][0])
pedestal = interpolator_pedestal(tel_id=1, time=1.0)
assert all(pedestal == table_pedestal["pedestal"][0])
gain = interpolator_gain(tel_id=1, time=1.0)
assert all(gain == table_gain["gain"][0])
with pytest.raises(KeyError):
interpolator_pointing(tel_id=2, time=t0 + 1 * u.s)
with pytest.raises(KeyError):
interpolator_cal(tel_id=2, time=1.0)
interpolator_pedestal(tel_id=2, time=1.0)
with pytest.raises(KeyError):
interpolator_gain(tel_id=2, time=1.0)

interpolator_pointing = PointingInterpolator(bounds_error=False)
interpolator_cal = CalibrationInterpolator(bounds_error=False)
interpolator_pedestal = PedestalInterpolator(bounds_error=False)
interpolator_gain = GainInterpolator(bounds_error=False)
interpolator_pointing.add_table(1, table_pointing)
interpolator_cal.add_table(1, table_cal, "pedestal")
interpolator_pedestal.add_table(1, table_pedestal)
interpolator_gain.add_table(1, table_gain)

for dt in (-0.1, 10.1) * u.s:
alt, az = interpolator_pointing(tel_id=1, time=t0 + dt)
assert np.isnan(alt.value)
assert np.isnan(az.value)

assert all(np.isnan(interpolator_cal(tel_id=1, time=-0.1)))
assert all(np.isnan(interpolator_pedestal(tel_id=1, time=-0.1)))
assert all(np.isnan(interpolator_gain(tel_id=1, time=-0.1)))

interpolator_pointing = PointingInterpolator(bounds_error=False, extrapolate=True)
interpolator_pointing.add_table(1, table_pointing)
Expand Down

0 comments on commit db1c983

Please sign in to comment.