diff --git a/Cargo.toml b/Cargo.toml index 4143e88..0e1d8e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ citadel_workspace_lib = { path = "./citadel_workspace_lib", default-features = f # standard deps serde = { version = "1.0.104", features = ["derive"] } -citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", branch = "master"} +citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" } tokio = { version = "1.28.1", default-features = false } tokio-util = { version = "0.7.8", default-features = false } bincode2 = { version = "2.0.1", default-features = false } diff --git a/citadel_workspace_service/src/kernel/mod.rs b/citadel_workspace_service/src/kernel/mod.rs index 683fc61..b253cdf 100644 --- a/citadel_workspace_service/src/kernel/mod.rs +++ b/citadel_workspace_service/src/kernel/mod.rs @@ -44,6 +44,7 @@ pub struct Connection { peers: HashMap, associated_tcp_connection: Uuid, c2s_file_transfer_handlers: HashMap>, + groups: HashMap, } #[allow(dead_code)] @@ -54,6 +55,13 @@ struct PeerConnection { associated_tcp_connection: Uuid, } +#[allow(dead_code)] +pub struct GroupConnection { + key: MessageGroupKey, + tx: GroupChannelSendHalf, + cid: u64, +} + impl Connection { fn new( sink: PeerChannelSendHalf, @@ -66,6 +74,7 @@ impl Connection { client_server_remote, associated_tcp_connection, c2s_file_transfer_handlers: HashMap::new(), + groups: HashMap::new(), } } @@ -107,18 +116,13 @@ impl Connection { } } - // fn remove_object_transfer_handler(&mut self, peer_cid: u64, object_id: u32) -> Option> { - // if self.implicated_cid() == peer_cid { - // // C2S - // self.c2s_file_transfer_handlers.remove(&object_id) - // } else { - // // P2P - // if let Some(peer_connection) = self.peers.get_mut(&peer_cid) { - // peer_connection.handler_map.remove(&object_id) - // } - // else{None} - // } - // } + pub fn add_group_channel( + &mut self, + group_key: MessageGroupKey, + group_channel: GroupConnection, + ) { + self.groups.insert(group_key, group_channel); + } fn take_file_transfer_handle( &mut self, @@ -217,6 +221,7 @@ impl NetKernel for CitadelWorkspaceService { } async fn on_node_event_received(&self, message: NodeResult) -> Result<(), NetworkError> { + info!(target: "citadel", "NODE EVENT RECEIVED WITH MESSAGE: {message:?}"); match message { NodeResult::Disconnect(disconnect) => { if let Some(conn) = disconnect.v_conn_type { @@ -343,16 +348,48 @@ impl NetKernel for CitadelWorkspaceService { ); } } - NodeResult::PeerEvent(event) => { - if let PeerSignal::Disconnect { + NodeResult::GroupChannelCreated(group_channel_created) => { + let channel = group_channel_created.channel; + let cid = channel.cid(); + let key = channel.key(); + let (tx, rx) = channel.split(); + + let mut server_connection_map = self.server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + connection.add_group_channel(key, GroupConnection { key, tx, cid }); + + let uuid = connection.associated_tcp_connection; + request_handler::spawn_group_channel_receiver( + key, + cid, + uuid, + rx, + self.tcp_connection_map.clone(), + ); + + send_response_to_tcp_client( + &self.tcp_connection_map, + InternalServiceResponse::GroupChannelCreateSuccess( + GroupChannelCreateSuccess { + cid, + group_key: key, + request_id: None, + }, + ), + connection.associated_tcp_connection, + ) + .await; + } + } + NodeResult::PeerEvent(event) => match event.event { + PeerSignal::Disconnect { peer_conn_type: PeerConnectionType::LocalGroupPeer { implicated_cid, peer_cid, }, disconnect_response: _, - } = event.event - { + } => { if let Some(conn) = self.clear_peer_connection(implicated_cid, peer_cid).await { let response = InternalServiceResponse::Disconnected(Disconnected { cid: implicated_cid, @@ -367,6 +404,31 @@ impl NetKernel for CitadelWorkspaceService { .await; } } + PeerSignal::BroadcastConnected { + implicated_cid, + group_broadcast, + } => { + let mut server_connection_map = self.server_connection_map.lock().await; + handle_group_broadcast( + group_broadcast, + implicated_cid, + &mut server_connection_map, + self.tcp_connection_map.clone(), + ) + .await; + } + _ => {} + }, + + NodeResult::GroupEvent(group_event) => { + let mut server_connection_map = self.server_connection_map.lock().await; + handle_group_broadcast( + group_event.event, + group_event.implicated_cid, + &mut server_connection_map, + self.tcp_connection_map.clone(), + ) + .await; } _ => {} } @@ -478,6 +540,184 @@ fn handle_connection( }); } +async fn handle_group_broadcast( + group_broadcast: GroupBroadcast, + implicated_cid: u64, + server_connection_map: &mut HashMap, + tcp_connection_map: Arc>>>, +) { + if let Some(connection) = server_connection_map.get_mut(&implicated_cid) { + let response = match group_broadcast { + GroupBroadcast::Invitation { + sender: peer_cid, + key: group_key, + } => Some(InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: implicated_cid, + peer_cid, + group_key, + request_id: None, + })), + + GroupBroadcast::RequestJoin { + sender: peer_cid, + key: group_key, + } => connection + .groups + .get_mut(&group_key) + .map(|_group_connection| { + InternalServiceResponse::GroupJoinRequestReceived(GroupJoinRequestReceived { + cid: implicated_cid, + peer_cid, + group_key, + request_id: None, + }) + }), + + GroupBroadcast::AcceptMembership { target: _, key: _ } => None, + + GroupBroadcast::DeclineMembership { target: _, key } => Some( + InternalServiceResponse::GroupRequestDeclined(GroupRequestDeclined { + cid: implicated_cid, + group_key: key, + request_id: None, + }), + ), + + GroupBroadcast::Message { + sender: peer_cid, + key: group_key, + message, + } => connection + .groups + .get_mut(&group_key) + .map(|_group_connection| { + InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: implicated_cid, + peer_cid, + message: message.into_buffer(), + group_key, + request_id: None, + }) + }), + + GroupBroadcast::MessageResponse { + key: group_key, + success, + } => connection + .groups + .get_mut(&group_key) + .map(|_group_connection| { + InternalServiceResponse::GroupMessageResponse(GroupMessageResponse { + cid: implicated_cid, + success, + group_key, + request_id: None, + }) + }), + + GroupBroadcast::MemberStateChanged { + key: group_key, + state, + } => Some(InternalServiceResponse::GroupMemberStateChanged( + GroupMemberStateChanged { + cid: implicated_cid, + group_key, + state, + request_id: None, + }, + )), + + GroupBroadcast::LeaveRoomResponse { + key: group_key, + success, + message, + } => Some(InternalServiceResponse::GroupLeft(GroupLeft { + cid: implicated_cid, + group_key, + success, + message, + request_id: None, + })), + + GroupBroadcast::EndResponse { + key: group_key, + success, + } => Some(InternalServiceResponse::GroupEnded(GroupEnded { + cid: implicated_cid, + group_key, + success, + request_id: None, + })), + + GroupBroadcast::Disconnected { key: group_key } => connection + .groups + .get_mut(&group_key) + .map(|_group_connection| { + InternalServiceResponse::GroupDisconnected(GroupDisconnected { + cid: implicated_cid, + group_key, + request_id: None, + }) + }), + + GroupBroadcast::AddResponse { + key: _group_key, + failed_to_invite_list: _failed_to_invite_list, + } => None, + + GroupBroadcast::AcceptMembershipResponse { key, success } => { + connection.groups.get_mut(&key).map(|_group_connection| { + InternalServiceResponse::GroupMembershipResponse(GroupMembershipResponse { + cid: implicated_cid, + group_key: key, + success, + request_id: None, + }) + }) + } + + GroupBroadcast::KickResponse { + key: _group_key, + success: _success, + } => None, + + GroupBroadcast::ListResponse { + groups: _group_list, + } => None, + + GroupBroadcast::CreateResponse { key: _group_key } => None, + + GroupBroadcast::GroupNonExists { key: _group_key } => None, + + GroupBroadcast::RequestJoinPending { result, key } => Some( + InternalServiceResponse::GroupRequestJoinPending(GroupRequestJoinPending { + cid: implicated_cid, + group_key: key, + result, + request_id: None, + }), + ), + + _ => None, + }; + match response { + Some(internal_service_response) => { + if let Some(connection) = server_connection_map.get_mut(&implicated_cid) { + send_response_to_tcp_client( + &tcp_connection_map, + internal_service_response, + connection.associated_tcp_connection, + ) + .await; + } + } + None => { + todo!() + } + } + } +} + fn spawn_tick_updater( object_transfer_handler: ObjectTransferHandler, implicated_cid: u64, diff --git a/citadel_workspace_service/src/kernel/request_handler.rs b/citadel_workspace_service/src/kernel/request_handler.rs index d26c5f2..c6ad2bb 100644 --- a/citadel_workspace_service/src/kernel/request_handler.rs +++ b/citadel_workspace_service/src/kernel/request_handler.rs @@ -1,22 +1,13 @@ use crate::kernel::{ create_client_server_remote, send_response_to_tcp_client, spawn_tick_updater, Connection, + GroupConnection, }; use async_recursion::async_recursion; +use citadel_logging::tracing::log; use citadel_logging::{error, info}; use citadel_sdk::prefabs::ClientServerRemote; use citadel_sdk::prelude::*; -use citadel_workspace_types::{ - AccountInformation, Accounts, ConnectionFailure, DeleteVirtualFileFailure, - DeleteVirtualFileSuccess, DisconnectFailure, Disconnected, DownloadFileFailure, - DownloadFileSuccess, FileTransferStatus, GetSessions, InternalServiceRequest, - InternalServiceResponse, ListAllPeers, ListAllPeersFailure, ListRegisteredPeers, - ListRegisteredPeersFailure, LocalDBClearAllKVFailure, LocalDBClearAllKVSuccess, - LocalDBDeleteKVFailure, LocalDBDeleteKVSuccess, LocalDBGetAllKVFailure, LocalDBGetAllKVSuccess, - LocalDBGetKVFailure, LocalDBGetKVSuccess, LocalDBSetKVFailure, LocalDBSetKVSuccess, - MessageReceived, MessageSendError, MessageSent, PeerConnectFailure, PeerConnectSuccess, - PeerDisconnectFailure, PeerDisconnectSuccess, PeerRegisterFailure, PeerRegisterSuccess, - PeerSessionInformation, SendFileFailure, SendFileRequestSent, SessionInformation, -}; +use citadel_workspace_types::*; use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; @@ -765,29 +756,6 @@ pub async fn handle_request( }; } - InternalServiceRequest::StartGroup { - initial_users_to_invite, - cid, - session_security_settings, - request_id: _, - } => { - let client_to_server_remote = ClientServerRemote::new( - VirtualTargetType::LocalGroupServer { - implicated_cid: cid, - }, - remote.clone(), - session_security_settings, - ); - match client_to_server_remote - .create_group(initial_users_to_invite) - .await - { - Ok(_group_channel) => {} - - Err(_err) => {} - } - } - InternalServiceRequest::PeerRegister { cid, peer_cid, @@ -1357,6 +1325,766 @@ pub async fn handle_request( } } }, + + InternalServiceRequest::GroupCreate { + cid, + request_id, + initial_users_to_invite, + } => { + let client_to_server_remote = ClientServerRemote::new( + VirtualTargetType::LocalGroupServer { + implicated_cid: cid, + }, + remote.clone(), + Default::default(), + ); + match client_to_server_remote + .create_group(initial_users_to_invite) + .await + { + Ok(group_channel) => { + // Store the group connection in map + let key = group_channel.key(); + let group_cid = group_channel.cid(); + let (tx, rx) = group_channel.split(); + match server_connection_map.lock().await.get_mut(&cid) { + Some(conn) => { + conn.add_group_channel( + key, + GroupConnection { + key, + cid: group_cid, + tx, + }, + ); + + let uuid = conn.associated_tcp_connection; + spawn_group_channel_receiver( + key, + cid, + uuid, + rx, + tcp_connection_map.clone(), + ); + + // Relay success to TCP client + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid, + group_key: key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + + None => { + todo!() + } + } + } + + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupCreateFailure(GroupCreateFailure { + cid, + message: err.into_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } + + InternalServiceRequest::GroupLeave { + cid, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(group_connection) = connection.groups.get_mut(&group_key) { + let group_sender = group_connection.tx.clone(); + drop(server_connection_map); + match group_sender.leave().await { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupLeaveSuccess(GroupLeaveSuccess { + cid, + group_key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupLeaveFailure(GroupLeaveFailure { + cid, + message: err.into_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupLeaveFailure(GroupLeaveFailure { + cid, + message: "Could Not Leave Group - Group Connection not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupLeaveFailure(GroupLeaveFailure { + cid, + message: "Could Not Leave Group - Connection not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupEnd { + cid, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(group_connection) = connection.groups.get_mut(&group_key) { + let group_sender = group_connection.tx.clone(); + drop(server_connection_map); + match group_sender.end().await { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupEndSuccess(GroupEndSuccess { + cid, + group_key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupEndFailure(GroupEndFailure { + cid, + message: err.into_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupEndFailure(GroupEndFailure { + cid, + message: "Could Not Leave Group - Group Connection not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupEndFailure(GroupEndFailure { + cid, + message: "Could Not Leave Group - Connection not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupMessage { + cid, + message, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(group_connection) = connection.groups.get_mut(&group_key) { + let group_sender = group_connection.tx.clone(); + drop(server_connection_map); + match group_sender.send_message(message.into()).await { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupMessageSuccess(GroupMessageSuccess { + cid, + group_key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupMessageFailure(GroupMessageFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupMessageFailure(GroupMessageFailure { + cid, + message: "Could Not Message Group - Group Connection not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupMessageFailure(GroupMessageFailure { + cid, + message: "Could Not Message Group - Connection not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupInvite { + cid, + peer_cid, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(group_connection) = connection.groups.get_mut(&group_key) { + let group_sender = group_connection.tx.clone(); + drop(server_connection_map); + match group_sender.invite(peer_cid).await { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupInviteSuccess(GroupInviteSuccess { + cid, + group_key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupInviteFailure(GroupInviteFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupInviteFailure(GroupInviteFailure { + cid, + message: "Could Not Invite to Group - Group Connection not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupInviteFailure(GroupInviteFailure { + cid, + message: "Could Not Invite to Group - Connection not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupKick { + cid, + peer_cid, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(group_connection) = connection.groups.get_mut(&group_key) { + let group_sender = group_connection.tx.clone(); + drop(server_connection_map); + match group_sender.kick(peer_cid).await { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupKickSuccess(GroupKickSuccess { + cid, + group_key, + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupKickFailure(GroupKickFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupKickFailure(GroupKickFailure { + cid, + message: "Could Not Kick from Group - GroupChannel not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupKickFailure(GroupKickFailure { + cid, + message: "Could Not Kick from Group - GroupChannel not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupListGroupsFor { + cid, + peer_cid, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { + let peer_remote = peer_connection.remote.clone(); + drop(server_connection_map); + let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand { + implicated_cid: cid, + command: GroupBroadcast::ListGroupsFor { cid: peer_cid }, + }); + if let Ok(mut subscription) = + peer_remote.send_callback_subscription(request).await + { + if let Some(evt) = subscription.next().await { + if let NodeResult::GroupEvent(GroupEvent { + implicated_cid: _, + ticket: _, + event: GroupBroadcast::ListResponse { groups }, + }) = evt + { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForSuccess( + GroupListGroupsForSuccess { + cid, + peer_cid, + group_list: Some(groups), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForFailure( + GroupListGroupsForFailure { + cid, + message: "Could Not List Groups - Failed".to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForFailure( + GroupListGroupsForFailure { + cid, + message: "Could Not List Groups - Subscription Error" + .to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForFailure( + GroupListGroupsForFailure { + cid, + message: "Could Not List Groups - Subscription Error" + .to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForFailure( + GroupListGroupsForFailure { + cid, + message: "Could Not List Groups - Peer not found".to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupListGroupsForFailure(GroupListGroupsForFailure { + cid, + message: "Could Not List Groups - Connection not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } + + InternalServiceRequest::GroupRespondRequest { + cid, + peer_cid, + group_key, + response, + request_id, + invitation, + } => { + let group_request = if response { + GroupBroadcast::AcceptMembership { + target: if invitation { cid } else { peer_cid }, + key: group_key, + } + } else { + GroupBroadcast::DeclineMembership { + target: if invitation { cid } else { peer_cid }, + key: group_key, + } + }; + let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand { + implicated_cid: cid, + command: group_request, + }); + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + if let Some(peer_connection) = connection.peers.get_mut(&peer_cid) { + let peer_remote = peer_connection.remote.clone(); + match peer_remote.send_callback_subscription(request).await { + Ok(mut subscription) => { + let mut result = false; + if invitation { + while let Some(evt) = subscription.next().await { + match evt { + // When accepting an invite, we expect a GroupChannelCreated in response + NodeResult::GroupChannelCreated(GroupChannelCreated { + ticket: _, + channel, + }) => { + let key = channel.key(); + let group_cid = channel.cid(); + let (tx, rx) = channel.split(); + connection.add_group_channel( + key, + GroupConnection { + key, + tx, + cid: group_cid, + }, + ); + + let uuid = connection.associated_tcp_connection; + spawn_group_channel_receiver( + key, + cid, + uuid, + rx, + tcp_connection_map.clone(), + ); + + result = true; + break; + } + NodeResult::GroupEvent(GroupEvent { + implicated_cid: _, + ticket: _, + event: + GroupBroadcast::AcceptMembershipResponse { + key: _, + success, + }, + }) => { + result = success; + // if !result { + // break; + // } + break; + } + NodeResult::GroupEvent(GroupEvent { + implicated_cid: _, + ticket: _, + event: + GroupBroadcast::DeclineMembershipResponse { + key: _, + success, + }, + }) => { + result = success; + break; + } + _ => {} + }; + } + drop(server_connection_map); + } else { + // For now we return a Success response - we did, in fact, receive the KernelStreamSubscription + result = true; + } + match result { + true => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid, + group_key, + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + false => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid, + message: "Group Invite Response Failed." + .to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } + } + } + } + + InternalServiceRequest::GroupRequestJoin { + cid, + group_key, + request_id, + } => { + let mut server_connection_map = server_connection_map.lock().await; + if let Some(connection) = server_connection_map.get_mut(&cid) { + let target_cid = group_key.cid; + if let Some(peer_connection) = connection.peers.get_mut(&target_cid) { + let peer_remote = peer_connection.remote.clone(); + drop(server_connection_map); + let group_request = GroupBroadcast::RequestJoin { + sender: cid, + key: group_key, + }; + let request = NodeRequest::GroupBroadcastCommand(GroupBroadcastCommand { + implicated_cid: cid, + command: group_request, + }); + match peer_remote.send_callback_subscription(request).await { + Ok(mut subscription) => { + let mut result = Err("Group Request Join Failed".to_string()); + while let Some(evt) = subscription.next().await { + if let NodeResult::GroupEvent(GroupEvent { + implicated_cid: _, + ticket: _, + event: + GroupBroadcast::RequestJoinPending { + result: signal_result, + key: _key, + }, + }) = evt + { + result = signal_result; + break; + } + } + match result { + Ok(_) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRequestJoinSuccess( + GroupRequestJoinSuccess { + cid, + group_key, + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRequestJoinFailure( + GroupRequestJoinFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } + } + Err(err) => { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRequestJoinFailure( + GroupRequestJoinFailure { + cid, + message: err.to_string(), + request_id: Some(request_id), + }, + ), + uuid, + ) + .await; + } + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRequestJoinFailure(GroupRequestJoinFailure { + cid, + message: "Could not Request to join Group - Peer not found".to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } else { + send_response_to_tcp_client( + tcp_connection_map, + InternalServiceResponse::GroupRequestJoinFailure(GroupRequestJoinFailure { + cid, + message: "Could not Request to join Group - Connection not found" + .to_string(), + request_id: Some(request_id), + }), + uuid, + ) + .await; + } + } } } @@ -1572,3 +2300,111 @@ async fn backend_handler_clear_all( } } } + +pub(crate) fn spawn_group_channel_receiver( + group_key: MessageGroupKey, + implicated_cid: u64, + uuid: Uuid, + mut rx: GroupChannelRecvHalf, + tcp_connection_map: Arc>>>, +) { + // Handler/Receiver for Group Channel Broadcasts that aren't handled in on_node_event_received in Kernel + let group_channel_receiver = async move { + while let Some(inbound_group_broadcast) = rx.next().await { + // Gets UnboundedSender to the TCP client to forward Broadcasts + match tcp_connection_map.lock().await.get(&uuid) { + Some(entry) => { + log::trace!(target:"citadel", "User {implicated_cid:?} Received Group Broadcast: {inbound_group_broadcast:?}"); + let message = match inbound_group_broadcast { + GroupBroadcastPayload::Message { payload, sender } => Some( + InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: implicated_cid, + peer_cid: sender, + message: payload.into_buffer(), + group_key, + request_id: None, + }), + ), + GroupBroadcastPayload::Event { payload } => match payload { + GroupBroadcast::RequestJoin { sender, key: _ } => { + Some(InternalServiceResponse::GroupJoinRequestReceived( + GroupJoinRequestReceived { + cid: implicated_cid, + peer_cid: sender, + group_key, + request_id: None, + }, + )) + } + GroupBroadcast::MemberStateChanged { key: _, state } => { + Some(InternalServiceResponse::GroupMemberStateChanged( + GroupMemberStateChanged { + cid: implicated_cid, + group_key, + state, + request_id: None, + }, + )) + } + GroupBroadcast::EndResponse { key, success } => { + Some(InternalServiceResponse::GroupEnded(GroupEnded { + cid: implicated_cid, + group_key: key, + success, + request_id: None, + })) + } + GroupBroadcast::Disconnected { key } => Some( + InternalServiceResponse::GroupDisconnected(GroupDisconnected { + cid: implicated_cid, + group_key: key, + request_id: None, + }), + ), + GroupBroadcast::MessageResponse { key, success } => { + Some(InternalServiceResponse::GroupMessageResponse( + GroupMessageResponse { + cid: implicated_cid, + group_key: key, + success, + request_id: None, + }, + )) + } + // GroupBroadcast::Create { .. } => {}, + // GroupBroadcast::LeaveRoom { .. } => {}, + // GroupBroadcast::End { .. } => {}, + // GroupBroadcast::Add { .. } => {}, + // GroupBroadcast::AddResponse { .. } => {}, + // GroupBroadcast::AcceptMembership { .. } => {}, + // GroupBroadcast::DeclineMembership { .. } => {}, + // GroupBroadcast::AcceptMembershipResponse { .. } => {}, + // GroupBroadcast::DeclineMembershipResponse { .. } => {}, + // GroupBroadcast::Kick { .. } => {}, + // GroupBroadcast::KickResponse { .. } => {}, + // GroupBroadcast::ListGroupsFor { .. } => {}, + // GroupBroadcast::ListResponse { .. } => {}, + // GroupBroadcast::Invitation { .. } => {}, + // GroupBroadcast::CreateResponse { .. } => {}, + // GroupBroadcast::RequestJoinPending { .. } => {}, + _ => None, + }, + }; + + // Forward Group Broadcast to TCP Client if it was one of the handled broadcasts + if let Some(message) = message { + if let Err(err) = entry.send(message) { + info!(target: "citadel", "Group Channel Forward To TCP Client Failed: {err:?}"); + } + } + } + None => { + info!(target:"citadel","Connection not found when Group Channel Broadcast Received"); + } + } + } + }; + + // Spawns the above Handler for Group Channel Broadcasts not handled in Node Events + tokio::task::spawn(group_channel_receiver); +} diff --git a/citadel_workspace_service/tests/common/mod.rs b/citadel_workspace_service/tests/common/mod.rs index 91d43be..83053b3 100644 --- a/citadel_workspace_service/tests/common/mod.rs +++ b/citadel_workspace_service/tests/common/mod.rs @@ -20,6 +20,7 @@ use std::error::Error; use std::future::Future; use std::net::SocketAddr; use std::path::PathBuf; +use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; @@ -28,320 +29,358 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio_util::codec::{Framed, LengthDelimitedCodec}; use uuid::Uuid; +pub struct RegisterAndConnectItems, R: Into, S: Into> { + pub internal_service_addr: SocketAddr, + pub server_addr: SocketAddr, + pub full_name: T, + pub username: R, + pub password: S, +} + +pub type PeerReturnHandle = ( + UnboundedSender, + UnboundedReceiver, + u64, +); + +pub trait PeerServiceHandles { + fn take_next_service_handle(&mut self) -> PeerReturnHandle; +} + +impl PeerServiceHandles for Vec { + fn take_next_service_handle(&mut self) -> PeerReturnHandle { + self.remove(0) + } +} + +pub fn generic_error(msg: T) -> Box { + Box::new(std::io::Error::new( + std::io::ErrorKind::Other, + msg.to_string(), + )) +} + pub async fn register_and_connect_to_server< T: Into, R: Into, S: Into, >( - internal_service_addr: SocketAddr, - server_addr: SocketAddr, - full_name: T, - username: R, - password: S, + services_to_create: Vec>, ) -> Result< - ( + Vec<( UnboundedSender, UnboundedReceiver, u64, - ), + )>, Box, > { - let conn = TcpStream::connect(internal_service_addr).await?; - info!(target: "citadel", "connected to the TCP stream"); - let framed = wrap_tcp_conn(conn); - info!(target: "citadel", "wrapped tcp connection"); - - let (mut sink, mut stream) = framed.split(); - - let first_packet = stream.next().await.unwrap()?; - info!(target: "citadel", "First packet"); - let greeter_packet: InternalServiceResponse = bincode2::deserialize(&first_packet)?; - - info!(target: "citadel", "Greeter packet {greeter_packet:?}"); - - let username = username.into(); - let full_name = full_name.into(); - let password = password.into(); - let session_security_settings = SessionSecuritySettingsBuilder::default() - // .with_crypto_params(EncryptionAlgorithm::AES_GCM_256 + KemAlgorithm::Kyber + SigAlgorithm::None) - .build() - .unwrap(); - - if let InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted) = - greeter_packet - { - let register_command = InternalServiceRequest::Register { - request_id: Uuid::new_v4(), - server_addr, - full_name, - username: username.clone(), - proposed_password: password.clone(), - session_security_settings, - connect_after_register: false, - }; - send(&mut sink, register_command).await.unwrap(); - - let second_packet = stream.next().await.unwrap().unwrap(); - let response_packet: InternalServiceResponse = - bincode2::deserialize(&second_packet).unwrap(); - if let InternalServiceResponse::RegisterSuccess( - citadel_workspace_types::RegisterSuccess { request_id: _ }, - ) = response_packet + let mut return_results: Vec<( + UnboundedSender, + UnboundedReceiver, + u64, + )> = Vec::new(); + + for item in services_to_create { + let conn = TcpStream::connect(item.internal_service_addr) + .await + .unwrap(); + info!(target: "citadel", "connected to the TCP stream"); + let framed = wrap_tcp_conn(conn); + info!(target: "citadel", "wrapped tcp connection"); + + let (mut sink, mut stream) = framed.split(); + + let first_packet = stream.next().await.unwrap().unwrap(); + info!(target: "citadel", "First packet"); + let greeter_packet: InternalServiceResponse = bincode2::deserialize(&first_packet).unwrap(); + + info!(target: "citadel", "Greeter packet {greeter_packet:?}"); + + let username = item.username.into(); + let full_name = item.full_name.into(); + let password = item.password.into(); + let session_security_settings = SessionSecuritySettingsBuilder::default() + // .with_crypto_params(EncryptionAlgorithm::AES_GCM_256 + KemAlgorithm::Kyber + SigAlgorithm::None) + .build() + .unwrap(); + + if let InternalServiceResponse::ServiceConnectionAccepted(ServiceConnectionAccepted) = + greeter_packet { - // now, connect to the server - let command = InternalServiceRequest::Connect { - username, - password, - connect_mode: Default::default(), - udp_mode: Default::default(), - keep_alive_timeout: None, - session_security_settings, + let register_command = InternalServiceRequest::Register { request_id: Uuid::new_v4(), + server_addr: item.server_addr, + full_name, + username: username.clone(), + proposed_password: password.clone(), + session_security_settings, + connect_after_register: false, }; + send(&mut sink, register_command).await.unwrap(); - send(&mut sink, command).await?; - - let next_packet = stream.next().await.unwrap()?; - let response_packet: InternalServiceResponse = bincode2::deserialize(&next_packet)?; - if let InternalServiceResponse::ConnectSuccess( - citadel_workspace_types::ConnectSuccess { cid, request_id: _ }, + let second_packet = stream.next().await.unwrap().unwrap(); + let response_packet: InternalServiceResponse = + bincode2::deserialize(&second_packet).unwrap(); + if let InternalServiceResponse::RegisterSuccess( + citadel_workspace_types::RegisterSuccess { request_id: _ }, ) = response_packet { - let (to_service, from_service) = tokio::sync::mpsc::unbounded_channel(); - let service_to_test = async move { - // take messages from the service and send them to from_service - while let Some(msg) = stream.next().await { - let msg = msg.unwrap(); - let msg_deserialized: InternalServiceResponse = - bincode2::deserialize(&msg).unwrap(); - info!(target = "citadel", "Service to test {:?}", msg_deserialized); - to_service.send(msg_deserialized).unwrap(); - } - }; - - let (to_service_sender, mut from_test) = tokio::sync::mpsc::unbounded_channel(); - let test_to_service = async move { - while let Some(msg) = from_test.recv().await { - info!(target = "citadel", "Test to service {:?}", msg); - send(&mut sink, msg).await.unwrap(); - } + // now, connect to the server + let command = InternalServiceRequest::Connect { + username, + password, + connect_mode: Default::default(), + udp_mode: Default::default(), + keep_alive_timeout: None, + session_security_settings, + request_id: Uuid::new_v4(), }; - spawn_services(service_to_test, test_to_service); + send(&mut sink, command).await.unwrap(); + + let next_packet = stream.next().await.unwrap().unwrap(); + let response_packet: InternalServiceResponse = + bincode2::deserialize(&next_packet).unwrap(); + if let InternalServiceResponse::ConnectSuccess( + citadel_workspace_types::ConnectSuccess { cid, request_id: _ }, + ) = response_packet + { + let (to_service, from_service) = tokio::sync::mpsc::unbounded_channel(); + let service_to_test = async move { + // take messages from the service and send them to from_service + while let Some(msg) = stream.next().await { + let msg = msg.unwrap(); + let msg_deserialized: InternalServiceResponse = + bincode2::deserialize(&msg).unwrap(); + info!(target = "citadel", "Service to test {:?}", msg_deserialized); + to_service.send(msg_deserialized).unwrap(); + } + }; - Ok((to_service_sender, from_service, cid)) + let (to_service_sender, mut from_test) = tokio::sync::mpsc::unbounded_channel(); + let test_to_service = async move { + while let Some(msg) = from_test.recv().await { + info!(target = "citadel", "Test to service {:?}", msg); + send(&mut sink, msg).await.unwrap(); + } + }; + + let mut internal_services: Vec< + Pin>> + Send + 'static>>, + > = Vec::new(); + internal_services.push(Box::pin(async move { + test_to_service.await; + Ok(()) + })); + internal_services.push(Box::pin(async move { + service_to_test.await; + Ok(()) + })); + spawn_services(internal_services); + return_results.push((to_service_sender, from_service, cid)); + } else { + panic!("Connection to server was not a success"); + } } else { - Err(generic_error("Connection to server was not a success")) + panic!("Registration to server was not a success"); } } else { - Err(generic_error("Registration to server was not a success")) + panic!("Wrong packet type"); } - } else { - Err(generic_error("Wrong packet type")) } + Ok(return_results) } pub async fn register_and_connect_to_server_then_peers( - a_int_svc_addr: SocketAddr, - b_int_svc_addr: SocketAddr, -) -> Result> { - // internal service for peer A - let bind_address_internal_service_a = a_int_svc_addr; - // internal service for peer B - let bind_address_internal_service_b = b_int_svc_addr; - + int_svc_addrs: Vec, +) -> Result, Box> { // TCP client (GUI, CLI) -> internal service -> empty kernel server(s) let (server, server_bind_address) = server_info_skip_cert_verification(); - tokio::task::spawn(server); - info!(target: "citadel", "sub server spawn"); - let internal_service_kernel_a = CitadelWorkspaceService::new(bind_address_internal_service_a); - let internal_service_a = NodeBuilder::default() - .with_node_type(NodeType::Peer) - // .with_backend(BackendType::InMemory) We need a filesystem backend for this test - .with_insecure_skip_cert_verification() - .build(internal_service_kernel_a) - .unwrap(); - - let internal_service_kernel_b = CitadelWorkspaceService::new(bind_address_internal_service_b); - - let internal_service_b = NodeBuilder::default() - .with_node_type(NodeType::Peer) - // .with_backend(BackendType::InMemory) We need a filesystem backend for this test - .with_insecure_skip_cert_verification() - .build(internal_service_kernel_b) - .unwrap(); - - spawn_services(internal_service_a, internal_service_b); - + let mut internal_services: Vec< + Pin>> + Send + 'static>>, + > = Vec::new(); + + for int_svc_addr_iter in int_svc_addrs.clone() { + let bind_address_internal_service = int_svc_addr_iter; + + info!(target: "citadel", "sub server spawn"); + let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); + let internal_service = NodeBuilder::default() + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(internal_service_kernel) + .unwrap(); + + internal_services.push(Box::pin(async move { + match internal_service.await { + Err(err) => Err(Box::try_from(err).unwrap()), + _ => Ok(()), + } + })); + } + spawn_services(internal_services); // give time for both the server and internal service to run tokio::time::sleep(Duration::from_millis(2000)).await; info!(target: "citadel", "about to connect to internal service"); - let (to_service_a, mut from_service_a, cid_a) = register_and_connect_to_server( - bind_address_internal_service_a, - server_bind_address, - "Peer A", - "peer.a", - "secret_a", - ) - .await - .unwrap(); - let (to_service_b, mut from_service_b, cid_b) = register_and_connect_to_server( - bind_address_internal_service_b, - server_bind_address, - "Peer B", - "peer.b", - "secret_b", - ) - .await - .unwrap(); - let session_security_settings = SessionSecuritySettingsBuilder::default() - // .with_crypto_params(EncryptionAlgorithm::AES_GCM_256 + KemAlgorithm::Kyber + SigAlgorithm::None) - .build() - .unwrap(); - - // now, both peers are connected and registered to the central server. Now, we - // need to have them peer-register to each other - to_service_a - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid_a, - peer_cid: cid_b, - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - to_service_b - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: cid_b, - peer_cid: cid_a, - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username, - request_id: _, - }) => { - assert_eq!(cid, cid_b); - assert_eq!(peer_cid, cid_b); - assert_eq!(peer_username, "peer.a"); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } - } - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username, - request_id: _, - }) => { - assert_eq!(cid, cid_a); - assert_eq!(peer_cid, cid_a); - assert_eq!(peer_username, "peer.b"); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } + let mut to_spawn: Vec>> = Vec::new(); + for (peer_number, int_svc_addr_iter) in int_svc_addrs.clone().iter().enumerate() { + let bind_address_internal_service = *int_svc_addr_iter; + to_spawn.push(RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service, + server_addr: server_bind_address, + full_name: format!("Peer {}", peer_number), + username: format!("peer.{}", peer_number), + password: format!("secret_{}", peer_number).into_bytes().to_owned(), + }); } - to_service_a - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: cid_a, - peer_cid: cid_b, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - to_service_b - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: cid_b, - peer_cid: cid_a, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, request_id: _ }) => { - assert_eq!(cid, cid_b); - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, request_id: _ }) => { - assert_eq!(cid, cid_a); - Ok(( - to_service_a, - from_service_a, - to_service_b, - from_service_b, - cid_a, - cid_b, - )) - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } -} + for service_index in 0..returned_service_info.len() { + let (item, neighbor_items) = { + let (_, second) = returned_service_info.split_at_mut(service_index); + let (element, remainder) = second.split_at_mut(1); + (&mut element[0], remainder) + }; -pub type PeerReturnHandle = ( - UnboundedSender, - UnboundedReceiver, - UnboundedSender, - UnboundedReceiver, - u64, - u64, -); + let (ref mut to_service_a, ref mut from_service_a, cid_a) = item; + for neighbor in neighbor_items { + let (ref mut to_service_b, ref mut from_service_b, cid_b) = neighbor; + let session_security_settings = SessionSecuritySettingsBuilder::default() + // .with_crypto_params(EncryptionAlgorithm::AES_GCM_256 + KemAlgorithm::Kyber + SigAlgorithm::None) + .build() + .unwrap(); + + // now, both peers are connected and registered to the central server. Now, we + // need to have them peer-register to each other + to_service_a + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: *cid_a, + peer_cid: (*cid_b), + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + to_service_b + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: *cid_b, + peer_cid: (*cid_a), + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + let item = from_service_b.recv().await.unwrap(); + + match item { + InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { + cid, + peer_cid, + peer_username: _, + request_id: _, + }) => { + assert_eq!(cid, *cid_b); + assert_eq!(peer_cid, *cid_b); + //assert_eq!(peer_username, "peer.0"); + } + _ => { + panic!("Didn't get the PeerRegisterSuccess"); + } + } -pub fn generic_error(msg: T) -> Box { - Box::new(std::io::Error::new( - std::io::ErrorKind::Other, - msg.to_string(), - )) -} + let item = from_service_a.recv().await.unwrap(); + match item { + InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { + cid, + peer_cid, + peer_username: _, + request_id: _, + }) => { + assert_eq!(cid, *cid_a); + assert_eq!(peer_cid, *cid_a); + //assert_eq!(peer_username, "peer.b"); + } + _ => { + panic!("Didn't get the PeerRegisterSuccess"); + } + } -pub fn spawn_services(internal_service_a: F1, internal_service_b: F2) -where - F1: Future + Send + 'static, - F2: Future + Send + 'static, - F1::Output: Send + 'static, - F2::Output: Send + 'static, -{ - let internal_services = async move { - tokio::select! { - _res0 = internal_service_a => (), - _res1 = internal_service_b => (), + to_service_a + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: *cid_a, + peer_cid: *cid_b, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + to_service_b + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: *cid_b, + peer_cid: *cid_a, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + let item = from_service_b.recv().await.unwrap(); + match item { + InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { + cid, + request_id: _, + }) => { + assert_eq!(cid, *cid_b); + } + _ => { + info!(target = "citadel", "{:?}", item); + panic!("Didn't get the PeerConnectSuccess"); + } + } + + let item = from_service_a.recv().await.unwrap(); + match item { + InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { + cid, + request_id: _, + }) => { + assert_eq!(cid, *cid_a); + } + _ => { + info!(target = "citadel", "{:?}", item); + panic!("Didn't get the PeerConnectSuccess"); + } + } } + } + Ok(returned_service_info) +} - // citadel_logging::error!(target: "citadel", "Internal service error: vital service ended"); - // std::process::exit(1); +pub fn spawn_services( + futures_to_spawn: Vec< + Pin>> + Send + 'static>>, + >, +) { + let services_to_spawn = async move { + let (returned_future, _, _) = futures::future::select_all(futures_to_spawn).await; + match returned_future { + Ok(_) => { + info!(target: "citadel","Vital Internal Service Ended"); + } + Err(err) => { + citadel_logging::error!(target: "citadel", "Internal service error: {err:?}"); + } + } + //std::process::exit(1); }; - tokio::task::spawn(internal_services); + tokio::task::spawn(services_to_spawn); } pub async fn send( diff --git a/citadel_workspace_service/tests/file_transfer.rs b/citadel_workspace_service/tests/file_transfer.rs index 293620f..8859c51 100644 --- a/citadel_workspace_service/tests/file_transfer.rs +++ b/citadel_workspace_service/tests/file_transfer.rs @@ -4,7 +4,7 @@ mod common; mod tests { use crate::common::{ register_and_connect_to_server, register_and_connect_to_server_then_peers, - server_info_file_transfer, + server_info_file_transfer, RegisterAndConnectItems, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -62,30 +62,33 @@ mod tests { info!(target: "citadel", "about to connect to internal service"); - let (to_service, mut from_service, cid) = register_and_connect_to_server( - bind_address_internal_service, - server_bind_address, - "John Doe", - "john.doe", - "secret", - ) - .await - .unwrap(); - - let cmp_path = PathBuf::from("../resources/test.txt"); - - let file_transfer_command = InternalServiceRequest::SendFile { - request_id: Uuid::new_v4(), - source: cmp_path.clone(), - cid, - transfer_type: TransferType::FileTransfer, - peer_cid: None, - chunk_size: None, - }; - to_service.send(file_transfer_command).unwrap(); - exhaust_stream_to_file_completion(cmp_path, &mut from_service).await; + let to_spawn = vec![RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service, + server_addr: server_bind_address, + full_name: "John Doe", + username: "john.doe", + password: "secret", + }]; + let returned_service_info = register_and_connect_to_server(to_spawn).await; + let mut service_vec = returned_service_info.unwrap(); + if let Some((to_service, from_service, cid)) = service_vec.get_mut(0_usize) { + let cmp_path = PathBuf::from("../resources/test.txt"); + + let file_transfer_command = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: cmp_path.clone(), + cid: *cid, + transfer_type: TransferType::FileTransfer, + peer_cid: None, + chunk_size: None, + }; + to_service.send(file_transfer_command).unwrap(); + exhaust_stream_to_file_completion(cmp_path, from_service).await; - Ok(()) + Ok(()) + } else { + panic!("Service Spawn Error") + } } #[tokio::test] @@ -97,21 +100,24 @@ mod tests { // internal service for peer B let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); - let (to_service_a, mut from_service_a, to_service_b, mut from_service_b, cid_a, cid_b) = - register_and_connect_to_server_then_peers( - bind_address_internal_service_a, - bind_address_internal_service_b, - ) - .await?; + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + ]) + .await?; + + let (peer_one, peer_two) = peer_return_handle_vec.as_mut_slice().split_at_mut(1_usize); + let (to_service_a, from_service_a, cid_a) = peer_one.get_mut(0_usize).unwrap(); + let (to_service_b, from_service_b, cid_b) = peer_two.get_mut(0_usize).unwrap(); let file_to_send = PathBuf::from("../resources/test.txt"); let send_file_to_service_b_payload = InternalServiceRequest::SendFile { request_id: Uuid::new_v4(), source: file_to_send, - cid: cid_a, + cid: *cid_a, transfer_type: TransferType::FileTransfer, - peer_cid: Some(cid_b), + peer_cid: Some(*cid_b), chunk_size: None, }; to_service_a.send(send_file_to_service_b_payload).unwrap(); @@ -126,8 +132,8 @@ mod tests { info!(target:"citadel", "File Transfer Request {cid_b:?}"); let file_transfer_accept = InternalServiceRequest::RespondFileTransfer { - cid: cid_b, - peer_cid: cid_a, + cid: *cid_b, + peer_cid: *cid_a, object_id: metadata.object_id as _, accept: true, download_location: None, @@ -159,13 +165,13 @@ mod tests { // Exhaust the stream for the receiver exhaust_stream_to_file_completion( PathBuf::from("../resources/test.txt"), - &mut from_service_b, + from_service_b, ) .await; // Exhaust the stream for the sender exhaust_stream_to_file_completion( PathBuf::from("../resources/test.txt"), - &mut from_service_a, + from_service_a, ) .await; } else { @@ -202,95 +208,98 @@ mod tests { info!(target: "citadel", "about to connect to internal service"); - let (to_service, mut from_service, cid) = register_and_connect_to_server( - bind_address_internal_service, - server_bind_address, - "John Doe", - "john.doe", - "secret", - ) - .await - .unwrap(); - - // Push file to REVFS - let file_to_send = PathBuf::from("../resources/test.txt"); - let virtual_path = PathBuf::from("/vfs/test.txt"); - let file_transfer_command = InternalServiceRequest::SendFile { - request_id: Uuid::new_v4(), - source: file_to_send.clone(), - cid, - transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { - virtual_path: virtual_path.clone(), - security_level: Default::default(), - }, - peer_cid: None, - chunk_size: None, - }; - to_service.send(file_transfer_command).unwrap(); - let file_transfer_response = from_service.recv().await.unwrap(); - if let InternalServiceResponse::SendFileFailure(SendFileFailure { - cid: _, - message, - request_id: _, - }) = file_transfer_response - { - panic!("Send File Failure: {message:?}") - } - - // Wait for the sender to complete the transfer - exhaust_stream_to_file_completion(file_to_send.clone(), &mut from_service).await; - - // Download/Pull file from REVFS - Don't delete on pull - let file_download_command = InternalServiceRequest::DownloadFile { - virtual_directory: virtual_path.clone(), - security_level: None, - delete_on_pull: false, - cid, - peer_cid: None, - request_id: Uuid::new_v4(), - }; - to_service.send(file_download_command).unwrap(); - let download_file_response = from_service.recv().await.unwrap(); - if let InternalServiceResponse::DownloadFileFailure(DownloadFileFailure { - cid: _, - message, - request_id: _, - }) = download_file_response - { - panic!("Download File Failure: {message:?}") - } - - // Exhaust the download request - exhaust_stream_to_file_completion(file_to_send.clone(), &mut from_service).await; - - // Delete file from REVFS - let file_delete_command = InternalServiceRequest::DeleteVirtualFile { - virtual_directory: virtual_path.clone(), - cid, - peer_cid: None, - request_id: Uuid::new_v4(), - }; - to_service.send(file_delete_command).unwrap(); - info!(target: "citadel","DeleteVirtualFile Request sent to server"); + let to_spawn = vec![RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service, + server_addr: server_bind_address, + full_name: "John Doe", + username: "john.doe", + password: "secret", + }]; + let returned_service_info = register_and_connect_to_server(to_spawn).await; + let mut service_vec = returned_service_info.unwrap(); + if let Some((to_service, from_service, cid)) = service_vec.get_mut(0_usize) { + // Push file to REVFS + let file_to_send = PathBuf::from("../resources/test.txt"); + let virtual_path = PathBuf::from("/vfs/test.txt"); + let file_transfer_command = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: file_to_send.clone(), + cid: *cid, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.clone(), + security_level: Default::default(), + }, + peer_cid: None, + chunk_size: None, + }; + to_service.send(file_transfer_command).unwrap(); + let file_transfer_response = from_service.recv().await.unwrap(); + if let InternalServiceResponse::SendFileFailure(SendFileFailure { + cid: _, + message, + request_id: _, + }) = file_transfer_response + { + panic!("Send File Failure: {message:?}") + } - let file_delete_command = from_service.recv().await.unwrap(); + // Wait for the sender to complete the transfer + exhaust_stream_to_file_completion(file_to_send.clone(), from_service).await; - match file_delete_command { - InternalServiceResponse::DeleteVirtualFileSuccess(DeleteVirtualFileSuccess { - cid: response_cid, + // Download/Pull file from REVFS - Don't delete on pull + let file_download_command = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_path.clone(), + security_level: None, + delete_on_pull: false, + cid: *cid, + peer_cid: None, + request_id: Uuid::new_v4(), + }; + to_service.send(file_download_command).unwrap(); + let download_file_response = from_service.recv().await.unwrap(); + if let InternalServiceResponse::DownloadFileFailure(DownloadFileFailure { + cid: _, + message, request_id: _, - }) => { - assert_eq!(cid, response_cid); - info!(target: "citadel","CID Comparison Yielded Success"); + }) = download_file_response + { + panic!("Download File Failure: {message:?}") } - _ => { - info!(target = "citadel", "{:?}", file_delete_command); - panic!("Didn't get the REVFS DeleteVirtualFileSuccess"); + + // Exhaust the download request + exhaust_stream_to_file_completion(file_to_send.clone(), from_service).await; + + // Delete file from REVFS + let file_delete_command = InternalServiceRequest::DeleteVirtualFile { + virtual_directory: virtual_path.clone(), + cid: *cid, + peer_cid: None, + request_id: Uuid::new_v4(), + }; + to_service.send(file_delete_command).unwrap(); + info!(target: "citadel","DeleteVirtualFile Request sent to server"); + + let file_delete_command = from_service.recv().await.unwrap(); + + match file_delete_command { + InternalServiceResponse::DeleteVirtualFileSuccess(DeleteVirtualFileSuccess { + cid: response_cid, + request_id: _, + }) => { + assert_eq!(*cid, response_cid); + info!(target: "citadel","CID Comparison Yielded Success"); + } + _ => { + info!(target = "citadel", "{:?}", file_delete_command); + panic!("Didn't get the REVFS DeleteVirtualFileSuccess"); + } } - } - info!(target: "citadel","{file_delete_command:?}"); + info!(target: "citadel","{file_delete_command:?}"); - Ok(()) + Ok(()) + } else { + panic!("Service Spawn Error"); + } } #[tokio::test] @@ -301,12 +310,15 @@ mod tests { // internal service for peer B let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); - let (to_service_a, mut from_service_a, to_service_b, mut from_service_b, cid_a, cid_b) = - register_and_connect_to_server_then_peers( - bind_address_internal_service_a, - bind_address_internal_service_b, - ) - .await?; + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + ]) + .await?; + + let (peer_one, peer_two) = peer_return_handle_vec.as_mut_slice().split_at_mut(1_usize); + let (to_service_a, from_service_a, cid_a) = peer_one.get_mut(0_usize).unwrap(); + let (to_service_b, from_service_b, cid_b) = peer_two.get_mut(0_usize).unwrap(); // Push file to REVFS on peer let file_to_send = PathBuf::from("../resources/test.txt"); @@ -314,12 +326,12 @@ mod tests { let send_file_to_service_b_payload = InternalServiceRequest::SendFile { request_id: Uuid::new_v4(), source: file_to_send.clone(), - cid: cid_a, + cid: *cid_a, transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { virtual_path: virtual_path.clone(), security_level: Default::default(), }, - peer_cid: Some(cid_b), + peer_cid: Some(*cid_b), chunk_size: None, }; to_service_a.send(send_file_to_service_b_payload).unwrap(); @@ -337,8 +349,8 @@ mod tests { }) = deserialized_service_a_payload_response { let file_transfer_accept_payload = InternalServiceRequest::RespondFileTransfer { - cid: cid_b, - peer_cid: cid_a, + cid: *cid_b, + peer_cid: *cid_a, object_id: metadata.object_id as _, accept: true, download_location: None, @@ -358,20 +370,20 @@ mod tests { virtual_directory: virtual_path.clone(), security_level: None, delete_on_pull: false, - cid: cid_a, - peer_cid: Some(cid_b), + cid: *cid_a, + peer_cid: Some(*cid_b), request_id: Uuid::new_v4(), }; to_service_a.send(download_file_command).unwrap(); - exhaust_stream_to_file_completion(file_to_send.clone(), &mut from_service_a).await; - exhaust_stream_to_file_completion(file_to_send.clone(), &mut from_service_b).await; + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; // Delete file on Peer REVFS let delete_file_command = InternalServiceRequest::DeleteVirtualFile { virtual_directory: virtual_path, - cid: cid_a, - peer_cid: Some(cid_b), + cid: *cid_a, + peer_cid: Some(*cid_b), request_id: Uuid::new_v4(), }; to_service_a.send(delete_file_command).unwrap(); @@ -381,7 +393,7 @@ mod tests { cid: response_cid, request_id: _, }) => { - assert_eq!(cid_a, response_cid); + assert_eq!(*cid_a, response_cid); } _ => { info!(target = "citadel", "{:?}", delete_file_response); diff --git a/citadel_workspace_service/tests/group_chat.rs b/citadel_workspace_service/tests/group_chat.rs new file mode 100644 index 0000000..19ba332 --- /dev/null +++ b/citadel_workspace_service/tests/group_chat.rs @@ -0,0 +1,1220 @@ +mod common; + +#[cfg(test)] +mod tests { + use crate::common::*; + use bytes::BytesMut; + use citadel_logging::info; + use citadel_sdk::prelude::{MemberState, UserIdentifier}; + use citadel_workspace_types::{ + GroupCreateSuccess, GroupDisconnected, GroupEndSuccess, GroupEnded, GroupInvitation, + GroupInviteSuccess, GroupJoinRequestReceived, GroupKickFailure, GroupKickSuccess, + GroupLeaveSuccess, GroupLeft, GroupListGroupsForSuccess, GroupMemberStateChanged, + GroupMessageReceived, GroupMessageResponse, GroupMessageSuccess, GroupRequestDeclined, + GroupRequestJoinFailure, GroupRequestJoinSuccess, GroupRespondRequestFailure, + GroupRespondRequestSuccess, InternalServiceRequest, InternalServiceResponse, + }; + use std::error::Error; + use std::net::SocketAddr; + use uuid::Uuid; + + #[tokio::test] + async fn test_citadel_workspace_service_group_create() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let mut initial_users_to_invite: Vec = Vec::new(); + initial_users_to_invite.push(UserIdentifier::from(cid_b)); + initial_users_to_invite.push(UserIdentifier::from(cid_c)); + let send_group_create_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: Some(initial_users_to_invite), + }; + to_service_a.send(send_group_create_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","Service A: {deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid: _, + group_key, + request_id: _, + }) = &deserialized_service_a_payload_response + { + let owner_group_key = *group_key; + + // Service B Declines Group Invitation + let service_b_group_create_invite = from_service_b.recv().await.unwrap(); + info!(target: "citadel","Service B: {service_b_group_create_invite:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_b_group_create_invite + { + assert_eq!(*peer_cid, cid_a); + assert_eq!(*group_key, owner_group_key.clone()); + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_b, + peer_cid: *peer_cid, + group_key: *group_key, + response: false, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service B Sending Invite Response"); + to_service_b.send(group_invite_response).unwrap(); + let deserialized_service_b_payload_response = from_service_b.recv().await.unwrap(); + info!(target: "citadel","Service B Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service B: Successfully Declined Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + panic!("Service B Failed Upon Responding to Group Invite: {message:?}"); + } + info!(target: "citadel","{deserialized_service_b_payload_response:?}"); + } else { + panic!("Service B Invitation Not Received"); + } + + // Service C Accepts Group Invitation + let service_c_group_create_invite = from_service_c.recv().await.unwrap(); + info!(target: "citadel","Service C: {service_c_group_create_invite:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_c_group_create_invite + { + assert_eq!(*group_key, owner_group_key.clone()); + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_c, + peer_cid: *peer_cid, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service C Sending Invite Response"); + to_service_c.send(group_invite_response).unwrap(); + let deserialized_service_c_payload_response = from_service_c.recv().await.unwrap(); + info!(target: "citadel","Service C Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service C: Successfully Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + panic!("Service C Failed Upon Responding to Group Invite: {message:?}"); + } + info!(target: "citadel","{deserialized_service_c_payload_response:?}"); + } else { + panic!("Service C Invitation Not Received"); + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } + + #[tokio::test] + async fn test_citadel_workspace_service_group_invite() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let send_group_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: None, + }; + to_service_a.send(send_group_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","Service A: {deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid: _, + group_key, + request_id: _, + }) = &deserialized_service_a_payload_response + { + // Invite Service B and Accept it + let send_group_payload = InternalServiceRequest::GroupInvite { + cid: cid_a, + peer_cid: cid_b, + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_a.send(send_group_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupInviteSuccess(GroupInviteSuccess { .. }) = + &deserialized_service_a_payload_response + { + let service_b_group_inbound = from_service_b.recv().await.unwrap(); + let owner_group_key = *group_key; + info!(target: "citadel","Service B: {service_b_group_inbound:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_b_group_inbound + { + let service_b_group_outbound = InternalServiceRequest::GroupRespondRequest { + cid: cid_b, + peer_cid: *peer_cid, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service B Sending Invite Response"); + to_service_b.send(service_b_group_outbound).unwrap(); + let deserialized_service_b_payload_response = + from_service_b.recv().await.unwrap(); + info!(target: "citadel","Service B Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service B: Successfully Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + panic!("Service B Failed Upon Responding to Group Invite: {message:?}"); + } + info!(target: "citadel","{deserialized_service_b_payload_response:?}"); + } + } else { + panic!("Service A Panicked When looking for Group Invite Response for Service B"); + } + + let _ = from_service_a.recv().await.unwrap(); // Receive unnecessary MemberStateChanged + + // Invite Service C and Decline it + let send_group_payload = InternalServiceRequest::GroupInvite { + cid: cid_a, + peer_cid: cid_c, + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_a.send(send_group_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupInviteSuccess(GroupInviteSuccess { .. }) = + &deserialized_service_a_payload_response + { + let service_c_group_inbound = from_service_c.recv().await.unwrap(); + let owner_group_key = *group_key; + info!(target: "citadel","Service C: {service_c_group_inbound:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_c_group_inbound + { + let service_c_group_outbound = InternalServiceRequest::GroupRespondRequest { + cid: cid_c, + peer_cid: *peer_cid, + group_key: *group_key, + response: false, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service C Sending Invite Response"); + to_service_c.send(service_c_group_outbound).unwrap(); + let deserialized_service_c_payload_response = + from_service_c.recv().await.unwrap(); + info!(target: "citadel","Service C Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service C: Successfully Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + panic!("Service C Failed Upon Responding to Group Invite: {message:?}"); + } + info!(target: "citadel","{deserialized_service_c_payload_response:?}"); + } + } else { + panic!("Service A Panicked When looking for Group Invite Response for Service C"); + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } + + #[tokio::test] + async fn test_citadel_workspace_service_group_request_join() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let send_group_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: None, + }; + to_service_a.send(send_group_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","Service A: {deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { .. }) = + &deserialized_service_a_payload_response + { + // Service B Requests to Join and Service A Accepts + let service_b_group_outbound = InternalServiceRequest::GroupListGroupsFor { + cid: cid_b, + peer_cid: cid_a, + request_id: Uuid::new_v4(), + }; + to_service_b.send(service_b_group_outbound).unwrap(); + info!(target: "citadel","Service B Requesting Groups for Service A"); + let service_b_group_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupListGroupsForSuccess(GroupListGroupsForSuccess { + cid: _, + peer_cid: _, + group_list, + request_id: _, + }) = &service_b_group_inbound + { + info!(target: "citadel", "Service B Got Success Response with groups: {group_list:?}"); + if let Some(&group_to_join) = group_list.clone().unwrap().first() { + info!(target: "citadel","Service B Found Group {group_to_join:?} for Service A"); + let service_b_group_outbound = InternalServiceRequest::GroupRequestJoin { + cid: cid_b, + group_key: group_to_join, + request_id: Uuid::new_v4(), + }; + to_service_b.send(service_b_group_outbound).unwrap(); + info!(target: "citadel","Service B Sending Group Join Request"); + let service_b_group_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupRequestJoinSuccess( + GroupRequestJoinSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &service_b_group_inbound + { + assert_eq!(group_to_join, *group_key); + info!(target: "citadel","Service B Requested To Join Group"); + } else if let InternalServiceResponse::GroupRequestJoinFailure( + GroupRequestJoinFailure { + cid: _, + message, + request_id: _, + }, + ) = &service_b_group_inbound + { + panic!("Service B Group Request Join Failure: {message:?}"); + } + + let service_a_group_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupJoinRequestReceived( + GroupJoinRequestReceived { + cid: _, + peer_cid: _, + group_key, + request_id: _, + }, + ) = &service_a_group_inbound + { + let service_a_group_outbound = + InternalServiceRequest::GroupRespondRequest { + cid: cid_a, + peer_cid: cid_b, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: false, + }; + to_service_a.send(service_a_group_outbound).unwrap(); + let service_a_group_inbound = from_service_a.recv().await.unwrap(); + info!(target: "citadel","Service A Received Response {service_a_group_inbound:?}"); + + info!(target: "citadel","Service A Accepted Join Request"); + + let service_b_group_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupMemberStateChanged( + GroupMemberStateChanged { + cid: _, + group_key: joined_group, + state, + request_id: _, + }, + ) = &service_b_group_inbound + { + match state { + MemberState::EnteredGroup { cids } => { + info!(target: "citadel","Service B {cids:?} Joined Group {joined_group:?}"); + } + _ => { + panic!("Service B Group Join Fatal Error") + } + } + } else { + info!(target: "citadel","Service B Waiting for MemberStateChanged - Received {service_b_group_inbound:?}"); + } + } else { + info!(target: "citadel","Service A Waiting for GroupJoinRequestReceived - Received {service_a_group_inbound:?}"); + } + } + } else { + panic!("Service B List Groups Failure"); + } + + // Service C Requests to Join and Service A Declines + let service_c_group_outbound = InternalServiceRequest::GroupListGroupsFor { + cid: cid_c, + peer_cid: cid_a, + request_id: Uuid::new_v4(), + }; + to_service_c.send(service_c_group_outbound).unwrap(); + info!(target: "citadel","Service C Requesting Groups for Service A"); + let service_c_group_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupListGroupsForSuccess(GroupListGroupsForSuccess { + cid: _, + peer_cid: _, + group_list, + request_id: _, + }) = &service_c_group_inbound + { + info!(target: "citadel", "Service C Got Success Response with groups: {group_list:?}"); + if let Some(&group_to_join) = group_list.clone().unwrap().first() { + info!(target: "citadel","Service C Found Group {group_to_join:?} for Service A: {cid_a:?}"); + let service_c_group_outbound = InternalServiceRequest::GroupRequestJoin { + cid: cid_c, + group_key: group_to_join, + request_id: Uuid::new_v4(), + }; + to_service_c.send(service_c_group_outbound).unwrap(); + info!(target: "citadel","Service C Sending Group Join Request"); + let service_c_group_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupRequestJoinSuccess( + GroupRequestJoinSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &service_c_group_inbound + { + assert_eq!(group_to_join, *group_key); + info!(target: "citadel","Service C Requested To Join Group"); + } else if let InternalServiceResponse::GroupRequestJoinFailure( + GroupRequestJoinFailure { + cid: _, + message, + request_id: _, + }, + ) = &service_c_group_inbound + { + panic!("Service C Group Request Join Failure: {message:?}"); + } + + let service_a_group_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupJoinRequestReceived( + GroupJoinRequestReceived { + cid: _, + peer_cid: _, + group_key, + request_id: _, + }, + ) = &service_a_group_inbound + { + let service_a_group_outbound = + InternalServiceRequest::GroupRespondRequest { + cid: cid_a, + peer_cid: cid_c, + group_key: *group_key, + response: false, + request_id: Uuid::new_v4(), + invitation: false, + }; + to_service_a.send(service_a_group_outbound).unwrap(); + info!(target: "citadel","Service A Declined Join Request"); + let service_c_group_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupRequestDeclined( + GroupRequestDeclined { .. }, + ) = &service_c_group_inbound + { + info!(target: "citadel", "Service C Successfully Received Decline Response for Request Join"); + } else { + panic!("Service C Waiting for Disconnected Response - Received {service_c_group_inbound:?}"); + } + } else { + info!(target: "citadel","Service A Waiting for GroupJoinRequestReceived - Received {service_a_group_inbound:?}"); + } + } else { + panic!("Service C Panicked While Finding Group To Join"); + } + } else { + panic!("Service C List Groups Failure"); + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } + + #[tokio::test] + async fn test_citadel_workspace_service_group_leave_and_end() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let mut initial_users_to_invite: Vec = Vec::new(); + initial_users_to_invite.push(UserIdentifier::from(cid_b)); + initial_users_to_invite.push(UserIdentifier::from(cid_c)); + let send_group_create_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: Some(initial_users_to_invite), + }; + to_service_a.send(send_group_create_payload).unwrap(); + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","Service A: {deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid: _, + group_key, + request_id: _, + }) = &deserialized_service_a_payload_response + { + let owner_group_key = *group_key; + + // Service B Accepts Invitation + let service_b_group_create_invite = from_service_b.recv().await.unwrap(); + info!(target: "citadel","Service B: {service_b_group_create_invite:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_b_group_create_invite + { + assert_eq!(*peer_cid, cid_a); + assert_eq!(*group_key, owner_group_key.clone()); + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_b, + peer_cid: *peer_cid, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service B Sending Invite Response"); + to_service_b.send(group_invite_response).unwrap(); + let deserialized_service_b_payload_response = from_service_b.recv().await.unwrap(); + info!(target: "citadel","Service B Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service B: Successfully Declined Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_b_payload_response + { + panic!("Service B Failed Upon Responding to Group Invite: {message:?}"); + } + } else { + panic!("Service B Invitation Not Received"); + } + + // Service C Accepts Group Invitation + let service_c_group_create_invite = from_service_c.recv().await.unwrap(); + info!(target: "citadel","Service C: {service_c_group_create_invite:?}"); + if let InternalServiceResponse::GroupInvitation(GroupInvitation { + cid: _, + peer_cid, + group_key, + request_id: _, + }) = &service_c_group_create_invite + { + assert_eq!(*group_key, owner_group_key.clone()); + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_c, + peer_cid: *peer_cid, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + info!(target: "citadel","Service C Sending Invite Response"); + to_service_c.send(group_invite_response).unwrap(); + let deserialized_service_c_payload_response = from_service_c.recv().await.unwrap(); + info!(target: "citadel","Service C Response Sent"); + if let InternalServiceResponse::GroupRespondRequestSuccess( + GroupRespondRequestSuccess { + cid: _, + group_key, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + assert_eq!(*group_key, owner_group_key.clone()); + info!(target: "citadel","Service C: Successfully Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure( + GroupRespondRequestFailure { + cid: _, + message, + request_id: _, + }, + ) = &deserialized_service_c_payload_response + { + panic!("Service C Failed Upon Responding to Group Invite: {message:?}"); + } + } else { + panic!("Service C Invitation Not Received"); + } + + // Service C Leaves Group + let service_c_outbound = InternalServiceRequest::GroupLeave { + cid: cid_c, + group_key: owner_group_key, + request_id: Uuid::new_v4(), + }; + info!(target: "citadel","Service C Leaving Group"); + to_service_c.send(service_c_outbound).unwrap(); + let service_c_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupLeaveSuccess(GroupLeaveSuccess { .. }) = + &service_c_inbound + { + info!(target: "citadel","Service C Successfully Requested to Leave Group"); + } else { + panic!("Service C panicked while attempting to leave group"); + } + let service_c_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupLeft(GroupLeft { + cid: _, + group_key: _, + success, + message: _, + request_id: _, + }) = &service_c_inbound + { + assert!(success); + info!(target: "citadel","Service C Successfully Left Group"); + } else { + panic!("Service C Failed to Leave Group"); + } + + // Service A Ends Group + let service_a_outbound = InternalServiceRequest::GroupEnd { + cid: cid_a, + group_key: owner_group_key, + request_id: Uuid::new_v4(), + }; + info!(target: "citadel","Service A Ending Group"); + to_service_a.send(service_a_outbound).unwrap(); + for _ in 0..4 { + // Receive the four MemberStateChanged Responses that are not needed here + let _ = from_service_a.recv().await.unwrap(); + } + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupEndSuccess(GroupEndSuccess { .. }) = + &service_a_inbound + { + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupEnded(GroupEnded { + cid: _, + group_key: ended_group, + success, + request_id: _, + }) = &service_a_inbound + { + assert_eq!(ended_group, group_key); + assert!(success); + info!(target: "citadel","Service A Successfully Ended Group"); + } else { + info!(target: "citadel", "Service A Waiting GroupEndSuccess and Received {service_a_inbound:?}"); + } + } else { + info!(target: "citadel", "Service A Waiting For GroupEnded Confirmation - Received {service_a_inbound:?}"); + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } + + #[tokio::test] + async fn test_citadel_workspace_service_group_kick() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let mut initial_users_to_invite: Vec = Vec::new(); + initial_users_to_invite.push(UserIdentifier::from(cid_b)); + initial_users_to_invite.push(UserIdentifier::from(cid_c)); + let send_group_create_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: Some(initial_users_to_invite), + }; + to_service_a.send(send_group_create_payload).unwrap(); + + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid: _, + group_key, + request_id: _, + }) = &deserialized_service_a_payload_response + { + // Service B Accepts Invitation + let service_b_group_create_invite = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupInvitation(..) = &service_b_group_create_invite { + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_b, + peer_cid: cid_a, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + to_service_b.send(group_invite_response).unwrap(); + let deserialized_service_b_payload_response = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupRespondRequestSuccess(..) = + &deserialized_service_b_payload_response + { + info!(target: "citadel","Service B Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure(..) = + &deserialized_service_b_payload_response + { + panic!("Service B Failed Upon Responding to Group Invite"); + } + } else { + panic!("Service B Invitation Not Received"); + } + + // Service C Accepts Group Invitation + let service_c_group_create_invite = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupInvitation(..) = &service_c_group_create_invite { + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_c, + peer_cid: cid_a, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + to_service_c.send(group_invite_response).unwrap(); + let deserialized_service_c_payload_response = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupRespondRequestSuccess(..) = + &deserialized_service_c_payload_response + { + info!(target: "citadel","Service C Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure(..) = + &deserialized_service_c_payload_response + { + panic!("Service C Failed Upon Responding to Group Invite"); + } + } else { + panic!("Service C Invitation Not Received"); + } + + let _ = from_service_a.recv().await.unwrap(); // Receive unnecessary MemberStateChanged + let _ = from_service_a.recv().await.unwrap(); // responses from Service B and C joining + + // Service A Kicks the other group members + let service_a_outbound = InternalServiceRequest::GroupKick { + cid: cid_a, + peer_cid: cid_b, + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_a.send(service_a_outbound).unwrap(); + + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupKickSuccess(GroupKickSuccess { + cid, + group_key: kick_group, + request_id: _, + }) = &service_a_inbound + { + assert_eq!(*cid, cid_a); + assert_eq!(kick_group, group_key); + info!(target: "citadel", "Service B was successfully kicked from the group {kick_group:?}"); + } else if let InternalServiceResponse::GroupKickFailure(GroupKickFailure { + cid: _, + message, + request_id: _, + }) = &service_a_inbound + { + panic!("Group Kick Error: Service B could not be kicked - {message:?}"); + } else { + panic!("Group Kick Error: Received Unexpected Response {service_a_inbound:?}"); + } + + let service_a_outbound = InternalServiceRequest::GroupKick { + cid: cid_a, + peer_cid: cid_c, + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_a.send(service_a_outbound).unwrap(); + + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupKickSuccess(GroupKickSuccess { + cid, + group_key: kick_group, + request_id: _, + }) = &service_a_inbound + { + assert_eq!(*cid, cid_a); + assert_eq!(kick_group, group_key); + info!(target: "citadel", "Service C was successfully kicked from the group {kick_group:?}"); + } else if let InternalServiceResponse::GroupKickFailure(GroupKickFailure { + cid: _, + message, + request_id: _, + }) = &service_a_inbound + { + panic!("Group Kick Error: Service C could not be kicked - {message:?}"); + } else { + panic!("Group Kick Error: Received Unexpected Response {service_a_inbound:?}"); + } + + // Service B is notified that it was kicked + let _ = from_service_b.recv().await.unwrap(); // MemberStateChanged from Service C Joining + let service_b_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupDisconnected(GroupDisconnected { + cid: _, + group_key: disconnected_group, + request_id: _, + }) = &service_b_inbound + { + assert_eq!(group_key, disconnected_group); + } else if let InternalServiceResponse::GroupLeft(GroupLeft { + cid: _, + group_key: group_left, + success: _, + message: _, + request_id: _, + }) = &service_b_inbound + { + assert_eq!(group_key, group_left); + } else { + panic! {"Service B did not received expected kick notification - instead received {service_b_inbound:?}"}; + } + + // Service C is notified that it was kicked + let _ = from_service_c.recv().await.unwrap(); // MemberStateChanged from Service B getting kicked + let _ = from_service_c.recv().await.unwrap(); // Extra MemberStateChanged Not Needed here + let service_c_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupDisconnected(GroupDisconnected { + cid: _, + group_key: disconnected_group, + request_id: _, + }) = &service_c_inbound + { + assert_eq!(group_key, disconnected_group); + } else if let InternalServiceResponse::GroupLeft(GroupLeft { + cid: _, + group_key: group_left, + success: _, + message: _, + request_id: _, + }) = &service_c_inbound + { + assert_eq!(group_key, group_left); + } else { + panic! {"Service C did not received expected kick notification - instead received {service_c_inbound:?}"}; + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } + + #[tokio::test] + async fn test_citadel_workspace_service_group_message() -> Result<(), Box> { + citadel_logging::setup_log(); + // internal service for peer A + let bind_address_internal_service_a: SocketAddr = "127.0.0.1:55536".parse().unwrap(); + // internal service for peer B + let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); + // internal service for peer C + let bind_address_internal_service_c: SocketAddr = "127.0.0.1:55538".parse().unwrap(); + + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + bind_address_internal_service_c, + ]) + .await?; + + let (to_service_a, mut from_service_a, cid_a) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_b, mut from_service_b, cid_b) = + peer_return_handle_vec.take_next_service_handle(); + let (to_service_c, mut from_service_c, cid_c) = + peer_return_handle_vec.take_next_service_handle(); + + let mut initial_users_to_invite: Vec = Vec::new(); + initial_users_to_invite.push(UserIdentifier::from(cid_b)); + initial_users_to_invite.push(UserIdentifier::from(cid_c)); + let send_group_create_payload = InternalServiceRequest::GroupCreate { + cid: cid_a, + request_id: Uuid::new_v4(), + initial_users_to_invite: Some(initial_users_to_invite), + }; + to_service_a.send(send_group_create_payload).unwrap(); + + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + + if let InternalServiceResponse::GroupCreateSuccess(GroupCreateSuccess { + cid: _, + group_key, + request_id: _, + }) = &deserialized_service_a_payload_response + { + // Service B Accepts Invitation + let service_b_group_create_invite = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupInvitation(..) = &service_b_group_create_invite { + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_b, + peer_cid: cid_a, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + to_service_b.send(group_invite_response).unwrap(); + let deserialized_service_b_payload_response = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupRespondRequestSuccess(..) = + &deserialized_service_b_payload_response + { + info!(target: "citadel","Service B Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure(..) = + &deserialized_service_b_payload_response + { + panic!("Service B Failed Upon Responding to Group Invite"); + } + } else { + panic!("Service B Invitation Not Received"); + } + + // Service C Accepts Group Invitation + let service_c_group_create_invite = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupInvitation(..) = &service_c_group_create_invite { + let group_invite_response = InternalServiceRequest::GroupRespondRequest { + cid: cid_c, + peer_cid: cid_a, + group_key: *group_key, + response: true, + request_id: Uuid::new_v4(), + invitation: true, + }; + to_service_c.send(group_invite_response).unwrap(); + let deserialized_service_c_payload_response = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupRespondRequestSuccess(..) = + &deserialized_service_c_payload_response + { + info!(target: "citadel","Service C Accepted Group Invite"); + } else if let InternalServiceResponse::GroupRespondRequestFailure(..) = + &deserialized_service_c_payload_response + { + panic!("Service C Failed Upon Responding to Group Invite"); + } + } else { + panic!("Service C Invitation Not Received"); + } + + let _ = from_service_a.recv().await.unwrap(); // Receive Unnecessary MemberStateChanged Responses + let _ = from_service_a.recv().await.unwrap(); // from Service B and Service C Joining Group + let _ = from_service_b.recv().await.unwrap(); + + let service_a_message = BytesMut::from("Service A Test Message"); + let service_b_message = BytesMut::from("Service B Test Message"); + + // Service A Sends a Message + let service_a_outbound = InternalServiceRequest::GroupMessage { + cid: cid_a, + message: service_a_message.clone(), + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_a.send(service_a_outbound).unwrap(); + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageSuccess(GroupMessageSuccess { .. }) = + &service_a_inbound + { + info!(target: "citadel","Service A Received GroupMessageSuccess"); + + // All Services Receive Message + let service_b_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: _, + peer_cid: _, + message, + group_key: _, + request_id: _, + }) = &service_b_inbound + { + info!(target: "citadel","Service B received message from Group A"); + assert_eq!(*message, service_a_message.clone()); + } else { + panic!("Service B Did Not Receive Message - instead received {service_b_inbound:?}"); + } + let service_c_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: _, + peer_cid: _, + message, + group_key: _, + request_id: _, + }) = &service_c_inbound + { + info!(target: "citadel","Service C received message from Group A"); + assert_eq!(*message, service_a_message.clone()); + } else { + panic!("Service C Did Not Receive Message - instead received {service_c_inbound:?}"); + } + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageResponse(GroupMessageResponse { + cid: _, + group_key: _, + success, + request_id: _, + }) = &service_a_inbound + { + if *success { + info!(target: "citadel","Service A Successfully received Group Message Response"); + } else { + panic!("Service A Group Message Response was unsuccessful"); + } + } else { + panic!("Service A Did Not Receive Message Response - instead received {service_a_inbound:?}"); + } + } else { + panic!("Service A Did Not Receive GroupMessageSuccess - instead received {service_a_inbound:?}"); + } + + // Service B Sends a Message + let service_b_outbound = InternalServiceRequest::GroupMessage { + cid: cid_b, + message: service_b_message.clone(), + group_key: *group_key, + request_id: Uuid::new_v4(), + }; + to_service_b.send(service_b_outbound).unwrap(); + let service_b_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageSuccess(GroupMessageSuccess { .. }) = + &service_b_inbound + { + info!(target: "citadel","Service B Received GroupMessageSuccess"); + + // All Services Receive Message + let service_a_inbound = from_service_a.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: _, + peer_cid: _, + message, + group_key: _, + request_id: _, + }) = &service_a_inbound + { + info!(target: "citadel","Service A received message from Service B in Group"); + assert_eq!(*message, service_b_message.clone()); + } else { + panic!("Service A Did Not Receive Message - instead received {service_a_inbound:?}"); + } + let service_c_inbound = from_service_c.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageReceived(GroupMessageReceived { + cid: _, + peer_cid: _, + message, + group_key: _, + request_id: _, + }) = &service_c_inbound + { + info!(target: "citadel","Service C received message from Service B in Group"); + assert_eq!(*message, service_b_message.clone()); + } else { + panic!("Service C Did Not Receive Message - instead received {service_c_inbound:?}"); + } + let service_b_inbound = from_service_b.recv().await.unwrap(); + if let InternalServiceResponse::GroupMessageResponse(GroupMessageResponse { + cid: _, + group_key: _, + success, + request_id: _, + }) = &service_b_inbound + { + if *success { + info!(target: "citadel","Service B Successfully received Group Message Response"); + } else { + panic!("Service B Group Message Response was unsuccessful"); + } + } else { + panic!("Service B Did Not Receive Message Response - instead received {service_b_inbound:?}"); + } + } else { + panic!("Service B Did Not Receive GroupMessageSuccess - instead received {service_b_inbound:?}"); + } + } else { + panic! {"Group Creation Error: Service A did not receive success response"}; + } + + Ok(()) + } +} diff --git a/citadel_workspace_service/tests/service.rs b/citadel_workspace_service/tests/service.rs index 6bbb55c..9f5edf1 100644 --- a/citadel_workspace_service/tests/service.rs +++ b/citadel_workspace_service/tests/service.rs @@ -5,7 +5,7 @@ mod tests { use crate::common::{ register_and_connect_to_server, register_and_connect_to_server_then_peers, send, server_info_reactive_skip_cert_verification, server_info_skip_cert_verification, - spawn_services, test_kv_for_service, + spawn_services, test_kv_for_service, RegisterAndConnectItems, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -18,7 +18,9 @@ mod tests { use core::panic; use futures::StreamExt; use std::error::Error; + use std::future::Future; use std::net::SocketAddr; + use std::pin::Pin; use std::time::Duration; use tokio::net::TcpStream; use uuid::Uuid; @@ -50,28 +52,32 @@ mod tests { info!(target: "citadel", "about to connect to internal service"); - let (to_service, mut from_service, cid) = register_and_connect_to_server( - bind_address_internal_service, - server_bind_address, - "John Doe", - "john.doe", - "secret", - ) - .await - .unwrap(); - let disconnect_command = InternalServiceRequest::Disconnect { - cid, - request_id: Uuid::new_v4(), - }; - to_service.send(disconnect_command).unwrap(); - let disconnect_response = from_service.recv().await.unwrap(); + let to_spawn = vec![RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service, + server_addr: server_bind_address, + full_name: "John Doe", + username: "john.doe", + password: "secret", + }]; + let returned_service_info = register_and_connect_to_server(to_spawn).await; + let mut service_vec = returned_service_info.unwrap(); + if let Some((to_service, from_service, cid)) = service_vec.get_mut(0_usize) { + let disconnect_command = InternalServiceRequest::Disconnect { + cid: *cid, + request_id: Uuid::new_v4(), + }; + to_service.send(disconnect_command).unwrap(); + let disconnect_response = from_service.recv().await.unwrap(); - assert!(matches!( - disconnect_response, - InternalServiceResponse::Disconnected { .. } - )); + assert!(matches!( + disconnect_response, + InternalServiceResponse::Disconnected { .. } + )); - Ok(()) + Ok(()) + } else { + panic!("Service Spawn Error") + } } // test @@ -112,63 +118,66 @@ mod tests { info!(target: "citadel", "about to connect to internal service"); - let (to_service, mut from_service, cid) = register_and_connect_to_server( - bind_address_internal_service, - server_bind_address, - "John Doe", - "john.doe", - "secret", - ) - .await - .unwrap(); - - let serialized_message = bincode2::serialize("Message Test").unwrap(); - let message_command = InternalServiceRequest::Message { - message: serialized_message, - cid, - peer_cid: None, - security_level: SecurityLevel::Standard, - request_id: Uuid::new_v4(), - }; - to_service.send(message_command).unwrap(); - let deserialized_message_response = from_service.recv().await.unwrap(); - info!(target: "citadel","{deserialized_message_response:?}"); - - if let InternalServiceResponse::MessageSent(MessageSent { cid, .. }) = - deserialized_message_response - { - info!(target:"citadel", "Message {cid}"); + let to_spawn = vec![RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service, + server_addr: server_bind_address, + full_name: "John Doe", + username: "john.doe", + password: "secret", + }]; + let returned_service_info = register_and_connect_to_server(to_spawn).await; + let mut service_vec = returned_service_info.unwrap(); + if let Some((to_service, from_service, cid)) = service_vec.get_mut(0_usize) { + let serialized_message = bincode2::serialize("Message Test").unwrap(); + let message_command = InternalServiceRequest::Message { + message: serialized_message, + cid: *cid, + peer_cid: None, + security_level: SecurityLevel::Standard, + request_id: Uuid::new_v4(), + }; + to_service.send(message_command).unwrap(); let deserialized_message_response = from_service.recv().await.unwrap(); - if let InternalServiceResponse::MessageReceived(MessageReceived { - message, - cid, - peer_cid: _, - request_id: _, - }) = deserialized_message_response + info!(target: "citadel","{deserialized_message_response:?}"); + + if let InternalServiceResponse::MessageSent(MessageSent { cid, .. }) = + deserialized_message_response { - println!("{message:?}"); - assert_eq!(SecBuffer::from("pong"), message); - info!(target:"citadel", "Message sending success {cid}"); + info!(target:"citadel", "Message {cid}"); + let deserialized_message_response = from_service.recv().await.unwrap(); + if let InternalServiceResponse::MessageReceived(MessageReceived { + message, + cid, + peer_cid: _, + request_id: _, + }) = deserialized_message_response + { + println!("{message:?}"); + assert_eq!(SecBuffer::from("pong"), message); + info!(target:"citadel", "Message sending success {cid}"); + } else { + panic!("Message sending is not right"); + } } else { - panic!("Message sending is not right"); + panic!("Message sending failed"); } - } else { - panic!("Message sending failed"); - } - let disconnect_command = InternalServiceRequest::Disconnect { - cid, - request_id: Uuid::new_v4(), - }; - to_service.send(disconnect_command).unwrap(); - let disconnect_response = from_service.recv().await.unwrap(); + let disconnect_command = InternalServiceRequest::Disconnect { + cid: *cid, + request_id: Uuid::new_v4(), + }; + to_service.send(disconnect_command).unwrap(); + let disconnect_response = from_service.recv().await.unwrap(); - assert!(matches!( - disconnect_response, - InternalServiceResponse::Disconnected { .. } - )); + assert!(matches!( + disconnect_response, + InternalServiceResponse::Disconnected { .. } + )); - Ok(()) + Ok(()) + } else { + panic!("Service Spawn Error") + } } #[tokio::test] @@ -297,10 +306,10 @@ mod tests { #[tokio::test] async fn test_citadel_workspace_service_peer_test() -> Result<(), Box> { citadel_logging::setup_log(); - let _ = register_and_connect_to_server_then_peers( + let _ = register_and_connect_to_server_then_peers(vec![ "127.0.0.1:55526".parse().unwrap(), "127.0.0.1:55527".parse().unwrap(), - ) + ]) .await?; Ok(()) } @@ -309,17 +318,20 @@ mod tests { async fn test_citadel_workspace_service_peer_test_list_peers() -> Result<(), Box> { citadel_logging::setup_log(); - let (to_service_a, mut from_service_a, to_service_b, mut from_service_b, cid_a, cid_b) = - register_and_connect_to_server_then_peers( - "127.0.0.1:55526".parse().unwrap(), - "127.0.0.1:55527".parse().unwrap(), - ) - .await?; + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + "127.0.0.1:55526".parse().unwrap(), + "127.0.0.1:55527".parse().unwrap(), + ]) + .await?; + + let (peer_one, peer_two) = peer_return_handle_vec.as_mut_slice().split_at_mut(1_usize); + let (to_service_a, from_service_a, cid_a) = peer_one.get_mut(0_usize).unwrap(); + let (to_service_b, from_service_b, cid_b) = peer_two.get_mut(0_usize).unwrap(); // Test that service A views the right information - test_list_peers(cid_a, cid_b, &to_service_a, &mut from_service_a).await; + test_list_peers(*cid_a, *cid_b, to_service_a, from_service_a).await; // Test that service B views the right information - test_list_peers(cid_b, cid_a, &to_service_b, &mut from_service_b).await; + test_list_peers(*cid_b, *cid_a, to_service_b, from_service_b).await; Ok(()) } @@ -332,18 +344,21 @@ mod tests { // internal service for peer B let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); - let (to_service_a, mut from_service_a, _to_service_b, mut from_service_b, cid_a, cid_b) = - register_and_connect_to_server_then_peers( - bind_address_internal_service_a, - bind_address_internal_service_b, - ) - .await?; + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + ]) + .await?; + + let (peer_one, peer_two) = peer_return_handle_vec.as_mut_slice().split_at_mut(1_usize); + let (to_service_a, from_service_a, cid_a) = peer_one.get_mut(0_usize).unwrap(); + let (_to_service_b, from_service_b, cid_b) = peer_two.get_mut(0_usize).unwrap(); let service_a_message = Vec::from("Hello World"); let service_a_message_payload = InternalServiceRequest::Message { message: service_a_message.clone(), - cid: cid_a, - peer_cid: Some(cid_b), + cid: *cid_a, + peer_cid: Some(*cid_b), security_level: Default::default(), request_id: Uuid::new_v4(), }; @@ -390,19 +405,38 @@ mod tests { )) .unwrap(); - spawn_services(internal_service, server); + let mut internal_services: Vec< + Pin>> + Send + 'static>>, + > = Vec::new(); + internal_services.push(Box::pin(async move { + match internal_service.await { + Err(err) => Err(Box::try_from(err).unwrap()), + _ => Ok(()), + } + })); + internal_services.push(Box::pin(async move { + match server.await { + Err(err) => Err(Box::try_from(err).unwrap()), + _ => Ok(()), + } + })); + spawn_services(internal_services); tokio::time::sleep(Duration::from_millis(2000)).await; - let (to_service_a, mut from_service_a, cid) = register_and_connect_to_server( - bind_address_internal_service_a, - server_bind_address, - "peer a", - "peer.a", - "password", - ) - .await?; - - test_kv_for_service(&to_service_a, &mut from_service_a, cid, None).await + let to_spawn = vec![RegisterAndConnectItems { + internal_service_addr: bind_address_internal_service_a, + server_addr: server_bind_address, + full_name: "peer a", + username: "peer.a", + password: "password", + }]; + let returned_service_info = register_and_connect_to_server(to_spawn).await; + let mut service_vec = returned_service_info.unwrap(); + if let Some((to_service_a, from_service_a, cid)) = service_vec.get_mut(0_usize) { + test_kv_for_service(to_service_a, from_service_a, *cid, None).await + } else { + panic!("Service Spawn Error") + } } #[tokio::test] @@ -413,15 +447,18 @@ mod tests { // internal service for peer B let bind_address_internal_service_b: SocketAddr = "127.0.0.1:55537".parse().unwrap(); - let (to_service_a, mut from_service_a, to_service_b, mut from_service_b, cid_a, cid_b) = - register_and_connect_to_server_then_peers( - bind_address_internal_service_a, - bind_address_internal_service_b, - ) - .await?; + let mut peer_return_handle_vec = register_and_connect_to_server_then_peers(vec![ + bind_address_internal_service_a, + bind_address_internal_service_b, + ]) + .await?; + + let (peer_one, peer_two) = peer_return_handle_vec.as_mut_slice().split_at_mut(1_usize); + let (to_service_a, from_service_a, cid_a) = peer_one.get_mut(0_usize).unwrap(); + let (to_service_b, from_service_b, cid_b) = peer_two.get_mut(0_usize).unwrap(); - test_kv_for_service(&to_service_a, &mut from_service_a, cid_a, Some(cid_b)).await?; - test_kv_for_service(&to_service_b, &mut from_service_b, cid_b, Some(cid_a)).await?; + test_kv_for_service(to_service_a, from_service_a, *cid_a, Some(*cid_b)).await?; + test_kv_for_service(to_service_b, from_service_b, *cid_b, Some(*cid_a)).await?; Ok(()) } } diff --git a/citadel_workspace_types/src/lib.rs b/citadel_workspace_types/src/lib.rs index e4067ce..7bb8cdb 100644 --- a/citadel_workspace_types/src/lib.rs +++ b/citadel_workspace_types/src/lib.rs @@ -3,7 +3,7 @@ pub use citadel_sdk::prelude::{ ConnectMode, ObjectTransferStatus, SecBuffer, SecurityLevel, SessionSecuritySettings, UdpMode, UserIdentifier, }; -use citadel_sdk::prelude::{TransferType, VirtualObjectMetadata}; +use citadel_sdk::prelude::{MemberState, MessageGroupKey, TransferType, VirtualObjectMetadata}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::net::SocketAddr; @@ -154,6 +154,257 @@ pub struct PeerRegisterFailure { pub request_id: Option, } +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupChannelCreateSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupChannelCreateFailure { + pub cid: u64, + pub group_key: MessageGroupKey, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupBroadcastHandleFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupCreateSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupCreateFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupLeaveSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupLeaveFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupEndSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupEndFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupEnded { + pub cid: u64, + pub group_key: MessageGroupKey, + pub success: bool, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupLeft { + pub cid: u64, + pub group_key: MessageGroupKey, + pub success: bool, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMessageReceived { + pub cid: u64, + pub peer_cid: u64, + pub message: BytesMut, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMessageSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMessageResponse { + pub cid: u64, + pub group_key: MessageGroupKey, + pub success: bool, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMessageFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupInvitation { + pub cid: u64, + pub peer_cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupInviteSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupInviteFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRespondRequestSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRespondRequestFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMembershipResponse { + pub cid: u64, + pub group_key: MessageGroupKey, + pub success: bool, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRequestJoinPending { + pub cid: u64, + pub group_key: MessageGroupKey, + pub result: Result<(), String>, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupDisconnected { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupKickSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupKickFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupListGroupsForSuccess { + pub cid: u64, + pub peer_cid: u64, + pub group_list: Option>, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupListGroupsForFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupListGroupsResponse { + pub cid: u64, + pub group_list: Option>, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupJoinRequestReceived { + pub cid: u64, + pub peer_cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRequestJoinAccepted { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRequestDeclined { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRequestJoinSuccess { + pub cid: u64, + pub group_key: MessageGroupKey, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupRequestJoinFailure { + pub cid: u64, + pub message: String, + pub request_id: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct GroupMemberStateChanged { + pub cid: u64, + pub group_key: MessageGroupKey, + pub state: MemberState, + pub request_id: Option, +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct LocalDBGetKVSuccess { pub cid: u64, @@ -320,6 +571,40 @@ pub enum InternalServiceResponse { PeerDisconnectFailure(PeerDisconnectFailure), PeerRegisterSuccess(PeerRegisterSuccess), PeerRegisterFailure(PeerRegisterFailure), + GroupChannelCreateSuccess(GroupChannelCreateSuccess), + GroupChannelCreateFailure(GroupChannelCreateFailure), + GroupBroadcastHandleFailure(GroupBroadcastHandleFailure), + GroupCreateSuccess(GroupCreateSuccess), + GroupCreateFailure(GroupCreateFailure), + GroupLeaveSuccess(GroupLeaveSuccess), + GroupLeaveFailure(GroupLeaveFailure), + GroupLeft(GroupLeft), + GroupEndSuccess(GroupEndSuccess), + GroupEndFailure(GroupEndFailure), + GroupEnded(GroupEnded), + GroupMessageReceived(GroupMessageReceived), + GroupMessageResponse(GroupMessageResponse), + GroupMessageSuccess(GroupMessageSuccess), + GroupMessageFailure(GroupMessageFailure), + GroupInvitation(GroupInvitation), + GroupInviteSuccess(GroupInviteSuccess), + GroupInviteFailure(GroupInviteFailure), + GroupRespondRequestSuccess(GroupRespondRequestSuccess), + GroupRespondRequestFailure(GroupRespondRequestFailure), + GroupMembershipResponse(GroupMembershipResponse), + GroupRequestJoinPending(GroupRequestJoinPending), + GroupDisconnected(GroupDisconnected), + GroupKickSuccess(GroupKickSuccess), + GroupKickFailure(GroupKickFailure), + GroupListGroupsForSuccess(GroupListGroupsForSuccess), + GroupListGroupsForFailure(GroupListGroupsForFailure), + GroupListGroupsResponse(GroupListGroupsResponse), + GroupJoinRequestReceived(GroupJoinRequestReceived), + GroupRequestJoinAccepted(GroupRequestJoinAccepted), + GroupRequestDeclined(GroupRequestDeclined), + GroupRequestJoinSuccess(GroupRequestJoinSuccess), + GroupMemberStateChanged(GroupMemberStateChanged), + GroupRequestJoinFailure(GroupRequestJoinFailure), LocalDBGetKVSuccess(LocalDBGetKVSuccess), LocalDBGetKVFailure(LocalDBGetKVFailure), LocalDBSetKVSuccess(LocalDBSetKVSuccess), @@ -401,12 +686,6 @@ pub enum InternalServiceRequest { peer_cid: Option, request_id: Uuid, }, - StartGroup { - initial_users_to_invite: Option>, - cid: u64, - session_security_settings: SessionSecuritySettings, - request_id: Uuid, - }, ListAllPeers { request_id: Uuid, cid: u64, @@ -472,6 +751,57 @@ pub enum InternalServiceRequest { // the command will reply with information for all accounts cid: Option, }, + GroupCreate { + cid: u64, + request_id: Uuid, + initial_users_to_invite: Option>, + }, + GroupLeave { + cid: u64, + group_key: MessageGroupKey, + request_id: Uuid, + }, + GroupEnd { + cid: u64, + group_key: MessageGroupKey, + request_id: Uuid, + }, + GroupMessage { + cid: u64, + message: BytesMut, + group_key: MessageGroupKey, + request_id: Uuid, + }, + GroupInvite { + cid: u64, + peer_cid: u64, + group_key: MessageGroupKey, + request_id: Uuid, + }, + GroupRespondRequest { + cid: u64, + peer_cid: u64, + group_key: MessageGroupKey, + response: bool, + request_id: Uuid, + invitation: bool, + }, + GroupKick { + cid: u64, + peer_cid: u64, + group_key: MessageGroupKey, + request_id: Uuid, + }, + GroupListGroupsFor { + cid: u64, + peer_cid: u64, + request_id: Uuid, + }, + GroupRequestJoin { + cid: u64, + group_key: MessageGroupKey, + request_id: Uuid, + }, } #[derive(Serialize, Deserialize, Clone, Debug)]