Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-43558: Ingest LFA data. #53

Merged
merged 6 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Dockerfile.enqueue
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
FROM python:3.11
RUN pip install redis gunicorn flask
WORKDIR /enqueue
COPY src/enqueue.py src/exposure_info.py src/utils.py /enqueue/
COPY src/enqueue.py src/info.py src/utils.py /enqueue/
# environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD NOTIFICATION_SECRET
# optional:
# DATASET_REGEXP
ENTRYPOINT [ "gunicorn", "-b", "0.0.0.0:8000", "-w", "2", "enqueue:app" ]
10 changes: 5 additions & 5 deletions Dockerfile.ingest
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,18 @@

# Dockerfile for ingest service.

ARG RUBINENV_VERSION=7.0.1
ARG RUBINENV_VERSION=8.0.0
FROM lsstsqre/newinstall:${RUBINENV_VERSION}
ARG OBS_LSST_VERSION
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2023_41}
ENV OBS_LSST_VERSION=${OBS_LSST_VERSION:-w_2024_12}
USER lsst
RUN source loadLSST.bash && mamba install redis-py rucio-clients
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" obs_lsst
COPY src/ingest.py src/exposure_info.py src/utils.py src/rucio_interface.py ./ingest/
RUN source loadLSST.bash && eups distrib install -t "${OBS_LSST_VERSION}" lsst_obs
COPY src/ingest.py src/info.py src/utils.py src/rucio_interface.py ./ingest/
# Environment variables that must be set:
# REDIS_HOST REDIS_PASSWORD BUCKET BUTLER_REPO
# For Rucio (all must be set if RUCIO_RSE is set):
# RUCIO_RSE RUCIO_DTN RUCIO_SCOPE RUCIO_CONFIG
# Optional:
# WEBHOOK_URI
ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup obs_lsst; python ingest/ingest.py" ]
ENTRYPOINT [ "bash", "-c", "source loadLSST.bash; setup lsst_obs; python ingest/ingest.py" ]
42 changes: 23 additions & 19 deletions src/enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
Enqueue service to post notifications to per-bucket queues.
"""
import os
import re
import time
import urllib.parse

import redis
from flask import Flask, request

from exposure_info import ExposureInfo
from info import ExposureInfo, Info
from utils import setup_logging, setup_redis

FILE_RETENTION: float = 7 * 24 * 60 * 60
Expand All @@ -38,30 +39,32 @@
logger = setup_logging(__name__)
r = setup_redis()
notification_secret = os.environ["NOTIFICATION_SECRET"]
regexp = re.compile(os.environ.get("DATASET_REGEXP", r"fits$"))


def enqueue_objects(objects):
"""Enqueue FITS objects onto per-bucket queues.
"""Enqueue objects onto per-bucket queues.

Compute the `ExposureInfo` for each FITS object and return the list.
Compute the `Info` for each object with a selected extension and return
the list.

Parameters
----------
objects: `list` [`str`]

Returns
-------
info_list: `list` [`ExposureInfo`]
info_list: `list` [`Info`]
"""
info_list = []
# Use a pipeline for efficiency.
with r.pipeline() as pipe:
for o in objects:
if o.endswith(".fits"):
e = ExposureInfo(o)
pipe.lpush(f"QUEUE:{e.bucket}", o)
logger.info("Enqueued %s to %s", o, e.bucket)
info_list.append(e)
if regexp.search(o):
info = Info.from_path(o)
pipe.lpush(f"QUEUE:{info.bucket}", o)
logger.info("Enqueued %s to %s", o, info.bucket)
info_list.append(info)
pipe.execute()
return info_list

Expand All @@ -71,18 +74,19 @@ def update_stats(info_list):

