From 3ec208ffc49f9161743d96aa4c90a9fb5dfed9de Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Tue, 28 May 2024 12:59:48 -0700 Subject: [PATCH 1/6] Fix ingest exception handling. --- src/ingest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/ingest.py b/src/ingest.py index 08a4896..96b68fd 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -168,6 +168,7 @@ def main(): refs = ingester.run(resources) except Exception: logger.exception("Error while ingesting %s", resources) + continue # Define visits if we ingested anything if not is_lfa and refs: From 5e6caa81a006d2653e9623354bfac8004363053f Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 27 Mar 2024 13:42:46 -0700 Subject: [PATCH 2/6] Add group presence recording. --- src/ingest.py | 43 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/ingest.py b/src/ingest.py index 96b68fd..e2edd90 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -22,10 +22,12 @@ """ Service to ingest images or LFA objects into per-bucket Butler repos. """ +import json import os import socket import time +import astropy.io.fits import requests from lsst.daf.butler import Butler from lsst.obs.base import DefineVisitsTask, RawIngestTask @@ -50,6 +52,7 @@ butler_repo = os.environ["BUTLER_REPO"] webhook_uri = os.environ.get("WEBHOOK_URI", None) is_lfa = "rubinobs-lfa-" in bucket +group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400)) worker_name = socket.gethostname() worker_queue = f"WORKER:{bucket}:{worker_name}" @@ -134,6 +137,42 @@ def on_metadata_failure(dataset, exc): pipe.execute() +def record_groups(resources: list[ResourcePath]) -> None: + """Record the group ids from received FITS files in Redis. + + Parameters + ---------- + resources: `list` [`ResourcePath`] + The resources to record group ids from. + """ + + global r, group_lifetime, logger + with r.pipeline() as pipe: + for res in resources: + json_file = res.updatedExtension("json") + header = {} + try: + with json_file.open("rb") as f: + header = json.load(f) + except Exception: + try: + with res.open("rb") as f: + header = astropy.io.fits.open(f)[0].header + except Exception: + logger.exception("Error reading group for %s", res) + try: + instrument = header["INSTRUME"] + groupid = header["GROUPID"] + snap_number = int(header["CURINDEX"]) - 1 + detector = header["RAFTBAY"] + "_" + header["CCDSLOT"] + key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}" + pipe.set(key, str(res)) + pipe.expire(key, group_lifetime) + except Exception: + logger.exception("Error reading group for %s", res) + pipe.execute() + + def main(): """Ingest FITS files from a Redis queue.""" logger.info("Initializing Butler from %s", butler_repo) @@ -162,6 +201,10 @@ def main(): # Ingest if we have resources if resources: + + if not is_lfa: + record_groups(resources) + logger.info("Ingesting %s", resources) refs = None try: From 83b818d9b9eea35ffc37aae0cfee72aef5fed1b3 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Wed, 27 Mar 2024 13:42:57 -0700 Subject: [PATCH 3/6] Add presence server. --- src/presence.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) create mode 100644 src/presence.py diff --git a/src/presence.py b/src/presence.py new file mode 100644 index 0000000..77607e0 --- /dev/null +++ b/src/presence.py @@ -0,0 +1,78 @@ +# This file is part of embargo_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +""" +Presence service to translate group names to image URIs. +""" +import os +import re + +from flask import Flask + +from utils import setup_logging, setup_redis + +logger = setup_logging(__name__) +r = setup_redis() + +# Delete image key when seen, before it expires, to save space +delete_seen = os.environ.get("DELETE_SEEN") is not None + +app = Flask(__name__) + + +@app.get("/presence////") +def presence(instrument: str, group_name: str, snap_index: int, detector_name: str) -> dict | tuple: + """Return the presence and URI of an image matching the parameters. + + Parameters + ---------- + instrument: `str` + Name of the instrument taking the image. + group_name: `str` + Name of the group (from the GROUPID FITS header). + snap_index: `int` + Number of the snap (zero-based). + detector_name: `str` + Name of the detector ("RNN_SNN"). + + Returns + ------- + json: `dict` + JSON with "error", "present", "uri", and/or "message" keys. + """ + + try: + if instrument not in ("LATISS", "LSSTComCam", "LSSTComCamSim", "LSSTCam", "LSST-TS8"): + return ({"error": True, "message": f"Unknown instrument {instrument}"}, 400) + if not re.match(r"R\d\d_S\d\d", detector_name): + return ({"error": True, "message": f"Unrecognized detector name {detector_name}"}, 400) + key = f"GROUP:{instrument}:{group_name}:{snap_index}:{detector_name}" + result = r.get(key) + if result: + logger.info(f"Found key {key}") + if delete_seen: + r.delete(key) + return {"error": False, "present": True, "uri": result.decode()} + else: + logger.debug(f"No key {key}") + return {"error": False, "present": False} + except Exception as e: + return ({"error": True, "message": str(e)}, 500) From 98e406a5e29fb2020408ad3a8baa1d0a27ef391a Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Sat, 30 Mar 2024 19:21:09 -0700 Subject: [PATCH 4/6] Add build for presence service. --- .github/workflows/build.yaml | 14 +++++++++++++- Dockerfile.presence | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 1 deletion(-) create mode 100644 Dockerfile.presence diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index aa127bf..32f164f 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -11,6 +11,7 @@ env: ENQUEUE_NAME: embargo-butler-enqueue INGEST_NAME: embargo-butler-ingest IDLE_NAME: embargo-butler-idle + PRESENCE_NAME: embargo-butler-presence jobs: push: @@ -20,7 +21,7 @@ jobs: contents: read steps: - name: Checkout - uses: actions/checkout@v3 + uses: actions/checkout@v4 - name: Build enqueue image run: | @@ -43,6 +44,13 @@ jobs: --tag $IDLE_NAME \ --label "runnumber=${GITHUB_RUN_ID}" + - name: Build presence image + run: | + docker build . \ + -f Dockerfile.presence \ + --tag $PRESENCE_NAME \ + --label "runnumber=${GITHUB_RUN_ID}" + - name: Log in to GitHub Container Registry run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin @@ -51,6 +59,7 @@ jobs: ENQUEUE_ID=ghcr.io/${{ github.repository_owner }}/$ENQUEUE_NAME INGEST_ID=ghcr.io/${{ github.repository_owner }}/$INGEST_NAME IDLE_ID=ghcr.io/${{ github.repository_owner }}/$IDLE_NAME + PRESENCE_ID=ghcr.io/${{ github.repository_owner }}/$PRESENCE_NAME if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||') @@ -62,6 +71,7 @@ jobs: echo ENQUEUE_ID=$ENQUEUE_ID echo INGEST_ID=$INGEST_ID echo IDLE_ID=$IDLE_ID + echo PRESENCE_ID=$PRESENCE_ID echo VERSION=$VERSION docker tag $ENQUEUE_NAME $ENQUEUE_ID:$VERSION docker push $ENQUEUE_ID:$VERSION @@ -69,3 +79,5 @@ jobs: docker push $INGEST_ID:$VERSION docker tag $IDLE_NAME $IDLE_ID:$VERSION docker push $IDLE_ID:$VERSION + docker tag $PRESENCE_NAME $PRESENCE_ID:$VERSION + docker push $PRESENCE_ID:$VERSION diff --git a/Dockerfile.presence b/Dockerfile.presence new file mode 100644 index 0000000..23ff0c7 --- /dev/null +++ b/Dockerfile.presence @@ -0,0 +1,32 @@ +# This file is part of embargo_butler. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (http://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +# Dockerfile for presence service. + +FROM python:3.11 +RUN pip install redis gunicorn flask +WORKDIR /presence +COPY src/presence.py src/utils.py /presence/ +# environment variables that must be set: +# REDIS_HOST REDIS_PASSWORD +# optional: +# DELETE_SEEN +ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "presence:app" ] From d358c8fea44227a586fd7a98c580e7a4c2776e4e Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Sat, 30 Mar 2024 19:21:29 -0700 Subject: [PATCH 5/6] Update default versions for manual ingest build. --- .github/workflows/build-manually.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-manually.yaml b/.github/workflows/build-manually.yaml index cbad7f3..22381ec 100644 --- a/.github/workflows/build-manually.yaml +++ b/.github/workflows/build-manually.yaml @@ -5,11 +5,11 @@ on: rubinenvVersion: description: 'rubin-env version' required: true - default: '7.0.1' + default: '8.0.0' obsLsstVersion: description: 'Science Pipelines release' required: true - default: 'w_2023_41' + default: 'w_2024_12' env: From 5a66a640dc492d9516005e99cd98ad520dc3f857 Mon Sep 17 00:00:00 2001 From: Kian-Tat Lim Date: Thu, 4 Apr 2024 16:52:11 -0700 Subject: [PATCH 6/6] Protect against JSON arriving without FITS. --- src/ingest.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/ingest.py b/src/ingest.py index e2edd90..42a1d93 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -149,6 +149,8 @@ def record_groups(resources: list[ResourcePath]) -> None: global r, group_lifetime, logger with r.pipeline() as pipe: for res in resources: + if not res.exists(): + continue json_file = res.updatedExtension("json") header = {} try: @@ -159,7 +161,7 @@ def record_groups(resources: list[ResourcePath]) -> None: with res.open("rb") as f: header = astropy.io.fits.open(f)[0].header except Exception: - logger.exception("Error reading group for %s", res) + logger.exception("Error reading header for %s", res) try: instrument = header["INSTRUME"] groupid = header["GROUPID"]