Skip to content

Commit

Permalink
Merge branch 'tickets/DM-43180'
Browse files Browse the repository at this point in the history
  • Loading branch information
kfindeisen committed Jun 17, 2024
2 parents 593f8df + bf44a1b commit 38c0266
Showing 1 changed file with 24 additions and 8 deletions.
32 changes: 24 additions & 8 deletions src/main.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,28 @@
import asyncio
import dataclasses
import datetime
import json
import logging
import os
from pathlib import Path
import sys
import asyncio
import httpx
import yaml
import time
import typing
import dataclasses

from aiokafka import AIOKafkaConsumer # type:ignore
from cloudevents.conversion import to_structured
from cloudevents.http import CloudEvent
from dataclasses import dataclass
from pathlib import Path
from kafkit.registry.httpx import RegistryApi
import httpx
from kafkit.registry import Deserializer
from kafkit.registry.httpx import RegistryApi
from prometheus_client import start_http_server, Summary # type:ignore
from prometheus_client import Gauge
import yaml

REQUEST_TIME = Summary("request_processing_seconds", "Time spent processing request")


@dataclass
@dataclasses.dataclass
class NextVisitModel:
"Next Visit Message"
salIndex: int
Expand Down Expand Up @@ -160,6 +162,7 @@ async def main() -> None:
group_id = os.environ["CONSUMER_GROUP"]
topic = os.environ["NEXT_VISIT_TOPIC"]
offset = os.environ["OFFSET"]
expire = float(os.environ["MESSAGE_EXPIRATION"])
kafka_schema_registry_url = os.environ["KAFKA_SCHEMA_REGISTRY_URL"]
latiss_knative_serving_url = os.environ["LATISS_KNATIVE_SERVING_URL"]
lsstcomcam_knative_serving_url = os.environ["LSSTCOMCAM_KNATIVE_SERVING_URL"]
Expand Down Expand Up @@ -273,6 +276,19 @@ async def main() -> None:
"it's not an observation.")
continue

# efdStamp is visit publication, in seconds since 1970-01-01 UTC
if next_visit_message_initial["message"]["private_efdStamp"]:
published = next_visit_message_initial["message"]["private_efdStamp"]
age = time.time() - published
if age > expire:
logging.warning("Message published at %s is %s old, ignoring.",
time.ctime(published),
datetime.timedelta(seconds=age)
)
continue
else:
logging.warning("Message does not have private_efdStamp, can't determine age.")

next_visit_message_updated = NextVisitModel(
salIndex=next_visit_message_initial["message"]["salIndex"],
scriptSalIndex=next_visit_message_initial["message"][
Expand Down

0 comments on commit 38c0266

Please sign in to comment.