diff --git a/Cargo.toml b/Cargo.toml index c118bee..bb339b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,9 +22,9 @@ citadel-internal-service-connector = { path = "./citadel-internal-service-connec citadel-internal-service-macros = { path = "./citadel-internal-service-macros", default-features = false, version = "0.1.0" } # Avarok deps -citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" } -citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol" } -citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol", default-features = false } +citadel_sdk = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } +citadel_types = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } +citadel_logging = { git = "https://github.com/Avarok-Cybersecurity/Citadel-Protocol/" } # Standard deps serde = { version = "1.0.104", features = ["derive"] } @@ -41,4 +41,5 @@ uuid = { version="1.3.3", features = [ anyhow = "1.0.71" async-recursion = { version = "1.0.4" } parking_lot = { version = "0.12.1" } -structopt = { version = "0.3.26" } \ No newline at end of file +structopt = { version = "0.3.26" } +lazy_static = "1.4.0" \ No newline at end of file diff --git a/citadel-internal-service-types/src/lib.rs b/citadel-internal-service-types/src/lib.rs index 6383705..cdef49b 100644 --- a/citadel-internal-service-types/src/lib.rs +++ b/citadel-internal-service-types/src/lib.rs @@ -115,6 +115,7 @@ pub struct DeleteVirtualFileFailure { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct PeerConnectSuccess { pub cid: u64, + pub peer_cid: u64, pub request_id: Option, } @@ -557,7 +558,7 @@ pub struct FileTransferStatusNotification { #[derive(Serialize, Deserialize, Debug, Clone)] pub struct FileTransferTickNotification { pub cid: u64, - pub peer_cid: u64, + pub peer_cid: Option, pub status: ObjectTransferStatus, } diff --git a/citadel-internal-service/Cargo.toml b/citadel-internal-service/Cargo.toml index 850e12e..00b9861 100644 --- a/citadel-internal-service/Cargo.toml +++ b/citadel-internal-service/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] +default = [] vendored = ["citadel_sdk/vendored"] [dependencies] diff --git a/citadel-internal-service/src/kernel/mod.rs b/citadel-internal-service/src/kernel/mod.rs index 53f8247..453f044 100644 --- a/citadel-internal-service/src/kernel/mod.rs +++ b/citadel-internal-service/src/kernel/mod.rs @@ -4,6 +4,7 @@ use citadel_internal_service_connector::util::wrap_tcp_conn; use citadel_internal_service_types::*; use citadel_logging::{error, info, warn}; use citadel_sdk::prefabs::ClientServerRemote; +use citadel_sdk::prelude::remote_specialization::PeerRemote; use citadel_sdk::prelude::VirtualTargetType; use citadel_sdk::prelude::*; use futures::stream::{SplitSink, StreamExt}; @@ -56,7 +57,7 @@ pub struct Connection { #[allow(dead_code)] struct PeerConnection { sink: PeerChannelSendHalf, - remote: SymmetricIdentifierHandle, + remote: PeerRemote, handler_map: HashMap>, associated_tcp_connection: Uuid, } @@ -88,7 +89,7 @@ impl Connection { &mut self, peer_cid: u64, sink: PeerChannelSendHalf, - remote: SymmetricIdentifierHandle, + remote: PeerRemote, ) { self.peers.insert( peer_cid, @@ -333,8 +334,8 @@ fn handle_connection( } } } - Err(_) => { - warn!(target: "citadel", "Bad message from client"); + Err(err) => { + warn!(target: "citadel", "Bad message from client: {err:?}"); } } } @@ -356,7 +357,7 @@ fn handle_connection( fn spawn_tick_updater( object_transfer_handler: ObjectTransferHandler, implicated_cid: u64, - peer_cid: u64, + peer_cid: Option, server_connection_map: &mut HashMap, tcp_connection_map: Arc>>>, ) { @@ -377,7 +378,7 @@ fn spawn_tick_updater( ); match entry.send(message.clone()) { Ok(_res) => { - info!(target: "citadel", "File Transfer Status Tick Sent"); + info!(target: "citadel", "File Transfer Status Tick Sent {status:?}"); } Err(err) => { warn!(target: "citadel", "File Transfer Status Tick Not Sent: {err:?}"); diff --git a/citadel-internal-service/src/kernel/request_handler.rs b/citadel-internal-service/src/kernel/request_handler.rs new file mode 100644 index 0000000..e69de29 diff --git a/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs b/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs index 5907f63..b5bf8fd 100644 --- a/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs +++ b/citadel-internal-service/src/kernel/requests/file/respond_file_transfer.rs @@ -32,7 +32,7 @@ pub async fn handle( spawn_tick_updater( owned_handler, cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); diff --git a/citadel-internal-service/src/kernel/requests/group/request_join.rs b/citadel-internal-service/src/kernel/requests/group/request_join.rs index 02d8a0d..eeeb78b 100644 --- a/citadel-internal-service/src/kernel/requests/group/request_join.rs +++ b/citadel-internal-service/src/kernel/requests/group/request_join.rs @@ -5,7 +5,7 @@ use citadel_internal_service_types::{ InternalServiceResponse, }; use citadel_sdk::prelude::{ - GroupBroadcast, GroupBroadcastCommand, GroupEvent, NodeRequest, NodeResult, + GroupBroadcast, GroupBroadcastCommand, GroupEvent, NodeRequest, NodeResult, TargetLockedRemote, }; use futures::StreamExt; use uuid::Uuid; @@ -38,7 +38,11 @@ pub async fn handle( implicated_cid: cid, command: group_request, }); - match peer_remote.send_callback_subscription(request).await { + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = Err("Group Request Join Failed".to_string()); while let Some(evt) = subscription.next().await { diff --git a/citadel-internal-service/src/kernel/requests/group/respond_request.rs b/citadel-internal-service/src/kernel/requests/group/respond_request.rs index a3151e9..3508be7 100644 --- a/citadel-internal-service/src/kernel/requests/group/respond_request.rs +++ b/citadel-internal-service/src/kernel/requests/group/respond_request.rs @@ -5,7 +5,8 @@ use citadel_internal_service_types::{ InternalServiceResponse, }; use citadel_sdk::prelude::{ - GroupBroadcast, GroupBroadcastCommand, GroupChannelCreated, GroupEvent, NodeRequest, NodeResult, + GroupBroadcast, GroupBroadcastCommand, GroupChannelCreated, GroupEvent, NodeRequest, + NodeResult, TargetLockedRemote, }; use futures::StreamExt; use uuid::Uuid; @@ -52,7 +53,11 @@ pub async fn handle( let peer_remote = peer_connection.remote.clone(); drop(server_connection_map); - match peer_remote.send_callback_subscription(request).await { + match peer_remote + .remote() + .send_callback_subscription(request) + .await + { Ok(mut subscription) => { let mut result = false; if invitation { diff --git a/citadel-internal-service/src/kernel/requests/peer/connect.rs b/citadel-internal-service/src/kernel/requests/peer/connect.rs index 0cb3915..83228a5 100644 --- a/citadel-internal-service/src/kernel/requests/peer/connect.rs +++ b/citadel-internal-service/src/kernel/requests/peer/connect.rs @@ -38,6 +38,7 @@ pub async fn handle( if already_connected { let response = InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, + peer_cid, request_id: Some(request_id), }); @@ -67,11 +68,7 @@ pub async fn handle( .await .get_mut(&cid) .unwrap() - .add_peer_connection( - peer_cid, - sink, - symmetric_identifier_handle_ref.into_owned(), - ); + .add_peer_connection(peer_cid, sink, peer_connect_success.remote); let hm_for_conn = this.tcp_connection_map.clone(); @@ -101,6 +98,7 @@ pub async fn handle( InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, + peer_cid, request_id: Some(request_id), }) } diff --git a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs index 4475f84..bdceb9e 100644 --- a/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs +++ b/citadel-internal-service/src/kernel/responses/object_transfer_handle.rs @@ -41,7 +41,7 @@ pub async fn handle( spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); @@ -74,7 +74,7 @@ pub async fn handle( spawn_tick_updater( object_transfer_handler, implicated_cid, - peer_cid, + Some(peer_cid), &mut server_connection_map, this.tcp_connection_map.clone(), ); diff --git a/citadel-internal-service/tests/common/mod.rs b/citadel-internal-service/tests/common/mod.rs index fbb8700..48dfcd9 100644 --- a/citadel-internal-service/tests/common/mod.rs +++ b/citadel-internal-service/tests/common/mod.rs @@ -2,7 +2,8 @@ use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_connector::util::{InternalServiceConnector, WrappedSink}; use citadel_internal_service_types::{ - InternalServiceRequest, InternalServiceResponse, PeerConnectSuccess, PeerRegisterSuccess, + FileTransferTickNotification, InternalServiceRequest, InternalServiceResponse, + PeerConnectNotification, PeerConnectSuccess, PeerRegisterNotification, PeerRegisterSuccess, }; use citadel_logging::info; use citadel_sdk::prefabs::server::client_connect_listener::ClientConnectListenerKernel; @@ -193,6 +194,7 @@ pub async fn register_and_connect_to_server_then_peers( info!(target: "citadel", "Internal Service Spawning"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel) @@ -244,157 +246,172 @@ pub async fn register_and_connect_to_server_then_peers( let (ref mut to_service_b, ref mut from_service_b, cid_b) = neighbor; let session_security_settings = SessionSecuritySettingsBuilder::default().build().unwrap(); + register_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + session_security_settings, + ) + .await?; + + connect_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + session_security_settings, + ) + .await?; + } + } + Ok(returned_service_info) +} - // now, both peers are connected and registered to the central server. Now, we - // need to have them peer-register to each other - info!( - target = "citadel", - "Peer {cid_a:?} Sending PeerRegister Request to {cid_b:?}" - ); - to_service_a - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: (*cid_b), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Receive Notification of Register Request - let peer_register_notification = from_service_b.recv().await.unwrap(); - assert!(matches!( - peer_register_notification, - InternalServiceResponse::PeerRegisterNotification(..) - )); - - info!( - target = "citadel", - "Peer {cid_b:?} Accepting PeerRegister Request From {cid_a:?}" +pub async fn register_p2p( + to_service_a: &mut UnboundedSender, + from_service_a: &mut UnboundedReceiver, + cid_a: u64, + to_service_b: &mut UnboundedSender, + from_service_b: &mut UnboundedReceiver, + cid_b: u64, + session_security_settings: SessionSecuritySettings, +) -> Result<(), Box> { + // Service A Requests to Register with Service B + to_service_a + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid_a, + peer_cid: cid_b, + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + // Service B receives Register Request from Service A + let inbound_response = from_service_b.recv().await.unwrap(); + match inbound_response { + InternalServiceResponse::PeerRegisterNotification(PeerRegisterNotification { + cid, + peer_cid, + peer_username: _, + request_id: _, + }) => { + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + } + _ => { + panic!( + "Peer B didn't get the PeerRegisterNotification, instead got {inbound_response:?}" ); - to_service_b - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: (*cid_a), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username: _, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_b:?} Received PeerRegisterSuccess Signal" - ); - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } - } - - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { - cid, - peer_cid, - peer_username: _, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_a:?} Received PeerRegisterSuccess Signal" - ); - assert_eq!(cid, *cid_a); - assert_eq!(peer_cid, *cid_b); - } - _ => { - panic!("Didn't get the PeerRegisterSuccess"); - } - } + } + } - info!( - target = "citadel", - "Peer {cid_a:?} Sending PeerConnect Request to {cid_b:?}" - ); - to_service_a - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: *cid_b, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Receive Notification of Connect Request - let peer_connect_notification = from_service_b.recv().await.unwrap(); - assert!(matches!( - peer_connect_notification, - InternalServiceResponse::PeerConnectNotification(..) - )); + // Service B Sends Register Request to Accept + to_service_b + .send(InternalServiceRequest::PeerRegister { + request_id: Uuid::new_v4(), + cid: cid_b, + peer_cid: cid_a, + session_security_settings, + connect_after_register: false, + }) + .unwrap(); + + // Receive Register Success Responses + let resp = from_service_a.recv().await.unwrap(); + let InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { cid, peer_cid, .. }) = + resp + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_a); + assert_eq!(peer_cid, cid_b); + + let resp = from_service_b.recv().await.unwrap(); + let InternalServiceResponse::PeerRegisterSuccess(PeerRegisterSuccess { cid, peer_cid, .. }) = + resp + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); - info!( - target = "citadel", - "Peer {cid_b:?} Accepting PeerConnect Request From {cid_a:?}" - ); - to_service_b - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: *cid_a, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - let item = from_service_b.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { - cid, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_b:?} Received PeerConnectSuccess Signal" - ); - assert_eq!(cid, *cid_b); - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } + Ok(()) +} - let item = from_service_a.recv().await.unwrap(); - match item { - InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { - cid, - request_id: _, - }) => { - info!( - target = "citadel", - "Peer {cid_a:?} Received PeerConnectSuccess Signal" - ); - assert_eq!(cid, *cid_a); - } - _ => { - info!(target = "citadel", "{:?}", item); - panic!("Didn't get the PeerConnectSuccess"); - } - } +pub async fn connect_p2p( + to_service_a: &mut UnboundedSender, + from_service_a: &mut UnboundedReceiver, + cid_a: u64, + to_service_b: &mut UnboundedSender, + from_service_b: &mut UnboundedReceiver, + cid_b: u64, + session_security_settings: SessionSecuritySettings, +) -> Result<(), Box> { + // Service A Requests To Connect + to_service_a + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid_a, + peer_cid: cid_b, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + // Service B Receives Connect Request from Service A + let inbound_response = from_service_b.recv().await.unwrap(); + match inbound_response { + InternalServiceResponse::PeerConnectNotification(PeerConnectNotification { + cid, + peer_cid, + session_security_settings: _, + udp_mode: _, + request_id: _, + }) => { + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + } + _ => { + panic!("Peer B didn't get the PeerConnectNotification"); } } - Ok(returned_service_info) + + // Service B Sends Connect Request to Accept + to_service_b + .send(InternalServiceRequest::PeerConnect { + request_id: Uuid::new_v4(), + cid: cid_b, + peer_cid: cid_a, + udp_mode: Default::default(), + session_security_settings, + }) + .unwrap(); + + // Receive Connect Success Responses + let signal = from_service_a.recv().await.unwrap(); + let InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, peer_cid, .. }) = + signal + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_a); + assert_eq!(peer_cid, cid_b); + let signal = from_service_b.recv().await.unwrap(); + let InternalServiceResponse::PeerConnectSuccess(PeerConnectSuccess { cid, peer_cid, .. }) = + signal + else { + panic!("Invalid signal") + }; + assert_eq!(cid, cid_b); + assert_eq!(peer_cid, cid_a); + + Ok(()) } pub fn spawn_services(futures_to_spawn: Vec) { @@ -533,6 +550,86 @@ pub fn server_info_file_transfer<'a>( (server, bind_addr) } +pub async fn exhaust_stream_to_file_completion( + cmp_path: PathBuf, + svc: &mut UnboundedReceiver, +) { + // Exhaust the stream for the receiver + let mut path = None; + let mut is_revfs = false; + let cmp_file_name = cmp_path + .file_name() + .unwrap() + .to_os_string() + .into_string() + .unwrap(); + loop { + let tick_response = svc.recv().await.unwrap(); + match tick_response { + InternalServiceResponse::FileTransferTickNotification( + FileTransferTickNotification { + cid: _, + peer_cid: _, + status, + }, + ) => match status { + ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { + path = Some(file_path); + is_revfs = matches!( + vfm.transfer_type, + TransferType::RemoteEncryptedVirtualFilesystem { .. } + ); + info!(target: "citadel", "File Transfer (Receiving) Beginning"); + assert_eq!(vfm.name, cmp_file_name) + } + ObjectTransferStatus::ReceptionTick(..) => { + info!(target: "citadel", "File Transfer (Receiving) Tick"); + } + ObjectTransferStatus::ReceptionComplete => { + info!(target: "citadel", "File Transfer (Receiving) Completed"); + let cmp_data = tokio::fs::read(cmp_path.clone()).await.unwrap(); + let streamed_data = tokio::fs::read( + path.clone() + .expect("Never received the ReceptionBeginning tick!"), + ) + .await + .unwrap(); + if is_revfs { + // The locally stored contents should NEVER be the same as the plaintext for REVFS + assert_ne!( + cmp_data.as_slice(), + streamed_data.as_slice(), + "Original data and streamed data does not match" + ); + } else { + assert_eq!( + cmp_data.as_slice(), + streamed_data.as_slice(), + "Original data and streamed data does not match" + ); + } + + return; + } + ObjectTransferStatus::TransferComplete => { + info!(target: "citadel", "File Transfer (Sending) Completed"); + return; + } + ObjectTransferStatus::TransferBeginning => { + info!(target: "citadel", "File Transfer (Sending) Beginning"); + } + ObjectTransferStatus::TransferTick(..) => {} + _ => { + panic!("File Send Reception Status Yielded Unexpected Response") + } + }, + unexpected_response => { + citadel_logging::warn!(target: "citadel", "Unexpected signal {unexpected_response:?}") + } + } + } +} + pub async fn test_kv_for_service( to_service: &UnboundedSender, from_service: &mut UnboundedReceiver, diff --git a/citadel-internal-service/tests/file_transfer.rs b/citadel-internal-service/tests/file_transfer.rs index 224b632..5dfcadf 100644 --- a/citadel-internal-service/tests/file_transfer.rs +++ b/citadel-internal-service/tests/file_transfer.rs @@ -3,15 +3,15 @@ mod common; #[cfg(test)] mod tests { use crate::common::{ - register_and_connect_to_server, register_and_connect_to_server_then_peers, - server_info_file_transfer, RegisterAndConnectItems, + exhaust_stream_to_file_completion, register_and_connect_to_server, + register_and_connect_to_server_then_peers, server_info_file_transfer, + RegisterAndConnectItems, }; use citadel_internal_service::kernel::CitadelWorkspaceService; use citadel_internal_service_types::{ DeleteVirtualFileSuccess, DownloadFileFailure, DownloadFileSuccess, - FileTransferRequestNotification, FileTransferStatusNotification, - FileTransferTickNotification, InternalServiceRequest, InternalServiceResponse, - SendFileRequestFailure, SendFileRequestSuccess, + FileTransferRequestNotification, FileTransferStatusNotification, InternalServiceRequest, + InternalServiceResponse, SendFileRequestFailure, SendFileRequestSuccess, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -24,7 +24,6 @@ mod tests { use std::sync::atomic::AtomicBool; use std::sync::Arc; use std::time::Duration; - use tokio::sync::mpsc::UnboundedReceiver; use uuid::Uuid; #[tokio::test] @@ -50,6 +49,7 @@ mod tests { info!(target: "citadel", "sub server spawn"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel)?; @@ -197,6 +197,7 @@ mod tests { info!(target: "citadel", "sub server spawn"); let internal_service_kernel = CitadelWorkspaceService::new(bind_address_internal_service); let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) .with_node_type(NodeType::Peer) .with_insecure_skip_cert_verification() .build(internal_service_kernel)?; @@ -323,7 +324,7 @@ mod tests { // Push file to REVFS on peer let file_to_send = PathBuf::from("../resources/test.txt"); - let virtual_path = PathBuf::from("/vfs/virtual_test.txt"); + let virtual_path = PathBuf::from("/vfs/test.txt"); let send_file_to_service_b_payload = InternalServiceRequest::SendFile { request_id: Uuid::new_v4(), source: file_to_send.clone(), @@ -365,6 +366,9 @@ mod tests { panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); } + let deserialized_service_a_payload_response = from_service_a.recv().await.unwrap(); + info!(target: "citadel","{deserialized_service_a_payload_response:?}"); + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; @@ -391,6 +395,9 @@ mod tests { } } + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_b).await; + exhaust_stream_to_file_completion(file_to_send.clone(), from_service_a).await; + // Delete file on Peer REVFS let delete_file_command = InternalServiceRequest::DeleteVirtualFile { virtual_directory: virtual_path, @@ -415,80 +422,4 @@ mod tests { Ok(()) } - - async fn exhaust_stream_to_file_completion( - cmp_path: PathBuf, - svc: &mut UnboundedReceiver, - ) { - // Exhaust the stream for the receiver - let mut path = None; - let mut is_revfs = false; - loop { - let tick_response = svc.recv().await.unwrap(); - citadel_logging::trace!(target: "citadel", "RECV signal {tick_response:?}"); - match tick_response { - InternalServiceResponse::FileTransferTickNotification( - FileTransferTickNotification { - cid: _, - peer_cid: _, - status, - }, - ) => match status { - ObjectTransferStatus::ReceptionBeginning(file_path, vfm) => { - path = Some(file_path); - is_revfs = matches!( - vfm.transfer_type, - TransferType::RemoteEncryptedVirtualFilesystem { .. } - ); - info!(target: "citadel", "File Transfer (Receiving) Beginning"); - assert_eq!(vfm.name, "test.txt") - } - ObjectTransferStatus::ReceptionTick(..) => { - info!(target: "citadel", "File Transfer (Receiving) Tick"); - } - ObjectTransferStatus::ReceptionComplete => { - info!(target: "citadel", "File Transfer (Receiving) Completed"); - let cmp_data = tokio::fs::read(cmp_path.clone()).await.unwrap(); - let streamed_data = tokio::fs::read( - path.clone() - .expect("Never received the ReceptionBeginning tick!"), - ) - .await - .unwrap(); - if is_revfs { - // The locally stored contents should NEVER be the same as the plaintext for REVFS - assert_ne!( - cmp_data.as_slice(), - streamed_data.as_slice(), - "Original data and streamed data does not match" - ); - } else { - assert_eq!( - cmp_data.as_slice(), - streamed_data.as_slice(), - "Original data and streamed data does not match" - ); - } - - return; - } - ObjectTransferStatus::TransferComplete => { - info!(target: "citadel", "File Transfer (Sending) Completed"); - return; - } - ObjectTransferStatus::TransferBeginning => { - info!(target: "citadel", "File Transfer (Sending) Beginning"); - } - ObjectTransferStatus::TransferTick(..) => {} - _ => { - panic!("File Send Reception Status Yielded Unexpected Response") - } - }, - - unexpected_response => { - citadel_logging::warn!(target: "citadel", "Unexpected signal {unexpected_response:?}") - } - } - } - } } diff --git a/citadel-internal-service/tests/intra_kernel.rs b/citadel-internal-service/tests/intra_kernel.rs new file mode 100644 index 0000000..aeed211 --- /dev/null +++ b/citadel-internal-service/tests/intra_kernel.rs @@ -0,0 +1,490 @@ +mod common; + +#[cfg(test)] +mod tests { + use crate::common::{ + exhaust_stream_to_file_completion, register_and_connect_to_server, + server_info_skip_cert_verification, RegisterAndConnectItems, + }; + use citadel_internal_service::kernel::CitadelWorkspaceService; + use citadel_internal_service_types::{ + DeleteVirtualFileSuccess, DownloadFileSuccess, FileTransferRequestNotification, + FileTransferStatusNotification, InternalServiceRequest, InternalServiceResponse, + MessageNotification, MessageSendFailure, MessageSendSuccess, SendFileRequestSuccess, + }; + use citadel_sdk::prelude::*; + use std::path::PathBuf; + use uuid::Uuid; + + #[tokio::test] + async fn test_intra_kernel_service_and_peers() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + Ok(()) + } + + #[tokio::test] + async fn test_intra_kernel_peer_message() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::InMemory) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + let message_request = InternalServiceRequest::Message { + request_id: Uuid::new_v4(), + message: "Test Message From Peer 0.".to_string().into_bytes(), + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + security_level: Default::default(), + }; + peer_0_tx.send(message_request)?; + match peer_0_rx.recv().await.unwrap() { + InternalServiceResponse::MessageSendSuccess(MessageSendSuccess { .. }) => { + citadel_logging::info!(target: "citadel", "Message Successfully Sent from Peer 0 to Peer 1."); + } + InternalServiceResponse::MessageSendFailure(MessageSendFailure { + cid: _, + message, + request_id: _, + }) => { + panic!("Message Sending Failed With Error: {message:?}") + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + match peer_1_rx.recv().await.unwrap() { + InternalServiceResponse::MessageNotification(MessageNotification { + message, + cid: _, + peer_cid: _, + request_id: _, + }) => { + citadel_logging::info!(target: "citadel", "Message from Peer 0 Successfully Received at Peer 1: {message:?}"); + } + _ => { + panic!("Received Unexpected Response When Expecting MessageSend Response.") + } + } + Ok(()) + } + + #[tokio::test] + async fn test_intra_kernel_send_file() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + + let file_to_send = PathBuf::from("../resources/test.txt"); + + let send_file_to_service_1_payload = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: file_to_send, + cid: peer_0_cid, + transfer_type: TransferType::FileTransfer, + peer_cid: Some(peer_1_cid), + chunk_size: None, + }; + peer_0_tx.send(send_file_to_service_1_payload).unwrap(); + citadel_logging::info!(target:"citadel", "File Transfer Request Sent from {peer_0_cid:?}"); + + citadel_logging::info!(target:"citadel", "File Transfer Request Sent Successfully {peer_0_cid:?}"); + let deserialized_service_1_payload_response = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { metadata, .. }, + ) = deserialized_service_1_payload_response + { + citadel_logging::info!(target:"citadel", "File Transfer Request {peer_1_cid:?}"); + + let file_transfer_accept = InternalServiceRequest::RespondFileTransfer { + cid: peer_1_cid, + peer_cid: peer_0_cid, + object_id: metadata.object_id as _, + accept: true, + download_location: None, + request_id: Uuid::new_v4(), + }; + peer_1_tx.send(file_transfer_accept).unwrap(); + citadel_logging::info!(target:"citadel", "Accepted File Transfer {peer_1_cid:?}"); + + let file_transfer_accept = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferStatusNotification( + FileTransferStatusNotification { + cid: _, + object_id: _, + success, + response, + message: _, + request_id: _, + }, + ) = file_transfer_accept + { + if success && response { + citadel_logging::info!(target:"citadel", "File Transfer Accept Success {peer_1_cid:?}"); + // continue to status ticks + } else { + panic!("Service 1 Accept Response Failure - Success: {success:?} Response {response:?}") + } + } else { + panic!("Unhandled Service 1 response") + } + + // Exhaust the stream for the receiver + exhaust_stream_to_file_completion( + PathBuf::from("../resources/test.txt"), + &mut peer_1_rx, + ) + .await; + // Exhaust the stream for the sender + exhaust_stream_to_file_completion( + PathBuf::from("../resources/test.txt"), + &mut peer_0_rx, + ) + .await; + } else { + panic!("File Transfer P2P Failure"); + }; + + Ok(()) + } + + #[tokio::test] + async fn test_intra_kernel_revfs() -> Result<(), Box> { + crate::common::setup_log(); + + let (server, server_bind_address) = server_info_skip_cert_verification(); + tokio::task::spawn(server); + + let service_addr = "127.0.0.1:55778".parse().unwrap(); + let service = CitadelWorkspaceService::new(service_addr); + + let internal_service = NodeBuilder::default() + .with_backend(BackendType::Filesystem("filesystem".into())) + .with_node_type(NodeType::Peer) + .with_insecure_skip_cert_verification() + .build(service)?; + + tokio::task::spawn(internal_service); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + + // Now with both the server and the IS running, we can test both peers trying to connect, then to each other + // via p2p + let to_spawn = vec![ + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 0".to_string(), + username: "peer.0".to_string(), + password: "secret_0".to_string().into_bytes().to_owned(), + }, + RegisterAndConnectItems { + internal_service_addr: service_addr, + server_addr: server_bind_address, + full_name: "Peer 1".to_string(), + username: "peer.1".to_string(), + password: "secret_1".to_string().into_bytes().to_owned(), + }, + ]; + + let mut returned_service_info = register_and_connect_to_server(to_spawn).await.unwrap(); + let (mut peer_0_tx, mut peer_0_rx, peer_0_cid) = returned_service_info.remove(0); + let (mut peer_1_tx, mut peer_1_rx, peer_1_cid) = returned_service_info.remove(0); + + crate::common::register_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + citadel_logging::info!(target: "citadel", "P2P Register complete"); + crate::common::connect_p2p( + &mut peer_0_tx, + &mut peer_0_rx, + peer_0_cid, + &mut peer_1_tx, + &mut peer_1_rx, + peer_1_cid, + SessionSecuritySettings::default(), + ) + .await?; + + // Push file to REVFS on peer + let file_to_send = PathBuf::from("../resources/test.txt"); + let virtual_path = PathBuf::from("/vfs/test.txt"); + let send_file_peer_1_tx_payload = InternalServiceRequest::SendFile { + request_id: Uuid::new_v4(), + source: file_to_send.clone(), + cid: peer_0_cid, + transfer_type: TransferType::RemoteEncryptedVirtualFilesystem { + virtual_path: virtual_path.clone(), + security_level: Default::default(), + }, + peer_cid: Some(peer_1_cid), + chunk_size: None, + }; + peer_0_tx.send(send_file_peer_1_tx_payload).unwrap(); + let deserialized_service_a_payload_response = peer_0_rx.recv().await.unwrap(); + citadel_logging::info!(target: "citadel","{deserialized_service_a_payload_response:?}"); + + if let InternalServiceResponse::SendFileRequestSuccess(SendFileRequestSuccess { .. }) = + &deserialized_service_a_payload_response + { + citadel_logging::info!(target:"citadel", "File Transfer Request {peer_1_cid}"); + let deserialized_service_a_payload_response = peer_1_rx.recv().await.unwrap(); + if let InternalServiceResponse::FileTransferRequestNotification( + FileTransferRequestNotification { metadata, .. }, + ) = deserialized_service_a_payload_response + { + let file_transfer_accept_payload = InternalServiceRequest::RespondFileTransfer { + cid: peer_1_cid, + peer_cid: peer_0_cid, + object_id: metadata.object_id, + accept: true, + download_location: None, + request_id: Uuid::new_v4(), + }; + peer_1_tx.send(file_transfer_accept_payload).unwrap(); + citadel_logging::info!(target:"citadel", "Accepted File Transfer {peer_1_cid}"); + } else { + panic!("File Transfer P2P Failure"); + } + } else { + panic!("File Transfer Request failed: {deserialized_service_a_payload_response:?}"); + } + + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_1_rx).await; + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_0_rx).await; + + citadel_logging::info!(target: "citadel", "Peer 0 Requesting to Download File"); + + // Download P2P REVFS file - without delete on pull + let download_file_command = InternalServiceRequest::DownloadFile { + virtual_directory: virtual_path.clone(), + security_level: Default::default(), + delete_on_pull: false, + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + request_id: Uuid::new_v4(), + }; + peer_0_tx.send(download_file_command).unwrap(); + citadel_logging::info!(target: "citadel", "Peer 0 Waiting for DownloadFileSuccess Response"); + let download_file_response = peer_0_rx.recv().await.unwrap(); + match download_file_response { + InternalServiceResponse::DownloadFileSuccess(DownloadFileSuccess { + cid: response_cid, + request_id: _, + }) => { + assert_eq!(peer_0_cid, response_cid); + } + _ => { + panic!("Didn't get the REVFS DownloadFileSuccess - instead got {download_file_response:?}"); + } + } + + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_1_rx).await; + exhaust_stream_to_file_completion(file_to_send.clone(), &mut peer_0_rx).await; + + citadel_logging::info!(target: "citadel", "Peer 0 Requesting to Delete File"); + + // Delete file on Peer REVFS + let delete_file_command = InternalServiceRequest::DeleteVirtualFile { + virtual_directory: virtual_path, + cid: peer_0_cid, + peer_cid: Some(peer_1_cid), + request_id: Uuid::new_v4(), + }; + peer_0_tx.send(delete_file_command).unwrap(); + let delete_file_response = peer_0_rx.recv().await.unwrap(); + match delete_file_response { + InternalServiceResponse::DeleteVirtualFileSuccess(DeleteVirtualFileSuccess { + cid: response_cid, + request_id: _, + }) => { + assert_eq!(peer_0_cid, response_cid); + } + _ => { + panic!("Didn't get the REVFS DeleteVirtualFileSuccess - instead got {delete_file_response:?}"); + } + } + citadel_logging::info!(target: "citadel","{delete_file_response:?}"); + + Ok(()) + } +} diff --git a/citadel-internal-service/tests/service.rs b/citadel-internal-service/tests/service.rs index 1fe1f7b..63e28ad 100644 --- a/citadel-internal-service/tests/service.rs +++ b/citadel-internal-service/tests/service.rs @@ -11,7 +11,6 @@ mod tests { use citadel_internal_service_connector::util::InternalServiceConnector; use citadel_internal_service_types::{ InternalServiceRequest, InternalServiceResponse, MessageNotification, MessageSendSuccess, - PeerConnectNotification, PeerRegisterNotification, }; use citadel_logging::info; use citadel_sdk::prelude::*; @@ -502,97 +501,26 @@ mod tests { let (ref mut to_service_a, ref mut from_service_a, cid_a) = item; for neighbor in neighbor_items { let (ref mut to_service_b, ref mut from_service_b, cid_b) = neighbor; - let session_security_settings = - SessionSecuritySettingsBuilder::default().build().unwrap(); - - // Service A Requests to Register with Service B - to_service_a - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: (*cid_b), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Service B receives Register Request from Service A - let inbound_response = from_service_b.recv().await.unwrap(); - match inbound_response { - InternalServiceResponse::PeerRegisterNotification( - PeerRegisterNotification { - cid, - peer_cid, - peer_username: _, - request_id: _, - }, - ) => { - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Peer B didn't get the PeerRegisterNotification, instead got {inbound_response:?}"); - } - } - - // Service B Sends Register Request to Accept - to_service_b - .send(InternalServiceRequest::PeerRegister { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: (*cid_a), - session_security_settings, - connect_after_register: false, - }) - .unwrap(); - - // Receive Register Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); - - // Service A Requests To Connect - to_service_a - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_a, - peer_cid: *cid_b, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Service B Receives Connect Request from Service A - let inbound_response = from_service_b.recv().await.unwrap(); - match inbound_response { - InternalServiceResponse::PeerConnectNotification(PeerConnectNotification { - cid, - peer_cid, - session_security_settings: _, - udp_mode: _, - request_id: _, - }) => { - assert_eq!(cid, *cid_b); - assert_eq!(peer_cid, *cid_a); - } - _ => { - panic!("Peer B didn't get the PeerConnectNotification"); - } - } - - // Service B Sends Connect Request to Accept - to_service_b - .send(InternalServiceRequest::PeerConnect { - request_id: Uuid::new_v4(), - cid: *cid_b, - peer_cid: *cid_a, - udp_mode: Default::default(), - session_security_settings, - }) - .unwrap(); - - // Receive Connect Success Responses - let _ = from_service_a.recv().await.unwrap(); - let _ = from_service_b.recv().await.unwrap(); + crate::common::register_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + SessionSecuritySettings::default(), + ) + .await?; + crate::common::connect_p2p( + to_service_a, + from_service_a, + *cid_a, + to_service_b, + from_service_b, + *cid_b, + SessionSecuritySettings::default(), + ) + .await?; } } Ok(()) diff --git a/service/Cargo.toml b/service/Cargo.toml index 991ecd7..b335bac 100644 --- a/service/Cargo.toml +++ b/service/Cargo.toml @@ -8,9 +8,10 @@ edition = "2021" [features] localhost-testing = ["citadel_sdk/localhost-testing"] vendored = ["citadel-internal-service/vendored"] +deadlock-detection = ["parking_lot/deadlock_detection", "parking_lot", "lazy_static"] [[bin]] -name = "citadel_service_bin" +name = "internal-service" path = "src/main.rs" [[bin]] @@ -22,4 +23,6 @@ structopt = { workspace = true } tokio = { workspace = true, features = ["macros", "rt"] } citadel-internal-service = { workspace = true } citadel_sdk = { workspace = true } -citadel_logging = { workspace = true } \ No newline at end of file +citadel_logging = { workspace = true } +parking_lot = { workspace = true, optional = true } +lazy_static = { workspace = true, optional = true } \ No newline at end of file diff --git a/service/src/main.rs b/service/src/main.rs index 37ea230..d17c43e 100644 --- a/service/src/main.rs +++ b/service/src/main.rs @@ -26,7 +26,7 @@ async fn main() -> Result<(), Box> { #[derive(Debug, StructOpt)] #[structopt( - name = "citadel-service-bin", + name = "internal-service", about = "Used for running a local service for citadel applications" )] struct Options { @@ -35,3 +35,32 @@ struct Options { #[structopt(short, long)] dangerous: Option, } + +#[cfg(feature = "deadlock-detection")] +lazy_static::lazy_static! { + static ref DEADLOCK_INIT: () = { + let _ = std::thread::spawn(move || { + info!(target: "gadget", "Executing deadlock detector ..."); + use std::thread; + use std::time::Duration; + use parking_lot::deadlock; + use citadel_logging::*; + loop { + std::thread::sleep(Duration::from_secs(5)); + let deadlocks = deadlock::check_deadlock(); + if deadlocks.is_empty() { + continue; + } + + error!(target: "citadel", "{} deadlocks detected", deadlocks.len()); + for (i, threads) in deadlocks.iter().enumerate() { + error!(target: "citadel", "Deadlock #{}", i); + for t in threads { + error!(target: "citadel", "Thread Id {:#?}", t.thread_id()); + error!(target: "citadel", "{:#?}", t.backtrace()); + } + } + } + }); + }; +}