diff --git a/common/client-libs/gateway-client/src/bandwidth.rs b/common/client-libs/gateway-client/src/bandwidth.rs index 25e9a44394d..9fd43765bdd 100644 --- a/common/client-libs/gateway-client/src/bandwidth.rs +++ b/common/client-libs/gateway-client/src/bandwidth.rs @@ -87,8 +87,10 @@ impl ClientBandwidth { if remaining < 0 { tracing::warn!("OUT OF BANDWIDTH. remaining: {remaining_bi2}"); - } else { + } else if remaining < 1_000_000 { tracing::info!("remaining bandwidth: {remaining_bi2}"); + } else { + tracing::debug!("remaining bandwidth: {remaining_bi2}"); } self.inner diff --git a/common/client-libs/gateway-client/src/client/mod.rs b/common/client-libs/gateway-client/src/client/mod.rs index 6c1d9fa0b28..6cb7b83f020 100644 --- a/common/client-libs/gateway-client/src/client/mod.rs +++ b/common/client-libs/gateway-client/src/client/mod.rs @@ -139,6 +139,10 @@ impl GatewayClient { self.gateway_identity } + pub fn shared_key(&self) -> Option> { + self.shared_key.clone() + } + pub fn ws_fd(&self) -> Option { match &self.connection { SocketState::Available(conn) => ws_fd(conn.as_ref()), @@ -408,7 +412,7 @@ impl GatewayClient { } Some(_) => { - info!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!"); + debug!("the gateway is using exactly the same (or older) protocol version as we are. We're good to continue!"); Ok(()) } } @@ -992,24 +996,6 @@ impl GatewayClient { } Ok(()) } - - #[deprecated(note = "this method does not deal with upgraded keys for legacy clients")] - pub async fn authenticate_and_start( - &mut self, - ) -> Result - where - C: DkgQueryClient + Send + Sync, - St: CredentialStorage, - ::StorageError: Send + Sync + 'static, - { - let shared_key = self.perform_initial_authentication().await?; - self.claim_initial_bandwidth().await?; - - // this call is NON-blocking - self.start_listening_for_mixnet_messages()?; - - Ok(shared_key) - } } // type alias for an ease of use diff --git a/common/client-libs/gateway-client/src/socket_state.rs b/common/client-libs/gateway-client/src/socket_state.rs index 942f6506148..7bc0e8d1d05 100644 --- a/common/client-libs/gateway-client/src/socket_state.rs +++ b/common/client-libs/gateway-client/src/socket_state.rs @@ -110,6 +110,11 @@ impl PartiallyDelegatedRouter { } }; + if self.stream_return.is_canceled() { + // nothing to do, receiver has been dropped + return; + } + let return_res = match ret { Err(err) => self.stream_return.send(Err(err)), Ok(_) => { diff --git a/common/crypto/src/asymmetric/identity/mod.rs b/common/crypto/src/asymmetric/identity/mod.rs index 4b51aa2f641..a432b0c806d 100644 --- a/common/crypto/src/asymmetric/identity/mod.rs +++ b/common/crypto/src/asymmetric/identity/mod.rs @@ -6,6 +6,7 @@ use ed25519_dalek::{Signer, SigningKey}; pub use ed25519_dalek::{Verifier, PUBLIC_KEY_LENGTH, SECRET_KEY_LENGTH, SIGNATURE_LENGTH}; use nym_pemstore::traits::{PemStorableKey, PemStorableKeyPair}; use std::fmt::{self, Debug, Display, Formatter}; +use std::hash::{Hash, Hasher}; use std::str::FromStr; use thiserror::Error; use zeroize::{Zeroize, ZeroizeOnDrop}; @@ -122,6 +123,14 @@ impl PemStorableKeyPair for KeyPair { #[derive(Copy, Clone, Eq, PartialEq)] pub struct PublicKey(ed25519_dalek::VerifyingKey); +impl Hash for PublicKey { + fn hash(&self, state: &mut H) { + // each public key has unique bytes representation which can be used + // for the hash implementation + self.to_bytes().hash(state) + } +} + impl Display for PublicKey { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { Display::fmt(&self.to_base58_string(), f) diff --git a/nym-api/src/network_monitor/gateways_reader.rs b/nym-api/src/network_monitor/gateways_reader.rs index 01bec7c52a6..bc3eeaa44cb 100644 --- a/nym-api/src/network_monitor/gateways_reader.rs +++ b/nym-api/src/network_monitor/gateways_reader.rs @@ -2,9 +2,8 @@ // SPDX-License-Identifier: GPL-3.0-only use futures::Stream; -use nym_crypto::asymmetric::identity; +use nym_crypto::asymmetric::{ed25519, identity}; use nym_gateway_client::{AcknowledgementReceiver, MixnetMessageReceiver}; -use nym_mixnet_contract_common::IdentityKey; use std::pin::Pin; use std::task::{Context, Poll}; use tokio_stream::StreamMap; @@ -15,8 +14,8 @@ pub(crate) enum GatewayMessages { } pub(crate) struct GatewaysReader { - ack_map: StreamMap, - stream_map: StreamMap, + ack_map: StreamMap, + stream_map: StreamMap, } impl GatewaysReader { @@ -33,19 +32,18 @@ impl GatewaysReader { message_receiver: MixnetMessageReceiver, ack_receiver: AcknowledgementReceiver, ) { - let channel_id = id.to_string(); - self.stream_map.insert(channel_id.clone(), message_receiver); - self.ack_map.insert(channel_id, ack_receiver); + self.stream_map.insert(id, message_receiver); + self.ack_map.insert(id, ack_receiver); } - pub fn remove_receivers(&mut self, id: &str) { - self.stream_map.remove(id); - self.ack_map.remove(id); + pub fn remove_receivers(&mut self, id: ed25519::PublicKey) { + self.stream_map.remove(&id); + self.ack_map.remove(&id); } } impl Stream for GatewaysReader { - type Item = (IdentityKey, GatewayMessages); + type Item = (ed25519::PublicKey, GatewayMessages); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { match Pin::new(&mut self.ack_map).poll_next(cx) { diff --git a/nym-api/src/network_monitor/mod.rs b/nym-api/src/network_monitor/mod.rs index 1df86ef3b81..557b9fdf412 100644 --- a/nym-api/src/network_monitor/mod.rs +++ b/nym-api/src/network_monitor/mod.rs @@ -16,7 +16,8 @@ use crate::node_status_api::NodeStatusCache; use crate::nym_contract_cache::cache::NymContractCache; use crate::storage::NymApiStorage; use crate::support::caching::cache::SharedCache; -use crate::support::{config, nyxd}; +use crate::support::config::Config; +use crate::support::nyxd; use futures::channel::mpsc; use nym_bandwidth_controller::BandwidthController; use nym_credential_storage::persistent_storage::PersistentStorage; @@ -36,7 +37,7 @@ pub(crate) mod test_route; pub(crate) const ROUTE_TESTING_TEST_NONCE: u64 = 0; pub(crate) fn setup<'a>( - config: &'a config::NetworkMonitor, + config: &'a Config, nym_contract_cache: &NymContractCache, described_cache: SharedCache, node_status_cache: NodeStatusCache, @@ -54,7 +55,7 @@ pub(crate) fn setup<'a>( } pub(crate) struct NetworkMonitorBuilder<'a> { - config: &'a config::NetworkMonitor, + config: &'a Config, nyxd_client: nyxd::Client, node_status_storage: NymApiStorage, contract_cache: NymContractCache, @@ -64,7 +65,7 @@ pub(crate) struct NetworkMonitorBuilder<'a> { impl<'a> NetworkMonitorBuilder<'a> { pub(crate) fn new( - config: &'a config::NetworkMonitor, + config: &'a Config, nyxd_client: nyxd::Client, node_status_storage: NymApiStorage, contract_cache: NymContractCache, @@ -81,7 +82,7 @@ impl<'a> NetworkMonitorBuilder<'a> { } } - pub(crate) async fn build( + pub(crate) async fn build( self, ) -> NetworkMonitorRunnables { // TODO: those keys change constant throughout the whole execution of the monitor. @@ -101,7 +102,7 @@ impl<'a> NetworkMonitorBuilder<'a> { self.contract_cache, self.described_cache, self.node_status_cache, - self.config.debug.per_node_test_packets, + self.config.network_monitor.debug.per_node_test_packets, Arc::clone(&ack_key), *identity_keypair.public_key(), *encryption_keypair.public_key(), @@ -110,7 +111,11 @@ impl<'a> NetworkMonitorBuilder<'a> { let bandwidth_controller = { BandwidthController::new( nym_credential_storage::initialise_persistent_storage( - &self.config.storage_paths.credentials_database_path, + &self + .config + .network_monitor + .storage_paths + .credentials_database_path, ) .await, self.nyxd_client.clone(), @@ -118,12 +123,10 @@ impl<'a> NetworkMonitorBuilder<'a> { }; let packet_sender = new_packet_sender( - self.config, + &self.config, gateway_status_update_sender, Arc::clone(&identity_keypair), - self.config.debug.gateway_sending_rate, bandwidth_controller, - self.config.debug.disabled_credentials_mode, ); let received_processor = new_received_processor( @@ -131,14 +134,15 @@ impl<'a> NetworkMonitorBuilder<'a> { Arc::clone(&encryption_keypair), ack_key, ); - let summary_producer = new_summary_producer(self.config.debug.per_node_test_packets); + let summary_producer = + new_summary_producer(self.config.network_monitor.debug.per_node_test_packets); let packet_receiver = new_packet_receiver( gateway_status_update_receiver, received_processor_sender_channel, ); let monitor = Monitor::new( - self.config, + &self.config.network_monitor, packet_preparer, packet_sender, received_processor, @@ -154,12 +158,12 @@ impl<'a> NetworkMonitorBuilder<'a> { } } -pub(crate) struct NetworkMonitorRunnables { +pub(crate) struct NetworkMonitorRunnables { monitor: Monitor, packet_receiver: PacketReceiver, } -impl NetworkMonitorRunnables { +impl NetworkMonitorRunnables { // TODO: note, that is not exactly doing what we want, because when // `ReceivedProcessor` is constructed, it already spawns a future // this needs to be refactored! @@ -194,22 +198,16 @@ fn new_packet_preparer( } fn new_packet_sender( - config: &config::NetworkMonitor, + config: &Config, gateways_status_updater: GatewayClientUpdateSender, local_identity: Arc, - max_sending_rate: usize, bandwidth_controller: BandwidthController, - disabled_credentials_mode: bool, ) -> PacketSender { PacketSender::new( + config, gateways_status_updater, local_identity, - config.debug.gateway_response_timeout, - config.debug.gateway_connection_timeout, - config.debug.max_concurrent_gateway_clients, - max_sending_rate, bandwidth_controller, - disabled_credentials_mode, ) } @@ -236,8 +234,8 @@ fn new_packet_receiver( // TODO: 1) does it still have to have separate builder or could we get rid of it now? // TODO: 2) how do we make it non-async as other 'start' methods? -pub(crate) async fn start( - config: &config::NetworkMonitor, +pub(crate) async fn start( + config: &Config, nym_contract_cache: &NymContractCache, described_cache: SharedCache, node_status_cache: NodeStatusCache, diff --git a/nym-api/src/network_monitor/monitor/gateway_client_handle.rs b/nym-api/src/network_monitor/monitor/gateway_client_handle.rs new file mode 100644 index 00000000000..6e3f157efb9 --- /dev/null +++ b/nym-api/src/network_monitor/monitor/gateway_client_handle.rs @@ -0,0 +1,54 @@ +// Copyright 2021 - Nym Technologies SA +// SPDX-License-Identifier: GPL-3.0-only + +use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; +use crate::support::nyxd; +use nym_credential_storage::persistent_storage::PersistentStorage; +use nym_gateway_client::GatewayClient; +use std::ops::{Deref, DerefMut}; +use tracing::warn; + +pub(crate) struct GatewayClientHandle { + client: GatewayClient, + gateways_status_updater: GatewayClientUpdateSender, +} + +impl GatewayClientHandle { + pub(crate) fn new( + client: GatewayClient, + gateways_status_updater: GatewayClientUpdateSender, + ) -> Self { + GatewayClientHandle { + client, + gateways_status_updater, + } + } +} + +impl Drop for GatewayClientHandle { + fn drop(&mut self) { + if self + .gateways_status_updater + .unbounded_send(GatewayClientUpdate::Disconnect( + self.client.gateway_identity(), + )) + .is_err() + { + warn!("fail to cleanly shutdown gateway connection") + } + } +} + +impl Deref for GatewayClientHandle { + type Target = GatewayClient; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +impl DerefMut for GatewayClientHandle { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.client + } +} diff --git a/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs b/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs deleted file mode 100644 index c791f9a6af8..00000000000 --- a/nym-api/src/network_monitor/monitor/gateway_clients_cache.rs +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2021 - Nym Technologies SA -// SPDX-License-Identifier: GPL-3.0-only - -use crate::support::nyxd; -use nym_credential_storage::persistent_storage::PersistentStorage; -use nym_crypto::asymmetric::identity::PUBLIC_KEY_LENGTH; -use nym_gateway_client::GatewayClient; -use std::collections::HashMap; -use std::sync::Arc; -use tokio::sync::{Mutex, MutexGuard, TryLockError}; - -pub(crate) struct GatewayClientHandle(Arc); - -struct GatewayClientHandleInner { - client: Mutex>>, - raw_identity: [u8; PUBLIC_KEY_LENGTH], -} - -pub(crate) struct UnlockedGatewayClientHandle<'a>( - MutexGuard<'a, Option>>, -); - -impl GatewayClientHandle { - pub(crate) fn new(gateway_client: GatewayClient) -> Self { - GatewayClientHandle(Arc::new(GatewayClientHandleInner { - raw_identity: gateway_client.gateway_identity().to_bytes(), - client: Mutex::new(Some(gateway_client)), - })) - } - - pub(crate) fn ptr_eq(&self, other: &Self) -> bool { - Arc::ptr_eq(&self.0, &other.0) - } - - // this could have also been achieved with a normal #[derive(Clone)] but I prefer to be explicit about it, - // because clippy would suggest some potentially confusing 'simplifications' regarding clone - pub(crate) fn clone_data_pointer(&self) -> Self { - GatewayClientHandle(Arc::clone(&self.0)) - } - - pub(crate) fn raw_identity(&self) -> [u8; PUBLIC_KEY_LENGTH] { - self.0.raw_identity - } - - pub(crate) async fn is_invalid(&self) -> bool { - self.0.client.lock().await.is_none() - } - - pub(crate) async fn lock_client(&self) -> UnlockedGatewayClientHandle<'_> { - UnlockedGatewayClientHandle(self.0.client.lock().await) - } - - pub(crate) fn lock_client_unchecked(&self) -> UnlockedGatewayClientHandle<'_> { - UnlockedGatewayClientHandle(self.0.client.try_lock().unwrap()) - } - - pub(crate) fn try_lock_client(&self) -> Result, TryLockError> { - self.0.client.try_lock().map(UnlockedGatewayClientHandle) - } -} - -impl UnlockedGatewayClientHandle<'_> { - pub(crate) fn get_mut_unchecked( - &mut self, - ) -> &mut GatewayClient { - self.0.as_mut().unwrap() - } - - pub(crate) fn inner_mut( - &mut self, - ) -> Option<&mut GatewayClient> { - self.0.as_mut() - } - - pub(crate) fn invalidate(&mut self) { - *self.0 = None - } -} - -pub(crate) type GatewayClientsMap = HashMap<[u8; PUBLIC_KEY_LENGTH], GatewayClientHandle>; - -#[derive(Clone)] -pub(crate) struct ActiveGatewayClients { - // there is no point in using an RwLock here as there will only ever be two readers here and both - // potentially need write access. - // A BiLock would have been slightly better than a normal Mutex since it's optimised for two - // owners, but it's behind `unstable` feature flag in futures and it would be a headache if the API - // changed. - inner: Arc>, -} - -impl ActiveGatewayClients { - pub(crate) fn new() -> Self { - ActiveGatewayClients { - inner: Arc::new(Mutex::new(HashMap::new())), - } - } - - pub(crate) async fn lock(&self) -> MutexGuard<'_, GatewayClientsMap> { - self.inner.lock().await - } -} diff --git a/nym-api/src/network_monitor/monitor/gateways_pinger.rs b/nym-api/src/network_monitor/monitor/gateways_pinger.rs deleted file mode 100644 index ed09c2d0906..00000000000 --- a/nym-api/src/network_monitor/monitor/gateways_pinger.rs +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2021 - Nym Technologies SA -// SPDX-License-Identifier: GPL-3.0-only - -use crate::network_monitor::monitor::gateway_clients_cache::ActiveGatewayClients; -use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; -use nym_crypto::asymmetric::identity; -use nym_crypto::asymmetric::identity::PUBLIC_KEY_LENGTH; -use nym_task::TaskClient; -use std::time::Duration; -use tokio::time::{sleep, Instant}; -use tracing::{debug, info, trace, warn}; - -// TODO: should it perhaps be moved to config along other timeout values? -const PING_TIMEOUT: Duration = Duration::from_secs(3); - -pub(crate) struct GatewayPinger { - gateway_clients: ActiveGatewayClients, - gateways_status_updater: GatewayClientUpdateSender, - pinging_interval: Duration, -} - -impl GatewayPinger { - pub(crate) fn new( - gateway_clients: ActiveGatewayClients, - gateways_status_updater: GatewayClientUpdateSender, - pinging_interval: Duration, - ) -> Self { - GatewayPinger { - gateway_clients, - gateways_status_updater, - pinging_interval, - } - } - - fn notify_connection_failure(&self, raw_gateway_id: [u8; PUBLIC_KEY_LENGTH]) { - // if this unwrap failed it means something extremely weird is going on - // and we got some solar flare bitflip type of corruption - let gateway_key = identity::PublicKey::from_bytes(&raw_gateway_id) - .expect("failed to recover gateways public key from valid bytes"); - - // remove the gateway listener channels - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::Failure(gateway_key)) - .expect("packet receiver seems to have died!"); - } - - async fn ping_and_cleanup_all_gateways(&self) { - info!("Pinging all active gateways"); - - let lock_acquire_start = Instant::now(); - let active_gateway_clients_guard = self.gateway_clients.lock().await; - trace!( - "Acquiring lock took {:?}", - Instant::now().duration_since(lock_acquire_start) - ); - - if active_gateway_clients_guard.is_empty() { - debug!("no gateways to ping"); - return; - } - - // don't keep the guard the entire time - clone all Arcs and drop it - // - // this clippy warning is a false positive as we cannot get rid of the collect by moving - // everything into a single iterator as it would require us to hold the lock the entire time - // and that is exactly what we want to avoid - #[allow(clippy::needless_collect)] - let active_gateway_clients = active_gateway_clients_guard - .iter() - .map(|(_, handle)| handle.clone_data_pointer()) - .collect::>(); - drop(active_gateway_clients_guard); - - let ping_start = Instant::now(); - - let mut clients_to_purge = Vec::new(); - - // since we don't need to wait for response, we can just ping all gateways sequentially - // if it becomes problem later on, we can adjust it. - for client_handle in active_gateway_clients.into_iter() { - trace!( - "Pinging: {}", - identity::PublicKey::from_bytes(&client_handle.raw_identity()) - .unwrap() - .to_base58_string() - ); - // if we fail to obtain the lock it means the client is being currently used to send messages - // and hence we don't need to ping it to keep connection alive - if let Ok(mut unlocked_handle) = client_handle.try_lock_client() { - if let Some(active_client) = unlocked_handle.inner_mut() { - match tokio::time::timeout(PING_TIMEOUT, active_client.send_ping_message()) - .await - { - Err(_timeout) => { - warn!( - "we timed out trying to ping {} - assuming the connection is dead.", - active_client.gateway_identity().to_base58_string(), - ); - clients_to_purge.push(client_handle.raw_identity()); - } - Ok(Err(err)) => { - warn!( - "failed to send ping message to gateway {} - {} - assuming the connection is dead.", - active_client.gateway_identity().to_base58_string(), - err, - ); - clients_to_purge.push(client_handle.raw_identity()); - } - _ => {} - } - } else { - clients_to_purge.push(client_handle.raw_identity()); - } - } - } - - info!( - "Purging {} gateways, acquiring lock", - clients_to_purge.len() - ); - // purge all dead connections - // reacquire the guard - let lock_acquire_start = Instant::now(); - let mut active_gateway_clients_guard = self.gateway_clients.lock().await; - info!( - "Acquiring lock took {:?}", - Instant::now().duration_since(lock_acquire_start) - ); - - for gateway_id in clients_to_purge.into_iter() { - if let Some(removed_handle) = active_gateway_clients_guard.remove(&gateway_id) { - if !removed_handle.is_invalid().await { - info!("Handle is invalid, purging"); - // it was not invalidated by the packet sender meaning it probably was some unbonded node - // that was never cleared - self.notify_connection_failure(gateway_id); - } - info!("Handle is not invalid, not purged") - } - } - - let ping_end = Instant::now(); - let time_taken = ping_end.duration_since(ping_start); - info!("Pinging all active gateways took {:?}", time_taken); - } - - pub(crate) async fn run(&self, mut shutdown: TaskClient) { - while !shutdown.is_shutdown() { - tokio::select! { - _ = sleep(self.pinging_interval) => { - tokio::select! { - biased; - _ = shutdown.recv() => { - trace!("GatewaysPinger: Received shutdown"); - } - _ = self.ping_and_cleanup_all_gateways() => (), - } - } - _ = shutdown.recv() => { - trace!("GatewaysPinger: Received shutdown"); - } - } - } - } -} diff --git a/nym-api/src/network_monitor/monitor/mod.rs b/nym-api/src/network_monitor/monitor/mod.rs index b99ce6a0cec..21f6f30c19f 100644 --- a/nym-api/src/network_monitor/monitor/mod.rs +++ b/nym-api/src/network_monitor/monitor/mod.rs @@ -17,15 +17,14 @@ use std::collections::{HashMap, HashSet}; use tokio::time::{sleep, Duration, Instant}; use tracing::{debug, error, info, trace}; -pub(crate) mod gateway_clients_cache; -pub(crate) mod gateways_pinger; +pub(crate) mod gateway_client_handle; pub(crate) mod preparer; pub(crate) mod processor; pub(crate) mod receiver; pub(crate) mod sender; pub(crate) mod summary_producer; -pub(super) struct Monitor { +pub(super) struct Monitor { test_nonce: u64, packet_preparer: PacketPreparer, packet_sender: PacketSender, @@ -33,7 +32,6 @@ pub(super) struct Monitor { summary_producer: SummaryProducer, node_status_storage: NymApiStorage, run_interval: Duration, - gateway_ping_interval: Duration, packet_delivery_timeout: Duration, /// Number of test packets sent via each "random" route to verify whether they work correctly. @@ -49,7 +47,7 @@ pub(super) struct Monitor { packet_type: PacketType, } -impl Monitor { +impl Monitor { pub(super) fn new( config: &config::NetworkMonitor, packet_preparer: PacketPreparer, @@ -67,7 +65,6 @@ impl Monitor { summary_producer, node_status_storage, run_interval: config.debug.run_interval, - gateway_ping_interval: config.debug.gateway_ping_interval, packet_delivery_timeout: config.debug.packet_delivery_timeout, route_test_packets: config.debug.route_test_packets, test_routes: config.debug.test_routes, @@ -135,12 +132,15 @@ impl Monitor { packets.push(gateway_packets); } - self.received_processor.set_route_test_nonce().await; - self.packet_sender.send_packets(packets).await; + self.received_processor.set_route_test_nonce(); + let gateway_clients = self.packet_sender.send_packets(packets).await; // give the packets some time to traverse the network sleep(self.packet_delivery_timeout).await; + // start all the disconnections in the background + drop(gateway_clients); + let received = self.received_processor.return_received().await; let mut results = self.analyse_received_test_route_packets(&received); @@ -247,12 +247,11 @@ impl Monitor { .flat_map(|packets| packets.packets.iter()) .count(); - self.received_processor - .set_new_test_nonce(self.test_nonce) - .await; + self.received_processor.set_new_test_nonce(self.test_nonce); info!("Sending packets to all gateways..."); - self.packet_sender + let gateway_clients = self + .packet_sender .send_packets(prepared_packets.packets) .await; @@ -264,6 +263,9 @@ impl Monitor { // give the packets some time to traverse the network sleep(self.packet_delivery_timeout).await; + // start all the disconnections in the background + drop(gateway_clients); + let received = self.received_processor.return_received().await; let total_received = received.len(); info!("Test routes: {:#?}", routes); @@ -311,9 +313,6 @@ impl Monitor { .wait_for_validator_cache_initial_values(self.minimum_test_routes) .await; - self.packet_sender - .spawn_gateways_pinger(self.gateway_ping_interval, shutdown.clone()); - let mut run_interval = tokio::time::interval(self.run_interval); while !shutdown.is_shutdown() { tokio::select! { diff --git a/nym-api/src/network_monitor/monitor/processor.rs b/nym-api/src/network_monitor/monitor/processor.rs index 0008bfa901e..7a65115d78d 100644 --- a/nym-api/src/network_monitor/monitor/processor.rs +++ b/nym-api/src/network_monitor/monitor/processor.rs @@ -5,17 +5,19 @@ use crate::network_monitor::gateways_reader::GatewayMessages; use crate::network_monitor::test_packet::{NodeTestMessage, NymApiTestMessageExt}; use crate::network_monitor::ROUTE_TESTING_TEST_NONCE; use futures::channel::mpsc; -use futures::lock::{Mutex, MutexGuard}; -use futures::{SinkExt, StreamExt}; +use futures::lock::Mutex; +use futures::StreamExt; use nym_crypto::asymmetric::encryption; use nym_node_tester_utils::error::NetworkTestingError; use nym_node_tester_utils::processor::TestPacketProcessor; use nym_sphinx::acknowledgements::AckKey; use nym_sphinx::receiver::{MessageReceiver, MessageRecoveryError}; use std::mem; +use std::ops::Deref; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; use thiserror::Error; -use tracing::{debug, error, trace, warn}; +use tracing::{error, trace, warn}; pub(crate) type ReceivedProcessorSender = mpsc::UnboundedSender; pub(crate) type ReceivedProcessorReceiver = mpsc::UnboundedReceiver; @@ -37,51 +39,71 @@ enum ProcessingError { ReceivedOutsideTestRun, } -// we can't use Notify due to possible edge case where both notification are consumed at once -enum LockPermit { - Release, - Free, +#[derive(Clone)] +struct SharedProcessorData { + inner: Arc, } -struct ReceivedProcessorInner { - /// Nonce of the current test run indicating which packets should get rejected. - test_nonce: Option, +impl SharedProcessorData { + async fn reset_run_information(&self) -> Vec { + self.test_nonce.store(u64::MAX, Ordering::SeqCst); + let mut guard = self.received_packets.lock().await; + mem::take(&mut *guard) + } +} - /// Channel for receiving packets/messages from the gateway clients - packets_receiver: ReceivedProcessorReceiver, +impl Deref for SharedProcessorData { + type Target = SharedProcessorDataInner; + fn deref(&self) -> &Self::Target { + &self.inner + } +} - test_processor: TestPacketProcessor, +struct SharedProcessorDataInner { + /// Nonce of the current test run indicating which packets should get rejected. + test_nonce: AtomicU64, /// Vector containing all received (and decrypted) packets in the current test run. // TODO: perhaps a different structure would be better here - received_packets: Vec, + received_packets: Mutex>, } -impl ReceivedProcessorInner { - fn on_received_data(&mut self, raw_message: Vec) -> Result<(), ProcessingError> { +struct ReceiverTask { + shared: SharedProcessorData, + packets_receiver: ReceivedProcessorReceiver, + test_processor: TestPacketProcessor, +} + +impl ReceiverTask +where + R: MessageReceiver, +{ + async fn on_received_data(&mut self, raw_message: Vec) -> Result<(), ProcessingError> { // if the nonce is none it means the packet was received during the 'waiting' for the // next test run - if self.test_nonce.is_none() { + let test_nonce = self.shared.test_nonce.load(Ordering::SeqCst); + if test_nonce == u64::MAX { return Err(ProcessingError::ReceivedOutsideTestRun); } let test_msg = self.test_processor.process_mixnet_message(raw_message)?; - if test_msg.ext.test_nonce != self.test_nonce.unwrap() { + if test_msg.ext.test_nonce != test_nonce { return Err(ProcessingError::NonMatchingNonce { received: test_msg.ext.test_nonce, - expected: self.test_nonce.unwrap(), + expected: test_nonce, }); } - self.received_packets.push(test_msg); + self.shared.received_packets.lock().await.push(test_msg); Ok(()) } fn on_received_ack(&mut self, raw_ack: Vec) -> Result<(), ProcessingError> { // if the nonce is none it means the packet was received during the 'waiting' for the // next test run - if self.test_nonce.is_none() { + let test_nonce = self.shared.test_nonce.load(Ordering::SeqCst); + if test_nonce == u64::MAX { return Err(ProcessingError::ReceivedOutsideTestRun); } @@ -92,11 +114,11 @@ impl ReceivedProcessorInner { Ok(()) } - fn on_received(&mut self, messages: GatewayMessages) { + async fn on_received(&mut self, messages: GatewayMessages) { match messages { GatewayMessages::Data(data_msgs) => { for raw in data_msgs { - if let Err(err) = self.on_received_data(raw) { + if let Err(err) = self.on_received_data(raw).await { warn!(target: "Monitor", "failed to process received gateway message: {err}") } } @@ -110,137 +132,64 @@ impl ReceivedProcessorInner { } } } - - fn finish_run(&mut self) -> Vec { - self.test_nonce = None; - mem::take(&mut self.received_packets) - } } -pub(crate) struct ReceivedProcessor { - permit_changer: Option>, - inner: Arc>>, +pub struct ReceivedProcessor { + shared: SharedProcessorData, + receiver_task: Option>, } -impl ReceivedProcessor { +impl ReceivedProcessor +where + R: MessageReceiver, +{ pub(crate) fn new( packets_receiver: ReceivedProcessorReceiver, client_encryption_keypair: Arc, ack_key: Arc, ) -> Self { - let inner: Arc>> = - Arc::new(Mutex::new(ReceivedProcessorInner { - test_nonce: None, - packets_receiver, - test_processor: TestPacketProcessor::new(client_encryption_keypair, ack_key), - received_packets: Vec::new(), - })); + let shared_data = SharedProcessorData { + inner: Arc::new(SharedProcessorDataInner { + test_nonce: AtomicU64::new(u64::MAX), + received_packets: Default::default(), + }), + }; ReceivedProcessor { - permit_changer: None, - inner, + shared: shared_data.clone(), + receiver_task: Some(ReceiverTask { + shared: shared_data, + packets_receiver, + test_processor: TestPacketProcessor::new(client_encryption_keypair, ack_key), + }), } } - pub(crate) fn start_receiving(&mut self) { - let inner = Arc::clone(&self.inner); - - // TODO: perhaps it should be using 0 size instead? - let (permit_sender, mut permit_receiver) = mpsc::channel(1); - self.permit_changer = Some(permit_sender); + pub(crate) fn start_receiving(&mut self) + where + R: Sync + Send + 'static, + { + let mut receiver_task = self + .receiver_task + .take() + .expect("network monitor has already started the receiver task!"); tokio::spawn(async move { - while let Some(permit) = wait_for_permit(&mut permit_receiver, &inner).await { - receive_or_release_permit(&mut permit_receiver, permit).await; - } - - async fn receive_or_release_permit( - permit_receiver: &mut mpsc::Receiver, - mut inner: MutexGuard<'_, ReceivedProcessorInner>, - ) { - loop { - tokio::select! { - permit_receiver = permit_receiver.next() => match permit_receiver { - Some(LockPermit::Release) => return, - Some(LockPermit::Free) => error!("somehow we got notification that the lock is free to take while we already hold it!"), - None => return, - }, - messages = inner.packets_receiver.next() => match messages { - Some(messages) => inner.on_received(messages), - None => return, - }, - } - } - } - - // // this lint really looks like a false positive because when lifetimes are elided, - // // the compiler can't figure out appropriate lifetime bounds - // #[allow(clippy::needless_lifetimes)] - async fn wait_for_permit<'a: 'b, 'b, P: MessageReceiver>( - permit_receiver: &'b mut mpsc::Receiver, - inner: &'a Mutex>, - ) -> Option>> { - loop { - match permit_receiver.next().await { - // we should only ever get this on the very first run - Some(LockPermit::Release) => debug!( - "somehow got request to drop our lock permit while we do not hold it!" - ), - Some(LockPermit::Free) => return Some(inner.lock().await), - None => return None, - } - } + while let Some(messages) = receiver_task.packets_receiver.next().await { + receiver_task.on_received(messages).await } }); } - pub(super) async fn set_route_test_nonce(&mut self) { - self.set_new_test_nonce(ROUTE_TESTING_TEST_NONCE).await + pub(super) fn set_route_test_nonce(&self) { + self.set_new_test_nonce(ROUTE_TESTING_TEST_NONCE) } - pub(super) async fn set_new_test_nonce(&mut self, test_nonce: u64) { - // ask for the lock back - self.permit_changer - .as_mut() - .expect("ReceivedProcessor hasn't started receiving!") - .send(LockPermit::Release) - .await - .expect("processing task has died!"); - let mut inner = self.inner.lock().await; - - inner.test_nonce = Some(test_nonce); - - // give the permit back - drop(inner); - self.permit_changer - .as_mut() - .expect("ReceivedProcessor hasn't started receiving!") - .send(LockPermit::Free) - .await - .expect("processing task has died!"); + pub(super) fn set_new_test_nonce(&self, test_nonce: u64) { + self.shared.test_nonce.store(test_nonce, Ordering::SeqCst); } - pub(super) async fn return_received(&mut self) -> Vec { - // ask for the lock back - self.permit_changer - .as_mut() - .expect("ReceivedProcessor hasn't started receiving!") - .send(LockPermit::Release) - .await - .expect("processing task has died!"); - let mut inner = self.inner.lock().await; - - let received = inner.finish_run(); - - // give the permit back - drop(inner); - self.permit_changer - .as_mut() - .expect("ReceivedProcessor hasn't started receiving!") - .send(LockPermit::Free) - .await - .expect("processing task has died!"); - - received + pub(super) async fn return_received(&self) -> Vec { + self.shared.reset_run_information().await } } diff --git a/nym-api/src/network_monitor/monitor/receiver.rs b/nym-api/src/network_monitor/monitor/receiver.rs index d6dcbf4b855..91a8c678110 100644 --- a/nym-api/src/network_monitor/monitor/receiver.rs +++ b/nym-api/src/network_monitor/monitor/receiver.rs @@ -14,7 +14,7 @@ pub(crate) type GatewayClientUpdateSender = mpsc::UnboundedSender; pub(crate) enum GatewayClientUpdate { - Failure(identity::PublicKey), + Disconnect(identity::PublicKey), New( identity::PublicKey, (MixnetMessageReceiver, AcknowledgementReceiver), @@ -45,8 +45,8 @@ impl PacketReceiver { self.gateways_reader .add_receivers(id, message_receiver, ack_receiver); } - GatewayClientUpdate::Failure(id) => { - self.gateways_reader.remove_receivers(&id.to_string()); + GatewayClientUpdate::Disconnect(id) => { + self.gateways_reader.remove_receivers(id); } } } diff --git a/nym-api/src/network_monitor/monitor/sender.rs b/nym-api/src/network_monitor/monitor/sender.rs index ac69a0bc45b..bc4d13ef4a1 100644 --- a/nym-api/src/network_monitor/monitor/sender.rs +++ b/nym-api/src/network_monitor/monitor/sender.rs @@ -1,35 +1,34 @@ // Copyright 2021-2023 - Nym Technologies SA // SPDX-License-Identifier: GPL-3.0-only -use crate::network_monitor::monitor::gateway_clients_cache::{ - ActiveGatewayClients, GatewayClientHandle, -}; -use crate::network_monitor::monitor::gateways_pinger::GatewayPinger; +use crate::network_monitor::monitor::gateway_client_handle::GatewayClientHandle; use crate::network_monitor::monitor::receiver::{GatewayClientUpdate, GatewayClientUpdateSender}; +use crate::support::config::Config; use crate::support::nyxd; +use dashmap::DashMap; use futures::channel::mpsc; use futures::stream::{self, FuturesUnordered, StreamExt}; use futures::task::Context; use futures::{Future, Stream}; use nym_bandwidth_controller::BandwidthController; use nym_credential_storage::persistent_storage::PersistentStorage; -use nym_crypto::asymmetric::identity::{self, PUBLIC_KEY_LENGTH}; +use nym_crypto::asymmetric::ed25519; use nym_gateway_client::client::config::GatewayClientConfig; use nym_gateway_client::client::GatewayConfig; use nym_gateway_client::error::GatewayClientError; use nym_gateway_client::{ - AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver, PacketRouter, + AcknowledgementReceiver, GatewayClient, MixnetMessageReceiver, PacketRouter, SharedGatewayKey, }; use nym_sphinx::forwarding::packet::MixPacket; -use nym_task::TaskClient; use pin_project::pin_project; +use sqlx::__rt::timeout; use std::mem; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; use std::time::Duration; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; const TIME_CHUNK_SIZE: Duration = Duration::from_millis(50); @@ -39,7 +38,7 @@ pub(crate) struct GatewayPackets { pub(crate) clients_address: String, /// Public key of the target gateway. - pub(crate) pub_key: identity::PublicKey, + pub(crate) pub_key: ed25519::PublicKey, /// All the packets that are going to get sent to the gateway. pub(crate) packets: Vec, @@ -48,7 +47,7 @@ pub(crate) struct GatewayPackets { impl GatewayPackets { pub(crate) fn new( clients_address: String, - pub_key: identity::PublicKey, + pub_key: ed25519::PublicKey, packets: Vec, ) -> Self { GatewayPackets { @@ -66,7 +65,7 @@ impl GatewayPackets { } } - pub(crate) fn empty(clients_address: String, pub_key: identity::PublicKey) -> Self { + pub(crate) fn empty(clients_address: String, pub_key: ed25519::PublicKey) -> Self { GatewayPackets { clients_address, pub_key, @@ -89,96 +88,63 @@ impl GatewayPackets { // struct consisting of all external data required to construct a fresh gateway client struct FreshGatewayClientData { gateways_status_updater: GatewayClientUpdateSender, - local_identity: Arc, + local_identity: Arc, gateway_response_timeout: Duration, bandwidth_controller: BandwidthController, disabled_credentials_mode: bool, + gateways_key_cache: DashMap>, } impl FreshGatewayClientData { - fn notify_connection_failure( - self: Arc, - raw_gateway_id: [u8; PUBLIC_KEY_LENGTH], - ) { - // if this unwrap failed it means something extremely weird is going on - // and we got some solar flare bitflip type of corruption - let gateway_key = identity::PublicKey::from_bytes(&raw_gateway_id) - .expect("failed to recover gateways public key from valid bytes"); - - // remove the gateway listener channels - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::Failure(gateway_key)) - .expect("packet receiver seems to have died!"); - } - fn notify_new_connection( self: Arc, - gateway_id: identity::PublicKey, - gateway_channels: Option<(MixnetMessageReceiver, AcknowledgementReceiver)>, + gateway_id: ed25519::PublicKey, + gateway_channels: (MixnetMessageReceiver, AcknowledgementReceiver), ) { - self.gateways_status_updater - .unbounded_send(GatewayClientUpdate::New( - gateway_id, - gateway_channels.expect("we created a new client, yet the channels are a None!"), - )) - .expect("packet receiver seems to have died!") + if self + .gateways_status_updater + .unbounded_send(GatewayClientUpdate::New(gateway_id, gateway_channels)) + .is_err() + { + error!("packet receiver seems to have died!") + } } } pub(crate) struct PacketSender { - // TODO: this has a potential long-term issue. If we keep those clients cached between runs, - // malicious gateways could figure out which traffic comes from the network monitor and always - // forward that traffic while dropping the rest. However, at the current stage such sophisticated - // behaviour is unlikely. - active_gateway_clients: ActiveGatewayClients, - fresh_gateway_client_data: Arc, gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, max_concurrent_clients: usize, max_sending_rate: usize, } impl PacketSender { - // at this point I'm not entirely sure how to deal with this warning without - // some considerable refactoring - #[allow(clippy::too_many_arguments)] pub(crate) fn new( + config: &Config, gateways_status_updater: GatewayClientUpdateSender, - local_identity: Arc, - gateway_response_timeout: Duration, - gateway_connection_timeout: Duration, - max_concurrent_clients: usize, - max_sending_rate: usize, + local_identity: Arc, bandwidth_controller: BandwidthController, - disabled_credentials_mode: bool, ) -> Self { PacketSender { - active_gateway_clients: ActiveGatewayClients::new(), fresh_gateway_client_data: Arc::new(FreshGatewayClientData { gateways_status_updater, local_identity, - gateway_response_timeout, + gateway_response_timeout: config.network_monitor.debug.gateway_response_timeout, bandwidth_controller, - disabled_credentials_mode, + disabled_credentials_mode: config.network_monitor.debug.disabled_credentials_mode, + gateways_key_cache: Default::default(), }), - gateway_connection_timeout, - max_concurrent_clients, - max_sending_rate, + gateway_connection_timeout: config.network_monitor.debug.gateway_connection_timeout, + gateway_bandwidth_claim_timeout: config + .network_monitor + .debug + .gateway_bandwidth_claim_timeout, + max_concurrent_clients: config.network_monitor.debug.max_concurrent_gateway_clients, + max_sending_rate: config.network_monitor.debug.gateway_sending_rate, } } - pub(crate) fn spawn_gateways_pinger(&self, pinging_interval: Duration, shutdown: TaskClient) { - let gateway_pinger = GatewayPinger::new( - self.active_gateway_clients.clone(), - self.fresh_gateway_client_data - .gateways_status_updater - .clone(), - pinging_interval, - ); - - tokio::spawn(async move { gateway_pinger.run(shutdown).await }); - } - fn new_gateway_client_handle( config: GatewayConfig, fresh_gateway_client_data: &FreshGatewayClientData, @@ -190,8 +156,6 @@ impl PacketSender { let task_client = nym_task::TaskClient::dummy().named(format!("gateway-{}", config.gateway_identity)); - // TODO: future optimization: if we're remaking client for a gateway to which we used to be connected in the past, - // use old shared keys let (message_sender, message_receiver) = mpsc::unbounded(); // currently we do not care about acks at all, but we must keep the channel alive @@ -204,13 +168,18 @@ impl PacketSender { task_client.fork("packet-router"), ); + let shared_keys = fresh_gateway_client_data + .gateways_key_cache + .get(&config.gateway_identity) + .map(|k| k.value().clone()); + let gateway_client = GatewayClient::new( GatewayClientConfig::new_default() .with_disabled_credentials_mode(fresh_gateway_client_data.disabled_credentials_mode) .with_response_timeout(fresh_gateway_client_data.gateway_response_timeout), config, Arc::clone(&fresh_gateway_client_data.local_identity), - None, + shared_keys, gateway_packet_router, Some(fresh_gateway_client_data.bandwidth_controller.clone()), nym_statistics_common::clients::ClientStatsSender::new(None), @@ -218,7 +187,10 @@ impl PacketSender { ); ( - GatewayClientHandle::new(gateway_client), + GatewayClientHandle::new( + gateway_client, + fresh_gateway_client_data.gateways_status_updater.clone(), + ), (message_receiver, ack_receiver), ) } @@ -228,11 +200,11 @@ impl PacketSender { mut mix_packets: Vec, max_sending_rate: usize, ) -> Result<(), GatewayClientError> { - let gateway_id = client.gateway_identity().to_base58_string(); + let gateway_id = client.gateway_identity(); + info!( - "Got {} packets to send to gateway {}", + "Got {} packets to send to gateway {gateway_id}", mix_packets.len(), - gateway_id ); if mix_packets.len() <= max_sending_rate { @@ -282,47 +254,79 @@ impl PacketSender { Ok(()) } + async fn client_startup( + connection_timeout: Duration, + bandwidth_claim_timeout: Duration, + client: &mut GatewayClientHandle, + ) -> Option> { + let gateway_identity = client.gateway_identity(); + + // 1. attempt to authenticate + let shared_key = + match timeout(connection_timeout, client.perform_initial_authentication()).await { + Err(_timeout) => { + warn!("timed out while trying to authenticate with gateway {gateway_identity}"); + return None; + } + Ok(Err(err)) => { + warn!("failed to authenticate with gateway ({gateway_identity}): {err}"); + return None; + } + Ok(Ok(res)) => res.initial_shared_key, + }; + + // 2. maybe claim bandwidth + match timeout(bandwidth_claim_timeout, client.claim_initial_bandwidth()).await { + Err(_timeout) => { + warn!("timed out while trying to claim initial bandwidth with gateway {gateway_identity}"); + return None; + } + Ok(Err(err)) => { + warn!("failed to claim bandwidth with gateway ({gateway_identity}): {err}"); + return None; + } + Ok(Ok(_)) => (), + } + + // 3. start internal listener + if let Err(err) = client.start_listening_for_mixnet_messages() { + warn!("failed to start message listener for {gateway_identity}: {err}"); + return None; + } + + Some(shared_key) + } + async fn create_new_gateway_client_handle_and_authenticate( config: GatewayConfig, fresh_gateway_client_data: &FreshGatewayClientData, gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, ) -> Option<( GatewayClientHandle, (MixnetMessageReceiver, AcknowledgementReceiver), )> { let gateway_identity = config.gateway_identity; - let (new_client, (message_receiver, ack_receiver)) = + let (mut new_client, (message_receiver, ack_receiver)) = Self::new_gateway_client_handle(config, fresh_gateway_client_data); - // Put this in timeout in case the gateway has incorrectly set their ulimit and our connection - // gets stuck in their TCP queue and just hangs on our end but does not terminate - // (an actual bug we experienced) - // - // Note: locking the client in unchecked manner is fine here as we just created the lock - // and it wasn't shared with anyone, therefore we're the only one holding reference to it - // and hence it's impossible to fail to obtain the permit. - let mut unlocked_client = new_client.lock_client_unchecked(); - - // SAFETY: it's fine to use the deprecated method here as we're creating brand new clients each time, - // and there's no need to deal with any key upgrades - #[allow(deprecated)] - match tokio::time::timeout( + match Self::client_startup( gateway_connection_timeout, - unlocked_client.get_mut_unchecked().authenticate_and_start(), + gateway_bandwidth_claim_timeout, + &mut new_client, ) .await { - Ok(Ok(_)) => { - drop(unlocked_client); + Some(shared_key) => { + fresh_gateway_client_data + .gateways_key_cache + .insert(gateway_identity, shared_key); Some((new_client, (message_receiver, ack_receiver))) } - Ok(Err(err)) => { - warn!("failed to authenticate with new gateway ({gateway_identity}): {err}",); - // we failed to create a client, can't do much here - None - } - Err(_) => { - warn!("timed out while trying to authenticate with new gateway {gateway_identity}",); + None => { + fresh_gateway_client_data + .gateways_key_cache + .remove(&gateway_identity); None } } @@ -345,123 +349,63 @@ impl PacketSender { // than just concurrently? async fn send_gateway_packets( gateway_connection_timeout: Duration, + gateway_bandwidth_claim_timeout: Duration, packets: GatewayPackets, fresh_gateway_client_data: Arc, - client: Option, max_sending_rate: usize, ) -> Option { - let existing_client = client.is_some(); - - // Note that in the worst case scenario we will only wait for a second or two to obtain the lock - // as other possibly entity holding the lock (the gateway pinger) is attempting to send - // the ping messages with a maximum timeout. - let (client, gateway_channels) = if let Some(client) = client { - if client.is_invalid().await { - warn!("Our existing client was invalid - two test runs happened back to back without cleanup"); - return None; - } - (client, None) - } else { - let (client, gateway_channels) = - Self::create_new_gateway_client_handle_and_authenticate( - packets.gateway_config(), - &fresh_gateway_client_data, - gateway_connection_timeout, - ) - .await?; - (client, Some(gateway_channels)) - }; + let (mut client, gateway_channels) = + Self::create_new_gateway_client_handle_and_authenticate( + packets.gateway_config(), + &fresh_gateway_client_data, + gateway_connection_timeout, + gateway_bandwidth_claim_timeout, + ) + .await?; + + let identity = client.gateway_identity(); let estimated_time = Duration::from_secs_f64(packets.packets.len() as f64 / max_sending_rate as f64); // give some leeway let timeout = estimated_time * 3; - let mut guard = client.lock_client().await; - let unwrapped_client = guard.get_mut_unchecked(); - - if let Err(err) = Self::check_remaining_bandwidth(unwrapped_client).await { - warn!( - "Failed to claim additional bandwidth for {} - {err}", - unwrapped_client.gateway_identity().to_base58_string(), - ); - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + if let Err(err) = Self::check_remaining_bandwidth(&mut client).await { + warn!("Failed to claim additional bandwidth for {identity}: {err}",); return None; } match tokio::time::timeout( timeout, - Self::attempt_to_send_packets(unwrapped_client, packets.packets, max_sending_rate), + Self::attempt_to_send_packets(&mut client, packets.packets, max_sending_rate), ) .await { Err(_timeout) => { - warn!( - "failed to send packets to {} - we timed out", - packets.pub_key.to_base58_string(), - ); - // if this was a fresh client, there's no need to do anything as it was never - // registered to get read - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + warn!("failed to send packets to {identity} - we timed out",); return None; } Ok(Err(err)) => { - warn!( - "failed to send packets to {} - {:?}", - packets.pub_key.to_base58_string(), - err - ); - // if this was a fresh client, there's no need to do anything as it was never - // registered to get read - if existing_client { - guard.invalidate(); - fresh_gateway_client_data.notify_connection_failure(packets.pub_key.to_bytes()); - } + warn!("failed to send packets to {identity}: {err}",); return None; } Ok(Ok(_)) => { - if !existing_client { - fresh_gateway_client_data - .notify_new_connection(packets.pub_key, gateway_channels); - } + fresh_gateway_client_data.notify_new_connection(identity, gateway_channels) } } - drop(guard); Some(client) } - // point of this is to basically insert handles of fresh clients that didn't exist here before - async fn merge_client_handles(&self, handles: Vec) { - let mut guard = self.active_gateway_clients.lock().await; - for handle in handles { - let raw_identity = handle.raw_identity(); - if let Some(existing) = guard.get(&raw_identity) { - if !handle.ptr_eq(existing) { - panic!("Duplicate client detected!") - } - - if handle.is_invalid().await { - guard.remove(&raw_identity); - } - } else { - // client never existed -> just insert it - guard.insert(raw_identity, handle); - } - } - } - - pub(super) async fn send_packets(&mut self, packets: Vec) { + pub(super) async fn send_packets( + &mut self, + packets: Vec, + ) -> Vec { // we know that each of the elements in the packets array will only ever access a single, // unique element from the existing clients let gateway_connection_timeout = self.gateway_connection_timeout; + let gateway_bandwidth_claim_timeout = self.gateway_bandwidth_claim_timeout; let max_concurrent_clients = if self.max_concurrent_clients > 0 { Some(self.max_concurrent_clients) } else { @@ -469,41 +413,22 @@ impl PacketSender { }; let max_sending_rate = self.max_sending_rate; - let guard = self.active_gateway_clients.lock().await; - // this clippy warning is a false positive as we cannot get rid of the collect by moving - // everything into a single iterator as it would require us to hold the lock the entire time - // and that is exactly what we want to avoid - #[allow(clippy::needless_collect)] let stream_data = packets .into_iter() - .map(|packets| { - let existing_client = guard - .get(&packets.pub_key.to_bytes()) - .map(|client| client.clone_data_pointer()); - ( - packets, - Arc::clone(&self.fresh_gateway_client_data), - existing_client, - ) - }) + .map(|packets| (packets, Arc::clone(&self.fresh_gateway_client_data))) .collect::>(); - // drop the guard immediately so that the other task (gateway pinger) would not need to wait until - // we're done sending packets (note: without this drop, we wouldn't be able to ping gateways that - // we're not interacting with right now) - drop(guard); - // can't chain it all nicely together as there's no adapter method defined on Stream directly // for ForEachConcurrentClientUse let used_clients = ForEachConcurrentClientUse::new( stream::iter(stream_data.into_iter()), max_concurrent_clients, - |(packets, fresh_data, client)| async move { + |(packets, fresh_data)| async move { Self::send_gateway_packets( gateway_connection_timeout, + gateway_bandwidth_claim_timeout, packets, fresh_data, - client, max_sending_rate, ) .await @@ -514,7 +439,8 @@ impl PacketSender { .flatten() .collect(); - self.merge_client_handles(used_clients).await; + // we need to keep clients alive until the test finishes so that we could keep receiving + used_clients } } diff --git a/nym-api/src/support/cli/run.rs b/nym-api/src/support/cli/run.rs index 9f3feb0d1f7..ca4c741be64 100644 --- a/nym-api/src/support/cli/run.rs +++ b/nym-api/src/support/cli/run.rs @@ -256,7 +256,7 @@ async fn start_nym_api_tasks_axum(config: &Config) -> anyhow::Result( - &config.network_monitor, + &config, &nym_contract_cache_state, described_nodes_cache.clone(), node_status_cache_state.clone(), diff --git a/nym-api/src/support/config/mod.rs b/nym-api/src/support/config/mod.rs index 5a07c223bee..e93a880a090 100644 --- a/nym-api/src/support/config/mod.rs +++ b/nym-api/src/support/config/mod.rs @@ -38,13 +38,12 @@ const DEFAULT_GATEWAY_SENDING_RATE: usize = 200; const DEFAULT_MAX_CONCURRENT_GATEWAY_CLIENTS: usize = 50; const DEFAULT_PACKET_DELIVERY_TIMEOUT: Duration = Duration::from_secs(20); const DEFAULT_MONITOR_RUN_INTERVAL: Duration = Duration::from_secs(15 * 60); -const DEFAULT_GATEWAY_PING_INTERVAL: Duration = Duration::from_secs(60); // Set this to a high value for now, so that we don't risk sporadic timeouts that might cause // bought bandwidth tokens to not have time to be spent; Once we remove the gateway from the // bandwidth bridging protocol, we can come back to a smaller timeout value const DEFAULT_GATEWAY_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5 * 60); -// This timeout value should be big enough to accommodate an initial bandwidth acquirement -const DEFAULT_GATEWAY_CONNECTION_TIMEOUT: Duration = Duration::from_secs(2 * 60); +const DEFAULT_GATEWAY_CONNECTION_TIMEOUT: Duration = Duration::from_secs(15); +const DEFAULT_GATEWAY_BANDWIDTH_CLAIM_TIMEOUT: Duration = Duration::from_secs(2 * 60); const DEFAULT_TEST_ROUTES: usize = 3; const DEFAULT_MINIMUM_TEST_ROUTES: usize = 1; @@ -323,11 +322,6 @@ pub struct NetworkMonitorDebug { #[serde(with = "humantime_serde")] pub run_interval: Duration, - /// Specifies interval at which we should be sending ping packets to all active gateways - /// in order to keep the websocket connections alive. - #[serde(with = "humantime_serde")] - pub gateway_ping_interval: Duration, - /// Specifies maximum rate (in packets per second) of test packets being sent to gateway pub gateway_sending_rate: usize, @@ -343,6 +337,10 @@ pub struct NetworkMonitorDebug { #[serde(with = "humantime_serde")] pub gateway_connection_timeout: Duration, + /// Maximum allowed time for the gateway bandwidth claim to get resolved + #[serde(with = "humantime_serde")] + pub gateway_bandwidth_claim_timeout: Duration, + /// Specifies the duration the monitor is going to wait after sending all measurement /// packets before declaring nodes unreachable. #[serde(with = "humantime_serde")] @@ -370,11 +368,11 @@ impl Default for NetworkMonitorDebug { min_gateway_reliability: DEFAULT_MIN_GATEWAY_RELIABILITY, disabled_credentials_mode: true, run_interval: DEFAULT_MONITOR_RUN_INTERVAL, - gateway_ping_interval: DEFAULT_GATEWAY_PING_INTERVAL, gateway_sending_rate: DEFAULT_GATEWAY_SENDING_RATE, max_concurrent_gateway_clients: DEFAULT_MAX_CONCURRENT_GATEWAY_CLIENTS, gateway_response_timeout: DEFAULT_GATEWAY_RESPONSE_TIMEOUT, gateway_connection_timeout: DEFAULT_GATEWAY_CONNECTION_TIMEOUT, + gateway_bandwidth_claim_timeout: DEFAULT_GATEWAY_BANDWIDTH_CLAIM_TIMEOUT, packet_delivery_timeout: DEFAULT_PACKET_DELIVERY_TIMEOUT, test_routes: DEFAULT_TEST_ROUTES, minimum_test_routes: DEFAULT_MINIMUM_TEST_ROUTES,