From b344dfa0cda6b0a8b2839d155623b770b4c4844f Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Sun, 10 Mar 2024 23:19:48 -0700 Subject: [PATCH 1/6] Add LFA processing. --- Dockerfile.enqueue | 4 +- Dockerfile.ingest | 2 +- src/enqueue.py | 42 ++++++------ src/exposure_info.py | 67 ------------------ src/info.py | 151 +++++++++++++++++++++++++++++++++++++++++ src/ingest.py | 67 +++++++++--------- src/rucio_interface.py | 2 +- 7 files changed, 215 insertions(+), 120 deletions(-) delete mode 100644 src/exposure_info.py create mode 100644 src/info.py diff --git a/Dockerfile.enqueue b/Dockerfile.enqueue index 1ce3469..b678b04 100644 --- a/Dockerfile.enqueue +++ b/Dockerfile.enqueue @@ -24,7 +24,9 @@ FROM python:3.11 RUN pip install redis gunicorn flask WORKDIR /enqueue -COPY src/enqueue.py src/exposure_info.py src/utils.py /enqueue/ +COPY src/enqueue.py src/info.py src/utils.py /enqueue/ # environment variables that must be set: # REDIS_HOST REDIS_PASSWORD NOTIFICATION_SECRET +# optional: +# DATASET_REGEXP ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "enqueue:app" ] diff --git a/Dockerfile.ingest b/Dockerfile.ingest index 859f912..212eb5c 100644 --- a/Dockerfile.ingest +++ b/Dockerfile.ingest @@ -28,7 +28,7 @@ ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2023_41} USER lsst RUN source loadLSST.bash && mamba install redis-py rucio-clients RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst -COPY src/ingest.py src/exposure_info.py src/utils.py src/rucio_interface.py ./ingest/ +COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/ # Environment variables that must be set: # REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO # For Rucio (all must be set if RUCIO_RSE is set): diff --git a/src/enqueue.py b/src/enqueue.py index 12a9ac2..c92f43a 100644 --- a/src/enqueue.py +++ b/src/enqueue.py @@ -29,7 +29,7 @@ import redis from flask import Flask, request -from exposure_info import ExposureInfo +from info import ExposureInfo, Info from utils import setup_logging, setup_redis FILE_RETENTION: float = 7 * 24 * 60 * 60 @@ -38,12 +38,14 @@ logger = setup_logging(__name__) r = setup_redis() notification_secret = os.environ["NOTIFICATION_SECRET"] +extensions = set(os.environ.get("DATASET_EXTENSIONS", "fits").split(",")) def enqueue_objects(objects): - """Enqueue FITS objects onto per-bucket queues. + """Enqueue objects onto per-bucket queues. - Compute the `ExposureInfo` for each FITS object and return the list. + Compute the `Info` for each object with a selected extension and return + the list. Parameters ---------- @@ -51,17 +53,18 @@ def enqueue_objects(objects): Returns ------- - info_list: `list` [`ExposureInfo`] + info_list: `list` [`Info`] """ info_list = [] # Use a pipeline for efficiency. with r.pipeline() as pipe: for o in objects: - if o.endswith(".fits"): - e = ExposureInfo(o) - pipe.lpush(f"QUEUE:{e.bucket}", o) - logger.info("Enqueued %s to %s", o, e.bucket) - info_list.append(e) + extension = o.rsplit(".", maxsplit=1)[-1] + if extension in extensions: + info = Info.from_path(o) + pipe.lpush(f"QUEUE:{info.bucket}", o) + logger.info("Enqueued %s to %s", o, info.bucket) + info_list.append(info) pipe.execute() return info_list @@ -71,18 +74,19 @@ def update_stats(info_list): Parameters ---------- - info_list: `list` [`ExposureInfo`] + info_list: `list` [`Info`] """ + max_seqnum = {} with r.pipeline() as pipe: - max_seqnum = {} - for e in info_list: - pipe.hincrby(f"REC:{e.bucket}", e.obs_day, 1) - bucket_instrument = f"{e.bucket}:{e.instrument}" - pipe.hincrby(f"RECINSTR:{bucket_instrument}", e.obs_day, 1) - pipe.hset(f"FILE:{e.path}", "recv_time", str(time.time())) - pipe.expire(f"FILE:{e.path}", FILE_RETENTION) - seqnum_key = f"MAXSEQ:{bucket_instrument}:{e.obs_day}" - max_seqnum[seqnum_key] = max(int(e.seq_num), max_seqnum.get(seqnum_key, 0)) + for info in info_list: + pipe.hincrby(f"REC:{info.bucket}", info.obs_day, 1) + bucket_instrument = f"{info.bucket}:{info.instrument}" + pipe.hincrby(f"RECINSTR:{bucket_instrument}", info.obs_day, 1) + pipe.hset(f"FILE:{info.path}", "recv_time", str(time.time())) + pipe.expire(f"FILE:{info.path}", FILE_RETENTION) + if isinstance(info, ExposureInfo): + seqnum_key = f"MAXSEQ:{bucket_instrument}:{info.obs_day}" + max_seqnum[seqnum_key] = max(int(info.seq_num), max_seqnum.get(seqnum_key, 0)) pipe.execute() for seqnum_key in max_seqnum: diff --git a/src/exposure_info.py b/src/exposure_info.py deleted file mode 100644 index f4ce41e..0000000 --- a/src/exposure_info.py +++ /dev/null @@ -1,67 +0,0 @@ -# This file is part of embargo_butler. -# -# Developed for the LSST Data Management System. -# This product includes software developed by the LSST Project -# (http://www.lsst.org). -# See the COPYRIGHT file at the top-level directory of this distribution -# for details of code ownership. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program. If not, see . - -""" -ExposureInfo class used to extract information from notification messages. -""" -import logging -from dataclasses import dataclass - -__all__ = ("ExposureInfo",) - -logger = logging.getLogger(__name__) - - -@dataclass -class ExposureInfo: - path: str - bucket: str - instrument: str - filename: str - exp_id: str - instrument_code: str - controller: str - obs_day: str - seq_num: str - - def __init__(self, path): - try: - if path.startswith("s3://"): - path = path[len("s3://") :] - self.path = path - ( - self.bucket, - self.instrument, - self.obs_day, - self.exp_id, - self.filename, - ) = path.split("/") - ( - self.instrument_code, - self.controller, - obs_day, - self.seq_num, - ) = self.exp_id.split("_") - if obs_day != self.obs_day: - logger.warn("Mismatched observation dates: %s", path) - except Exception: - logger.exception("Unable to parse: %s", path) - raise diff --git a/src/info.py b/src/info.py new file mode 100644 index 0000000..83d57ff --- /dev/null +++ b/src/info.py @@ -0,0 +1,151 @@ +# This file is part of embargo_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import logging +from dataclasses import dataclass +from typing import Self + +__all__ = ("Info", "ExposureInfo", "LfaInfo") + +logger = logging.getLogger(__name__) + + +@dataclass +class Info: + """Base class for information extracted from notification messages.""" + + path: str + """Path component of the S3 URL. + """ + + bucket: str + """Bucket component of the S3 URL. + """ + + instrument: str + """Instrument name. + """ + + filename: str + """Filename component of the S3 URL. + """ + + obs_day: str + """Observation day (in timezone UTC-12). + """ + + @staticmethod + def from_path(url: str) -> Self: + """Create an `Info` of the proper subclass from an S3 URL. + + Parameters + ---------- + url: `str` + S3 URL (including bucket). + + Returns + ------- + info: `Info` + Either an `LfaInfo` or an `ExposureInfo` as appropriate. + """ + + if url.startswith("s3://"): + url = url[len("s3://") :] + if "rubinobs-lfa-" in url: + return LfaInfo(url) + else: + return ExposureInfo(url) + + +@dataclass +class ExposureInfo(Info): + """Class used to extract exposure information from notification messages. + + Parameters + ---------- + path: `str` + Path portion of S3 URL (after bucket). + """ + + exp_id: str + """Exposure ID. + """ + + instrument_code: str + """Instrument code (two characters). + """ + + controller: str + """Controller code (one character). + """ + + seq_num: str + """Sequence number within an observation day. + """ + + def __init__(self, path): + try: + self.path = path + ( + self.bucket, + self.instrument, + self.obs_day, + self.exp_id, + self.filename, + ) = path.split("/") + ( + self.instrument_code, + self.controller, + obs_day, + self.seq_num, + ) = self.exp_id.split("_") + if obs_day != self.obs_day: + logger.warn("Mismatched observation dates: %s", path) + except Exception: + logger.exception("Unable to parse: %s", path) + raise + + +@dataclass +class LfaInfo(Info): + """Class used to extract LFA information from notification messages. + + Parameters + ---------- + path: `str` + Path portion of S3 URL (after bucket). + """ + + def __init__(self, path): + try: + self.path = path + components = path.split("/") + if len(components) == 7: + self.bucket, csc, generator, year, month, day, self.filename = components + self.instrument = f"{csc}/{generator}" + elif len(components) == 6: + self.bucket, self.instrument, year, month, day, self.filename = components + else: + raise ValueError(f"Unrecognized number of components: {len(components)}") + self.obs_day = f"{year}{month}{day}" + except Exception: + logger.exception("Unable to parse: %s", path) + raise diff --git a/src/ingest.py b/src/ingest.py index 1e475d7..2a7d710 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -20,7 +20,7 @@ # along with this program. If not, see . """ -Service to ingest images into per-bucket Butler repos. +Service to ingest images or LFA objects into per-bucket Butler repos. """ import os import socket @@ -31,7 +31,7 @@ from lsst.obs.base import DefineVisitsTask, RawIngestTask from lsst.resources import ResourcePath -from exposure_info import ExposureInfo +from info import Info from rucio_interface import RucioInterface from utils import setup_logging, setup_redis @@ -44,19 +44,23 @@ logger = setup_logging(__name__) r = setup_redis() bucket = os.environ["BUCKET"] +if bucket.startswith("rubin:"): + os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1" redis_queue = f"QUEUE:{bucket}" butler_repo = os.environ["BUTLER_REPO"] webhook_uri = os.environ.get("WEBHOOK_URI", None) +is_lfa = "rubinobs-lfa-" in bucket worker_name = socket.gethostname() worker_queue = f"WORKER:{bucket}:{worker_name}" -rucio_rse = os.environ.get("RUCIO_RSE", None) -if rucio_rse: - dtn_url = os.environ["RUCIO_DTN"] - if not dtn_url.endswith("/"): - dtn_url += "/" - rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"]) +if not is_lfa: + rucio_rse = os.environ.get("RUCIO_RSE", None) + if rucio_rse: + dtn_url = os.environ["RUCIO_DTN"] + if not dtn_url.endswith("/"): + dtn_url += "/" + rucio_interface = RucioInterface(rucio_rse, dtn_url, bucket, os.environ["RUCIO_SCOPE"]) def on_success(datasets): @@ -71,15 +75,15 @@ def on_success(datasets): """ for dataset in datasets: logger.info("Ingested %s", dataset) - e = ExposureInfo(dataset.path.geturl()) - logger.debug("Exposure %s", e) + info = Info.from_path(dataset.path.geturl()) + logger.debug("%s", info) with r.pipeline() as pipe: - pipe.lrem(worker_queue, 0, e.path) - pipe.hset(f"FILE:{e.path}", "ingest_time", str(time.time())) - pipe.hincrby(f"INGEST:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1) + pipe.lrem(worker_queue, 0, info.path) + pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time())) + pipe.hincrby(f"INGEST:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) pipe.execute() if webhook_uri: - resp = requests.post(webhook_uri, json=e.__dict__, timeout=0.5) + resp = requests.post(webhook_uri, json=info.__dict__, timeout=0.5) logger.info("Webhook response: %s", resp) @@ -96,15 +100,15 @@ def on_ingest_failure(dataset, exc): Exception raised by the ingest failure. """ logger.error("Failed to ingest %s: %s", dataset, exc) - e = ExposureInfo(dataset.files[0].filename.geturl()) - logger.debug("Exposure %s", e) + info = Info.from_path(dataset.files[0].filename.geturl()) + logger.debug("%s", info) with r.pipeline() as pipe: - pipe.hincrby(f"FAIL:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1) - pipe.hset(f"FILE:{e.path}", "ing_fail_exc", str(exc)) - pipe.hincrby(f"FILE:{e.path}", "ing_fail_count", 1) + pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) + pipe.hset(f"FILE:{info.path}", "ing_fail_exc", str(exc)) + pipe.hincrby(f"FILE:{info.path}", "ing_fail_count", 1) pipe.execute() - if int(r.hget(f"FILE:{e.path}", "ing_fail_count")) >= MAX_FAILURES: - r.lrem(worker_queue, 0, e.path) + if int(r.hget(f"FILE:{info.path}", "ing_fail_count")) >= MAX_FAILURES: + r.lrem(worker_queue, 0, info.path) def on_metadata_failure(dataset, exc): @@ -121,12 +125,12 @@ def on_metadata_failure(dataset, exc): """ logger.error("Failed to translate metadata for %s: %s", dataset, exc) - e = ExposureInfo(dataset.geturl()) - logger.debug("Exposure %s", e) + info = Info.from_path(dataset.geturl()) + logger.debug("%s", info) with r.pipeline() as pipe: - pipe.hincrby(f"FAIL:{e.bucket}:{e.instrument}", f"{e.obs_day}", 1) - pipe.hset(f"FILE:{e.path}", "md_fail_exc", str(exc)) - pipe.lrem(worker_queue, 0, e.path) + pipe.hincrby(f"FAIL:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) + pipe.hset(f"FILE:{info.path}", "md_fail_exc", str(exc)) + pipe.lrem(worker_queue, 0, info.path) pipe.execute() @@ -144,9 +148,10 @@ def main(): on_metadata_failure=on_metadata_failure, ) - define_visits_config = DefineVisitsTask.ConfigClass() - define_visits_config.groupExposures = "one-to-one" - visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler) + if not is_lfa: + define_visits_config = DefineVisitsTask.ConfigClass() + define_visits_config.groupExposures = "one-to-one" + visit_definer = DefineVisitsTask(config=define_visits_config, butler=butler) logger.info("Waiting on %s", worker_queue) while True: @@ -171,14 +176,14 @@ def main(): logger.exception("Error while ingesting %s", resources) # Define visits if we ingested anything - if refs: + if not is_lfa and refs: try: ids = [ref.dataId for ref in refs] visit_definer.run(ids) logger.info("Defined visits for %s", ids) except Exception: logger.exception("Error while defining visits for %s", refs) - if rucio_rse: + if not is_lfa and rucio_rse: # Register with Rucio if we ingested anything try: rucio_interface.register(resources) diff --git a/src/rucio_interface.py b/src/rucio_interface.py index fab27ab..a3e6b15 100644 --- a/src/rucio_interface.py +++ b/src/rucio_interface.py @@ -26,10 +26,10 @@ import time import zlib +import rucio.common.exception from lsst.resources import ResourcePath from rucio.client.didclient import DIDClient from rucio.client.replicaclient import ReplicaClient -import rucio.common.exception __all__ = ["RucioInterface"] From 041ee0d4433e8fb5768964c78b97d8d4f3e66823 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 12 Mar 2024 12:40:55 -0700 Subject: [PATCH 2/6] Change to RE from extension list. --- src/enqueue.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/enqueue.py b/src/enqueue.py index c92f43a..a44e6c8 100644 --- a/src/enqueue.py +++ b/src/enqueue.py @@ -23,6 +23,7 @@ Enqueue service to post notifications to per-bucket queues. """ import os +import re import time import urllib.parse @@ -38,7 +39,7 @@ logger = setup_logging(__name__) r = setup_redis() notification_secret = os.environ["NOTIFICATION_SECRET"] -extensions = set(os.environ.get("DATASET_EXTENSIONS", "fits").split(",")) +regexp = re.compile(os.environ.get("DATASET_REGEXP", r"fits$")) def enqueue_objects(objects): @@ -59,8 +60,7 @@ def enqueue_objects(objects): # Use a pipeline for efficiency. with r.pipeline() as pipe: for o in objects: - extension = o.rsplit(".", maxsplit=1)[-1] - if extension in extensions: + if regexp.search(o): info = Info.from_path(o) pipe.lpush(f"QUEUE:{info.bucket}", o) logger.info("Enqueued %s to %s", o, info.bucket) From af7ef6c531d63f9940c7d3009c8412a6e3e7ee35 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 27 Mar 2024 14:25:03 -0700 Subject: [PATCH 3/6] Update Dockerfile to get all obs packages. --- Dockerfile.ingest | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile.ingest b/Dockerfile.ingest index 212eb5c..3a5d299 100644 --- a/Dockerfile.ingest +++ b/Dockerfile.ingest @@ -24,10 +24,10 @@ ARG RUBINENV_VERSION=7.0.1 FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION -ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2023_41} +ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_12} USER lsst RUN source loadLSST.bash && mamba install redis-py rucio-clients -RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst +RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/ # Environment variables that must be set: # REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO @@ -35,4 +35,4 @@ COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/ # RUCIO_RSE RUCIO_DTN RUCIO_SCOPE RUCIO_CONFIG # Optional: # WEBHOOK_URI -ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ingest/ingest.py" ] +ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup lsst_obs; python ingest/ingest.py" ] From 5ad40649d2ae1eb1ebfb0c5d2ddc783efb25e231 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 27 Mar 2024 17:02:48 -0700 Subject: [PATCH 4/6] Bump rubin-env version. --- Dockerfile.ingest | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile.ingest b/Dockerfile.ingest index 3a5d299..289177d 100644 --- a/Dockerfile.ingest +++ b/Dockerfile.ingest @@ -21,7 +21,7 @@ # Dockerfile for ingest service. -ARG RUBINENV_VERSION=7.0.1 +ARG RUBINENV_VERSION=8.0.0 FROM lsstsqre/newinstall:${RUBINENV_VERSION} ARG OBS_LSST_VERSION ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_12} From 78f8761be2dd21c66fee37dcf10787318b089b71 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 28 Mar 2024 12:48:13 -0700 Subject: [PATCH 5/6] Remove FITS check. --- src/ingest.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/src/ingest.py b/src/ingest.py index 2a7d710..08a4896 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -158,13 +158,7 @@ def main(): # Process any entries on the worker queue. if r.llen(worker_queue) > 0: blobs = r.lrange(worker_queue, 0, -1) - resources = [] - for b in blobs: - if b.endswith(b".fits"): - # Should always be the case - resources.append(ResourcePath(f"s3://{b.decode()}")) - else: - r.lrem(worker_queue, 0, b) + resources = [ResourcePath(f"s3://{b.decode()}") for b in blobs] # Ingest if we have resources if resources: From cf8669c75d02f2c7483fdeab3faf3b13d11282f4 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Fri, 29 Mar 2024 13:50:50 -0700 Subject: [PATCH 6/6] Fix handling of GenericCamera movies. --- src/info.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/info.py b/src/info.py index 83d57ff..1867611 100644 --- a/src/info.py +++ b/src/info.py @@ -138,7 +138,10 @@ def __init__(self, path): try: self.path = path components = path.split("/") - if len(components) == 7: + if len(components) == 8: + self.bucket, csc, generator, year, month, day, directory, self.filename = components + self.instrument = f"{csc}/{generator}" + elif len(components) == 7: self.bucket, csc, generator, year, month, day, self.filename = components self.instrument = f"{csc}/{generator}" elif len(components) == 6: