Skip to content

Commit

Permalink
Add fan out delivery time to track message arrival
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Nov 21, 2024
1 parent fa58f9b commit 69f7390
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ def add_detectors(
self,
message: dict,
active_detectors: list,
fan_out_delivery_time: str
) -> list[dict[str, str]]:
"""Adds and duplicates next visit messages for fanout.
Expand All @@ -55,6 +56,8 @@ def add_detectors(
The next visit message.
active_detectors: `list`
The active detectors for an instrument.
fan_out_delivery_Time: `str`
The time that fan out sends the message in system time.
Yields
------
message_list : `list`
Expand Down Expand Up @@ -103,6 +106,7 @@ async def fan_out_msg(
await producer.start()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, data)
# TODO Review if flush needed.
await producer.flush()
await producer.stop()

Expand Down Expand Up @@ -269,6 +273,8 @@ async def main() -> None:
"it's not an observation.")
continue

fan_out_delivery_time = time.time()

'''
# Temporary disable so we can see older messages for testing.
Expand Down Expand Up @@ -323,6 +329,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
latiss_active_detectors,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_latiss_topic
Expand All @@ -334,6 +341,7 @@ async def main() -> None:
dataclasses.asdict(next_visit_message_updated),
# Just use ComCam active detector config.
lsstcomcam_active_detectors,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_comcamsim_topic
Expand All @@ -356,6 +364,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
hsc_active_detectors,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_hsc_topic
Expand All @@ -366,6 +375,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
hsc_active_detectors_59134,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_hsc_topic
Expand All @@ -376,6 +386,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
hsc_active_detectors_59142,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_hsc_topic
Expand All @@ -386,6 +397,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
hsc_active_detectors_59150,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_hsc_topic
Expand All @@ -396,6 +408,7 @@ async def main() -> None:
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
hsc_active_detectors_59160,
fan_out_delivery_time,
)
)
fan_out_topic = fan_out_hsc_topic
Expand Down

0 comments on commit 69f7390

Please sign in to comment.