Skip to content

Commit

Permalink
split __call__ function into two function for the first and second pass
Browse files Browse the repository at this point in the history
second pass also has an argument to pass the list of faulty/valid chunks which can be a logical_and from multiple first passes
  • Loading branch information
TjarkMiener committed Aug 26, 2024
1 parent 4ffd39e commit c404a4a
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 173 deletions.
326 changes: 176 additions & 150 deletions src/ctapipe/monitoring/calculator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,11 @@
calculate the montoring data for the camera calibration.
"""

from abc import abstractmethod

import numpy as np
from astropy.table import Table, vstack
from astropy.table import Table

from ctapipe.core import TelescopeComponent
from ctapipe.core.traits import (
Bool,
ComponentName,
Dict,
Float,
Expand Down Expand Up @@ -117,66 +114,30 @@ def __init__(
parent=self,
)

@abstractmethod
def __call__(
self, table, tel_id, masked_pixels_of_sample=None, col_name="image"
) -> Table:
"""
Calculate the monitoring data for a given set of events.
This method should be implemented by subclasses to perform the specific
calibration calculations required for different types of calibration.
Parameters
----------
table : astropy.table.Table
DL1-like table with images of shape (n_images, n_channels, n_pix), event IDs and
timestamps of shape (n_images, )
tel_id : int
Telescope ID for which the calibration is being performed
masked_pixels_of_sample : ndarray, optional
Boolean array of masked pixels of shape (n_pix, ) that are not available for processing
col_name : str
Column name in the table from which the statistics will be aggregated
Returns
-------
astropy.table.Table
Table containing the aggregated statistics and their outlier masks
"""


class StatisticsCalculator(CalibrationCalculator):
"""
Component to calculate statistics from calibration events.
This class inherits from CalibrationCalculator and is responsible for
This class inherits from ``CalibrationCalculator`` and is responsible for
calculating various statistics from calibration events, such as pedestal
and flat-field data. It aggregates statistics, detects outliers,
handles faulty data chunks.
The default option is to conduct only one pass over the data with non-overlapping
chunks, while overlapping chunks can be set by the ``chunk_shift`` parameter.
Two passes over the data, set via the ``second_pass``-flag, can be conducted
with a refined shift of the chunk in regions of trouble with a high percentage
of faulty pixels exceeding the threshold ``faulty_pixels_threshold``.
The ``StatisticsCalculator`` holds two functions to conduct two different passes
over the data with and without overlapping chunks. The first pass is conducted
with non-overlapping, while overlapping chunks can be set by the ``chunk_shift``
parameter in the second pass. The second pass over the data is only conducted
in regions of trouble with a high percentage of faulty pixels exceeding
the threshold ``faulty_pixels_threshold``.
"""

chunk_shift = Int(
default_value=None,
allow_none=True,
help=(
"Number of samples to shift the aggregation chunk for the "
"calculation of the statistical values. If second_pass is set, "
"the first pass is conducted without overlapping chunks (chunk_shift=None) "
"and the second pass with a refined shift of the chunk in regions of trouble."
),
).tag(config=True)

second_pass = Bool(
default_value=False,
help=(
"Set whether to conduct a second pass over the data "
"with a refined shift of the chunk in regions of trouble."
"Number of samples to shift the aggregation chunk for the calculation "
"of the statistical values. Only used in the second_pass(), since the "
"first_pass() is conducted without overlapping chunks (chunk_shift=None)."
),
).tag(config=True)

Expand All @@ -185,42 +146,52 @@ class StatisticsCalculator(CalibrationCalculator):
allow_none=True,
help=(
"Threshold in percentage of faulty pixels over the camera "
"to conduct second pass with a refined shift of the chunk "
"in regions of trouble."
"to identify regions of trouble."
),
).tag(config=True)

def __call__(
def first_pass(
self,
table,
tel_id,
masked_pixels_of_sample=None,
col_name="image",
) -> Table:
# Check if the chunk_shift is set for second pass mode
if self.second_pass and self.chunk_shift is None:
raise ValueError(
"chunk_shift must be set if second pass over the data is selected"
)
"""
Calculate the monitoring data for a given set of events with non-overlapping aggregation chunks.
This method performs the first pass over the provided data table to calculate
various statistics for calibration purposes. The statistics are aggregated with
non-overlapping chunks (``chunk_shift`` set to None), and faulty pixels are detected
using a list of outlier detectors.
Parameters
----------
table : astropy.table.Table
DL1-like table with images of shape (n_images, n_channels, n_pix), event IDs and
timestamps of shape (n_images, )
tel_id : int
Telescope ID for which the calibration is being performed
masked_pixels_of_sample : ndarray, optional
Boolean array of masked pixels of shape (n_pix, ) that are not available for processing
col_name : str
Column name in the table from which the statistics will be aggregated
Returns
-------
astropy.table.Table
Table containing the aggregated statistics, their outlier masks, and the validity of the chunks
"""
# Get the aggregator
aggregator = self.stats_aggregator[self.stats_aggregator_type.tel[tel_id]]
# Pass through the whole provided dl1 table
if self.second_pass:
aggregated_stats = aggregator(
table=table,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=None,
)
else:
aggregated_stats = aggregator(
table=table,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)

aggregated_stats = aggregator(
table=table,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=None,
)
# Detect faulty pixels with multiple instances of ``OutlierDetector``
outlier_mask = np.zeros_like(aggregated_stats["mean"], dtype=bool)
for aggregated_val, outlier_detector in self.outlier_detectors.items():
Expand All @@ -230,88 +201,143 @@ def __call__(
)
# Add the outlier mask to the aggregated statistics
aggregated_stats["outlier_mask"] = outlier_mask
# Get valid chunks and add them to the aggregated statistics
aggregated_stats["is_valid"] = self._get_valid_chunks(outlier_mask)
return aggregated_stats

def second_pass(
self,
table,
valid_chunks,
tel_id,
masked_pixels_of_sample=None,
col_name="image",
) -> Table:
"""
Conduct a second pass over the data to refine the statistics in regions with a high percentage of faulty pixels.
This method performs a second pass over the data with a refined shift of the chunk in regions where a high percentage
of faulty pixels were detected during the first pass. Note: Multiple first passes of different calibration events are
performed which may lead to different identification of faulty chunks in rare cases. Therefore a joined list of faulty
chunks is recommended to be passed to the second pass(es) if those different passes use the same ``chunk_size``.
Parameters
----------
table : astropy.table.Table
DL1-like table with images of shape (n_images, n_channels, n_pix), event IDs and timestamps of shape (n_images, ).
valid_chunks : ndarray
Boolean array indicating the validity of each chunk from the first pass.
Note: This boolean array can be a ``logical_and`` from multiple first passes of different calibration events.
tel_id : int
Telescope ID for which the calibration is being performed.
masked_pixels_of_sample : ndarray, optional
Boolean array of masked pixels of shape (n_pix, ) that are not available for processing.
col_name : str
Column name in the table from which the statistics will be aggregated.
Returns
-------
astropy.table.Table
Table containing the aggregated statistics after the second pass, their outlier masks, and the validity of the chunks.
"""
# Check if the chunk_shift is set for the second pass
if self.chunk_shift is None:
raise ValueError(
"chunk_shift must be set if second pass over the data is requested"
)
# Get the aggregator
aggregator = self.stats_aggregator[self.stats_aggregator_type.tel[tel_id]]
# Conduct a second pass over the data
if self.second_pass:
# Check if the camera has two gain channels
if outlier_mask.shape[1] == 2:
# Combine the outlier mask of both gain channels
outlier_mask = np.logical_or(
outlier_mask[:, 0, :],
outlier_mask[:, 1, :],
aggregated_stats_secondpass = None
if np.all(valid_chunks):
self.log.info(
"No faulty chunks detected in the first pass. The second pass with a finer chunk shift is not executed."
)
else:
chunk_size = aggregator.chunk_size
faulty_chunks_indices = np.where(~valid_chunks)[0]
for index in faulty_chunks_indices:
# Log information of the faulty chunks
self.log.warning(
f"Faulty chunk detected in the first pass at index '{index}'."
)
# Calculate the fraction of faulty pixels over the camera
faulty_pixels_percentage = (
np.count_nonzero(outlier_mask, axis=-1) / np.shape(outlier_mask)[-1]
) * 100.0

# Check for faulty chunks if the threshold is exceeded
faulty_chunks = faulty_pixels_percentage > self.faulty_pixels_threshold
if np.any(faulty_chunks):
chunk_size = aggregated_stats["n_events"][0]
faulty_chunks_indices = np.where(faulty_chunks)[0]
for index in faulty_chunks_indices:
# Log information of the faulty chunks
self.log.warning(
f"Faulty chunk ({int(faulty_pixels_percentage[index])}% of the camera unavailable) detected in the first pass: time_start={aggregated_stats['time_start'][index]}; time_end={aggregated_stats['time_end'][index]}"
)
# Calculate the start of the slice depending on whether the previous chunk was faulty or not
slice_start = (
chunk_size * index
if index - 1 in faulty_chunks_indices
else chunk_size * (index - 1)
# Calculate the start of the slice depending on whether the previous chunk was faulty or not
slice_start = (
chunk_size * index
if index - 1 in faulty_chunks_indices
else chunk_size * (index - 1)
)
# Set the start of the slice to the first element of the dl1 table if out of bound
# and add one ``chunk_shift``.
slice_start = max(0, slice_start) + self.chunk_shift
# Set the end of the slice to the last element of the dl1 table if out of bound
# and subtract one ``chunk_shift``.
slice_end = min(len(table) - 1, chunk_size * (index + 2)) - (
self.chunk_shift - 1
)
# Slice the dl1 table according to the previously calculated start and end.
table_sliced = table[slice_start:slice_end]
# Run the stats aggregator on the sliced dl1 table with a chunk_shift
# to sample the period of trouble (carflashes etc.) as effectively as possible.
# Checking for the length of the sliced table to be greater than he chunk_size
# since it can be smaller if the last two chunks are faulty.
if len(table_sliced) > aggregator.chunk_size:
aggregated_stats_secondpass = aggregator(
table=table_sliced,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)
# Set the start of the slice to the first element of the dl1 table if out of bound
# and add one ``chunk_shift``.
slice_start = max(0, slice_start) + self.chunk_shift
# Set the end of the slice to the last element of the dl1 table if out of bound
# and subtract one ``chunk_shift``.
slice_end = min(len(table) - 1, chunk_size * (index + 2)) - (
self.chunk_shift - 1
# Detect faulty pixels with multiple instances of OutlierDetector of the second pass
outlier_mask_secondpass = np.zeros_like(
aggregated_stats_secondpass["mean"], dtype=bool
)
for (
aggregated_val,
outlier_detector,
) in self.outlier_detectors.items():
outlier_mask_secondpass = np.logical_or(
outlier_mask_secondpass,
outlier_detector(aggregated_stats_secondpass[aggregated_val]),
)
# Slice the dl1 table according to the previously calculated start and end.
table_sliced = table[slice_start:slice_end]
# Add the outlier mask to the aggregated statistics
aggregated_stats_secondpass["outlier_mask"] = outlier_mask_secondpass
aggregated_stats_secondpass["is_valid"] = self._get_valid_chunks(
outlier_mask_secondpass
)
return aggregated_stats_secondpass

def _get_valid_chunks(self, outlier_mask):
"""
Identify valid chunks based on the outlier mask.
# Run the stats aggregator on the sliced dl1 table with a chunk_shift
# to sample the period of trouble (carflashes etc.) as effectively as possible.
# Checking for the length of the sliced table to be greater than he chunk_size
# since it can be smaller if the last two chunks are faulty.
if len(table_sliced) > aggregator.chunk_size:
aggregated_stats_secondpass = aggregator(
table=table_sliced,
masked_pixels_of_sample=masked_pixels_of_sample,
col_name=col_name,
chunk_shift=self.chunk_shift,
)
This method processes the outlier mask to determine which chunks of data
are considered valid or faulty. A chunk is marked as faulty if the percentage
of outlier pixels exceeds a predefined threshold ``faulty_pixels_threshold``.
# Detect faulty pixels with multiple instances of OutlierDetector of the second pass
outlier_mask_secondpass = np.zeros_like(
aggregated_stats_secondpass["mean"], dtype=bool
)
for (
aggregated_val,
outlier_detector,
) in self.outlier_detectors.items():
outlier_mask_secondpass = np.logical_or(
outlier_mask_secondpass,
outlier_detector(
aggregated_stats_secondpass[aggregated_val]
),
)
# Add the outlier mask to the aggregated statistics
aggregated_stats_secondpass[
"outlier_mask"
] = outlier_mask_secondpass
Parameters
----------
outlier_mask : numpy.ndarray
Boolean array indicating outlier pixels. The shape of the array should
match the shape of the aggregated statistics.
# Stack the aggregated statistics of the second pass to the first pass
aggregated_stats = vstack(
[aggregated_stats, aggregated_stats_secondpass]
)
# Sort the aggregated statistics based on the starting time
aggregated_stats.sort(["time_start"])
else:
self.log.info(
"No faulty chunks detected in the first pass. The second pass with a finer chunk shift is not executed."
)
# Return the aggregated statistics and their outlier masks
return aggregated_stats
Returns
-------
numpy.ndarray
Boolean array where each element indicates whether the corresponding
chunk is valid (True) or faulty (False).
"""
# Check if the camera has two gain channels
if outlier_mask.shape[1] == 2:
# Combine the outlier mask of both gain channels
outlier_mask = np.logical_or(
outlier_mask[:, 0, :],
outlier_mask[:, 1, :],
)
# Calculate the fraction of faulty pixels over the camera
faulty_pixels_percentage = (
np.count_nonzero(outlier_mask, axis=-1) / np.shape(outlier_mask)[-1]
) * 100.0
# Check for valid chunks if the threshold is not exceeded
valid_chunks = faulty_pixels_percentage < self.faulty_pixels_threshold
return valid_chunks
Loading

0 comments on commit c404a4a

Please sign in to comment.