-
Notifications
You must be signed in to change notification settings - Fork 1
/
lua-scripts.go
167 lines (124 loc) · 4.87 KB
/
lua-scripts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package redisOrderedQueue
var scriptDeleteMessage = `
local groupSetKey = KEYS[1];
local messageQueueKey = KEYS[2];
local groupId = ARGV[1];
local messageData = ARGV[2];
local removedMsgsCount = redis.call('ZREM', messageQueueKey, messageData);
return { removedMsgsCount };
`;
var scriptAddGroupAndMessageToQueue = `
-- Processing group
local groupStreamKey = KEYS[1];
local groupSetKey = KEYS[2];
-- Message data
local priorityMessageQueueKey = KEYS[3];
-- Processing group
local groupId = ARGV[1];
-- Message data
local messagePriority = ARGV[2];
local messageData = ARGV[3];
-- Push message data into priority queue
local addedMsgCount = redis.call('ZADD', priorityMessageQueueKey, messagePriority, messageData);
local newScoreOfGroup = 0;
if (addedMsgCount > 0) then
-- Add to unlocked group set
newScoreOfGroup = tonumber(redis.call('ZINCRBY', groupSetKey, addedMsgCount, groupId));
if (newScoreOfGroup == 1) then
-- Add to stream of groups for processing
redis.call('XADD', groupStreamKey, '*', 'group', groupId);
end
end
return { addedMsgCount, newScoreOfGroup, messageData };
`;
var scriptClaimTimedOutGroup = `
-- Processing group
local groupStreamKey = KEYS[1];
-- Consumer group & consumer
local consumerGroupId = ARGV[1];
local consumerId = ARGV[2];
-- Timeout milliseconds
local timeoutMs = tonumber(ARGV[3]);
local pending_msgs = redis.call('XPENDING', groupStreamKey, consumerGroupId, '-', '+', '1');
if (#(pending_msgs) > 0) then
-- Claim message by Id for this consumer
local pending_msg_id = pending_msgs[1][1];
local pending_msg_consumer_id = pending_msgs[1][2];
local pending_msg_idle_ms = tonumber(pending_msgs[1][3]);
if (pending_msg_idle_ms >= timeoutMs) then
local claimed_msgs = redis.call('XCLAIM', groupStreamKey, consumerGroupId, consumerId, 0, pending_msg_id);
if (#(claimed_msgs) > 0) then
local msg_id = claimed_msgs[1][1];
local msg_content = claimed_msgs[1][2];
-- Check if we are snatching messages from a different consumer ID
if (consumerId ~= pending_msg_consumer_id) then
-- Check remaining pending msgs for this consumer, if no more msgs are pending, we can delete the consumer
local pending_msgs_same_consumer = redis.call('XPENDING', groupStreamKey, consumerGroupId, '-', '+', '1', pending_msg_consumer_id);
if (#(pending_msgs_same_consumer) == 0) then
-- Delete consumer which has timed out
redis.call('XGROUP', 'DELCONSUMER', groupStreamKey, consumerGroupId, pending_msg_consumer_id)
end
end
local group_id = msg_content[2];
return { msg_id, group_id };
end
end
end
return {};
`;
var scriptUnlockGroup = `
-- Processing group
local groupStreamKey = KEYS[1];
local groupSetKey = KEYS[2];
-- Message data
local priorityMessageQueueKey = KEYS[3];
-- Consumer group & consumer
local consumerGroupId = ARGV[1];
local consumerId = ARGV[2];
local messageId = ARGV[3];
local groupId = ARGV[4];
redis.call('XACK', groupStreamKey, consumerGroupId, messageId);
redis.call('XDEL', groupStreamKey, messageId);
-- Check remaining pending msgs for this consumer, if no more msgs are pending, we can delete the consumer
local pending_msgs_same_consumer = redis.call('XPENDING', groupStreamKey, consumerGroupId, '-', '+', '1', consumerId);
if (#(pending_msgs_same_consumer) == 0) then
-- Delete consumer which has timed out
redis.call('XGROUP', 'DELCONSUMER', groupStreamKey, consumerGroupId, consumerId)
end
-- Check remaining messages in group queue, if there are no more messages remainint we can delete group set key
local numMsgs = tonumber(redis.call('ZCARD', priorityMessageQueueKey));
if (numMsgs == 0) then
-- No msgs, remove group set key
redis.call('ZREM', groupSetKey, groupId);
return { 1, numMsgs };
else
-- Has msgs, update group set score to reflect current number of messages
redis.call('ZADD', groupSetKey, 'CH', numMsgs, groupId);
-- Has msgs, re-add group to the tail of the stream
redis.call('XADD', groupStreamKey, '*', 'group', groupId);
return { 0, numMsgs };
end
`;
var scriptGetMetrics = `
-- Processing group
local groupStreamKey = KEYS[1];
local groupSetKey = KEYS[2];
-- Message data
local consumerGroupId = ARGV[1];
-- Processing group
local topMessageGroupsLimit = tonumber(ARGV[2]);
local sum=0
local z=redis.call('ZRANGE', groupSetKey, 0, -1, 'WITHSCORES')
for i=2, #z, 2 do
sum=sum+z[i]
end
local topGroups = redis.call('ZREVRANGE', groupSetKey, 0, topMessageGroupsLimit, 'WITHSCORES')
local consumers = redis.call('XINFO', 'CONSUMERS', groupStreamKey, consumerGroupId)
return {
redis.call('XLEN', groupStreamKey),
redis.call('ZCARD', groupSetKey),
#(consumers),
sum,
topGroups
}
`;