diff --git a/src/main.py b/src/main.py index ee2e456..0c98b12 100644 --- a/src/main.py +++ b/src/main.py @@ -5,6 +5,7 @@ import logging import os from pathlib import Path +import redis.asyncio as redis import sys import time import typing @@ -99,17 +100,13 @@ def detector_load(conf: dict, instrument: str) -> list[int]: return active_detectors async def fan_out_msg( - producer, + redis_client, fan_out_topic, data ): - await producer.start() + await redis_client.start() logging.info(f"sending msg {data}") - await producer.send_and_wait(fan_out_topic, data) - # TODO Review if flush needed. - await producer.flush() - await producer.stop() - + await redis_client.xadd(fan_out_topic, data) async def main() -> None: @@ -349,12 +346,9 @@ async def main() -> None: fan_out_topic = fan_out_comcamsim_topic in_process_requests_gauge = lsstcomcamsim_in_process_requests_gauge case "LSSTComCam": - logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}" - " as the prompt service for this is not yet deployed.") fan_out_message_list = ( next_visit_message_updated.add_detectors( dataclasses.asdict(next_visit_message_updated), - # Just use ComCam active detector config. lsstcomcam_active_detectors, fan_out_delivery_time, ) @@ -429,24 +423,17 @@ async def main() -> None: ) try: - # https://aiokafka.readthedocs.io/en/stable/producer.html - producer = AIOKafkaProducer( - bootstrap_servers=prompt_processing_kafka_cluster, - value_serializer=fan_out_serializer, - security_protocol=fan_out_security_protocol, - sasl_mechanism=fan_out_sasl_mechanism, - sasl_plain_username=fan_out_sasl_username, - sasl_plain_password=fan_out_sasl_password - ) - await producer.start() - logging.info ("started kafka producer") + redis_host = "10.108.11.86" + redis_client = redis.Redis(host=redis_host) + logging.info(f"Redis Ping successful: {await client.ping()}") + await client.aclose() for fan_out_message in fan_out_message_list: task = asyncio.create_task( fan_out_msg( - producer, + redis_client, fan_out_topic, fan_out_message )