-
Notifications
You must be signed in to change notification settings - Fork 0
/
service_bus.py
207 lines (173 loc) · 7.65 KB
/
service_bus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
import json
import click
import colorama
import config
from message_parser import ServiceBusMessageParser
from queue_process import QueueProcess
from topic_process import TopicProcess
@click.group(invoke_without_command=True)
@click.pass_context
@click.option('--verbose', '-v', is_flag=True,
help="Increase output verbosity level")
@click.option('--queue_name', '-qn', required=False, help="Name of the message bus queue")
@click.option('--topic_name', '-tn', required=False, help="Name of the message bus topic")
@click.option('--subscription_name', '-sn', required=False, help="Name of the message bus subscription")
@click.option('--log_path', '-l', required=False, default=None, type=click.Path(exists=True), help="Path to log console outputs")
@click.option('--default_session', '-ds', is_flag=True, help="When a queue session is used, the session name by default is called SESSION_ID")
@click.option('--session', '-s', required=False, help="Session name to be used")
def main(ctx, verbose, queue_name, topic_name, subscription_name, log_path, default_session, session):
group_commands = ['peek_queue', 'purge_queue']
"""
serviceBusPythonExplorer is a command line tool that helps management Azure Service Bus.\n
example: python service_bus.py peek_queue -n [queue_name]
"""
ctx.obj = {} if not ctx.obj else ctx.obj
if ctx.invoked_subcommand is None:
click.echo("Specify one of the commands below")
print(*group_commands, sep='\n')
ctx.obj['VERBOSE'] = verbose
ctx.obj['QUEUE_NAME'] = queue_name
ctx.obj['TOPIC_NAME'] = topic_name
ctx.obj['SUBSCRIPTION_NAME'] = subscription_name
ctx.obj['LOG_PATH'] = log_path
ctx.obj['USE_SESSION'] = False
if default_session or (session is not None and session != ''):
ctx.obj['USE_SESSION'] = True
ctx.obj['SESSION'] = session
set_session_id(ctx)
set_console_color()
if queue_name is not None:
print(queue_name)
if topic_name is not None:
print(topic_name)
if subscription_name is not None:
print(subscription_name)
#config_obj = config.Config()
#config_obj.init()
def set_session_id(ctx):
if ctx.obj['USE_SESSION']:
if ctx.obj['SESSION'] is None or ctx.obj['SESSION'] == '':
ctx.obj['SESSION'] = 'SESSION_ID'
def set_console_color():
config_obj = config.Config()
config_obj.init()
style = None
try:
style = config_obj.config_export['DEFAULT']['Style']
except Exception:
pass
if style == 'LIGHTYELLOW_EX':
colorama.init()
print(colorama.Fore.LIGHTYELLOW_EX)
elif style == 'LIGHTRED_EX':
colorama.init()
print(colorama.Fore.LIGHTRED_EX)
elif style == 'LIGHTMAGENTA_EX':
colorama.init()
print(colorama.Fore.LIGHTMAGENTA_EX)
elif style == 'LIGHTBLUE_EX':
colorama.init()
print(colorama.Fore.LIGHTBLUE_EX)
elif style == 'LIGHTCYAN_EX':
colorama.init()
print(colorama.Fore.LIGHTCYAN_EX)
elif style == 'LIGHTGREEN_EX':
colorama.init()
print(colorama.Fore.LIGHTGREEN_EX)
elif style == 'LIGHTWHITE_EX':
colorama.init()
print(colorama.Fore.LIGHTWHITE_EX)
@main.command('peek_queue')
@click.pass_context
@click.option('--max_message_count', '-mc', required=False, default=5, help="Max message count. Default is 5")
@click.option('--pages', required=False, default=1, help="Number of pages to run. Default is 1")
@click.option('--get_queue_properties', '-qp', is_flag=True, help="Get queue properties")
@click.option('--dead_letter', '-dl', is_flag=True, help="Peek messages from dead letter")
@click.option('--pretty', is_flag=True, help="Pretty json log messages")
def peek_queue(ctx, max_message_count, pages, get_queue_properties, dead_letter, pretty):
"""
: Peek messages from queue.
"""
ctx.obj['MAX_MESSAGE_COUNT'] = max_message_count
ctx.obj['PAGES'] = pages
ctx.obj['GET_QUEUE_PROPERTIES'] = get_queue_properties
ctx.obj['DEAD_LETTER'] = dead_letter
ctx.obj['PRETTY'] = pretty
queue_process_obj = QueueProcess(ctx)
queue_process_obj.spying_message_queue()
@main.command('peek_sub')
@click.pass_context
@click.option('--max_message_count', '-mc', required=False, default=5, help="Max message count. Default is 5")
@click.option('--pages', required=False, default=1, help="Number of pages to run. Default is 1")
@click.option('--get_queue_properties', '-qp', is_flag=True, help="Get queue properties")
@click.option('--dead_letter', '-dl', is_flag=True, help="Peek messages from dead letter")
@click.option('--pretty', is_flag=True, help="Pretty json log messages")
def peek_sub(ctx, max_message_count, pages, get_queue_properties, dead_letter, pretty):
"""
: Peek messages from subscription.
"""
ctx.obj['MAX_MESSAGE_COUNT'] = max_message_count
ctx.obj['PAGES'] = pages
ctx.obj['GET_QUEUE_PROPERTIES'] = get_queue_properties
ctx.obj['DEAD_LETTER'] = dead_letter
ctx.obj['PRETTY'] = pretty
queue_process_obj = TopicProcess(ctx)
queue_process_obj.spying_message()
@main.command('purge_queue')
@click.pass_context
@click.option('--confirm', prompt='Please type the word [confirm]')
@click.option('--dead_letter', '-dl', is_flag=True, help="Purge dead letter messages")
@click.option('--max_message_count', '-mc', required=False, default=50, help="Max message count. Default is 50")
@click.option('--to_dead_letter', is_flag=True, help="Sends messages to dead_letter")
def purge_queue(ctx, confirm, dead_letter, max_message_count, to_dead_letter):
"""
: Purge all messages from queue.
"""
if confirm.lower() == "confirm":
ctx.obj['MAX_MESSAGE_COUNT'] = max_message_count
ctx.obj['DEAD_LETTER'] = dead_letter
ctx.obj['TO_DEAD_LETTER'] = to_dead_letter
queue_process_obj = QueueProcess(ctx)
queue_process_obj.purge()
else:
colorama.init()
print(colorama.Fore.LIGHTRED_EX + 'Invalid word the operation will not proceed')
@main.command('purge_sub')
@click.pass_context
@click.option('--confirm', prompt='Please type the word [confirm]')
@click.option('--dead_letter', '-dl', is_flag=True, help="Purge dead letter messages")
@click.option('--max_message_count', '-mc', required=False, default=50, help="Max message count. Default is 50")
@click.option('--to_dead_letter', is_flag=True, help="Sends messages to dead_letter")
def purge_subscription(ctx, confirm, dead_letter, max_message_count, to_dead_letter):
"""
: Purge all messages from subscription
"""
if confirm.lower() == "confirm":
ctx.obj['MAX_MESSAGE_COUNT'] = max_message_count
ctx.obj['DEAD_LETTER'] = dead_letter
ctx.obj['TO_DEAD_LETTER'] = to_dead_letter
topic_process_obj = TopicProcess(ctx)
topic_process_obj.purge()
else:
colorama.init()
print(colorama.Fore.LIGHTRED_EX + 'Invalid word the operation will not proceed')
@main.command('message_queue')
@click.pass_context
@click.option('--input_file', '-i', type=click.File('rb'), required=True, help="Directory to get message file")
def message_queue(ctx, input_file):
"""
: Send messages to a service bus queue
"""
queue_process_obj = QueueProcess(ctx)
queue_process_obj.message(input_file)
@main.command('message_topic')
@click.pass_context
@click.option('--input_file', '-i', type=click.File('rb'), required=True, help="Directory to get message file")
def message_topic(ctx, input_file):
"""
: Send messages to a service bus topic
"""
topic_process_obj = TopicProcess(ctx)
topic_process_obj.message(input_file)
if __name__ == '__main__':
main()