diff --git a/docs/api-reference/monitoring/calculator.rst b/docs/api-reference/monitoring/calculator.rst new file mode 100644 index 00000000000..93a1c1ec861 --- /dev/null +++ b/docs/api-reference/monitoring/calculator.rst @@ -0,0 +1,11 @@ +.. _calibration_calculator: + +********************** +Calibration Calculator +********************** + + +Reference/API +============= + +.. automodapi:: ctapipe.monitoring.calculator diff --git a/docs/api-reference/monitoring/index.rst b/docs/api-reference/monitoring/index.rst index c38bf7bb3a9..e34072bd6ce 100644 --- a/docs/api-reference/monitoring/index.rst +++ b/docs/api-reference/monitoring/index.rst @@ -10,7 +10,7 @@ Monitoring data are time-series used to monitor the status or quality of hardwar This module provides some code to help to generate monitoring data from processed event data, particularly for the purposes of calibration and data quality assessment. -Currently, only code related to :ref:`stats_aggregator` and :ref:`outlier_detector` is implemented here. +Code related to :ref:`stats_aggregator`, :ref:`calibration_calculator`, and :ref:`outlier_detector` is implemented here. Submodules @@ -20,6 +20,7 @@ Submodules :maxdepth: 1 aggregator + calculator interpolation outlier diff --git a/docs/changes/2609.features.rst b/docs/changes/2609.features.rst new file mode 100644 index 00000000000..fac55b285f6 --- /dev/null +++ b/docs/changes/2609.features.rst @@ -0,0 +1 @@ +Add calibration calculators which aggregates statistics, detects outliers, handles faulty data chunks. diff --git a/src/ctapipe/calib/camera/calibrator.py b/src/ctapipe/calib/camera/calibrator.py index baf3d2f1057..853ba3f7da8 100644 --- a/src/ctapipe/calib/camera/calibrator.py +++ b/src/ctapipe/calib/camera/calibrator.py @@ -2,6 +2,7 @@ Definition of the `CameraCalibrator` class, providing all steps needed to apply calibration and image extraction, as well as supporting algorithms. """ + from functools import cache import astropy.units as u diff --git a/src/ctapipe/monitoring/aggregator.py b/src/ctapipe/monitoring/aggregator.py index ee8b5566c7c..607395a46b8 100644 --- a/src/ctapipe/monitoring/aggregator.py +++ b/src/ctapipe/monitoring/aggregator.py @@ -57,7 +57,7 @@ def __call__( and call the relevant function of the particular aggregator to compute aggregated statistic values. The chunks are generated in a way that ensures they do not overflow the bounds of the table. - If ``chunk_shift`` is None, chunks will not overlap, but the last chunk is ensured to be - of size `chunk_size`, even if it means the last two chunks will overlap. + of size ``chunk_size``, even if it means the last two chunks will overlap. - If ``chunk_shift`` is provided, it will determine the number of samples to shift between the start of consecutive chunks resulting in an overlap of chunks. Chunks that overflows the bounds of the table are not considered. @@ -65,10 +65,10 @@ def __call__( Parameters ---------- table : astropy.table.Table - table with images of shape (n_images, n_channels, n_pix) - and timestamps of shape (n_images, ) + table with images of shape (n_images, n_channels, n_pixels), event IDs and + timestamps of shape (n_images, ) masked_pixels_of_sample : ndarray, optional - boolean array of masked pixels of shape (n_pix, ) that are not available for processing + boolean array of masked pixels of shape (n_channels, n_pixels) that are not available for processing chunk_shift : int, optional number of samples to shift between the start of consecutive chunks col_name : string diff --git a/src/ctapipe/monitoring/calculator.py b/src/ctapipe/monitoring/calculator.py new file mode 100644 index 00000000000..c642ef3a9d9 --- /dev/null +++ b/src/ctapipe/monitoring/calculator.py @@ -0,0 +1,337 @@ +""" +Definition of the ``PixelStatisticsCalculator`` class, providing all steps needed to +calculate the montoring data for the camera calibration. +""" + +import numpy as np +from astropy.table import Table, vstack + +from ctapipe.core import TelescopeComponent +from ctapipe.core.traits import ( + ComponentName, + Dict, + Float, + Int, + List, + TelescopeParameter, + TraitError, +) +from ctapipe.monitoring.aggregator import StatisticsAggregator +from ctapipe.monitoring.outlier import OutlierDetector + +__all__ = [ + "PixelStatisticsCalculator", +] + + +class PixelStatisticsCalculator(TelescopeComponent): + """ + Component to calculate statistics from calibration events. + + The ``PixelStatisticsCalculator`` is responsible for calculating various statistics from + calibration events, such as pedestal and flat-field data. It aggregates statistics, + detects outliers, and handles faulty data periods. + This class holds two functions to conduct two different passes over the data with and without + overlapping aggregation chunks. The first pass is conducted with non-overlapping chunks, + while overlapping chunks can be set by the ``chunk_shift`` parameter for the second pass. + The second pass over the data is only conducted in regions of trouble with a high fraction + of faulty pixels exceeding the threshold ``faulty_pixels_fraction``. + """ + + stats_aggregator_type = TelescopeParameter( + trait=ComponentName( + StatisticsAggregator, default_value="SigmaClippingAggregator" + ), + default_value="SigmaClippingAggregator", + help="Name of the StatisticsAggregator subclass to be used.", + ).tag(config=True) + + outlier_detector_list = List( + trait=Dict(), + default_value=None, + allow_none=True, + help=( + "List of dicts containing the name of the OutlierDetector subclass to be used, " + "the aggregated statistic value to which the detector should be applied, " + "and the configuration of the specific detector. " + "E.g. ``[{'apply_to': 'std', 'name': 'RangeOutlierDetector', 'config': {'validity_range': [2.0, 8.0]}}]``." + ), + ).tag(config=True) + + chunk_shift = Int( + default_value=None, + allow_none=True, + help=( + "Number of samples to shift the aggregation chunk for the calculation " + "of the statistical values. Only used in the second_pass(), since the " + "first_pass() is conducted with non-overlapping chunks (chunk_shift=None)." + ), + ).tag(config=True) + + faulty_pixels_fraction = Float( + default_value=0.1, + allow_none=True, + help="Minimum fraction of faulty camera pixels to identify regions of trouble.", + ).tag(config=True) + + def __init__( + self, + subarray, + config=None, + parent=None, + **kwargs, + ): + """ + Parameters + ---------- + subarray: ctapipe.instrument.SubarrayDescription + Description of the subarray. Provides information about the + camera which are useful in calibration. Also required for + configuring the TelescopeParameter traitlets. + config: traitlets.loader.Config + Configuration specified by config file or cmdline arguments. + Used to set traitlet values. + This is mutually exclusive with passing a ``parent``. + parent: ctapipe.core.Component or ctapipe.core.Tool + Parent of this component in the configuration hierarchy, + this is mutually exclusive with passing ``config`` + """ + super().__init__(subarray=subarray, config=config, parent=parent, **kwargs) + + # Initialize the instances of StatisticsAggregator + self.stats_aggregators = {} + for _, _, name in self.stats_aggregator_type: + self.stats_aggregators[name] = StatisticsAggregator.from_name( + name, subarray=self.subarray, parent=self + ) + + # Initialize the instances of OutlierDetector from the configuration + self.outlier_detectors, self.apply_to_list = [], [] + if self.outlier_detector_list is not None: + for d, outlier_detector in enumerate(self.outlier_detector_list): + # Check if all required keys are present + missing_keys = { + "apply_to", + "name", + "config", + } - outlier_detector.keys() + if missing_keys: + raise TraitError( + f"Entry '{d}' in the ``outlier_detector_list`` trait" + f"is missing required key(s): {', '.join(missing_keys)}" + ) + self.apply_to_list.append(outlier_detector["apply_to"]) + self.outlier_detectors.append( + OutlierDetector.from_name( + outlier_detector["name"], + subarray=self.subarray, + parent=self, + **outlier_detector["config"], + ) + ) + + def first_pass( + self, + table, + tel_id, + masked_pixels_of_sample=None, + col_name="image", + ) -> Table: + """ + Calculate the monitoring data for a given set of events with non-overlapping aggregation chunks. + + This method performs the first pass over the provided data table to calculate + various statistics for calibration purposes. The statistics are aggregated with + non-overlapping chunks (``chunk_shift`` set to None), and faulty pixels are detected + using a list of outlier detectors. + + + Parameters + ---------- + table : astropy.table.Table + DL1-like table with images of shape (n_images, n_channels, n_pixels), event IDs and + timestamps of shape (n_images, ) + tel_id : int + Telescope ID for which the calibration is being performed + masked_pixels_of_sample : ndarray, optional + Boolean array of masked pixels of shape (n_channels, n_pixels) that are not available for processing + col_name : str + Column name in the table from which the statistics will be aggregated + + Returns + ------- + astropy.table.Table + Table containing the aggregated statistics, their outlier masks, and the validity of the chunks + """ + # Get the aggregator + aggregator = self.stats_aggregators[self.stats_aggregator_type.tel[tel_id]] + # Pass through the whole provided dl1 table + aggregated_stats = aggregator( + table=table, + masked_pixels_of_sample=masked_pixels_of_sample, + col_name=col_name, + chunk_shift=None, + ) + # Detect faulty pixels with multiple instances of ``OutlierDetector`` + # and append the outlier masks to the aggregated statistics + self._find_and_append_outliers(aggregated_stats) + # Get valid chunks and add them to the aggregated statistics + aggregated_stats["is_valid"] = self._get_valid_chunks( + aggregated_stats["outlier_mask"] + ) + return aggregated_stats + + def second_pass( + self, + table, + valid_chunks, + tel_id, + masked_pixels_of_sample=None, + col_name="image", + ) -> Table: + """ + Conduct a second pass over the data to refine the statistics in regions with a high percentage of faulty pixels. + + This method performs a second pass over the data with a refined shift of the chunk in regions where a high percentage + of faulty pixels were detected during the first pass. Note: Multiple first passes of different calibration events are + performed which may lead to different identification of faulty chunks in rare cases. Therefore a joined list of faulty + chunks is recommended to be passed to the second pass(es) if those different passes use the same ``chunk_size``. + + Parameters + ---------- + table : astropy.table.Table + DL1-like table with images of shape (n_images, n_channels, n_pixels), event IDs and timestamps of shape (n_images, ). + valid_chunks : ndarray + Boolean array indicating the validity of each chunk from the first pass. + Note: This boolean array can be a ``logical_and`` from multiple first passes of different calibration events. + tel_id : int + Telescope ID for which the calibration is being performed. + masked_pixels_of_sample : ndarray, optional + Boolean array of masked pixels of shape (n_channels, n_pixels) that are not available for processing. + col_name : str + Column name in the table from which the statistics will be aggregated. + + Returns + ------- + astropy.table.Table + Table containing the aggregated statistics after the second pass, their outlier masks, and the validity of the chunks. + """ + # Check if the chunk_shift is set for the second pass + if self.chunk_shift is None: + raise ValueError( + "chunk_shift must be set if second pass over the data is requested" + ) + # Check if at least one chunk is faulty + if np.all(valid_chunks): + raise ValueError( + "All chunks are valid. The second pass over the data is redundant." + ) + # Get the aggregator + aggregator = self.stats_aggregators[self.stats_aggregator_type.tel[tel_id]] + # Conduct a second pass over the data + aggregated_stats_secondpass = [] + faulty_chunks_indices = np.flatnonzero(~valid_chunks) + for index in faulty_chunks_indices: + # Log information of the faulty chunks + self.log.info( + "Faulty chunk detected in the first pass at index '%s'.", index + ) + # Calculate the start of the slice depending on whether the previous chunk was faulty or not + slice_start = ( + aggregator.chunk_size * index + if index - 1 in faulty_chunks_indices + else aggregator.chunk_size * (index - 1) + ) + # Set the start of the slice to the first element of the dl1 table if out of bound + # and add one ``chunk_shift``. + slice_start = max(0, slice_start) + self.chunk_shift + # Set the end of the slice to the last element of the dl1 table if out of bound + # and subtract one ``chunk_shift``. + slice_end = min(len(table) - 1, aggregator.chunk_size * (index + 2)) - ( + self.chunk_shift - 1 + ) + # Slice the dl1 table according to the previously calculated start and end. + table_sliced = table[slice_start:slice_end] + # Run the stats aggregator on the sliced dl1 table with a chunk_shift + # to sample the period of trouble (carflashes etc.) as effectively as possible. + # Checking for the length of the sliced table to be greater than the ``chunk_size`` + # since it can be smaller if the last two chunks are faulty. Note: The two last chunks + # can be overlapping during the first pass, so we simply ignore them if there are faulty. + if len(table_sliced) > aggregator.chunk_size: + aggregated_stats_secondpass.append( + aggregator( + table=table_sliced, + masked_pixels_of_sample=masked_pixels_of_sample, + col_name=col_name, + chunk_shift=self.chunk_shift, + ) + ) + # Stack the aggregated statistics of each faulty chunk + aggregated_stats_secondpass = vstack(aggregated_stats_secondpass) + # Detect faulty pixels with multiple instances of OutlierDetector of the second pass + # and append the outlier mask to the aggregated statistics + self._find_and_append_outliers(aggregated_stats_secondpass) + aggregated_stats_secondpass["is_valid"] = self._get_valid_chunks( + aggregated_stats_secondpass["outlier_mask"] + ) + return aggregated_stats_secondpass + + def _find_and_append_outliers(self, aggregated_stats): + """ + Find outliers and append the masks in the aggregated statistics. + + This method detects outliers in the aggregated statistics using the + outlier detectors defined in the configuration. Table containing the + aggregated statistics will be appended with the outlier masks for each + detector and a combined outlier mask. + + Parameters + ---------- + aggregated_stats : astropy.table.Table + Table containing the aggregated statistics. + + """ + outlier_mask = np.zeros_like(aggregated_stats["mean"], dtype=bool) + for d, (column_name, outlier_detector) in enumerate( + zip(self.apply_to_list, self.outlier_detectors) + ): + aggregated_stats[f"outlier_mask_detector_{d}"] = outlier_detector( + aggregated_stats[column_name] + ) + outlier_mask = np.logical_or( + outlier_mask, + aggregated_stats[f"outlier_mask_detector_{d}"], + ) + aggregated_stats["outlier_mask"] = outlier_mask + + def _get_valid_chunks(self, outlier_mask): + """ + Identify valid chunks based on the outlier mask. + + This method processes the outlier mask to determine which chunks of data + are considered valid or faulty. A chunk is marked as faulty if the fraction + of outlier pixels exceeds a predefined threshold ``faulty_pixels_fraction``. + + Parameters + ---------- + outlier_mask : numpy.ndarray + Boolean array indicating outlier pixels. The shape of the array should + match the shape of the aggregated statistics. + + Returns + ------- + numpy.ndarray + Boolean array where each element indicates whether the corresponding + chunk is valid (True) or faulty (False). + """ + # Check if the camera has two gain channels + if outlier_mask.shape[1] == 2: + # Combine the outlier mask of both gain channels + outlier_mask = np.logical_or.reduce(outlier_mask, axis=1) + # Calculate the fraction of faulty pixels over the camera + faulty_pixels = ( + np.count_nonzero(outlier_mask, axis=-1) / np.shape(outlier_mask)[-1] + ) + # Check for valid chunks if the threshold is not exceeded + valid_chunks = faulty_pixels < self.faulty_pixels_fraction + return valid_chunks diff --git a/src/ctapipe/monitoring/outlier.py b/src/ctapipe/monitoring/outlier.py index 4ac387bbcd3..98a0e338c85 100644 --- a/src/ctapipe/monitoring/outlier.py +++ b/src/ctapipe/monitoring/outlier.py @@ -35,12 +35,12 @@ def __call__(self, column) -> bool: ---------- column : astropy.table.Column column with chunk-wise aggregated statistic values (mean, median, or std) - of shape (n_entries, n_channels, n_pix) + of shape (n_entries, n_channels, n_pixels) Returns ------- boolean mask - mask of outliers of shape (n_entries, n_channels, n_pix) + mask of outliers of shape (n_entries, n_channels, n_pixels) """ pass diff --git a/src/ctapipe/monitoring/tests/test_calculator.py b/src/ctapipe/monitoring/tests/test_calculator.py new file mode 100644 index 00000000000..af00da4b2e8 --- /dev/null +++ b/src/ctapipe/monitoring/tests/test_calculator.py @@ -0,0 +1,159 @@ +""" +Tests for CalibrationCalculator and related functions +""" + +import numpy as np +from astropy.table import Table, vstack +from astropy.time import Time +from traitlets.config.loader import Config + +from ctapipe.monitoring.calculator import PixelStatisticsCalculator + + +def test_statistics_calculator(example_subarray): + """test basic functionality of the PixelStatisticsCalculator""" + + # Create dummy data for testing + n_images = 5050 + times = Time( + np.linspace(60117.911, 60117.9258, num=n_images), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=n_images, dtype=int) + rng = np.random.default_rng(0) + charge_data = rng.normal(77.0, 10.0, size=(n_images, 2, 1855)) + # Create tables + charge_table = Table( + [times, event_ids, charge_data], + names=("time_mono", "event_id", "image"), + ) + # Create configuration + chunk_size = 1000 + chunk_shift = 500 + config = Config( + { + "PixelStatisticsCalculator": { + "stats_aggregator_type": [ + ("id", 1, "SigmaClippingAggregator"), + ], + "chunk_shift": chunk_shift, + "faulty_pixels_fraction": 0.09, + }, + "SigmaClippingAggregator": { + "chunk_size": chunk_size, + }, + } + ) + # Initialize the calculator from config + calculator = PixelStatisticsCalculator(subarray=example_subarray, config=config) + # Compute the statistical values + stats = calculator.first_pass(table=charge_table, tel_id=1) + # Set all chunks as faulty to aggregate the statistic values with a "global" chunk shift + valid_chunks = np.zeros_like(stats["is_valid"].data, dtype=bool) + # Run the second pass over the data + stats_chunk_shift = calculator.second_pass( + table=charge_table, valid_chunks=valid_chunks, tel_id=1 + ) + # Stack the statistic values from the first and second pass + stats_stacked = vstack([stats, stats_chunk_shift]) + # Sort the stacked aggregated statistic values by starting time + stats_stacked.sort(["time_start"]) + # Check if the calculated statistical values are reasonable + # for a camera with two gain channels + np.testing.assert_allclose(stats[0]["mean"], 77.0, atol=2.5) + np.testing.assert_allclose(stats[1]["median"], 77.0, atol=2.5) + np.testing.assert_allclose(stats[0]["std"], 10.0, atol=2.5) + np.testing.assert_allclose(stats_chunk_shift[0]["mean"], 77.0, atol=2.5) + np.testing.assert_allclose(stats_chunk_shift[1]["median"], 77.0, atol=2.5) + np.testing.assert_allclose(stats_chunk_shift[0]["std"], 10.0, atol=2.5) + # Check if overlapping chunks of the second pass were aggregated + assert stats_chunk_shift is not None + # Check if the number of aggregated chunks is correct + # In the first pass, the number of chunks is equal to the + # number of images divided by the chunk size plus one + # overlapping chunk at the end. + expected_len_firstpass = n_images // chunk_size + 1 + assert len(stats) == expected_len_firstpass + # In the second pass, the number of chunks is equal to the + # number of images divided by the chunk shift minus the + # number of chunks in the first pass, since we set all + # chunks to be faulty. + expected_len_secondpass = (n_images // chunk_shift) - expected_len_firstpass + assert len(stats_chunk_shift) == expected_len_secondpass + # The total number of aggregated chunks is the sum of the + # number of chunks in the first and second pass. + assert len(stats_stacked) == expected_len_firstpass + expected_len_secondpass + + +def test_outlier_detector(example_subarray): + """test the chunk shift option and the boundary case for the last chunk""" + + # Create dummy data for testing + times = Time( + np.linspace(60117.911, 60117.9258, num=5500), scale="tai", format="mjd" + ) + event_ids = np.linspace(35, 725000, num=5500, dtype=int) + rng = np.random.default_rng(0) + ped_data = rng.normal(2.0, 5.0, size=(5500, 2, 1855)) + # Create table + ped_table = Table( + [times, event_ids, ped_data], + names=("time_mono", "event_id", "image"), + ) + # Create configuration + config = Config( + { + "PixelStatisticsCalculator": { + "stats_aggregator_type": [ + ("id", 1, "SigmaClippingAggregator"), + ], + "outlier_detector_list": [ + { + "apply_to": "mean", + "name": "MedianOutlierDetector", + "config": { + "median_range_factors": [-3.0, 3.0], + }, + }, + { + "apply_to": "median", + "name": "StdOutlierDetector", + "config": { + "std_range_factors": [-2.0, 2.0], + }, + }, + { + "apply_to": "std", + "name": "RangeOutlierDetector", + "config": { + "validity_range": [2.0, 8.0], + }, + }, + ], + "chunk_shift": 500, + "faulty_pixels_fraction": 0.09, + }, + "SigmaClippingAggregator": { + "chunk_size": 1000, + }, + } + ) + # Initialize the calculator from config + calculator = PixelStatisticsCalculator(subarray=example_subarray, config=config) + # Run the first pass over the data + stats_first_pass = calculator.first_pass(table=ped_table, tel_id=1) + # Run the second pass over the data + stats_second_pass = calculator.second_pass( + table=ped_table, valid_chunks=stats_first_pass["is_valid"].data, tel_id=1 + ) + # Stack the statistic values from the first and second pass + stats_stacked = vstack([stats_first_pass, stats_second_pass]) + # Sort the stacked aggregated statistic values by starting time + stats_stacked.sort(["time_start"]) + # Check if overlapping chunks of the second pass were aggregated + assert stats_second_pass is not None + assert len(stats_stacked) > len(stats_second_pass) + # Check if the calculated statistical values are reasonable + # for a camera with two gain channels + np.testing.assert_allclose(stats_stacked[0]["mean"], 2.0, atol=2.5) + np.testing.assert_allclose(stats_stacked[1]["median"], 2.0, atol=2.5) + np.testing.assert_allclose(stats_stacked[0]["std"], 5.0, atol=2.5)