-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathservice_bus_base.py
105 lines (84 loc) · 4.56 KB
/
service_bus_base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import json
import os
import config
import custom_log
from azure.servicebus import ServiceBusClient, management, ServiceBusSubQueue, ServiceBusReceiveMode
import service_bus_custom_encoder
class ServiceBusBase:
def __init__(self, ctx):
self.ctx = ctx
#self.CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
self.SERVICE_BUS_QUEUE_NAME = None
self.SERVICE_BUS_TOPIC_NAME = None
self.SERVICE_BUS_SUBSCRIPTION_NAME = None
self.custom_log_obj = custom_log.CustomLog(ctx.obj['VERBOSE'], ctx.obj['LOG_PATH'])
ServiceBusBase.init_conf(self)
ServiceBusBase.init_variables(self)
self.service_bus_client = ServiceBusClient.from_connection_string(conn_str=self.CONNECTION_STR, logging_enable=True)
self.servicebus_mgmt_client = management.ServiceBusAdministrationClient.from_connection_string(conn_str=self.CONNECTION_STR)
self.DEAD_LETTER = ServiceBusSubQueue.DEAD_LETTER
self.ServiceBusReceiveMode = ServiceBusReceiveMode
def init_conf(self):
config_obj = config.Config()
config_obj.init()
try:
self.CONNECTION_STR = config_obj.config_export['DEFAULT']['ServiceBusConnectionStr']
self.ctx.obj['CONNECTION_STR'] = self.CONNECTION_STR
except Exception:
self.custom_log_obj.log_info("'ServiceBusConnectionStr' not defined in servicebus.conf")
try:
self.SERVICE_BUS_QUEUE_NAME = config_obj.config_export['DEFAULT']['QueueName']
self.ctx.obj['QUEUE_NAME'] = self.SERVICE_BUS_QUEUE_NAME
except Exception:
self.custom_log_obj.log_info("'QueueName' not defined in servicebus.conf")
try:
self.SERVICE_BUS_TOPIC_NAME = config_obj.config_export['DEFAULT']['TopicName']
self.ctx.obj['TOPIC_NAME'] = self.SERVICE_BUS_TOPIC_NAME
except Exception:
self.custom_log_obj.log_info("'TopicName' not defined in servicebus.conf")
try:
self.SERVICE_BUS_SUBSCRIPTION_NAME = config_obj.config_export['DEFAULT']['SubscriptionName']
self.ctx.obj['SUBSCRIPTION_NAME'] = self.SERVICE_BUS_SUBSCRIPTION_NAME
except Exception:
self.custom_log_obj.log_info("'SubscriptionName' not defined in servicebus.conf")
def init_variables(self):
try:
self.CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
self.ctx.obj['CONNECTION_STR'] = self.CONNECTION_STR
except KeyError:
self.custom_log_obj.log_info("Environment variable 'SERVICE_BUS_CONNECTION_STR' not defined")
try:
self.SERVICE_BUS_QUEUE_NAME = os.environ['SERVICE_BUS_QUEUE_NAME']
self.ctx.obj['QUEUE_NAME'] = self.SERVICE_BUS_QUEUE_NAME
except KeyError:
self.custom_log_obj.log_info("Environment variable 'SERVICE_BUS_QUEUE_NAME' not defined")
try:
self.SERVICE_BUS_TOPIC_NAME = os.environ['SERVICE_BUS_TOPIC_NAME']
self.ctx.obj['TOPIC_NAME'] = self.SERVICE_BUS_TOPIC_NAME
except KeyError:
self.custom_log_obj.log_info("Environment variable 'SERVICE_BUS_TOPIC_NAME' not defined")
try:
self.SERVICE_BUS_SUBSCRIPTION_NAME = os.environ['SERVICE_BUS_SUBSCRIPTION_NAME']
self.ctx.obj['SUBSCRIPTION_NAME'] = self.SERVICE_BUS_SUBSCRIPTION_NAME
except KeyError:
self.custom_log_obj.log_info("Environment variable 'SERVICE_BUS_SUBSCRIPTION_NAME' not defined")
@staticmethod
def dump(obj):
for attr in dir(obj):
if hasattr(obj, attr):
print("obj.%s = %s" % (attr, getattr(obj, attr)))
def log_message_pretty(self, msg):
self.custom_log_obj.log_info("%s %s" % ('Message encoded size: ', msg.message.get_message_encoded_size(),))
json_str = json.dumps(msg, cls=service_bus_custom_encoder.ServiceBusCustomEncoder)
json_obj = json.loads(json_str)
json_obj["application_properties"] = json.loads(json_obj["application_properties"].replace("b'", "'").replace("'", '"').replace("None", '""'))
try:
json_obj["message"] = json.loads(json_obj["message"])
except Exception:
pass
json_str = json.dumps(json_obj, indent=4)
self.custom_log_obj.log_info(json_str)
def log_message(self, msg):
self.custom_log_obj.log_info("%s %s" % ('Message encoded size: ', msg.message.get_message_encoded_size(),))
json_str = json.dumps(msg, cls=service_bus_custom_encoder.ServiceBusCustomEncoder)
self.custom_log_obj.log_info(json_str.replace("\\", ""))