-
Notifications
You must be signed in to change notification settings - Fork 1
/
manual_requeue.py
107 lines (78 loc) · 3.35 KB
/
manual_requeue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
class DLQ:
def __init__(self, queue_name):
self.sqs = boto3.resource('sqs')
self.dead_letter_queue_name = queue_name
def get_queue(self, queue_name):
queue = self.sqs.get_queue_by_name(
QueueName=queue_name
)
return queue
def receive_messages_from_dlq(self):
messages = self.get_queue(self.dead_letter_queue_name).receive_messages(
MaxNumberOfMessages=10,
WaitTimeSeconds=20
)
logger.info('Message Length: %s', str(len(messages)))
return messages
def send_messages_to_source_queue(self, retry_count, message_id, message_body):
source_queue_name = self.dead_letter_queue_name.replace('DLQ-', '')
logger.info('Source Queue: %s', source_queue_name)
retry_count += 1
self.get_queue(source_queue_name).send_messages(Entries=[
{
'Id': message_id,
'MessageBody': message_body,
'MessageAttributes': {
'retryCount': {
'StringValue': str(retry_count),
'DataType': 'String'
}
}
}
]
)
def delete_message_from_dlq(self, message_id, message_receipt_handle):
self.get_queue(self.dead_letter_queue_name).delete_messages(Entries=[
{
'Id': message_id,
'ReceiptHandle': message_receipt_handle
}
]
)
def requeue_all(self, retry_count):
total_moved_job = 0
while True:
messages = self.receive_messages_from_dlq()
if len(messages) == 0:
break
else:
for i, msg in enumerate(messages):
logger.info('Index:[%s], Message ID/Body: %s / %s', str(i), msg.message_id, str(msg.body))
logger.info('Index:[%s], Sending message back to source queue...', str(i))
self.send_messages_to_source_queue(retry_count, msg.message_id, msg.body)
logger.info('Index:[%s], Deleteing message on DLQ...', str(i))
self.delete_message_from_dlq(msg.message_id, msg.receipt_handle)
total_moved_job += len(messages)
if total_moved_job > 60:
break
logger.info('Total Moved Job: %s', total_moved_job)
def lambda_handler(context, event):
logger.info('Event Body: \n' + str(event))
dlq_name = event['Records'][0]['eventSourceARN'].split(':')[5]
# dlq_name = 'dlq-demo-queue' #Test queue name when without Lambda Trigger
try:
retry_count = event['Records'][0]['messageAttributes']['retryCount']['stringValue']
except Exception as e:
logger.warning('KeyError: %s, assign retry_count = 1', e)
retry_count = 1
if retry_count > 3:
logger.error('This task is retrying over 3 time. function end.')
return 200
DLQ(dlq_name).requeue_all(retry_count)
return 200
if __name__ == '__main__':
lambda_handler(context='', event='')