diff --git a/src/enqueue.py b/src/enqueue.py index 70abde9..3279ba6 100644 --- a/src/enqueue.py +++ b/src/enqueue.py @@ -65,8 +65,13 @@ def enqueue_objects(objects): for o in objects: if regexp.search(o): info = Info.from_path(o) - pipe.lpush(f"QUEUE:{info.bucket}", o) - logger.info("Enqueued %s to %s", o, info.bucket) + if info.needs_exposure(): + if not r.hexists("EXPSEEN", info.exp_id): + pipe.lpush(f"EXPWAIT:{info.exp_id}", o) + logger.info("Wait for exposure %s: %s", info.exp_id, o) + else: + pipe.lpush(f"QUEUE:{info.bucket}", o) + logger.info("Enqueued %s to %s", o, info.bucket) info_list.append(info) pipe.execute() return info_list diff --git a/src/info.py b/src/info.py index 1867611..1f24bf1 100644 --- a/src/info.py +++ b/src/info.py @@ -20,6 +20,7 @@ # along with this program. If not, see . import logging +import re from dataclasses import dataclass from typing import Self @@ -74,6 +75,12 @@ def from_path(url: str) -> Self: else: return ExposureInfo(url) + def needs_exposure(self) -> bool: + return False + + def is_raw(self) -> bool: + return False + @dataclass class ExposureInfo(Info): @@ -123,6 +130,12 @@ def __init__(self, path): logger.exception("Unable to parse: %s", path) raise + def needs_exposure(self) -> bool: + return self.filename.endswith("_guider.fits") + + def is_raw(self) -> bool: + return bool(re.search(r"\d\.fits", self.filename)) + @dataclass class LfaInfo(Info): diff --git a/src/ingest.py b/src/ingest.py index 912e351..a95a622 100644 --- a/src/ingest.py +++ b/src/ingest.py @@ -26,11 +26,13 @@ import os import socket import time +from typing import Callable import astropy.io.fits import requests -from lsst.daf.butler import Butler +from lsst.daf.butler import Butler, DatasetRef from lsst.obs.base import DefineVisitsTask, RawIngestTask +from lsst.obs.lsst import ingest_guider from lsst.resources import ResourcePath from info import Info @@ -82,9 +84,17 @@ def on_success(datasets): logger.debug("%s", info) with r.pipeline() as pipe: pipe.lrem(worker_queue, 0, info.path) + if info.is_raw(): + if pipe.hsetnx("EXPSEEN", info.exp_id, str(time.time())): + pipe.hexpire("EXPSEEN", 48 * 3600, info.exp_id) pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time())) pipe.hincrby(f"INGEST:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1) pipe.execute() + if info.is_raw(): + wait_queue = f"EXPWAIT:{info.exp_id}" + if r.llen(wait_queue) > 0: + while r.lmove(wait_queue, worker_queue, "RIGHT", "LEFT"): + pass if webhook_uri: resp = requests.post(webhook_uri, json=info.__dict__, timeout=0.5) logger.info("Webhook response: %s", resp) @@ -186,8 +196,39 @@ def record_groups(resources: list[ResourcePath]) -> None: pipe.execute() +def ingest_batch( + resources: list[ResourcePath], ingest_func: Callable[[list[ResourcePath]], list[DatasetRef]] +) -> list[DatasetRef]: + refs = [] + try: + refs.extend(ingest_func(resources)) + except Exception: + logger.exception("Error while ingesting %s, retrying one by one", resources) + for resource in resources: + try: + refs.extend(ingest_func([resource])) + except Exception: + logger.exception("Error while ingesting %s", resource) + info = Info.from_path(resource.geturl()) + r.lrem(worker_queue, 0, info.path) + return refs + + +def guider_ingest(resources: list[ResourcePath]) -> list[DatasetRef]: + global butler + return ingest_guider( + butler, + resources, + transfer="direct", + on_success=on_success, + on_ingest_failure=on_ingest_failure, + on_metadata_failure=on_metadata_failure, + ) + + def main(): """Ingest FITS files from a Redis queue.""" + global butler logger.info("Initializing Butler from %s", butler_repo) butler = Butler(butler_repo, writeable=True) ingest_config = RawIngestTask.ConfigClass() @@ -219,19 +260,20 @@ def main(): record_groups(resources) logger.info("Ingesting %s", resources) - refs = None - try: - refs = ingester.run(resources) - except Exception: - logger.exception("Error while ingesting %s, retrying one by one", resources) - refs = [] - for resource in resources: - try: - refs.extend(ingester.run([resource])) - except Exception: - logger.exception("Error while ingesting %s", resource) - info = Info.from_path(resource.geturl()) - r.lrem(worker_queue, 0, info.path) + + normal_resources = [] + guider_resources = [] + for resource in resources: + if resource.basename().endswith("_guider.fits"): + guider_resources.append(resource) + else: + normal_resources.append(resource) + + refs = [] + if guider_resources: + refs.extend(ingest_batch(guider_resources, guider_ingest)) + if normal_resources: + refs.extend(ingest_batch(normal_resources, ingester.run)) # Define visits if we ingested anything if not is_lfa and refs: