Skip to content

Commit

Permalink
add unit tests for calculators
Browse files Browse the repository at this point in the history
  • Loading branch information
TjarkMiener committed Aug 26, 2024
1 parent 61936e5 commit 52ecd8a
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/ctapipe/monitoring/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ def __call__(
Parameters
----------
table : astropy.table.Table
table with images of shape (n_images, n_channels, n_pix)
and timestamps of shape (n_images, )
table with images of shape (n_images, n_channels, n_pix), event IDs and
timestamps of shape (n_images, )
masked_pixels_of_sample : ndarray, optional
boolean array of masked pixels of shape (n_pix, ) that are not available for processing
chunk_shift : int, optional
Expand Down
46 changes: 26 additions & 20 deletions src/ctapipe/monitoring/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class CalibrationCalculator(TelescopeComponent):
).tag(config=True)

outlier_detector_list = List(
trait=Dict,
trait=Dict(),
default_value=None,
allow_none=True,
help=(
Expand Down Expand Up @@ -118,7 +118,9 @@ def __init__(
)

@abstractmethod
def __call__(self, table, masked_pixels_of_sample, tel_id, col_name) -> Table:
def __call__(
self, table, tel_id, masked_pixels_of_sample=None, col_name="image"
) -> Table:
"""
Calculate the monitoring data for a given set of events.
Expand All @@ -128,15 +130,15 @@ def __call__(self, table, masked_pixels_of_sample, tel_id, col_name) -> Table:
Parameters
----------
table : astropy.table.Table
DL1-like table with images of shape (n_images, n_channels, n_pix)
and timestamps of shape (n_images, )
masked_pixels_of_sample : ndarray, optional
Boolean array of masked pixels of shape (n_pix, ) that are not available for processing
DL1-like table with images of shape (n_images, n_channels, n_pix), event IDs and
timestamps of shape (n_images, )
tel_id : int
Telescope ID for which the calibration is being performed
masked_pixels_of_sample : ndarray, optional
Boolean array of masked pixels of shape (n_pix, ) that are not available for processing
col_name : str
Column name in the table from which the statistics will be aggregated
Returns
-------
astropy.table.Table
Expand Down Expand Up @@ -191,8 +193,8 @@ class StatisticsCalculator(CalibrationCalculator):
def __call__(
self,
table,
masked_pixels_of_sample,
tel_id,
masked_pixels_of_sample=None,
col_name="image",
) -> Table:

Expand All @@ -207,20 +209,21 @@ def __call__(
# Pass through the whole provided dl1 table
if self.second_pass:
aggregated_stats = aggregator(
table=table[tel_id],
table=table,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=None,
)
else:
aggregated_stats = aggregator(
table=table[tel_id],
table=table,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)

# Detect faulty pixels with mutiple instances of ``OutlierDetector``
outlier_mask = np.zeros_like(aggregated_stats[0]["mean"], dtype=bool)
outlier_mask = np.zeros_like(aggregated_stats["mean"], dtype=bool)
for aggregated_val, outlier_detector in self.outlier_detectors.items():
outlier_mask = np.logical_or(
outlier_mask,
Expand Down Expand Up @@ -251,7 +254,7 @@ def __call__(
for index in faulty_chunks_indices:
# Log information of the faulty chunks
self.log.warning(
f"Faulty chunk ({int(faulty_pixels_percentage[index]*100.0)}% of the camera unavailable) detected in the first pass: time_start={aggregated_stats['time_start'][index]}; time_end={aggregated_stats['time_end'][index]}"
f"Faulty chunk ({int(faulty_pixels_percentage[index])}% of the camera unavailable) detected in the first pass: time_start={aggregated_stats['time_start'][index]}; time_end={aggregated_stats['time_end'][index]}"
)
# Calculate the start of the slice depending on whether the previous chunk was faulty or not
slice_start = (
Expand All @@ -272,16 +275,19 @@ def __call__(

# Run the stats aggregator on the sliced dl1 table with a chunk_shift
# to sample the period of trouble (carflashes etc.) as effectively as possible.
aggregated_stats_secondpass = aggregator(
table=table_sliced,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)
# Checking for the length of the sliced table to be greater than he chunk_size
# since it can be smaller if the last two chunks are faulty.
if len(table_sliced) > aggregator.chunk_size:
aggregated_stats_secondpass = aggregator(
table=table_sliced,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)

# Detect faulty pixels with mutiple instances of OutlierDetector of the second pass
outlier_mask_secondpass = np.zeros_like(
aggregated_stats_secondpass[0]["mean"], dtype=bool
aggregated_stats_secondpass["mean"], dtype=bool
)
for (
aggregated_val,
Expand Down Expand Up @@ -309,4 +315,4 @@ def __call__(
"No faulty chunks detected in the first pass. The second pass with a finer chunk shift is not executed."
)
# Return the aggregated statistics and their outlier masks
return aggregated_stats
return aggregated_stats
106 changes: 106 additions & 0 deletions src/ctapipe/monitoring/tests/test_calculator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""
Tests for CalibrationCalculator and related functions
"""

import numpy as np
from astropy.table import Table
from astropy.time import Time
from traitlets.config.loader import Config

from ctapipe.monitoring.aggregator import PlainAggregator
from ctapipe.monitoring.calculator import CalibrationCalculator, StatisticsCalculator


def test_onepass_calculator(example_subarray):
"""test basic 'one pass' functionality of the StatisticsCalculator"""

# Create dummy data for testing
times = Time(
np.linspace(60117.911, 60117.9258, num=5000), scale="tai", format="mjd"
)
event_ids = np.linspace(35, 725000, num=5000, dtype=int)
rng = np.random.default_rng(0)
charge_data = rng.normal(77.0, 10.0, size=(5000, 2, 1855))
# Create tables
charge_table = Table(
[times, event_ids, charge_data],
names=("time_mono", "event_id", "image"),
)
# Initialize the aggregators and calculators
chunk_size = 500
aggregator = PlainAggregator(subarray=example_subarray, chunk_size=chunk_size)
calculator = CalibrationCalculator.from_name(
name="StatisticsCalculator",
subarray=example_subarray,
stats_aggregator=aggregator,
)
calculator_chunk_shift = StatisticsCalculator(
subarray=example_subarray, stats_aggregator=aggregator, chunk_shift=250
)
# Compute the statistical values
stats = calculator(table=charge_table, tel_id=1)
stats_chunk_shift = calculator_chunk_shift(table=charge_table, tel_id=1)

# Check if the calculated statistical values are reasonable
# for a camera with two gain channels
np.testing.assert_allclose(stats[0]["mean"], 77.0, atol=2.5)
np.testing.assert_allclose(stats[1]["median"], 77.0, atol=2.5)
np.testing.assert_allclose(stats[0]["std"], 10.0, atol=2.5)
# Check if three chunks are used for the computation of aggregated statistic values as the last chunk overflows
assert len(stats) * 2 == len(stats_chunk_shift) + 1

def test_secondpass_calculator(example_subarray):
"""test the chunk shift option and the boundary case for the last chunk"""

# Create dummy data for testing
times = Time(
np.linspace(60117.911, 60117.9258, num=5500), scale="tai", format="mjd"
)
event_ids = np.linspace(35, 725000, num=5500, dtype=int)
rng = np.random.default_rng(0)
ped_data = rng.normal(2.0, 5.0, size=(5500, 2, 1855))
# Create table
ped_table = Table(
[times, event_ids, ped_data],
names=("time_mono", "event_id", "image"),
)
# Create configuration
config = Config(
{
"StatisticsCalculator": {
"stats_aggregator_type": [
("id", 1, "SigmaClippingAggregator"),
],
"outlier_detector_list": [
{
"apply_to": "mean",
"name": "StdOutlierDetector",
"validity_range": [-2.0, 2.0],
},
{
"apply_to": "median",
"name": "StdOutlierDetector",
"validity_range": [-3.0, 3.0],
},
{
"apply_to": "std",
"name": "RangeOutlierDetector",
"validity_range": [2.0, 8.0],
},
],
"chunk_shift": 100,
"second_pass": True,
"faulty_pixels_threshold": 1.0,
},
"SigmaClippingAggregator": {
"chunk_size": 500,
},
}
)
# Initialize the calculator from config
calculator = StatisticsCalculator(subarray=example_subarray, config=config)
# Compute aggregated statistic values
stats = calculator(ped_table, 1, col_name="image")
# Check if the second pass was activated
assert len(stats) > 20

0 comments on commit 52ecd8a

Please sign in to comment.