Skip to content

Commit

Permalink
added skeleton for on-the-fly waveform cleaning with digital sum and …
Browse files Browse the repository at this point in the history
…DBSCAN
  • Loading branch information
TjarkMiener committed Oct 18, 2024
1 parent 911d482 commit 43547b2
Showing 1 changed file with 50 additions and 23 deletions.
73 changes: 50 additions & 23 deletions dl1_data_handler/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
"""

__all__ = [
"ProcessType",
"TableQualityQuery",
"DLDataReader",
"DLImageReader",
"get_unmapped_image",
"DLWaveformReader",
"get_unmapped_waveform",
"clean_waveform",
]

from abc import abstractmethod
Expand All @@ -29,12 +31,12 @@
from ctapipe.core import Component, QualityQuery
from ctapipe.core.traits import (
Bool,
Dict,
CInt,
Int,
IntTelescopeParameter,
Set,
List,
Path,
CaselessStrEnum,
Unicode,
TelescopeParameter,
Expand Down Expand Up @@ -972,6 +974,7 @@ def _get_features(self, batch) -> dict:
def get_unmapped_waveform(
r1_event,
settings,
camera_geometry=None,
dl1_cleaning_mask=None,
):
"""
Expand All @@ -990,10 +993,12 @@ def get_unmapped_waveform(
Dictionary containing settings for waveform processing, including:
- ``waveform_scale`` (float): Scale factor for waveform values.
- ``waveform_offset`` (int): Offset value for waveform values.
- ``type`` (str): Type of waveform processing (``calibrated`` or ``cleaned_calibrated``).
- ``cleaning_type`` (str or None): Data level on which the cleaning mask(s) are obtained for cleaning the waveforms.
- ``seq_length`` (int): Length of the waveform sequence to be extracted.
- ``readout_length`` (int): Total length of the readout window.
- ``seq_position`` (str): Position of the sequence within the readout window (``center`` or ``maximum``).
camera_geometry : ctapipe.instrument.CameraGeometry, optional
The geometry of the camera, including pixel positions and camera type. Default is ``None``.
dl1_cleaning_mask : numpy.ndarray, optional
Array containing the DL1 cleaning mask to be applied to the waveform to find the shower maximum
to center the sequence. Default is ``None``.
Expand All @@ -1019,19 +1024,16 @@ def get_unmapped_waveform(
if settings["waveform_offset"] > 0:
waveform -= settings["waveform_offset"]
# Apply the DL1 cleaning mask if selected
if settings["type"] == "cleaned_calibrated":
if settings["cleaning_type"] == "image":
waveform *= dl1_cleaning_mask[:, None]
elif settings["cleaning_type"] == "waveform":
waveform = clean_waveform(waveform, camera_geometry, settings["DBSCAN_params"])
# Retrieve the sequence around the center of the readout window or the shower maximum
if settings["seq_length"] < settings["readout_length"]:
if settings["seq_position"] == "center":
sequence_position = waveform.shape[1] // 2 - 1
elif settings["seq_position"] == "maximum":
if dl1_cleaning_mask is None:
sequence_position = np.argmax(np.sum(waveform, axis=0))
else:
sequence_position = np.argmax(
np.sum(waveform * dl1_cleaning_mask[:, None], axis=0)
)
sequence_position = np.argmax(np.sum(waveform, axis=0))
# Calculate start and stop positions
start = max(0, int(1 + sequence_position - settings["seq_length"] / 2))
stop = min(settings["readout_length"], int(1 + sequence_position + settings["seq_length"] / 2))
Expand All @@ -1044,14 +1046,16 @@ def get_unmapped_waveform(

return waveform

def clean_waveform(waveform, camera_geometry, DBSCAN_config):
pass

class DLWaveformReader(DLDataReader):
"""
A data reader class for handling R1 calibrated waveform data from telescopes.
This class extends the ``DLDataReader`` to specifically handle the reading,
transformation, and mapping of R1 calibrated waveform data. It supports both ``mono``
and ``stereo`` data loading modes and can apply DL1 cleaning masks to the waveforms
and ``stereo`` data loading modes and can perform a cleaning to the waveforms
if specified.
Attributes
Expand All @@ -1061,11 +1065,14 @@ class DLWaveformReader(DLDataReader):
the sequence length is set to the readout length.
sequence_position : str
Position of the sequence within the readout window. Can be ``center`` or ``maximum``.
clean : bool
Indicates whether to apply the DL1 cleaning mask to the calibrated waveforms.
cleaning_type : str or None
Data level on which the cleaning mask(s) are obtained for cleaning the waveforms.
Can be ``image`` or ``waveform``.
DBSCAN_params : dict or None
Dictionary containing the DBSCAN clustering parameters for waveform cleaning.
waveform_settings : dict
Contains settings for waveform processing, including type, sequence length,
readout length, sequence position, scale, and offset.
Contains settings for waveform processing, including cleaning type (with DBSCAN parameters),
sequence length, readout length, sequence position, scale, and offset.
"""

sequence_length = Int(
Expand All @@ -1084,12 +1091,31 @@ class DLWaveformReader(DLDataReader):
),
).tag(config=True)

clean = Bool(
default_value=False,
allow_none=False,
help="Set whether to apply the DL1 cleaning mask to the calibrated waveforms.",
cleaning_type = CaselessStrEnum(
["image", "waveform"],
allow_none=True,
default_value=None,
help=(
"Set whether to apply cleaning of the calibrated waveforms. "
"Two cleaning types are supported obtained from different data levels."
"``image``: apply the DL1 cleaning mask to the calibrated waveforms. "
"``waveform``: perform a digital sum and a DBSCAN clustering on-the-fly. "
),
).tag(config=True)

DBSCAN_params = Dict(
default_value={"eps": 0.5, "min_samples": 5, "metric": 'euclidean'},
allow_none=True,
help=(
"Set the DBSCAN clustering parameters for waveform cleaning. "
"Only required when ``cleaning_type`` is set to ``waveform``. "
"``eps``: The maximum distance between two samples for one to be considered as in the neighborhood of the other."
"``min_samples``: The number of samples in a neighborhood for a point to be considered as a core point."
"``metric``: The metric to use when calculating distance between instances in a feature array."
),
).tag(config=True)


def __init__(
self,
input_url_signal,
Expand Down Expand Up @@ -1120,12 +1146,12 @@ def __init__(
)

# Construct settings dict for the calibrated waveforms
self.waveform_type = "cleaned_calibrated" if self.clean else "calibrated"
self.waveform_settings = {
"type": self.waveform_type,
"cleaning_type": self.cleaning_type,
"seq_length": self.sequence_length,
"readout_length": self.readout_length,
"seq_position": self.sequence_position,
"DBSCAN_params": self.DBSCAN_params,
}

# Check the transform value used for the file compression
Expand Down Expand Up @@ -1176,6 +1202,9 @@ def _get_features(self, batch) -> dict:
batch["tel_id"],
):
filename = list(self.files)[file_idx]
camera_type = self._get_camera_type(
list(self.selected_telescopes.keys())[tel_type_id]
)
with lock:
tel_table = f"tel_{tel_id:03d}"
child = self.files[filename].root.r1.event.telescope._f_get_child(
Expand All @@ -1193,13 +1222,11 @@ def _get_features(self, batch) -> dict:
unmapped_waveform = get_unmapped_waveform(
child[table_idx],
self.waveform_settings,
self.image_mappers[camera_type].geometry,
dl1_cleaning_mask,
)
# Apply the 'ImageMapper' whenever the index matrix is not None.
# Otherwise, return the unmapped image for the 'IndexedConv' package.
camera_type = self._get_camera_type(
list(self.selected_telescopes.keys())[tel_type_id]
)
if self.image_mappers[camera_type].index_matrix is None:
waveforms.append(self.image_mappers[camera_type].map_image(unmapped_waveform))
else:
Expand Down

0 comments on commit 43547b2

Please sign in to comment.