This repository has been archived by the owner on Jun 2, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
TRConsumer.py
82 lines (65 loc) · 2.74 KB
/
TRConsumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from typing import Union
from dotenv import load_dotenv, find_dotenv
import os
import json
import logging
import pika
from loguru import logger
from router.messages import RouterResponse, RouterCommand
from router.train_router import TrainRouter
from train_lib.clients import Consumer
from train_lib.clients.rabbitmq import LOG_FORMAT
LOGGER = logging.getLogger(__name__)
class TRConsumer(Consumer):
def __init__(self, ampq_url: str, queue: str = "", routing_key: str = None):
super().__init__(ampq_url, queue, routing_key=routing_key)
self.ampq_url = ampq_url
# Set auto reconnect to true
self.router = TrainRouter()
self.auto_reconnect = True
# Configure routing key
self.ROUTING_KEY = routing_key
def run(self):
super().run()
def on_message(self, _unused_channel, basic_deliver, properties, body):
try:
message = json.loads(body)
# print(json.dumps(message, indent=2))
except json.JSONDecodeError:
LOGGER.error("Malformed json input")
super().on_message(_unused_channel, basic_deliver, properties, body)
self.process_message(message)
super().on_message(_unused_channel, basic_deliver, properties, body)
def process_message(self, msg: Union[dict, str]):
"""
Filter the type and info from the received message from rabbit mq, and perform actions using the train router
accordingly.
:param msg:
:return:
"""
# parse message and command
if isinstance(msg, str) or isinstance(msg, bytes):
msg = json.loads(msg)
print(msg)
logger.debug(f"Received Message: {msg}")
command = RouterCommand.from_message(msg)
# perform requested action
response = self.router.process_command(command)
# publish response
self.publish_events_for_train(response)
def publish_events_for_train(self, response: RouterResponse, exchange: str = "pht",
exchange_type: str = "topic", routing_key: str = "ui.tr.event"):
connection = pika.BlockingConnection(pika.URLParameters(self.ampq_url))
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
json_message = response.make_queue_message()
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=json_message)
LOGGER.info(" [x] Sent %r" % json_message)
connection.close()
def main():
load_dotenv(find_dotenv())
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
tr_consumer = TRConsumer(os.getenv("AMQP_URL"), "", routing_key="tr")
tr_consumer.run()
if __name__ == '__main__':
main()