Parameters
----------
info_list: `list` [`ExposureInfo`]
info_list: `list` [`Info`]
"""
max_seqnum = {}
with r.pipeline() as pipe:
max_seqnum = {}
for e in info_list:
pipe.hincrby(f"REC:{e.bucket}", e.obs_day, 1)
bucket_instrument = f"{e.bucket}:{e.instrument}"
pipe.hincrby(f"RECINSTR:{bucket_instrument}", e.obs_day, 1)
pipe.hset(f"FILE:{e.path}", "recv_time", str(time.time()))
pipe.expire(f"FILE:{e.path}", FILE_RETENTION)
seqnum_key = f"MAXSEQ:{bucket_instrument}:{e.obs_day}"
max_seqnum[seqnum_key] = max(int(e.seq_num), max_seqnum.get(seqnum_key, 0))
for info in info_list:
pipe.hincrby(f"REC:{info.bucket}", info.obs_day, 1)
bucket_instrument = f"{info.bucket}:{info.instrument}"
pipe.hincrby(f"RECINSTR:{bucket_instrument}", info.obs_day, 1)
pipe.hset(f"FILE:{info.path}", "recv_time", str(time.time()))
pipe.expire(f"FILE:{info.path}", FILE_RETENTION)
if isinstance(info, ExposureInfo):
seqnum_key = f"MAXSEQ:{bucket_instrument}:{info.obs_day}"
max_seqnum[seqnum_key] = max(int(info.seq_num), max_seqnum.get(seqnum_key, 0))
pipe.execute()

for seqnum_key in max_seqnum:
Expand Down
67 changes: 0 additions & 67 deletions src/exposure_info.py

This file was deleted.

154 changes: 154 additions & 0 deletions src/info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# This file is part of embargo_butler.
#
# Developed for the LSST Data Management System.
# This product includes software developed by the LSST Project
# (http://www.lsst.org).
# See the COPYRIGHT file at the top-level directory of this distribution
# for details of code ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
from dataclasses import dataclass
from typing import Self

__all__ = ("Info", "ExposureInfo", "LfaInfo")

logger = logging.getLogger(__name__)


@dataclass
class Info:
"""Base class for information extracted from notification messages."""

path: str
"""Path component of the S3 URL.
"""

bucket: str
"""Bucket component of the S3 URL.
"""

instrument: str
"""Instrument name.
"""

filename: str
"""Filename component of the S3 URL.
"""

obs_day: str
"""Observation day (in timezone UTC-12).
"""

@staticmethod
def from_path(url: str) -> Self:
"""Create an `Info` of the proper subclass from an S3 URL.

Parameters
----------
url: `str`
S3 URL (including bucket).

Returns
-------
info: `Info`
Either an `LfaInfo` or an `ExposureInfo` as appropriate.
"""

if url.startswith("s3://"):
url = url[len("s3://") :]
if "rubinobs-lfa-" in url:
return LfaInfo(url)
else:
return ExposureInfo(url)


@dataclass
class ExposureInfo(Info):
"""Class used to extract exposure information from notification messages.

Parameters
----------
path: `str`
Path portion of S3 URL (after bucket).
"""

exp_id: str
"""Exposure ID.
"""

instrument_code: str
"""Instrument code (two characters).
"""

controller: str
"""Controller code (one character).
"""

seq_num: str
"""Sequence number within an observation day.
"""

def __init__(self, path):
try:
self.path = path
(
self.bucket,
self.instrument,
self.obs_day,
self.exp_id,
self.filename,
) = path.split("/")
(
self.instrument_code,
self.controller,
obs_day,
self.seq_num,
) = self.exp_id.split("_")
if obs_day != self.obs_day:
logger.warn("Mismatched observation dates: %s", path)
except Exception:
logger.exception("Unable to parse: %s", path)
raise


@dataclass
class LfaInfo(Info):
"""Class used to extract LFA information from notification messages.

Parameters
----------
path: `str`
Path portion of S3 URL (after bucket).
"""

def __init__(self, path):
try:
self.path = path
components = path.split("/")
if len(components) == 8:
self.bucket, csc, generator, year, month, day, directory, self.filename = components
self.instrument = f"{csc}/{generator}"
elif len(components) == 7:
self.bucket, csc, generator, year, month, day, self.filename = components
self.instrument = f"{csc}/{generator}"
elif len(components) == 6:
self.bucket, self.instrument, year, month, day, self.filename = components
else:
raise ValueError(f"Unrecognized number of components: {len(components)}")
self.obs_day = f"{year}{month}{day}"
except Exception:
logger.exception("Unable to parse: %s", path)
raise
Loading
Loading