From 2c340ed5c33bbe91d4f0809e01fda830d3e4e789 Mon Sep 17 00:00:00 2001 From: YoussefAWasfy <112941926+YoussefAWasfy@users.noreply.github.com> Date: Mon, 23 Sep 2024 15:40:27 +0200 Subject: [PATCH] fix(FTL-17077): message pickup example without ws (#42) * fix: add messages-received to mediator and fix example * feat: refactor lua script --------- --- .../src/database/atm-functions.lua | 107 ++++++------ .../src/database/handlers.rs | 1 + .../src/messages/mod.rs | 10 +- .../src/messages/protocols/message_pickup.rs | 152 +++++++++++++++++- affinidi-messaging-sdk/README.md | 5 +- .../examples/message_pickup.rs | 76 +++++---- .../src/protocols/message_pickup.rs | 16 +- .../src/transports/websockets/mod.rs | 2 +- 8 files changed, 264 insertions(+), 105 deletions(-) diff --git a/affinidi-messaging-mediator/src/database/atm-functions.lua b/affinidi-messaging-mediator/src/database/atm-functions.lua index 41010cc..2123087 100644 --- a/affinidi-messaging-mediator/src/database/atm-functions.lua +++ b/affinidi-messaging-mediator/src/database/atm-functions.lua @@ -11,7 +11,7 @@ local function store_message(keys, args) -- Do we have the correct number of arguments? -- from_did_hash can be optional!!! - if #args < 5 then + if #args < 5 then return redis.error_reply('store_message: not enough arguments') elseif #args > 6 then return redis.error_reply('store_message: too many arguments') @@ -22,43 +22,44 @@ local function store_message(keys, args) -- Get current time on server local time = redis.call('TIME') - redis.log(redis.LOG_WARNING, 'TIME: '..time[1]..':'..time[2]) - local time = string.format("%d%03.0f", time[1], time[2]/1000) - redis.log(redis.LOG_WARNING, 'TIME-2: '..time) + redis.log(redis.LOG_WARNING, 'TIME: ' .. time[1] .. ':' .. time[2]) + local time = string.format("%d%03.0f", time[1], time[2] / 1000) + redis.log(redis.LOG_WARNING, 'TIME-2: ' .. time) local bytes = tonumber(args[2]) if bytes == nil then return redis.error_reply('store_message: invalid bytes') end - + -- Store message - redis.call('SET', 'MSG:'..keys[1], args[1]) + redis.call('SET', 'MSG:' .. keys[1], args[1]) -- Set Global Metrics redis.call('HINCRBY', 'GLOBAL', 'RECEIVED_BYTES', bytes) redis.call('HINCRBY', 'GLOBAL', 'RECEIVED_COUNT', 1) -- Create Message Expiry Record - redis.call('RPUSH', 'MSG_EXPIRY', keys[1]..':'..time) + redis.call('RPUSH', 'MSG_EXPIRY', keys[1] .. ':' .. time) -- Update the receiver records - redis.call('HINCRBY', 'DID:'..args[4], 'RECEIVE_QUEUE_BYTES', bytes) - redis.call('HINCRBY', 'DID:'..args[4], 'RECEIVE_QUEUE_COUNT', 1) + redis.call('HINCRBY', 'DID:' .. args[4], 'RECEIVE_QUEUE_BYTES', bytes) + redis.call('HINCRBY', 'DID:' .. args[4], 'RECEIVE_QUEUE_COUNT', 1) -- If changing the fields in the future, update the fetch_messages function - local RQ = redis.call('XADD', 'RECEIVE_Q:'..args[4], time..'-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'FROM', args[5]) - + local RQ = redis.call('XADD', 'RECEIVE_Q:' .. args[4], time .. '-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'FROM', + args[5]) + -- Update the sender records local SQ = nil if table.getn(args) == 6 then -- Update the sender records - redis.call('HINCRBY', 'DID:'..args[6], 'SEND_QUEUE_BYTES', bytes) - redis.call('HINCRBY', 'DID:'..args[6], 'SEND_QUEUE_COUNT', 1) - SQ = redis.call('XADD', 'SEND_Q:'..args[6], time..'-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'TO', args[3]) + redis.call('HINCRBY', 'DID:' .. args[6], 'SEND_QUEUE_BYTES', bytes) + redis.call('HINCRBY', 'DID:' .. args[6], 'SEND_QUEUE_COUNT', 1) + SQ = redis.call('XADD', 'SEND_Q:' .. args[6], time .. '-*', 'MSG_ID', keys[1], 'BYTES', bytes, 'TO', args[3]) end -- Update message MetaData - redis.call('HMSET', 'MSG:META:'..keys[1], 'BYTES', bytes, 'TO', args[4], 'TIMESTAMP', time, 'RECEIVE_ID', RQ) + redis.call('HMSET', 'MSG:META:' .. keys[1], 'BYTES', bytes, 'TO', args[4], 'TIMESTAMP', time, 'RECEIVE_ID', RQ) if SQ ~= nil then - redis.call('HMSET', 'MSG:META:'..keys[1], 'FROM', args[6], 'SEND_ID', SQ) + redis.call('HMSET', 'MSG:META:' .. keys[1], 'FROM', args[6], 'SEND_ID', SQ) end return redis.status_reply('OK') @@ -69,7 +70,7 @@ end -- args = [1] did_hash local function delete_message(keys, args) -- Correct number of keys? - if #keys ~= 1 then + if #keys ~= 1 then return redis.error_reply('delete_message: only accepts one key') end @@ -82,7 +83,7 @@ local function delete_message(keys, args) redis.setresp(3) -- Retrieve message metadata - local meta = redis.call('HGETALL', 'MSG:META:'..keys[1]) + local meta = redis.call('HGETALL', 'MSG:META:' .. keys[1]) if meta.map == nil then return redis.error_reply('Couldn\'t retrieve metadata') end @@ -94,35 +95,35 @@ local function delete_message(keys, args) local bytes = meta.map.BYTES if bytes == nil then - redis.log(redis.LOG_WARNING, 'message ('..keys[1]..') metadata did not contain BYTES field.') - return redis.error_reply('message ('..keys[1]..') metadata did not contain BYTES field.') + redis.log(redis.LOG_WARNING, 'message (' .. keys[1] .. ') metadata did not contain BYTES field.') + return redis.error_reply('message (' .. keys[1] .. ') metadata did not contain BYTES field.') end - + -- Delete message - redis.call('DEL', 'MSG:'..keys[1]) + redis.call('DEL', 'MSG:' .. keys[1]) -- Set Global Metrics redis.call('HINCRBY', 'GLOBAL', 'DELETED_BYTES', bytes) redis.call('HINCRBY', 'GLOBAL', 'DELETED_COUNT', 1) -- Remove the receiver records - redis.call('HINCRBY', 'DID:'..meta.map.TO, 'RECEIVE_QUEUE_BYTES', -bytes) - redis.call('HINCRBY', 'DID:'..meta.map.TO, 'RECEIVE_QUEUE_COUNT', -1) - redis.call('XDEL', 'RECEIVE_Q:'..meta.map.TO, meta.map.RECEIVE_ID) - + redis.call('HINCRBY', 'DID:' .. meta.map.TO, 'RECEIVE_QUEUE_BYTES', -bytes) + redis.call('HINCRBY', 'DID:' .. meta.map.TO, 'RECEIVE_QUEUE_COUNT', -1) + redis.call('XDEL', 'RECEIVE_Q:' .. meta.map.TO, meta.map.RECEIVE_ID) + -- Remove the sender records local SQ = nil if meta.map.SEND_ID ~= nil then -- Remove the sender records - redis.call('HINCRBY', 'DID:'..meta.map.FROM, 'SEND_QUEUE_BYTES', -bytes) - redis.call('HINCRBY', 'DID:'..meta.map.FROM, 'SEND_QUEUE_COUNT', -1) - SQ = redis.call('XDEL', 'SEND_Q:'..meta.map.FROM, meta.map.SEND_ID) + redis.call('HINCRBY', 'DID:' .. meta.map.FROM, 'SEND_QUEUE_BYTES', -bytes) + redis.call('HINCRBY', 'DID:' .. meta.map.FROM, 'SEND_QUEUE_COUNT', -1) + SQ = redis.call('XDEL', 'SEND_Q:' .. meta.map.FROM, meta.map.SEND_ID) end -- Remove the message metadata - redis.call('DEL', 'MSG:META:'..keys[1]) + redis.call('DEL', 'MSG:META:' .. keys[1]) - return redis.status_reply('OK') + return redis.status_reply('OK') end -- fetch_messages @@ -131,7 +132,7 @@ end -- [2] limit local function fetch_messages(keys, args) -- Do we have the correct number of arguments? - if #args ~= 2 then + if #args ~= 2 then return redis.error_reply('fetch_messages: wrong arguments') end @@ -141,11 +142,11 @@ local function fetch_messages(keys, args) -- Prepend an exclusive start_id if it exists local start_id = '-' if args[1] ~= "-" then - start_id = '('..args[1] + start_id = '(' .. args[1] end -- Get list of messages from stream - local list = redis.call('XRANGE', 'RECEIVE_Q:'..keys[1], start_id, '+', 'COUNT', args[2]) + local list = redis.call('XRANGE', 'RECEIVE_Q:' .. keys[1], start_id, '+', 'COUNT', args[2]) local fetched_messages = {} -- unpack the XRANGE list @@ -153,9 +154,9 @@ local function fetch_messages(keys, args) -- element[1] = stream_id -- element[2] = array of Stream Fields for i, sub_element in ipairs(element) do - if i == 1 then + if i == 1 then -- This is the stream ID - fetched_messages[x] = {'STREAM_ID', sub_element} + fetched_messages[x] = { 'STREAM_ID', sub_element } else -- [1] = MSG_ID -- [2] = message_id @@ -170,13 +171,13 @@ local function fetch_messages(keys, args) -- fetch the message table.insert(fetched_messages[x], 'MSG') - local msg = redis.call('GET', 'MSG:'..sub_element[2]) + local msg = redis.call('GET', 'MSG:' .. sub_element[2]) table.insert(fetched_messages[x], msg) -- fetch the message metadata - local meta = redis.call('HGETALL', 'MSG:META:'..sub_element[2]) + local meta = redis.call('HGETALL', 'MSG:META:' .. sub_element[2]) for k, v in pairs(meta.map) do - table.insert(fetched_messages[x], 'META_'..k) + table.insert(fetched_messages[x], 'META_' .. k) table.insert(fetched_messages[x], v) end end @@ -191,7 +192,7 @@ end -- returns number of sessions cleaned up local function clean_start_streaming(keys, args) -- Correct number of keys? - if #keys ~= 1 then + if #keys ~= 1 then return redis.error_reply('clean_start_streaming: only accepts one key') end @@ -204,7 +205,7 @@ local function clean_start_streaming(keys, args) redis.setresp(3) -- Prepend an exclusive start_id if it exists - local key = 'STREAMING_SESSIONS:'..keys[1] + local key = 'STREAMING_SESSIONS:' .. keys[1] -- Clean up sessions local counter = 0 @@ -212,14 +213,14 @@ local function clean_start_streaming(keys, args) local response = redis.call('SPOP', key, 1) -- No more items in the set - if next(response.set) == nil then + if next(response.set) == nil then break end local session = nil - for k,v in pairs(response.set) do + for k, v in pairs(response.set) do session = k - counter = counter+1 + counter = counter + 1 end @@ -235,7 +236,7 @@ end -- returns Message Pickup 3.0 Status details local function get_status_reply(keys, args) -- Correct number of keys? - if #keys ~= 1 then + if #keys ~= 1 then return redis.error_reply('get_status_reply: only accepts one key (recipient_did_hash)') end @@ -252,26 +253,26 @@ local function get_status_reply(keys, args) response.map.recipient_did = keys[1] -- Set the message count and total bytes - local r = redis.call('HMGET', 'DID:'..keys[1], 'RECEIVE_QUEUE_COUNT', 'RECEIVE_QUEUE_BYTES') + local r = redis.call('HMGET', 'DID:' .. keys[1], 'RECEIVE_QUEUE_COUNT', 'RECEIVE_QUEUE_BYTES') response.map.message_count = tonumber(r[1]) response.map.total_bytes = tonumber(r[2]) -- Get the oldest and newest message information - local r = redis.pcall('XINFO', 'STREAM', 'RECEIVE_Q:'..keys[1]) - if r['err'] == nil then - response.map.oldest_received = r.map['first-entry'][1] - response.map.newest_received = r.map['last-entry'][1] + local r = redis.pcall('XINFO', 'STREAM', 'RECEIVE_Q:' .. keys[1]) + if r['err'] == nil and r.map then + response.map.oldest_received = r.map['first-entry'] and r.map['first-entry'][1] or 0 + response.map.newest_received = r.map['last-entry'] and r.map['last-entry'][1] or 0 response.map.queue_count = r.map['length'] end -- Get live streaming status local r = redis.call("HEXISTS", "GLOBAL_STREAMING", keys[1]) - if r == 0 then + if r == 0 then response.map.live_delivery = false else response.map.live_delivery = true end - + return response end @@ -280,4 +281,4 @@ redis.register_function('store_message', store_message) redis.register_function('delete_message', delete_message) redis.register_function('fetch_messages', fetch_messages) redis.register_function('clean_start_streaming', clean_start_streaming) -redis.register_function('get_status_reply', get_status_reply) \ No newline at end of file +redis.register_function('get_status_reply', get_status_reply) diff --git a/affinidi-messaging-mediator/src/database/handlers.rs b/affinidi-messaging-mediator/src/database/handlers.rs index 550dd2c..27a022c 100644 --- a/affinidi-messaging-mediator/src/database/handlers.rs +++ b/affinidi-messaging-mediator/src/database/handlers.rs @@ -88,6 +88,7 @@ impl DatabaseHandler { let function_load: Result = deadpool_redis::redis::cmd("FUNCTION") .arg("LOAD") + .arg("REPLACE") .arg(LUA_SCRIPTS) .query_async(&mut conn) .await; diff --git a/affinidi-messaging-mediator/src/messages/mod.rs b/affinidi-messaging-mediator/src/messages/mod.rs index c9b376f..bb1a232 100644 --- a/affinidi-messaging-mediator/src/messages/mod.rs +++ b/affinidi-messaging-mediator/src/messages/mod.rs @@ -40,6 +40,9 @@ impl FromStr for MessageType { "https://didcomm.org/messagepickup/3.0/delivery-request" => { Ok(Self::MessagePickupDeliveryRequest) } + "https://didcomm.org/messagepickup/3.0/messages-received" => { + Ok(Self::MessagePickupMessagesReceived) + } "https://didcomm.org/routing/2.0/forward" => Ok(Self::ForwardRequest), _ => Err(MediatorError::ParseError( "-1".into(), @@ -65,10 +68,9 @@ impl MessageType { Self::MessagePickupDeliveryRequest => { message_pickup::delivery_request(message, state, session).await } - Self::MessagePickupMessagesReceived => Err(MediatorError::NotImplemented( - session.session_id.clone(), - "NOT IMPLEMENTED".into(), - )), + Self::MessagePickupMessagesReceived => { + message_pickup::messages_received(message, state, session).await + } Self::MessagePickupLiveDeliveryChange => { message_pickup::toggle_live_delivery(message, state, session).await } diff --git a/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs b/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs index 135e689..d87c0e6 100644 --- a/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs +++ b/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs @@ -2,8 +2,8 @@ use affinidi_messaging_didcomm::{Attachment, Message}; use affinidi_messaging_sdk::{ messages::fetch::FetchOptions, protocols::message_pickup::{ - MessagePickupDeliveryRequest, MessagePickupLiveDelivery, MessagePickupStatusReply, - MessagePickupStatusRequest, + MessagePickupDeliveryRequest, MessagePickupLiveDelivery, MessagePickupMessagesReceived, + MessagePickupStatusReply, MessagePickupStatusRequest, }, }; use base64::prelude::*; @@ -584,3 +584,151 @@ pub(crate) async fn delivery_request( } }.instrument(_span).await } + +// Process a Messages Received message and generates a response +pub(crate) async fn messages_received( + msg: &Message, + state: &SharedData, + session: &Session, +) -> Result { + let _span = span!(tracing::Level::DEBUG, "messages_received",); + async move { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + if let Some(expires) = msg.expires_time { + if expires <= now { + debug!( + "Message expired at ({}) now({}) seconds_ago({})", + expires, + now, + now - expires + ); + return Err(MediatorError::MessageExpired( + session.session_id.clone(), + expires.to_string(), + now.to_string(), + )); + } + } + + // Ensure to: exists and is valid + let to = if let Some(to) = &msg.to { + if let Some(first) = to.first() { + first.to_owned() + } else { + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + "Message missing valid 'to' field, expect at least one address in array." + .into(), + )); + } + } else { + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + "Message missing 'to' field".into(), + )); + }; + debug!("To: {}", to); + + // Must be addressed to ATM + if to != state.config.mediator_did { + debug!( + "to: ({}) doesn't match ATM DID ({})", + to, state.config.mediator_did + ); + return Err(MediatorError::RequestDataError(session.session_id.clone(), + format!("message to: ({}) didn't match ATM DID ({}). messages-received messages must be addressed directly to ATM!", + to, state.config.mediator_did))); + } + + // Message can not be anonymous + if msg.from.is_none() { + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + "Message Pickup 3.0 messages-received can not be anonymous as it is needed from to validate permissions".into(), + )); + }; + + // Check for extra-header `return_route` + if let Some(header) = msg.extra_headers.get("return_route") { + if header.as_str() != Some("all") { + debug!( + "return_route: extra-header exists. Expected (all) but received ({})", + header + ); + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + format!( + "return_route: extra-header exists. Expected (all) but received ({})", + header + ), + )); + } + } else { + debug!("return_route: extra-header does not exist!"); + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + "return_route: extra-header does not exist! It should!".into(), + )); + } + + // Get or create the thread id for the response + let thid = if let Some(thid) = &msg.thid { + thid.to_owned() + } else { + msg.id.clone() + }; + debug!("thid = ({})", thid); + + // Pull recipient_did and limit from message body + let message_id_list: Vec = match + serde_json::from_value::(msg.body.to_owned()) + { + Ok(body) => + body.message_id_list, + Err(e) => + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + format!("messages-received body isn't valid. Reason: {}", e), + )) + + }; + + debug!("Messages Id list: {:?}", message_id_list); + + for msg_id in &message_id_list { + debug!("getting message with id: {}", msg_id); + match state.database.get_message(&session.did_hash, msg_id).await { + Ok(msg) => { + debug!("Got message: {:?}", msg); + debug!("Deleting message: {}", msg_id); + match state + .database + .delete_message(&session.session_id, &session.did_hash, msg_id) + .await + { + Ok(_) => { + info!("Deleted message: {}", msg_id); + } + Err(err) => { + info!("Error deleting message: {:?}", err); + + } + } + } + Err(err) => { + info!("Error getting message: {:?}", err); + + } + } + } + + Ok(generate_status_reply(state, session, &session.did_hash, &thid, false, None).await.unwrap()) + +} + .instrument(_span) + .await +} diff --git a/affinidi-messaging-sdk/README.md b/affinidi-messaging-sdk/README.md index 06e29aa..38c2554 100644 --- a/affinidi-messaging-sdk/README.md +++ b/affinidi-messaging-sdk/README.md @@ -16,7 +16,7 @@ Use `` from [affinidi-messaging-mediator - Running affinidi-messag ```bash # enable logging for examples, -export RUST_LOG=none,affinidi_messaging_sdk=debug,ping=debug,demo=debug,send_message_to_me=debug,send_message_to_bob=debug,fetch_message_as_bob=debug +export RUST_LOG=none,affinidi_messaging_sdk=debug,ping=debug,demo=debug,send_message_to_me=debug,send_message_to_bob=debug,fetch_message_as_bob=debug,message_pickup=debug # no "did://" prefix for examples export MEDIATOR_DID= @@ -33,8 +33,7 @@ cargo run --example ping -- \ cargo run --example message_pickup -- \ --network-address $MEDIATOR_ENDPOINT \ - --ssl-certificates $MEDIATOR_TLS_CERTIFICATES \ - --mediator-did $MEDIATOR_DID + --ssl-certificates $MEDIATOR_TLS_CERTIFICATES # send a message to the same recipient as sender cargo run --example send_message_to_me -- \ diff --git a/affinidi-messaging-sdk/examples/message_pickup.rs b/affinidi-messaging-sdk/examples/message_pickup.rs index 9b2d3b3..a8d6bd2 100644 --- a/affinidi-messaging-sdk/examples/message_pickup.rs +++ b/affinidi-messaging-sdk/examples/message_pickup.rs @@ -5,7 +5,7 @@ use affinidi_messaging_sdk::{ }; use clap::Parser; use serde_json::json; -use tracing::{error, info}; +use tracing::info; use tracing_subscriber::filter; #[derive(Parser, Debug)] @@ -15,8 +15,6 @@ struct Args { network_address: String, #[arg(short, long)] ssl_certificates: String, - #[arg(short, long)] - mediator_did: String, } #[tokio::main] @@ -49,14 +47,22 @@ async fn main() -> Result<(), ATMError> { "y": "PpYqybOwMsm64vftt-7gBCQPIUbglMmyy_6rloSSAPk" }); - let atm_did = &args.mediator_did; + let public_config_builder = Config::builder() + .with_atm_api(&args.network_address) + .with_ssl_certificates(&mut vec![args.ssl_certificates.clone().into()]) + .with_websocket_disabled(); + + let mut public_atm = ATM::new(public_config_builder.build()?).await?; + + let atm_did = public_atm.well_known_did().await?; - let mut config = Config::builder().with_my_did(my_did).with_atm_did(atm_did); + let mut config = Config::builder().with_my_did(my_did).with_atm_did(&atm_did); println!("Running with address: {}", &args.network_address); config = config .with_atm_api(&args.network_address) - .with_ssl_certificates(&mut vec![args.ssl_certificates.into()]); + .with_ssl_certificates(&mut vec![args.ssl_certificates.into()]) + .with_websocket_disabled(); // Create a new ATM Client let mut atm = ATM::new(config.build()?).await?; @@ -67,24 +73,9 @@ async fn main() -> Result<(), ATMError> { let protocols = Protocols::new(); - /*let message = r#" - {"protected":"eyJ0eXAiOiJhcHBsaWNhdGlvbi9kaWRjb21tLWVuY3J5cHRlZCtqc29uIiwiYWxnIjoiRUNESC0xUFUrQTI1NktXIiwiZW5jIjoiQTI1NkNCQy1IUzUxMiIsInNraWQiOiJkaWQ6cGVlcjoyLlZ6Nk1rZ1dKZlZtUEVMb3pxNmFDeWNLM0NweEhOOFVwcGhuM1dTdVFrV1k2aXFzakYuRXpRM3NoZmI3dndRYVRKcUZrdDhuUmZvN051OTh0bWVZcGREZldncnFRaXREYXFYUnoja2V5LTIiLCJhcHUiOiJaR2xrT25CbFpYSTZNaTVXZWpaTmEyZFhTbVpXYlZCRlRHOTZjVFpoUTNsalN6TkRjSGhJVGpoVmNIQm9iak5YVTNWUmExZFpObWx4YzJwR0xrVjZVVE56YUdaaU4zWjNVV0ZVU25GR2EzUTRibEptYnpkT2RUazRkRzFsV1hCa1JHWlhaM0p4VVdsMFJHRnhXRko2STJ0bGVTMHkiLCJhcHYiOiI1a05fc2kyd2toMFVQX0ZlNVNLejVLQkJPNkYzMVRneXJBNEZ5Z1hTeExZIiwiZXBrIjp7ImNydiI6InNlY3AyNTZrMSIsImt0eSI6IkVDIiwieCI6IlluMllkX1BVUUZTSVQtaWt1WlFSZ0kyVmgzdFFOVXVLdWtEdWZ1clpHMzQiLCJ5IjoidkpRV1Y3U09jUWNtMkNwUkI1S3ZDMU5YeUxrS1hmdndwdVdlQlZ2RkY0RSJ9fQ","recipients":[{"header":{"kid":"did:peer:2.Vz6MkiToqovww7vYtxm1xNM15u9JzqzUFZ1k7s7MazYJUyAxv.EzQ3shQLqRUza6AMJFbPuMdvFRFWm1wKviQRnQSC1fScovJN4s.SeyJ0IjoiRElEQ29tbU1lc3NhZ2luZyIsInMiOnsidXJpIjoiaHR0cHM6Ly8xMjcuMC4wLjE6NzAzNyIsImEiOlsiZGlkY29tbS92MiJdLCJyIjpbXX19#key-2"},"encrypted_key":"_tK6Tu4uzgmKqJATtvQetmPSYC1dGKnLfDUOc6XLr4LDJHT1ujrAPtm-p2pIUd1Pewrt35CR0hK3-rdK0ffGSLJrP1ntrJ1K"}],"iv":"HP4BfkLKN5X0WTbu-PMV5g","ciphertext":"hQ4TLBC-yjGFx6GZonfzSM5DB8W9rNYeIW0eAPf1vxAkj0zuXkwsLx9tEyTbS1Oc46DDaIuMzFYcbfAyrioUNOUp-43OHGxtVKXues4wTNvMFVbWEkOq27bm18APj59JohT5QAd8Dn-iZ1vPl82MZbwxpymPWS0MQqdGB_OoY1Clm6nE7lDM3plgxbvH0VnUWopCrHHHhKE00esX7Ax5fO8RJLmdSt8DMPrVII3qOASDJIBHc2f5af06o7PytErkoN_d8oOyl2QhoQm7HWe1gWFXZcxxeZaFRqdJDcVrUJ_h3X7S21qxn38TdZ7r5fgGu9as0GvzkHCZbn9OBCjpQ6cUQNdB6NRuUXhjgWFH1vXALAiRn3OLsgiUVJM__Nr3EWLOgOeBLDz76K6LuXfFZApYjErf6doLbjMQ_vlb93r9-HNKy5xNHKZJF0i7qPCkZJUOPxm7DUsYt_zmq3gNs9C6DAug8Dp5ayBPriNUxgWBXpwaJyFB4idyMDs3rNyvsswm_Fz1ImiHT4w0DvCDPvgBbGgEot0wqzKdT-cjNAHaXIF_O-t1pzanrjUFU16VVeuyZ0m0WWB0qRnZWpADpWy_9Vdpjvvh3kw8BO05QwvTzdR82ed1R4JX2HHTZR3GNjvHy-y_b4sXK0yKAFrrSULcwLfJY-8tTLrfIlSssT9LMmBLJKBlhdIiG0gGyw0_Zf_3PocQ8a-MRcqdMFguKPAX49PcMnaSRkQkAPPYTPWp0zv8p_HREY3h8WIYWYhrfD_wAeJSAlaISxH-7RJkdnweauKyZ5sIQ4HZ7oqfw3ARR5IaJxI9N1H7gKuphLe27oYhHqbPb5OJmqlCOFi2MwmFruW9hZMitG9nYkr1GNF4rxfoT7kOfsLoEAkxmNHrBWqSoGdsCOtYCAbjeWNf6mtPoCaTfELf0hgzkOD5UXfgNO6oKoB76Y6YsuUNPbULPkEu20TDz9b2MFGkfyXONHNgA46MErNaWFM24J4-yK2hJ7YaiQ7i4x1ZvUTayoF4Co-1a4IS3Tc4kx1HF11vPzWRLQGgXtAh0LQKaoRhspMPUA2MSwWBu9Pf8cKBrY99NuQ1qVZaz8JmzikIDXEuLh6wPmny1OlvyWC4BvqPVX4-IwQV5c7Y_UiXs4p9_OU0Ioq-eA2SaZQRt1IXAodIZJ0mOf3L5om_ngoazEU9cc3N2BdwlrQ9i0fGtRmxC6nk03HqeFV182ZM8x8NFOIzRx_5lYBuTZVgjzV6vVVYK2LI_1hmGFkm5pr77vLidbSxH_oi7INANDcV9PzLKY5NB-1rdXvSUSkw5kQ2W_50HTTzLtluDD91q4DtJZg9rPI_7YFJUxWhJHOX5-iuitZNu788Cng1iHvKV2ylqIJ8GHp3f_FrIgfe8KFeULiuqQWbDgjp71NWPnk_GyNKWcGrcGx03sRV2U1l3kptkZ_Nv8yjsZygfW2iYM9vm_X-9f2rufPlpL2BmnzrmXpOBGIkk2TSXCYY5TzO65igG3yAE7-hrg02vIwbi7msUCMtiHcY","tag":"Tr-FOMqIRytEJvm1KSLFhalhErUUKZbIOWvJgyAaV2M"} - "#; - let response = atm.unpack(message).await?; - info!("Unpacked: {:?}", response); - */ - // For this example, we are forcing REST API only by closing the websocket - // NOTE: We could have done this when we configured the ATM, but we are doing it here for demonstration purposes - //atm.close_websocket().await?; - - // Enable live streaming - protocols - .message_pickup - .toggle_live_delivery(&mut atm, true) - .await?; - // Send a Message Pickup 3.0 Status Request - error!("Testing live_stream_next()!"); + info!("Testing status-request()!"); + // Status Request -> Status let status = protocols .message_pickup .send_status_request(&mut atm, None, None, None) @@ -92,15 +83,8 @@ async fn main() -> Result<(), ATMError> { info!("Status: {:?}", status); - if let Some((message, _)) = protocols - .message_pickup - .live_stream_next(&mut atm, Duration::from_secs(2)) - .await? - { - info!("Message: {:?}", message); - } - - error!("Testing delivery-request()!"); + info!("Testing delivery-request()!"); + // Delivery Request -> Message Delivery let response = protocols .message_pickup .send_delivery_request(&mut atm, None, None, None, None) @@ -109,10 +93,11 @@ async fn main() -> Result<(), ATMError> { let mut delete_ids: Vec = Vec::new(); for (message, _) in response { - info!("Message: {}", message.id); + info!("[send_delivery_request] Message: {}", message.id); delete_ids.push(message.id.clone()); } - + info!("Testing messages-received()!"); + // Message Received -> Status let response = protocols .message_pickup .send_messages_received(&mut atm, None, None, &delete_ids, None) @@ -121,6 +106,19 @@ async fn main() -> Result<(), ATMError> { info!("Status: after send_messages_received() : {:?}", response); /* TODO: Need to complete this part of the protocol... + Enable live streaming + protocols + .message_pickup + .toggle_live_delivery(&mut atm, false) + .await?; + + if let Some((message, _)) = protocols + .message_pickup + .live_stream_next(&mut atm, Duration::from_secs(2)) + .await? + { + info!("[live_stream_next] Message: {:?}", message); + } tokio::time::sleep(Duration::from_secs(1)).await; error!("Testing live_stream_get()!"); @@ -134,14 +132,14 @@ async fn main() -> Result<(), ATMError> { */ // Disable live streaming - protocols - .message_pickup - .toggle_live_delivery(&mut atm, false) - .await?; + // protocols + // .message_pickup + // .toggle_live_delivery(&mut atm, false) + // .await?; tokio::time::sleep(Duration::from_secs(1)).await; - atm.abort_websocket_task().await?; + // atm.abort_websocket_task().await?; Ok(()) } diff --git a/affinidi-messaging-sdk/src/protocols/message_pickup.rs b/affinidi-messaging-sdk/src/protocols/message_pickup.rs index 8ec9b1d..e110c51 100644 --- a/affinidi-messaging-sdk/src/protocols/message_pickup.rs +++ b/affinidi-messaging-sdk/src/protocols/message_pickup.rs @@ -53,6 +53,11 @@ pub struct MessagePickupDeliveryRequest { pub limit: usize, } +// Reads the body of an incoming Message Pickup 3.0 Messages Received Message +#[derive(Default, Deserialize, Serialize)] +pub struct MessagePickupMessagesReceived { + pub message_id_list: Vec, +} /// Handles the return from a message delivery request /// returns /// - StatusReply : No messages available @@ -515,7 +520,12 @@ impl MessagePickup { }; match atm.unpack(&decoded).await { - Ok((m, u)) => response.push((m, u)), + Ok((mut m, u)) => { + if let Some(attachment_id) = &attachment.id { + m.id = attachment_id.to_string(); + } + response.push((m, u)) + } Err(e) => { warn!("Error unpacking message: ({:?})", e); continue; @@ -562,7 +572,7 @@ impl MessagePickup { let mut msg = Message::build( Uuid::new_v4().into(), - "https://didcomm.org/messagepickup/3.0/delivery-request".to_owned(), + "https://didcomm.org/messagepickup/3.0/messages-received".to_owned(), json!({"message_id_list": list}), ) .header("return_route".into(), Value::String("all".into())); @@ -582,7 +592,7 @@ impl MessagePickup { let msg = msg.created_time(now).expires_time(now + 300).finalize(); let msg_id = msg.id.clone(); - debug!("Delivery-Request message: {:?}", msg); + debug!("messages-received message: {:?}", msg); // Pack the message let (msg, _) = msg diff --git a/affinidi-messaging-sdk/src/transports/websockets/mod.rs b/affinidi-messaging-sdk/src/transports/websockets/mod.rs index 1ac844f..2120acb 100644 --- a/affinidi-messaging-sdk/src/transports/websockets/mod.rs +++ b/affinidi-messaging-sdk/src/transports/websockets/mod.rs @@ -46,7 +46,7 @@ impl<'c> ATM<'c> { ws_recv_stream: None, }; - error!("secrets: {}", atm.secrets_resolver.len()); + debug!("secrets: {}", atm.secrets_resolver.len()); // Create a new channel with a capacity of at most 32. This communicates from SDK to the websocket handler let (tx, mut rx) = mpsc::channel::(32);