Skip to content

Commit

Permalink
Add redis stream connection and send message for testing.
Browse files Browse the repository at this point in the history
  • Loading branch information
dspeck1 committed Dec 23, 2024
1 parent 17dc30e commit 1f17b83
Showing 1 changed file with 9 additions and 22 deletions.
31 changes: 9 additions & 22 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import os
from pathlib import Path
import redis.asyncio as redis
import sys
import time
import typing
Expand Down Expand Up @@ -99,17 +100,13 @@ def detector_load(conf: dict, instrument: str) -> list[int]:
return active_detectors

async def fan_out_msg(
producer,
redis_client,
fan_out_topic,
data
):
await producer.start()
await redis_client.start()
logging.info(f"sending msg {data}")
await producer.send_and_wait(fan_out_topic, data)
# TODO Review if flush needed.
await producer.flush()
await producer.stop()

await redis_client.xadd(fan_out_topic, data)

async def main() -> None:

Expand Down Expand Up @@ -349,12 +346,9 @@ async def main() -> None:
fan_out_topic = fan_out_comcamsim_topic
in_process_requests_gauge = lsstcomcamsim_in_process_requests_gauge
case "LSSTComCam":
logging.info(f"Ignore LSSTComCam message {next_visit_message_updated}"
" as the prompt service for this is not yet deployed.")
fan_out_message_list = (
next_visit_message_updated.add_detectors(
dataclasses.asdict(next_visit_message_updated),
# Just use ComCam active detector config.
lsstcomcam_active_detectors,
fan_out_delivery_time,
)
Expand Down Expand Up @@ -429,24 +423,17 @@ async def main() -> None:
)

try:
# https://aiokafka.readthedocs.io/en/stable/producer.html
producer = AIOKafkaProducer(
bootstrap_servers=prompt_processing_kafka_cluster,
value_serializer=fan_out_serializer,
security_protocol=fan_out_security_protocol,
sasl_mechanism=fan_out_sasl_mechanism,
sasl_plain_username=fan_out_sasl_username,
sasl_plain_password=fan_out_sasl_password
)
await producer.start()
logging.info ("started kafka producer")
redis_host = "10.108.11.86"
redis_client = redis.Redis(host=redis_host)
logging.info(f"Redis Ping successful: {await client.ping()}")
await client.aclose()

for fan_out_message in fan_out_message_list:

task = asyncio.create_task(

fan_out_msg(
producer,
redis_client,
fan_out_topic,
fan_out_message
)
Expand Down

0 comments on commit 1f17b83

Please sign in to comment.