Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-39022: Add group name presence server. #54

Merged
merged 6 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build-manually.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ on:
rubinenvVersion:
description: 'rubin-env version'
required: true
default: '7.0.1'
default: '8.0.0'
obsLsstVersion:
description: 'Science Pipelines release'
required: true
default: 'w_2023_41'
default: 'w_2024_12'


env:
Expand Down
14 changes: 13 additions & 1 deletion .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ env:
ENQUEUE_NAME: embargo-butler-enqueue
INGEST_NAME: embargo-butler-ingest
IDLE_NAME: embargo-butler-idle
PRESENCE_NAME: embargo-butler-presence

jobs:
push:
Expand All @@ -20,7 +21,7 @@ jobs:
contents: read
steps:
- name: Checkout
uses: actions/checkout@v3
uses: actions/checkout@v4

- name: Build enqueue image
run: |
Expand All @@ -43,6 +44,13 @@ jobs:
--tag $IDLE_NAME \
--label "runnumber=${GITHUB_RUN_ID}"

- name: Build presence image
run: |
docker build . \
-f Dockerfile.presence \
--tag $PRESENCE_NAME \
--label "runnumber=${GITHUB_RUN_ID}"

- name: Log in to GitHub Container Registry
run: echo "${{ secrets.GITHUB_TOKEN }}" | docker login ghcr.io -u $ --password-stdin

Expand All @@ -51,6 +59,7 @@ jobs:
ENQUEUE_ID=ghcr.io/${{ github.repository_owner }}/$ENQUEUE_NAME
INGEST_ID=ghcr.io/${{ github.repository_owner }}/$INGEST_NAME
IDLE_ID=ghcr.io/${{ github.repository_owner }}/$IDLE_NAME
PRESENCE_ID=ghcr.io/${{ github.repository_owner }}/$PRESENCE_NAME

if [[ "${{ github.ref }}" == "refs/pull/"* ]]; then
VERSION=$(echo "${{ github.head_ref }}" | sed -e 's|.*/||')
Expand All @@ -62,10 +71,13 @@ jobs:
echo ENQUEUE_ID=$ENQUEUE_ID
echo INGEST_ID=$INGEST_ID
echo IDLE_ID=$IDLE_ID
echo PRESENCE_ID=$PRESENCE_ID
echo VERSION=$VERSION
docker tag $ENQUEUE_NAME $ENQUEUE_ID:$VERSION
docker push $ENQUEUE_ID:$VERSION
docker tag $INGEST_NAME $INGEST_ID:$VERSION
docker push $INGEST_ID:$VERSION
docker tag $IDLE_NAME $IDLE_ID:$VERSION
docker push $IDLE_ID:$VERSION
docker tag $PRESENCE_NAME $PRESENCE_ID:$VERSION
docker push $PRESENCE_ID:$VERSION
32 changes: 32 additions & 0 deletions Dockerfile.presence
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Dockerfile for presence service.

FROM python:3.11
RUN pip install redis gunicorn flask
WORKDIR /presence
COPY src/presence.py src/utils.py /presence/
# environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD
# optional:
# DELETE_SEEN
ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "presence:app" ]
46 changes: 46 additions & 0 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"""
Service to ingest images or LFA objects into per-bucket Butler repos.
"""
import json
import os
import socket
import time

import astropy.io.fits
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
Expand All @@ -50,6 +52,7 @@
butler_repo = os.environ["BUTLER_REPO"]
webhook_uri = os.environ.get("WEBHOOK_URI", None)
is_lfa = "rubinobs-lfa-" in bucket
group_lifetime = int(os.environ.get("GROUP_LIFETIME", 86400))

worker_name = socket.gethostname()
worker_queue = f"WORKER:{bucket}:{worker_name}"
Expand Down Expand Up @@ -134,6 +137,44 @@ def on_metadata_failure(dataset, exc):
pipe.execute()


def record_groups(resources: list[ResourcePath]) -> None:
"""Record the group ids from received FITS files in Redis.

Parameters
----------
resources: `list` [`ResourcePath`]
The resources to record group ids from.
"""

global r, group_lifetime, logger
with r.pipeline() as pipe:
for res in resources:
if not res.exists():
continue
json_file = res.updatedExtension("json")
header = {}
try:
with json_file.open("rb") as f:
header = json.load(f)
except Exception:
try:
with res.open("rb") as f:
header = astropy.io.fits.open(f)[0].header
except Exception:
logger.exception("Error reading header for %s", res)
try:
instrument = header["INSTRUME"]
groupid = header["GROUPID"]
snap_number = int(header["CURINDEX"]) - 1
detector = header["RAFTBAY"] + "_" + header["CCDSLOT"]
key = f"GROUP:{instrument}:{groupid}:{snap_number}:{detector}"
pipe.set(key, str(res))
pipe.expire(key, group_lifetime)
except Exception:
logger.exception("Error reading group for %s", res)
pipe.execute()


def main():
"""Ingest FITS files from a Redis queue."""
logger.info("Initializing Butler from %s", butler_repo)
Expand Down Expand Up @@ -162,12 +203,17 @@ def main():

# Ingest if we have resources
if resources:

if not is_lfa:
record_groups(resources)

logger.info("Ingesting %s", resources)
refs = None
try:
refs = ingester.run(resources)
except Exception:
logger.exception("Error while ingesting %s", resources)
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed with this commit?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually a separate fix that prevents visit definition (and Rucio registration) from trying to run if an exception occurred in ingest. I'll split it out into a separate commit.


# Define visits if we ingested anything
if not is_lfa and refs:
Expand Down
78 changes: 78 additions & 0 deletions src/presence.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""
Presence service to translate group names to image URIs.
"""
import os
import re

from flask import Flask

from utils import setup_logging, setup_redis

logger = setup_logging(__name__)
r = setup_redis()

# Delete image key when seen, before it expires, to save space
delete_seen = os.environ.get("DELETE_SEEN") is not None

app = Flask(__name__)


@app.get("/presence/<instrument>/<group_name>/<int:snap_index>/<detector_name>")
def presence(instrument: str, group_name: str, snap_index: int, detector_name: str) -> dict | tuple:
"""Return the presence and URI of an image matching the parameters.

Parameters
----------
instrument: `str`
Name of the instrument taking the image.
group_name: `str`
Name of the group (from the GROUPID FITS header).
snap_index: `int`
Number of the snap (zero-based).
detector_name: `str`
Name of the detector ("RNN_SNN").

Returns
-------
json: `dict`
JSON with "error", "present", "uri", and/or "message" keys.
"""

try:
if instrument not in ("LATISS", "LSSTComCam", "LSSTComCamSim", "LSSTCam", "LSST-TS8"):
return ({"error": True, "message": f"Unknown instrument {instrument}"}, 400)
if not re.match(r"R\d\d_S\d\d", detector_name):
return ({"error": True, "message": f"Unrecognized detector name {detector_name}"}, 400)
key = f"GROUP:{instrument}:{group_name}:{snap_index}:{detector_name}"
result = r.get(key)
if result:
logger.info(f"Found key {key}")
if delete_seen:
r.delete(key)
return {"error": False, "present": True, "uri": result.decode()}
else:
logger.debug(f"No key {key}")
return {"error": False, "present": False}
except Exception as e:
return ({"error": True, "message": str(e)}, 500)
Loading