diff --git a/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs b/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs index d87c0e6..82c2253 100644 --- a/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs +++ b/affinidi-messaging-mediator/src/messages/protocols/message_pickup.rs @@ -30,131 +30,53 @@ pub(crate) async fn status_request( ) -> Result { let _span = span!(tracing::Level::DEBUG, "status_request",); async move { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - - if let Some(expires) = msg.expires_time { - if expires <= now { - debug!( - "Message expired at ({}) now({}) seconds_ago({})", - expires, - now, - now - expires - ); - return Err(MediatorError::MessageExpired( - session.session_id.clone(), - expires.to_string(), - now.to_string(), - )); - } - } - - // Ensure to: exists and is valid - let to = if let Some(to) = &msg.to { - if let Some(first) = to.first() { - first.to_owned() + _validate_msg(msg, state, session).unwrap(); + // Get or create the thread id for the response + let thid = if let Some(thid) = &msg.thid { + thid.to_owned() } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing valid 'to' field, expect at least one address in array.".into(), - )); - } - } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing 'to' field".into(), - )); - }; - debug!("To: {}", to); - - // Must be addressed to ATM - if to != state.config.mediator_did { - debug!( - "to: ({}) doesn't match ATM DID ({})", - to, state.config.mediator_did - ); - return Err(MediatorError::RequestDataError(session.session_id.clone(), - format!("message to: ({}) didn't match ATM DID ({}). Status Request messages must be addressed directly to ATM!", - to, state.config.mediator_did))); - } - - // Message can not be anonymous - if msg.from.is_none() { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message Pickup 3.0 Status-Request can not be anonymous as it is needed from to validate permissions".into(), - )); - }; - - // Check for extra-header `return_route` - if let Some(header) = msg.extra_headers.get("return_route") { - if header.as_str() != Some("all") { - debug!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ), - )); - } - } else { - debug!("return_route: extra-header does not exist!"); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "return_route: extra-header does not exist! It should!".into(), - )); - } + msg.id.clone() + }; + debug!("thid = ({})", thid); - // Get or create the thread id for the response - let thid = if let Some(thid) = &msg.thid { - thid.to_owned() - } else { - msg.id.clone() - }; - debug!("thid = ({})", thid); - - // Pull recipient_did from message body - let recipient_did: String = if let Ok(body) = - serde_json::from_value::(msg.body.to_owned()) - { - if let Some(recipient_did) = body.recipient_did { - if recipient_did != session.did { - debug!( - "recipient_did: ({}) doesn't match session.did!", - recipient_did - ); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!( + // Pull recipient_did from message body + let recipient_did: String = if let Ok(body) = + serde_json::from_value::(msg.body.to_owned()) + { + if let Some(recipient_did) = body.recipient_did { + if recipient_did != session.did { + debug!( "recipient_did: ({}) doesn't match session.did!", recipient_did - ), - )); + ); + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + format!( + "recipient_did: ({}) doesn't match session.did!", + recipient_did + ), + )); + } else { + digest(recipient_did) + } } else { - digest(recipient_did) + session.did_hash.clone() } } else { session.did_hash.clone() - } - } else { - session.did_hash.clone() - }; - debug!("Body: recipient_did: {}", recipient_did); + }; + debug!("Body: recipient_did: {}", recipient_did); - info!( - "MessagePickup Status-Request received from: ({}) recipient_did_hash({:?})", - msg.from.clone().unwrap_or_else(|| "ANONYMOUS".to_string()), - recipient_did - ); + info!( + "MessagePickup Status-Request received from: ({}) recipient_did_hash({:?})", + msg.from.clone().unwrap_or_else(|| "ANONYMOUS".to_string()), + recipient_did + ); - generate_status_reply(state, session, &recipient_did, &thid, false, None).await -}.instrument(_span).await + generate_status_reply(state, session, &recipient_did, &thid, false, None).await + } + .instrument(_span) + .await } /// Creates the reply to a valid StatusRequest message @@ -296,86 +218,7 @@ pub(crate) async fn toggle_live_delivery( ) -> Result { let _span = span!(tracing::Level::DEBUG, "toggle_live_delivery",); async move { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - - if let Some(expires) = msg.expires_time { - if expires <= now { - debug!( - "Message expired at ({}) now({}) seconds_ago({})", - expires, - now, - now - expires - ); - return Err(MediatorError::MessageExpired( - session.session_id.clone(), - expires.to_string(), - now.to_string(), - )); - } - } - - // Ensure to: exists and is valid - let to = if let Some(to) = &msg.to { - if let Some(first) = to.first() { - first.to_owned() - } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing valid 'to' field, expect at least one address in array.".into(), - )); - } - } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing 'to' field".into(), - )); - }; - debug!("To: {}", to); - - // Must be addressed to ATM - if to != state.config.mediator_did { - debug!( - "to: ({}) doesn't match ATM DID ({})", - to, state.config.mediator_did - ); - return Err(MediatorError::RequestDataError(session.session_id.clone(), - format!("message to: ({}) didn't match ATM DID ({}). Live Delivery messages must be addressed directly to ATM!", - to, state.config.mediator_did))); - } - - // Message can not be anonymous - if msg.from.is_none() { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message Pickup 3.0 Status-Request can not be anonymous as it is needed from to validate permissions".into(), - )); - }; - - // Check for extra-header `return_route` - if let Some(header) = msg.extra_headers.get("return_route") { - if header.as_str() != Some("all") { - debug!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ), - )); - } - } else { - debug!("return_route: extra-header does not exist!"); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "return_route: extra-header does not exist! It should!".into(), - )); - } + _validate_msg(msg, state, session).unwrap(); // Get or create the thread id for the response let thid = if let Some(thid) = &msg.thid { @@ -409,24 +252,53 @@ pub(crate) async fn toggle_live_delivery( if live_delivery { // Enable Live delivery if let Some(stream_task) = &state.streaming_task { - stream_task.channel.send(StreamingUpdate { did_hash: session.did_hash.clone(), state: StreamingUpdateState::Start}).await.map_err(|e| { - error!("Error sending start message to streaming task: {:?}", e); - MediatorError::InternalError(session.session_id.clone(), "Error sending start message to streaming task".into()) - })?; + stream_task + .channel + .send(StreamingUpdate { + did_hash: session.did_hash.clone(), + state: StreamingUpdateState::Start, + }) + .await + .map_err(|e| { + error!("Error sending start message to streaming task: {:?}", e); + MediatorError::InternalError( + session.session_id.clone(), + "Error sending start message to streaming task".into(), + ) + })?; } - } else { // Disable live delivery if let Some(stream_task) = &state.streaming_task { - stream_task.channel.send(StreamingUpdate { did_hash: session.did_hash.clone(), state: StreamingUpdateState::Stop}).await.map_err(|e| { - error!("Error sending stop message to streaming task: {:?}", e); - MediatorError::InternalError(session.session_id.clone(), "Error sending stop message to streaming task".into()) - })?; + stream_task + .channel + .send(StreamingUpdate { + did_hash: session.did_hash.clone(), + state: StreamingUpdateState::Stop, + }) + .await + .map_err(|e| { + error!("Error sending stop message to streaming task: {:?}", e); + MediatorError::InternalError( + session.session_id.clone(), + "Error sending stop message to streaming task".into(), + ) + })?; } } - generate_status_reply(state, session, &session.did_hash, &thid, true, Some(live_delivery)).await - }.instrument(_span).await + generate_status_reply( + state, + session, + &session.did_hash, + &thid, + true, + Some(live_delivery), + ) + .await + } + .instrument(_span) + .await } /// Process a Delivery Request message and generates a response @@ -437,6 +309,172 @@ pub(crate) async fn delivery_request( ) -> Result { let _span = span!(tracing::Level::DEBUG, "delivery_request",); async move { + _validate_msg(msg, state, session).unwrap(); + // Get or create the thread id for the response + let thid = if let Some(thid) = &msg.thid { + thid.to_owned() + } else { + msg.id.clone() + }; + debug!("thid = ({})", thid); + + // Pull recipient_did and limit from message body + let (recipient_did, limit): (String, usize) = + match serde_json::from_value::(msg.body.to_owned()) { + Ok(body) => (body.recipient_did, body.limit), + Err(e) => { + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + format!("delivery-request body isn't valid. Reason: {}", e), + )) + } + }; + + let recipient_did_hash = digest(recipient_did.clone()); + + debug!( + "Body: recipient_did: {}, limit: {}", + recipient_did_hash, limit + ); + + info!( + "MessagePickup Delivery-Request received from: ({}) recipient_did({}) limit({})", + msg.from.clone().unwrap_or_else(|| "ANONYMOUS".to_string()), + recipient_did_hash, + limit + ); + + // All the parsing is done, lets attempt to retrieve messages + let messages = state + .database + .fetch_messages( + &session.session_id, + &recipient_did_hash, + &FetchOptions { + limit, + ..Default::default() + }, + ) + .await?; + debug!("msgs fetched: {}", messages.success.len()); + + if !messages.success.is_empty() { + let response_msg = Message::build( + Uuid::new_v4().into(), + "https://didcomm.org/messagepickup/3.0/delivery".to_string(), + json!({"recipient_did": recipient_did}), + ) + .thid(thid.clone()); + + let mut attachments: Vec = Vec::new(); + + for element in messages.success { + if let Some(msg) = element.msg { + let attachment = + Attachment::base64(BASE64_URL_SAFE_NO_PAD.encode(msg)).id(element.msg_id); + + attachments.push(attachment.finalize()) + } + } + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + + let response_msg = response_msg + .attachments(attachments) + .to(session.did.clone()) + .from(state.config.mediator_did.clone()) + .created_time(now) + .expires_time(now + 300) + .finalize(); + + debug!("delivery message =\n{:?}", response_msg); + + Ok(ProcessMessageResponse { + store_message: false, + force_live_delivery: false, + message: Some(response_msg), + }) + } else { + generate_status_reply(state, session, &recipient_did_hash, &thid, false, None).await + } + } + .instrument(_span) + .await +} + +// Process a Messages Received message and generates a response +pub(crate) async fn messages_received( + msg: &Message, + state: &SharedData, + session: &Session, +) -> Result { + let _span = span!(tracing::Level::DEBUG, "messages_received",); + async move { + _validate_msg(msg, state, session).unwrap(); + // Get or create the thread id for the response + let thid = if let Some(thid) = &msg.thid { + thid.to_owned() + } else { + msg.id.clone() + }; + debug!("thid = ({})", thid); + + // Pull recipient_did and limit from message body + let message_id_list: Vec = + match serde_json::from_value::(msg.body.to_owned()) { + Ok(body) => body.message_id_list, + Err(e) => { + return Err(MediatorError::RequestDataError( + session.session_id.clone(), + format!("messages-received body isn't valid. Reason: {}", e), + )) + } + }; + + debug!("Messages Id list: {:?}", message_id_list); + + for msg_id in &message_id_list { + debug!("getting message with id: {}", msg_id); + match state.database.get_message(&session.did_hash, msg_id).await { + Ok(msg) => { + debug!("Got message: {:?}", msg); + debug!("Deleting message: {}", msg_id); + match state + .database + .delete_message(&session.session_id, &session.did_hash, msg_id) + .await + { + Ok(_) => { + info!("Deleted message: {}", msg_id); + } + Err(err) => { + info!("Error deleting message: {:?}", err); + } + } + } + Err(err) => { + info!("Error getting message: {:?}", err); + } + } + } + + Ok( + generate_status_reply(state, session, &session.did_hash, &thid, false, None) + .await + .unwrap(), + ) + } + .instrument(_span) + .await +} + +fn _validate_msg( + msg: &Message, + state: &SharedData, + session: &Session, +) -> Result<(), MediatorError> { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() @@ -459,7 +497,7 @@ pub(crate) async fn delivery_request( } // Ensure to: exists and is valid - let to = if let Some(to) = &msg.to { + let to: String = if let Some(to) = &msg.to { if let Some(first) = to.first() { first.to_owned() } else { @@ -474,7 +512,6 @@ pub(crate) async fn delivery_request( "Message missing 'to' field".into(), )); }; - debug!("To: {}", to); // Must be addressed to ATM if to != state.config.mediator_did { @@ -483,16 +520,16 @@ pub(crate) async fn delivery_request( to, state.config.mediator_did ); return Err(MediatorError::RequestDataError(session.session_id.clone(), - format!("message to: ({}) didn't match ATM DID ({}). Status Request messages must be addressed directly to ATM!", - to, state.config.mediator_did))); + format!("message to: ({}) didn't match ATM DID ({}). messages-received messages must be addressed directly to ATM!", + to, state.config.mediator_did))); } // Message can not be anonymous if msg.from.is_none() { return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message Pickup 3.0 Status-Request can not be anonymous as it is needed from to validate permissions".into(), - )); + session.session_id.clone(), + "Message Pickup 3.0 messages-received can not be anonymous as it is needed from to validate permissions".into(), + )); }; // Check for extra-header `return_route` @@ -518,217 +555,5 @@ pub(crate) async fn delivery_request( )); } - // Get or create the thread id for the response - let thid = if let Some(thid) = &msg.thid { - thid.to_owned() - } else { - msg.id.clone() - }; - debug!("thid = ({})", thid); - - // Pull recipient_did and limit from message body - let (recipient_did, limit): (String, usize) = match - serde_json::from_value::(msg.body.to_owned()) - { - Ok(body) => - (body.recipient_did, body.limit), - Err(e) => - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!("delivery-request body isn't valid. Reason: {}", e), - )) - }; - - let recipient_did_hash = digest(recipient_did.clone()); - - debug!("Body: recipient_did: {}, limit: {}", recipient_did_hash, limit); - - info!( - "MessagePickup Delivery-Request received from: ({}) recipient_did({}) limit({})", - msg.from.clone().unwrap_or_else(|| "ANONYMOUS".to_string()), - recipient_did_hash, limit - ); - - // All the parsing is done, lets attempt to retrieve messages - let messages = state.database.fetch_messages(&session.session_id, &recipient_did_hash, &FetchOptions { limit, ..Default::default() }).await?; - debug!("msgs fetched: {}", messages.success.len()); - - if !messages.success.is_empty() { - let response_msg = Message::build(Uuid::new_v4().into(), "https://didcomm.org/messagepickup/3.0/delivery".to_string(), json!({"recipient_did": recipient_did})).thid(thid.clone()); - - let mut attachments: Vec = Vec::new(); - - for element in messages.success { - if let Some(msg) = element.msg { - let attachment = Attachment::base64(BASE64_URL_SAFE_NO_PAD.encode(msg)).id(element.msg_id); - - attachments.push(attachment.finalize()) - } - } - - let response_msg = response_msg.attachments(attachments).to(session.did.clone()) - .from(state.config.mediator_did.clone()) - .created_time(now) - .expires_time(now + 300) - .finalize(); - - debug!("delivery message =\n{:?}", response_msg); - - Ok(ProcessMessageResponse { - store_message: false, - force_live_delivery: false, - message: Some(response_msg), - }) - } else { - generate_status_reply(state, session, &recipient_did_hash, &thid, false, None).await - } -}.instrument(_span).await -} - -// Process a Messages Received message and generates a response -pub(crate) async fn messages_received( - msg: &Message, - state: &SharedData, - session: &Session, -) -> Result { - let _span = span!(tracing::Level::DEBUG, "messages_received",); - async move { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - - if let Some(expires) = msg.expires_time { - if expires <= now { - debug!( - "Message expired at ({}) now({}) seconds_ago({})", - expires, - now, - now - expires - ); - return Err(MediatorError::MessageExpired( - session.session_id.clone(), - expires.to_string(), - now.to_string(), - )); - } - } - - // Ensure to: exists and is valid - let to = if let Some(to) = &msg.to { - if let Some(first) = to.first() { - first.to_owned() - } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing valid 'to' field, expect at least one address in array." - .into(), - )); - } - } else { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message missing 'to' field".into(), - )); - }; - debug!("To: {}", to); - - // Must be addressed to ATM - if to != state.config.mediator_did { - debug!( - "to: ({}) doesn't match ATM DID ({})", - to, state.config.mediator_did - ); - return Err(MediatorError::RequestDataError(session.session_id.clone(), - format!("message to: ({}) didn't match ATM DID ({}). messages-received messages must be addressed directly to ATM!", - to, state.config.mediator_did))); - } - - // Message can not be anonymous - if msg.from.is_none() { - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "Message Pickup 3.0 messages-received can not be anonymous as it is needed from to validate permissions".into(), - )); - }; - - // Check for extra-header `return_route` - if let Some(header) = msg.extra_headers.get("return_route") { - if header.as_str() != Some("all") { - debug!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!( - "return_route: extra-header exists. Expected (all) but received ({})", - header - ), - )); - } - } else { - debug!("return_route: extra-header does not exist!"); - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - "return_route: extra-header does not exist! It should!".into(), - )); - } - - // Get or create the thread id for the response - let thid = if let Some(thid) = &msg.thid { - thid.to_owned() - } else { - msg.id.clone() - }; - debug!("thid = ({})", thid); - - // Pull recipient_did and limit from message body - let message_id_list: Vec = match - serde_json::from_value::(msg.body.to_owned()) - { - Ok(body) => - body.message_id_list, - Err(e) => - return Err(MediatorError::RequestDataError( - session.session_id.clone(), - format!("messages-received body isn't valid. Reason: {}", e), - )) - - }; - - debug!("Messages Id list: {:?}", message_id_list); - - for msg_id in &message_id_list { - debug!("getting message with id: {}", msg_id); - match state.database.get_message(&session.did_hash, msg_id).await { - Ok(msg) => { - debug!("Got message: {:?}", msg); - debug!("Deleting message: {}", msg_id); - match state - .database - .delete_message(&session.session_id, &session.did_hash, msg_id) - .await - { - Ok(_) => { - info!("Deleted message: {}", msg_id); - } - Err(err) => { - info!("Error deleting message: {:?}", err); - - } - } - } - Err(err) => { - info!("Error getting message: {:?}", err); - - } - } - } - - Ok(generate_status_reply(state, session, &session.did_hash, &thid, false, None).await.unwrap()) - -} - .instrument(_span) - .await + Ok(()) }