Skip to content

Commit

Permalink
Wait for exposures for some ingests.
Browse files Browse the repository at this point in the history
  • Loading branch information
ktlim committed Dec 4, 2024
1 parent ae7c7f5 commit 47ad19c
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 15 deletions.
9 changes: 7 additions & 2 deletions src/enqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ def enqueue_objects(objects):
for o in objects:
if regexp.search(o):
info = Info.from_path(o)
pipe.lpush(f"QUEUE:{info.bucket}", o)
logger.info("Enqueued %s to %s", o, info.bucket)
if info.needs_exposure():
if not r.hexists(f"EXPSEEN", info.exp_id):

Check failure on line 69 in src/enqueue.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F541

f-string is missing placeholders
pipe.lpush(f"EXPWAIT:{info.exp_id}", o)
logger.info("Wait for exposure %s: %s", info.exp_id, o)
else:
pipe.lpush(f"QUEUE:{info.bucket}", o)
logger.info("Enqueued %s to %s", o, info.bucket)
info_list.append(info)
pipe.execute()
return info_list
Expand Down
13 changes: 13 additions & 0 deletions src/info.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.

import logging
import re
from dataclasses import dataclass
from typing import Self

Expand Down Expand Up @@ -74,6 +75,12 @@ def from_path(url: str) -> Self:
else:
return ExposureInfo(url)

def needs_exposure() -> bool:
return False

def is_raw() -> bool:
return False


@dataclass
class ExposureInfo(Info):
Expand Down Expand Up @@ -123,6 +130,12 @@ def __init__(self, path):
logger.exception("Unable to parse: %s", path)
raise

def needs_exposure() -> bool:
return self.filename.endswith("_guider.fits")

Check failure on line 134 in src/info.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'self'

def is_raw() -> bool:
return bool(re.search(r"\d\.fits", self.filename))

Check failure on line 137 in src/info.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'self'


@dataclass
class LfaInfo(Info):
Expand Down
65 changes: 52 additions & 13 deletions src/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import requests
from lsst.daf.butler import Butler
from lsst.obs.base import DefineVisitsTask, RawIngestTask
from lsst.obs.lsst import ingest_guider
from lsst.resources import ResourcePath

from info import Info
Expand Down Expand Up @@ -82,9 +83,17 @@ def on_success(datasets):
logger.debug("%s", info)
with r.pipeline() as pipe:
pipe.lrem(worker_queue, 0, info.path)
if info.is_raw():
if pipe.hsetnx("EXPSEEN", info.exp_id, str(time.time())):
pipe.hexpire("EXPSEEN", 48 * 3600, info.exp_id)
pipe.hset(f"FILE:{info.path}", "ingest_time", str(time.time()))
pipe.hincrby(f"INGEST:{info.bucket}:{info.instrument}", f"{info.obs_day}", 1)
pipe.execute()
if info.is_raw():
wait_queue = f"EXPWAIT:{info.exp_id}"
if r.llen(wait_queue) > 0:
while r.lmove(wait_queue, worker_queue, "RIGHT", "LEFT"):
pass
if webhook_uri:
resp = requests.post(webhook_uri, json=info.__dict__, timeout=0.5)
logger.info("Webhook response: %s", resp)
Expand Down Expand Up @@ -186,6 +195,35 @@ def record_groups(resources: list[ResourcePath]) -> None:
pipe.execute()


def ingest_batch(
resources: list[ResourcePath], ingest_func: Callable[[list[ResourcePath]], list[DatasetRef]]

Check failure on line 199 in src/ingest.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'Callable'

Check failure on line 199 in src/ingest.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'DatasetRef'
) -> list[DatasetRef]:

Check failure on line 200 in src/ingest.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'DatasetRef'
refs = []
try:
refs.extend(ingest_func(resources))
except Exception:
logger.exception("Error while ingesting %s, retrying one by one", resources)
for resource in resources:
try:
refs.extend(ingest_func([resource]))
except Exception:
logger.exception("Error while ingesting %s", resource)
info = Info.from_path(resource.geturl())
r.lrem(worker_queue, 0, info.path)
return refs


def guider_ingest(resources: list[ResourcePath]) -> list[DatasetRef]:

Check failure on line 216 in src/ingest.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'DatasetRef'
return ingest_guider(
butler,

Check failure on line 218 in src/ingest.py

View workflow job for this annotation

GitHub Actions / call-workflow / lint

F821

undefined name 'butler'
resources,
transfer="direct",
on_success=on_success,
on_ingest_failure=on_ingest_failure,
on_metadata_failure=on_metadata_failure,
)


def main():
"""Ingest FITS files from a Redis queue."""
logger.info("Initializing Butler from %s", butler_repo)
Expand Down Expand Up @@ -219,19 +257,20 @@ def main():
record_groups(resources)

logger.info("Ingesting %s", resources)
refs = None
try:
refs = ingester.run(resources)
except Exception:
logger.exception("Error while ingesting %s, retrying one by one", resources)
refs = []
for resource in resources:
try:
refs.extend(ingester.run([resource]))
except Exception:
logger.exception("Error while ingesting %s", resource)
info = Info.from_path(resource.geturl())
r.lrem(worker_queue, 0, info.path)

normal_resources = []
guider_resources = []
for resource in resources:
if resource.basename().endswith("_guider.fits"):
guider_resources.append(resource)
else:
normal_resources.append(resource)

refs = []
if guider_resources:
refs.extend(ingest_batch(guider_resources, guider_ingest))
if normal_resources:
refs.extend(ingest_batch(normal_resources, ingester.run))

# Define visits if we ingested anything
if not is_lfa and refs:
Expand Down

0 comments on commit 47ad19c

Please sign in to comment.