Skip to content

Commit

Permalink
fix(FTL-17077): message pickup example without ws (#42)
Browse files Browse the repository at this point in the history
* fix: add messages-received to mediator and fix example

* feat: refactor lua script

---------
  • Loading branch information
YoussefAWasfy authored Sep 23, 2024
1 parent 822f794 commit 2c340ed
Show file tree
Hide file tree
Showing 8 changed files with 264 additions and 105 deletions.
107 changes: 54 additions & 53 deletions affinidi-messaging-mediator/src/database/atm-functions.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
local function store_message(keys, args)
-- Do we have the correct number of arguments?
-- from_did_hash can be optional!!!
if #args < 5 then
if #args < 5 then
return redis.error_reply('store_message: not enough arguments')
elseif #args > 6 then
return redis.error_reply('store_message: too many arguments')
Expand All @@ -22,43 +22,44 @@ local function store_message(keys, args)

-- Get current time on server
local time = redis.call('TIME')
redis.log(redis.LOG_WARNING, 'TIME: '..time[1]..':'..time[2])
local time = string.format("%d%03.0f", time[1], time[2]/1000)
redis.log(redis.LOG_WARNING, 'TIME-2: '..time)
redis.log(redis.LOG_WARNING, 'TIME: ' .. time[1] .. ':' .. time[2])
local time = string.format("%d%03.0f", time[1], time[2] / 1000)
redis.log(redis.LOG_WARNING, 'TIME-2: ' .. time)
local bytes = tonumber(args[2])
if bytes == nil then
return redis.error_reply('store_message: invalid bytes')
end

-- Store message
redis.call('SET', 'MSG:'..keys[1], args[1])
redis.call('SET', 'MSG:' .. keys[1], args[1])

-- Set Global Metrics
redis.call('HINCRBY', 'GLOBAL', 'RECEIVED_BYTES', bytes)
redis.call('HINCRBY', 'GLOBAL', 'RECEIVED_COUNT', 1)

-- Create Message Expiry Record
redis.call('RPUSH', 'MSG_EXPIRY', keys[1]..':'..time)
redis.call('RPUSH', 'MSG_EXPIRY', keys[1] .. ':' .. time)

-- Update the receiver records
redis.call('HINCRBY', 'DID:'..args[4], 'RECEIVE_QUEUE_BYTES', bytes)
redis.call('HINCRBY', 'DID:'..args[4], 'RECEIVE_QUEUE_COUNT', 1)
redis.call('HINCRBY', 'DID:' .. args[4], 'RECEIVE_QUEUE_BYTES', bytes)
redis.call('HINCRBY', 'DID:' .. args[4], 'RECEIVE_QUEUE_COUNT', 1)
-- If changing the fields in the future, update the fetch_messages function
local RQ = redis.call('XADD', 'RECEIVE_Q:'..args[4], time..'-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'FROM', args[5])

local RQ = redis.call('XADD', 'RECEIVE_Q:' .. args[4], time .. '-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'FROM',
args[5])

-- Update the sender records
local SQ = nil
if table.getn(args) == 6 then
-- Update the sender records
redis.call('HINCRBY', 'DID:'..args[6], 'SEND_QUEUE_BYTES', bytes)
redis.call('HINCRBY', 'DID:'..args[6], 'SEND_QUEUE_COUNT', 1)
SQ = redis.call('XADD', 'SEND_Q:'..args[6], time..'-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'TO', args[3])
redis.call('HINCRBY', 'DID:' .. args[6], 'SEND_QUEUE_BYTES', bytes)
redis.call('HINCRBY', 'DID:' .. args[6], 'SEND_QUEUE_COUNT', 1)
SQ = redis.call('XADD', 'SEND_Q:' .. args[6], time .. '-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'TO', args[3])
end

-- Update message MetaData
redis.call('HMSET', 'MSG:META:'..keys[1], 'BYTES', bytes, 'TO', args[4], 'TIMESTAMP', time, 'RECEIVE_ID', RQ)
redis.call('HMSET', 'MSG:META:' .. keys[1], 'BYTES', bytes, 'TO', args[4], 'TIMESTAMP', time, 'RECEIVE_ID', RQ)
if SQ ~= nil then
redis.call('HMSET', 'MSG:META:'..keys[1], 'FROM', args[6], 'SEND_ID', SQ)
redis.call('HMSET', 'MSG:META:' .. keys[1], 'FROM', args[6], 'SEND_ID', SQ)
end

return redis.status_reply('OK')
Expand All @@ -69,7 +70,7 @@ end
-- args = [1] did_hash
local function delete_message(keys, args)
-- Correct number of keys?
if #keys ~= 1 then
if #keys ~= 1 then
return redis.error_reply('delete_message: only accepts one key')
end

Expand All @@ -82,7 +83,7 @@ local function delete_message(keys, args)
redis.setresp(3)

-- Retrieve message metadata
local meta = redis.call('HGETALL', 'MSG:META:'..keys[1])
local meta = redis.call('HGETALL', 'MSG:META:' .. keys[1])
if meta.map == nil then
return redis.error_reply('Couldn\'t retrieve metadata')
end
Expand All @@ -94,35 +95,35 @@ local function delete_message(keys, args)

local bytes = meta.map.BYTES
if bytes == nil then
redis.log(redis.LOG_WARNING, 'message ('..keys[1]..') metadata did not contain BYTES field.')
return redis.error_reply('message ('..keys[1]..') metadata did not contain BYTES field.')
redis.log(redis.LOG_WARNING, 'message (' .. keys[1] .. ') metadata did not contain BYTES field.')
return redis.error_reply('message (' .. keys[1] .. ') metadata did not contain BYTES field.')
end

-- Delete message
redis.call('DEL', 'MSG:'..keys[1])
redis.call('DEL', 'MSG:' .. keys[1])

-- Set Global Metrics
redis.call('HINCRBY', 'GLOBAL', 'DELETED_BYTES', bytes)
redis.call('HINCRBY', 'GLOBAL', 'DELETED_COUNT', 1)

-- Remove the receiver records
redis.call('HINCRBY', 'DID:'..meta.map.TO, 'RECEIVE_QUEUE_BYTES', -bytes)
redis.call('HINCRBY', 'DID:'..meta.map.TO, 'RECEIVE_QUEUE_COUNT', -1)
redis.call('XDEL', 'RECEIVE_Q:'..meta.map.TO, meta.map.RECEIVE_ID)
redis.call('HINCRBY', 'DID:' .. meta.map.TO, 'RECEIVE_QUEUE_BYTES', -bytes)
redis.call('HINCRBY', 'DID:' .. meta.map.TO, 'RECEIVE_QUEUE_COUNT', -1)
redis.call('XDEL', 'RECEIVE_Q:' .. meta.map.TO, meta.map.RECEIVE_ID)

-- Remove the sender records
local SQ = nil
if meta.map.SEND_ID ~= nil then
-- Remove the sender records
redis.call('HINCRBY', 'DID:'..meta.map.FROM, 'SEND_QUEUE_BYTES', -bytes)
redis.call('HINCRBY', 'DID:'..meta.map.FROM, 'SEND_QUEUE_COUNT', -1)
SQ = redis.call('XDEL', 'SEND_Q:'..meta.map.FROM, meta.map.SEND_ID)
redis.call('HINCRBY', 'DID:' .. meta.map.FROM, 'SEND_QUEUE_BYTES', -bytes)
redis.call('HINCRBY', 'DID:' .. meta.map.FROM, 'SEND_QUEUE_COUNT', -1)
SQ = redis.call('XDEL', 'SEND_Q:' .. meta.map.FROM, meta.map.SEND_ID)
end

-- Remove the message metadata
redis.call('DEL', 'MSG:META:'..keys[1])
redis.call('DEL', 'MSG:META:' .. keys[1])

return redis.status_reply('OK')
return redis.status_reply('OK')
end

-- fetch_messages
Expand All @@ -131,7 +132,7 @@ end
-- [2] limit
local function fetch_messages(keys, args)
-- Do we have the correct number of arguments?
if #args ~= 2 then
if #args ~= 2 then
return redis.error_reply('fetch_messages: wrong arguments')
end

Expand All @@ -141,21 +142,21 @@ local function fetch_messages(keys, args)
-- Prepend an exclusive start_id if it exists
local start_id = '-'
if args[1] ~= "-" then
start_id = '('..args[1]
start_id = '(' .. args[1]
end

-- Get list of messages from stream
local list = redis.call('XRANGE', 'RECEIVE_Q:'..keys[1], start_id, '+', 'COUNT', args[2])
local list = redis.call('XRANGE', 'RECEIVE_Q:' .. keys[1], start_id, '+', 'COUNT', args[2])

local fetched_messages = {}
-- unpack the XRANGE list
for x, element in ipairs(list) do
-- element[1] = stream_id
-- element[2] = array of Stream Fields
for i, sub_element in ipairs(element) do
if i == 1 then
if i == 1 then
-- This is the stream ID
fetched_messages[x] = {'STREAM_ID', sub_element}
fetched_messages[x] = { 'STREAM_ID', sub_element }
else
-- [1] = MSG_ID
-- [2] = message_id
Expand All @@ -170,13 +171,13 @@ local function fetch_messages(keys, args)

-- fetch the message
table.insert(fetched_messages[x], 'MSG')
local msg = redis.call('GET', 'MSG:'..sub_element[2])
local msg = redis.call('GET', 'MSG:' .. sub_element[2])
table.insert(fetched_messages[x], msg)

-- fetch the message metadata
local meta = redis.call('HGETALL', 'MSG:META:'..sub_element[2])
local meta = redis.call('HGETALL', 'MSG:META:' .. sub_element[2])
for k, v in pairs(meta.map) do
table.insert(fetched_messages[x], 'META_'..k)
table.insert(fetched_messages[x], 'META_' .. k)
table.insert(fetched_messages[x], v)
end
end
Expand All @@ -191,7 +192,7 @@ end
-- returns number of sessions cleaned up
local function clean_start_streaming(keys, args)
-- Correct number of keys?
if #keys ~= 1 then
if #keys ~= 1 then
return redis.error_reply('clean_start_streaming: only accepts one key')
end

Expand All @@ -204,22 +205,22 @@ local function clean_start_streaming(keys, args)
redis.setresp(3)

-- Prepend an exclusive start_id if it exists
local key = 'STREAMING_SESSIONS:'..keys[1]
local key = 'STREAMING_SESSIONS:' .. keys[1]

-- Clean up sessions
local counter = 0
while (true) do
local response = redis.call('SPOP', key, 1)

-- No more items in the set
if next(response.set) == nil then
if next(response.set) == nil then
break
end

local session = nil
for k,v in pairs(response.set) do
for k, v in pairs(response.set) do
session = k
counter = counter+1
counter = counter + 1
end


Expand All @@ -235,7 +236,7 @@ end
-- returns Message Pickup 3.0 Status details
local function get_status_reply(keys, args)
-- Correct number of keys?
if #keys ~= 1 then
if #keys ~= 1 then
return redis.error_reply('get_status_reply: only accepts one key (recipient_did_hash)')
end

Expand All @@ -252,26 +253,26 @@ local function get_status_reply(keys, args)
response.map.recipient_did = keys[1]

-- Set the message count and total bytes
local r = redis.call('HMGET', 'DID:'..keys[1], 'RECEIVE_QUEUE_COUNT', 'RECEIVE_QUEUE_BYTES')
local r = redis.call('HMGET', 'DID:' .. keys[1], 'RECEIVE_QUEUE_COUNT', 'RECEIVE_QUEUE_BYTES')
response.map.message_count = tonumber(r[1])
response.map.total_bytes = tonumber(r[2])

-- Get the oldest and newest message information
local r = redis.pcall('XINFO', 'STREAM', 'RECEIVE_Q:'..keys[1])
if r['err'] == nil then
response.map.oldest_received = r.map['first-entry'][1]
response.map.newest_received = r.map['last-entry'][1]
local r = redis.pcall('XINFO', 'STREAM', 'RECEIVE_Q:' .. keys[1])
if r['err'] == nil and r.map then
response.map.oldest_received = r.map['first-entry'] and r.map['first-entry'][1] or 0
response.map.newest_received = r.map['last-entry'] and r.map['last-entry'][1] or 0
response.map.queue_count = r.map['length']
end

-- Get live streaming status
local r = redis.call("HEXISTS", "GLOBAL_STREAMING", keys[1])
if r == 0 then
if r == 0 then
response.map.live_delivery = false
else
response.map.live_delivery = true
end

return response
end

Expand All @@ -280,4 +281,4 @@ redis.register_function('store_message', store_message)
redis.register_function('delete_message', delete_message)
redis.register_function('fetch_messages', fetch_messages)
redis.register_function('clean_start_streaming', clean_start_streaming)
redis.register_function('get_status_reply', get_status_reply)
redis.register_function('get_status_reply', get_status_reply)
1 change: 1 addition & 0 deletions affinidi-messaging-mediator/src/database/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ impl DatabaseHandler {
let function_load: Result<String, deadpool_redis::redis::RedisError> =
deadpool_redis::redis::cmd("FUNCTION")
.arg("LOAD")
.arg("REPLACE")
.arg(LUA_SCRIPTS)
.query_async(&mut conn)
.await;
Expand Down
10 changes: 6 additions & 4 deletions affinidi-messaging-mediator/src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ impl FromStr for MessageType {
"https://didcomm.org/messagepickup/3.0/delivery-request" => {
Ok(Self::MessagePickupDeliveryRequest)
}
"https://didcomm.org/messagepickup/3.0/messages-received" => {
Ok(Self::MessagePickupMessagesReceived)
}
"https://didcomm.org/routing/2.0/forward" => Ok(Self::ForwardRequest),
_ => Err(MediatorError::ParseError(
"-1".into(),
Expand All @@ -65,10 +68,9 @@ impl MessageType {
Self::MessagePickupDeliveryRequest => {
message_pickup::delivery_request(message, state, session).await
}
Self::MessagePickupMessagesReceived => Err(MediatorError::NotImplemented(
session.session_id.clone(),
"NOT IMPLEMENTED".into(),
)),
Self::MessagePickupMessagesReceived => {
message_pickup::messages_received(message, state, session).await
}
Self::MessagePickupLiveDeliveryChange => {
message_pickup::toggle_live_delivery(message, state, session).await
}
Expand Down
Loading

0 comments on commit 2c340ed

Please sign in to comment.