diff --git a/.bumpversion.cfg b/.bumpversion.cfg index 4c1feebb1..eb17813b8 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 3.0.1 +current_version = 3.0.2 files = straxen/__init__.py commit = True tag = True diff --git a/HISTORY.md b/HISTORY.md index dad348eca..5e2523e4e 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,12 @@ +3.0.2 / 2025-01-13 +------------------- +* Collect SOM dtype at one place by @dachengx in https://github.com/XENONnT/straxen/pull/1511 +* Stop support for list of "take" protocol by @dachengx in https://github.com/XENONnT/straxen/pull/1517 +* Add `stage` flag for `RucioRemoteBackend` by @dachengx in https://github.com/XENONnT/straxen/pull/1520 + +**Full Changelog**: https://github.com/XENONnT/straxen/compare/v3.0.1...v3.0.2 + + 3.0.1 / 2024-12-27 ------------------- * Fix run_doc for led plugin by @GiovanniVolta in https://github.com/XENONnT/straxen/pull/1462 diff --git a/pyproject.toml b/pyproject.toml index 1a5faa420..c6378f316 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool] [tool.poetry] name = "straxen" -version = "3.0.1" +version = "3.0.2" description = "Streaming analysis for XENON" readme = "README.md" authors = [ diff --git a/straxen/__init__.py b/straxen/__init__.py index d7ee6278f..bd9d3b8f8 100644 --- a/straxen/__init__.py +++ b/straxen/__init__.py @@ -1,5 +1,5 @@ # mypy: disable-error-code="no-redef" -__version__ = "3.0.1" +__version__ = "3.0.2" from utilix import uconfig from .common import * diff --git a/straxen/analyses/posrec_comparison.py b/straxen/analyses/posrec_comparison.py index 163b24265..9eddf031b 100644 --- a/straxen/analyses/posrec_comparison.py +++ b/straxen/analyses/posrec_comparison.py @@ -64,14 +64,14 @@ def load_corrected_positions( z_obs = -drift_speed * drift_time - for algo, v_cmt in zip(posrec_algos, cmt_version): - fdc_tmp = (f"fdc_map_{algo}", v_cmt, True) + for alg, v_cmt in zip(posrec_algos, cmt_version): + fdc_tmp = (f"fdc_map_{alg}", v_cmt, True) map_tmp = straxen.get_correction_from_cmt(run_id, fdc_tmp) itp_tmp = straxen.InterpolatingMap(straxen.common.get_resource(map_tmp, fmt="binary")) itp_tmp.scale_coordinates([1.0, 1.0, -drift_speed]) orig_pos = np.vstack( - [events[f"{s2_pre}s2_x_{algo}"], events[f"{s2_pre}s2_y_{algo}"], z_obs] + [events[f"{s2_pre}s2_x_{alg}"], events[f"{s2_pre}s2_y_{alg}"], z_obs] ).T r_obs = np.linalg.norm(orig_pos[:, :2], axis=1) delta_r = itp_tmp(orig_pos) @@ -87,13 +87,13 @@ def load_corrected_positions( invalid = np.abs(z_obs) < np.abs(delta_r) z_cor[invalid] = z_obs[invalid] - result[f"x_{algo}"] = orig_pos[:, 0] * scale - result[f"y_{algo}"] = orig_pos[:, 1] * scale - result[f"r_{algo}"] = r_cor - result[f"r_naive_{algo}"] = r_obs - result[f"r_field_distortion_correction_{algo}"] = delta_r - result[f"theta_{algo}"] = np.arctan2(orig_pos[:, 1], orig_pos[:, 0]) - result[f"z_{algo}"] = z_cor + result[f"x_{alg}"] = orig_pos[:, 0] * scale + result[f"y_{alg}"] = orig_pos[:, 1] * scale + result[f"r_{alg}"] = r_cor + result[f"r_naive_{alg}"] = r_obs + result[f"r_field_distortion_correction_{alg}"] = delta_r + result[f"theta_{alg}"] = np.arctan2(orig_pos[:, 1], orig_pos[:, 0]) + result[f"z_{alg}"] = z_cor result["z_naive"] = z_obs return result @@ -102,16 +102,16 @@ def load_corrected_positions( def load_dtypes(posrec_algos): dtype = [] - for algo in posrec_algos: + for alg in posrec_algos: for xyzr in "x y z r".split(): dtype += [ ( ( ( f"Interaction {xyzr}-position, field-distortion corrected (cm) - " - f"{algo.upper()} posrec algorithm" + f"{alg.upper()} posrec algorithm" ), - f"{xyzr}_{algo}", + f"{xyzr}_{alg}", ), np.float32, ), @@ -121,9 +121,9 @@ def load_dtypes(posrec_algos): ( ( "Interaction r-position using observed S2 positions directly (cm) -" - f" {algo.upper()} posrec algorithm" + f" {alg.upper()} posrec algorithm" ), - f"r_naive_{algo}", + f"r_naive_{alg}", ), np.float32, ), @@ -131,16 +131,16 @@ def load_dtypes(posrec_algos): ( ( "Correction added to r_naive for field distortion (cm) - " - f"{algo.upper()} posrec algorithm" + f"{alg.upper()} posrec algorithm" ), - f"r_field_distortion_correction_{algo}", + f"r_field_distortion_correction_{alg}", ), np.float32, ), ( ( - f"Interaction angular position (radians) - {algo.upper()} posrec algorithm", - f"theta_{algo}", + f"Interaction angular position (radians) - {alg.upper()} posrec algorithm", + f"theta_{alg}", ), np.float32, ), diff --git a/straxen/corrections_services.py b/straxen/corrections_services.py index 18e83fb6a..c51ab0c9c 100644 --- a/straxen/corrections_services.py +++ b/straxen/corrections_services.py @@ -429,7 +429,7 @@ def apply_cmt_version(context: strax.Context, cmt_global_version: str) -> None: # this could be either a CMT tuple or a URLConfig value = option_info["strax_option"] - # might need to modify correction name to include position reconstruction algo + # might need to modify correction name to include position reconstruction alg # this is a bit of a mess, but posrec configs are treated differently in the tuples # URL configs should already include the posrec suffix # (it's real mess -- we should drop tuple configs) diff --git a/straxen/plugins/events/event_basics_vanilla.py b/straxen/plugins/events/event_basics_vanilla.py index c44f457ca..1c74e1a99 100644 --- a/straxen/plugins/events/event_basics_vanilla.py +++ b/straxen/plugins/events/event_basics_vanilla.py @@ -173,34 +173,34 @@ def _set_posrec_save(self): self.pos_rec_labels = list(set(posrec_names)) self.pos_rec_labels.sort() - self.posrec_save = [(xy + algo) for xy in ["x_", "y_"] for algo in self.pos_rec_labels] + self.posrec_save = [(xy + alg) for xy in ["x_", "y_"] for alg in self.pos_rec_labels] def _get_posrec_dtypes(self): """Get S2 positions for each of the position reconstruction algorithms.""" posrec_dtpye = [] - for algo in self.pos_rec_labels: + for alg in self.pos_rec_labels: # S2 positions posrec_dtpye += [ ( - f"s2_x_{algo}", + f"s2_x_{alg}", np.float32, - f"Main S2 {algo}-reconstructed X position, uncorrected [cm]", + f"Main S2 {alg}-reconstructed X position, uncorrected [cm]", ), ( - f"s2_y_{algo}", + f"s2_y_{alg}", np.float32, - f"Main S2 {algo}-reconstructed Y position, uncorrected [cm]", + f"Main S2 {alg}-reconstructed Y position, uncorrected [cm]", ), ( - f"alt_s2_x_{algo}", + f"alt_s2_x_{alg}", np.float32, - f"Alternate S2 {algo}-reconstructed X position, uncorrected [cm]", + f"Alternate S2 {alg}-reconstructed X position, uncorrected [cm]", ), ( - f"alt_s2_y_{algo}", + f"alt_s2_y_{alg}", np.float32, - f"Alternate S2 {algo}-reconstructed Y position, uncorrected [cm]", + f"Alternate S2 {alg}-reconstructed Y position, uncorrected [cm]", ), ] diff --git a/straxen/plugins/events/event_pattern_fit.py b/straxen/plugins/events/event_pattern_fit.py index cd86b8290..a5315b0fe 100644 --- a/straxen/plugins/events/event_pattern_fit.py +++ b/straxen/plugins/events/event_pattern_fit.py @@ -2,8 +2,9 @@ import straxen import numpy as np import numba -from straxen.numbafied_scipy import numba_gammaln, numba_betainc from scipy.special import loggamma +from straxen.numbafied_scipy import numba_gammaln, numba_betainc +from straxen.plugins.defaults import DEFAULT_POSREC_ALGO export, __all__ = strax.exporter() @@ -16,6 +17,10 @@ class EventPatternFit(strax.Plugin): provides = "event_pattern_fit" __version__ = "0.1.3" + default_reconstruction_algorithm = straxen.URLConfig( + default=DEFAULT_POSREC_ALGO, help="default reconstruction algorithm that provides (x,y)" + ) + # Getting S1 AFT maps s1_aft_map = straxen.URLConfig( default="itp_map://resource://xedocs://s1_aft_xyz_maps?attr=value&fmt=json&run_id=plugin.run_id&version=ONLINE", diff --git a/straxen/plugins/raw_records_coin_nv/nveto_recorder.py b/straxen/plugins/raw_records_coin_nv/nveto_recorder.py index cbd71073e..835e9e517 100644 --- a/straxen/plugins/raw_records_coin_nv/nveto_recorder.py +++ b/straxen/plugins/raw_records_coin_nv/nveto_recorder.py @@ -325,7 +325,7 @@ def pulse_in_interval(raw_records, record_links, start_times, end_times): """ nrr = len(raw_records) - result = np.zeros(nrr, bool) + result = np.zeros(nrr, np.bool_) last_interval_seen = 0 for ind, rr in enumerate(raw_records): diff --git a/straxen/scripts/bootstrax.py b/straxen/scripts/bootstrax.py index de38552da..5b2fd6286 100755 --- a/straxen/scripts/bootstrax.py +++ b/straxen/scripts/bootstrax.py @@ -78,13 +78,16 @@ action="store_true", help="Don't allow bootstrax to switch to a different target for special runs", ) +parser.add_argument( + "--fix_resources", + action="store_true", + help="Don't let bootstrax change number of cores/max_messages because of failures", +) parser.add_argument( "--infer_mode", action="store_true", - help=( - "Determine best number max-messages and cores for each run " - "automatically. Overrides --cores and --max_messages" - ), + help="Determine best number max-messages and cores for each run " + "automatically. Overrides --cores and --max_messages", ) parser.add_argument( "--delete_live", @@ -102,10 +105,8 @@ parser.add_argument( "--ignore_checks", action="store_true", - help=( - "Do not use! This disables checks on e.g. the timestamps! Should only " - "be used if some run is very valuable but some checks are failing." - ), + help="Do not use! This disables checks on e.g. the timestamps! Should only " + "be used if some run is very valuable but some checks are failing.", ) parser.add_argument("--max_messages", type=int, default=10, help="number of max mailbox messages") @@ -626,6 +627,24 @@ def infer_target(rd: dict) -> dict: # also use source field, outcome is the same) if "event_info" in targets or "event_info" in post_process: targets = list(targets) + ["event_info_double"] + elif "ambe" in mode: + # rates are very high, to ensure smooth operation let's just do this + # based on calibrations of Apr 2023 this is the only safe working solution + log.debug("ambe-mode") + + # get the mode from the daq_db + # this is a new thing from Nov 2023 + # it overwrites the mode from the rundb + bootstrax_config_coll = daq_db["bootstrax_config"] + bootstrax_config = bootstrax_config_coll.find_one({"name": "bootstrax_config"}) + + this_eb_ambe_mode = bootstrax_config["ambe_modes"].get(hostname[:3], "default") + log.debug(f"Ambe mode for {hostname} is {this_eb_ambe_mode}") + + if this_eb_ambe_mode != "default": + log.debug(f"Overwriting targets and post processing for {hostname} from daq_db") + targets = bootstrax_config["modes_definitions"][this_eb_ambe_mode]["targets"] + post_process = bootstrax_config["modes_definitions"][this_eb_ambe_mode]["post_process"] targets = strax.to_str_tuple(targets) post_process = strax.to_str_tuple(post_process) @@ -844,6 +863,17 @@ def infer_mode(rd): "timeout": [1200, 1200, None, None, None, None, None, None, None, None, None, 2400], } if data_rate and args.infer_mode: + + # Temporary solution + # It is a patch to try to process some ambe data without failing for memory + # If we are doing ambe -> consider the maximum rate + # so that we have 10 cores and 12 messages for new ebs + # and we have 8 cores and 6 messages for old ebs + # added by Carlo on 19 April 2023 + mode = str(rd.get("mode")) + if "ambe" in mode: + data_rate = 550 + df = pd.DataFrame(benchmark) if data_rate not in benchmark["mbs"]: df.loc[len(df.index)] = [data_rate, None, None, None, None, None] @@ -857,7 +887,14 @@ def infer_mode(rd): del df, benchmark, result["cores_old"], result["max_messages_old"] else: result = dict(cores=args.cores, max_messages=args.max_messages, timeout=1000) + n_fails = rd["bootstrax"].get("n_failures", 0) + if args.fix_resources: + # If we are in a fix resource mode, we should not change the resources + # based on the number of failures. + n_fails = 0 + log.debug(f"Fixing resources, ignoring {n_fails} previous failures") + if n_fails: # Exponentially lower resources & increase timeout result = dict( diff --git a/straxen/storage/rucio_remote.py b/straxen/storage/rucio_remote.py index 113b93c79..e5e6113f1 100644 --- a/straxen/storage/rucio_remote.py +++ b/straxen/storage/rucio_remote.py @@ -34,7 +34,13 @@ class RucioRemoteFrontend(strax.StorageFrontend): path = None def __init__( - self, download_heavy=False, staging_dir="./strax_data", rses_only=tuple(), *args, **kwargs + self, + download_heavy=False, + stage=False, + staging_dir="./strax_data", + rses_only=tuple(), + *args, + **kwargs, ): """ :param download_heavy: option to allow downloading of heavy data through RucioRemoteBackend @@ -49,7 +55,9 @@ def __init__( if HAVE_ADMIX: self.backends = [ - RucioRemoteBackend(staging_dir, download_heavy=download_heavy, rses_only=rses_only), + RucioRemoteBackend( + staging_dir, download_heavy=download_heavy, stage=stage, rses_only=rses_only + ), ] else: self.log.warning( @@ -95,7 +103,7 @@ class RucioRemoteBackend(strax.FileSytemBackend): # for caching RSE locations dset_cache: Dict[str, str] = {} - def __init__(self, staging_dir, download_heavy=False, rses_only=tuple(), **kwargs): + def __init__(self, staging_dir, download_heavy=False, stage=False, rses_only=tuple(), **kwargs): """ :param staging_dir: Path (a string) where to save data. Must be a writable location. @@ -119,6 +127,7 @@ def __init__(self, staging_dir, download_heavy=False, rses_only=tuple(), **kwarg super().__init__(**kwargs) self.staging_dir = staging_dir self.download_heavy = download_heavy + self.stage = stage self.rses_only = strax.to_str_tuple(rses_only) def _get_rse(self, dset_did, **filters): @@ -143,7 +152,9 @@ def _get_metadata(self, dset_did, **kwargs): self.dset_cache[dset_did] = rse metadata_did = strax.RUN_METADATA_PATTERN % dset_did - downloaded = admix.download(metadata_did, rse=rse, location=self.staging_dir) + downloaded = admix.download( + metadata_did, rse=rse, location=self.staging_dir, stage=self.stage + ) if len(downloaded) != 1: raise ValueError(f"{metadata_did} should be a single file. We found {len(downloaded)}.") metadata_path = downloaded[0] @@ -178,7 +189,9 @@ def _read_chunk(self, dset_did, chunk_info, dtype, compressor): rse = self._get_rse(dset_did) self.dset_cache[dset_did] = rse - downloaded = admix.download(chunk_did, rse=rse, location=self.staging_dir) + downloaded = admix.download( + chunk_did, rse=rse, location=self.staging_dir, stage=self.stage + ) if len(downloaded) != 1: raise ValueError( f"{chunk_did} should be a single file. We found {len(downloaded)}."