From 52ecd8a385316176f180f2584d336a9ab973da84 Mon Sep 17 00:00:00 2001 From: TjarkMiener Date: Mon, 26 Aug 2024 10:02:30 +0200 Subject: [PATCH] add unit tests for calculators --- src/ctapipe/monitoring/aggregator.py | 4 +- src/ctapipe/monitoring/calculator.py | 46 ++++---- .../monitoring/tests/test_calculator.py | 106 ++++++++++++++++++ 3 files changed, 134 insertions(+), 22 deletions(-) create mode 100644 src/ctapipe/monitoring/tests/test_calculator.py diff --git a/src/ctapipe/monitoring/aggregator.py b/src/ctapipe/monitoring/aggregator.py index 5f2e5933bbf..862f0758041 100644 --- a/src/ctapipe/monitoring/aggregator.py +++ b/src/ctapipe/monitoring/aggregator.py @@ -59,8 +59,8 @@ def __call__( Parameters ---------- table : astropy.table.Table - table with images of shape (n_images, n_channels, n_pix) - and timestamps of shape (n_images, ) + table with images of shape (n_images, n_channels, n_pix), event IDs and + timestamps of shape (n_images, ) masked_pixels_of_sample : ndarray, optional boolean array of masked pixels of shape (n_pix, ) that are not available for processing chunk_shift : int, optional diff --git a/src/ctapipe/monitoring/calculator.py b/src/ctapipe/monitoring/calculator.py index d3462aafb6a..750e44cb75f 100644 --- a/src/ctapipe/monitoring/calculator.py +++ b/src/ctapipe/monitoring/calculator.py @@ -53,7 +53,7 @@ class CalibrationCalculator(TelescopeComponent): ).tag(config=True) outlier_detector_list = List( - trait=Dict, + trait=Dict(), default_value=None, allow_none=True, help=( @@ -118,7 +118,9 @@ def __init__( ) @abstractmethod - def __call__(self, table, masked_pixels_of_sample, tel_id, col_name) -> Table: + def __call__( + self, table, tel_id, masked_pixels_of_sample=None, col_name="image" + ) -> Table: """ Calculate the monitoring data for a given set of events. @@ -128,15 +130,15 @@ def __call__(self, table, masked_pixels_of_sample, tel_id, col_name) -> Table: Parameters ---------- table : astropy.table.Table - DL1-like table with images of shape (n_images, n_channels, n_pix) - and timestamps of shape (n_images, ) - masked_pixels_of_sample : ndarray, optional - Boolean array of masked pixels of shape (n_pix, ) that are not available for processing + DL1-like table with images of shape (n_images, n_channels, n_pix), event IDs and + timestamps of shape (n_images, ) tel_id : int Telescope ID for which the calibration is being performed + masked_pixels_of_sample : ndarray, optional + Boolean array of masked pixels of shape (n_pix, ) that are not available for processing col_name : str Column name in the table from which the statistics will be aggregated - + Returns ------- astropy.table.Table @@ -191,8 +193,8 @@ class StatisticsCalculator(CalibrationCalculator): def __call__( self, table, - masked_pixels_of_sample, tel_id, + masked_pixels_of_sample=None, col_name="image", ) -> Table: @@ -207,20 +209,21 @@ def __call__( # Pass through the whole provided dl1 table if self.second_pass: aggregated_stats = aggregator( - table=table[tel_id], + table=table, masked_pixels_of_sample=masked_pixels_of_sample, col_name=col_name, chunk_shift=None, ) else: aggregated_stats = aggregator( - table=table[tel_id], + table=table, masked_pixels_of_sample=masked_pixels_of_sample, col_name=col_name, chunk_shift=self.chunk_shift, ) + # Detect faulty pixels with mutiple instances of ``OutlierDetector`` - outlier_mask = np.zeros_like(aggregated_stats[0]["mean"], dtype=bool) + outlier_mask = np.zeros_like(aggregated_stats["mean"], dtype=bool) for aggregated_val, outlier_detector in self.outlier_detectors.items(): outlier_mask = np.logical_or( outlier_mask, @@ -251,7 +254,7 @@ def __call__( for index in faulty_chunks_indices: # Log information of the faulty chunks self.log.warning( - f"Faulty chunk ({int(faulty_pixels_percentage[index]*100.0)}% of the camera unavailable) detected in the first pass: time_start={aggregated_stats['time_start'][index]}; time_end={aggregated_stats['time_end'][index]}" + f"Faulty chunk ({int(faulty_pixels_percentage[index])}% of the camera unavailable) detected in the first pass: time_start={aggregated_stats['time_start'][index]}; time_end={aggregated_stats['time_end'][index]}" ) # Calculate the start of the slice depending on whether the previous chunk was faulty or not slice_start = ( @@ -272,16 +275,19 @@ def __call__( # Run the stats aggregator on the sliced dl1 table with a chunk_shift # to sample the period of trouble (carflashes etc.) as effectively as possible. - aggregated_stats_secondpass = aggregator( - table=table_sliced, - masked_pixels_of_sample=masked_pixels_of_sample, - col_name=col_name, - chunk_shift=self.chunk_shift, - ) + # Checking for the length of the sliced table to be greater than he chunk_size + # since it can be smaller if the last two chunks are faulty. + if len(table_sliced) > aggregator.chunk_size: + aggregated_stats_secondpass = aggregator( + table=table_sliced, + masked_pixels_of_sample=masked_pixels_of_sample, + col_name=col_name, + chunk_shift=self.chunk_shift, + ) # Detect faulty pixels with mutiple instances of OutlierDetector of the second pass outlier_mask_secondpass = np.zeros_like( - aggregated_stats_secondpass[0]["mean"], dtype=bool + aggregated_stats_secondpass["mean"], dtype=bool ) for ( aggregated_val, @@ -309,4 +315,4 @@ def __call__( "No faulty chunks detected in the first pass. The second pass with a finer chunk shift is not executed." ) # Return the aggregated statistics and their outlier masks - return aggregated_stats \ No newline at end of file + return aggregated_stats diff --git a/src/ctapipe/monitoring/tests/test_calculator.py b/src/ctapipe/monitoring/tests/test_calculator.py new file mode 100644 index 00000000000..2878e6d4a0f --- /dev/null +++ b/src/ctapipe/monitoring/tests/test_calculator.py @@ -0,0 +1,106 @@ +""" +Tests for CalibrationCalculator and related functions +""" + +import numpy as np +from astropy.table import Table +from astropy.time import Time +from traitlets.config.loader import Config + +from ctapipe.monitoring.aggregator import PlainAggregator +from ctapipe.monitoring.calculator import CalibrationCalculator, StatisticsCalculator + + +def test_onepass_calculator(example_subarray): + """test basic 'one pass' functionality of the StatisticsCalculator""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5000), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5000, dtype=int) + rng = np.random.default_rng(0) + charge_data = rng.normal(77.0, 10.0, size=(5000, 2, 1855)) + # Create tables + charge_table = Table( + [times, event_ids, charge_data], + names=("time_mono", "event_id", "image"), + ) + # Initialize the aggregators and calculators + chunk_size = 500 + aggregator = PlainAggregator(subarray=example_subarray, chunk_size=chunk_size) + calculator = CalibrationCalculator.from_name( + name="StatisticsCalculator", + subarray=example_subarray, + stats_aggregator=aggregator, + ) + calculator_chunk_shift = StatisticsCalculator( + subarray=example_subarray, stats_aggregator=aggregator, chunk_shift=250 + ) + # Compute the statistical values + stats = calculator(table=charge_table, tel_id=1) + stats_chunk_shift = calculator_chunk_shift(table=charge_table, tel_id=1) + + # Check if the calculated statistical values are reasonable + # for a camera with two gain channels + np.testing.assert_allclose(stats[0]["mean"], 77.0, atol=2.5) + np.testing.assert_allclose(stats[1]["median"], 77.0, atol=2.5) + np.testing.assert_allclose(stats[0]["std"], 10.0, atol=2.5) + # Check if three chunks are used for the computation of aggregated statistic values as the last chunk overflows + assert len(stats) * 2 == len(stats_chunk_shift) + 1 + +def test_secondpass_calculator(example_subarray): + """test the chunk shift option and the boundary case for the last chunk""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5500), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5500, dtype=int) + rng = np.random.default_rng(0) + ped_data = rng.normal(2.0, 5.0, size=(5500, 2, 1855)) + # Create table + ped_table = Table( + [times, event_ids, ped_data], + names=("time_mono", "event_id", "image"), + ) + # Create configuration + config = Config( + { + "StatisticsCalculator": { + "stats_aggregator_type": [ + ("id", 1, "SigmaClippingAggregator"), + ], + "outlier_detector_list": [ + { + "apply_to": "mean", + "name": "StdOutlierDetector", + "validity_range": [-2.0, 2.0], + }, + { + "apply_to": "median", + "name": "StdOutlierDetector", + "validity_range": [-3.0, 3.0], + }, + { + "apply_to": "std", + "name": "RangeOutlierDetector", + "validity_range": [2.0, 8.0], + }, + ], + "chunk_shift": 100, + "second_pass": True, + "faulty_pixels_threshold": 1.0, + }, + "SigmaClippingAggregator": { + "chunk_size": 500, + }, + } + ) + # Initialize the calculator from config + calculator = StatisticsCalculator(subarray=example_subarray, config=config) + # Compute aggregated statistic values + stats = calculator(ped_table, 1, col_name="image") + # Check if the second pass was activated + assert len(stats) > 20 +