From 77227edb36092db9590b4ec0a1b93797d6e5658e Mon Sep 17 00:00:00 2001 From: qima Date: Sat, 23 Nov 2024 00:28:56 +0800 Subject: [PATCH] feat: network density sampling --- sn_networking/src/cmd.rs | 13 ++++++- sn_networking/src/driver.rs | 12 +++++- sn_networking/src/fifo_register.rs | 62 ++++++++++++++++++++++++++++++ sn_networking/src/lib.rs | 5 +++ sn_node/src/node.rs | 33 ++++++++++++++++ 5 files changed, 123 insertions(+), 2 deletions(-) create mode 100644 sn_networking/src/fifo_register.rs diff --git a/sn_networking/src/cmd.rs b/sn_networking/src/cmd.rs index 48372d8d17..1f36a81988 100644 --- a/sn_networking/src/cmd.rs +++ b/sn_networking/src/cmd.rs @@ -17,7 +17,7 @@ use crate::{ use libp2p::{ kad::{ store::{Error as StoreError, RecordStore}, - Quorum, Record, RecordKey, + KBucketDistance as Distance, Quorum, Record, RecordKey, }, Multiaddr, PeerId, }; @@ -136,6 +136,10 @@ pub enum LocalSwarmCmd { TriggerIntervalReplication, /// Triggers unrelevant record cleanup TriggerIrrelevantRecordCleanup, + /// Add a network density sample + AddNetworkDensitySample { + distance: Distance, + }, } /// Commands to send to the Swarm @@ -287,6 +291,9 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::TriggerIrrelevantRecordCleanup => { write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup") } + LocalSwarmCmd::AddNetworkDensitySample { distance } => { + write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})") + } } } } @@ -868,6 +875,10 @@ impl SwarmDriver { .store_mut() .cleanup_irrelevant_records(); } + LocalSwarmCmd::AddNetworkDensitySample { distance } => { + cmd_string = "AddNetworkDensitySample"; + self.network_density_samples.add(distance); + } } self.log_handling(cmd_string.to_string(), start.elapsed()); diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 43a5525ccf..92b0a2e21a 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -13,6 +13,7 @@ use crate::{ error::{NetworkError, Result}, event::{NetworkEvent, NodeEvent}, external_address::ExternalAddressManager, + fifo_register::FifoRegister, log_markers::Marker, multiaddr_pop_p2p, network_discovery::NetworkDiscovery, @@ -730,6 +731,7 @@ impl NetworkBuilder { replication_targets: Default::default(), last_replication: None, last_connection_pruning_time: Instant::now(), + network_density_samples: FifoRegister::new(100), }; let network = Network::new( @@ -835,6 +837,8 @@ pub struct SwarmDriver { pub(crate) last_replication: Option, /// when was the last outdated connection prunning undertaken. pub(crate) last_connection_pruning_time: Instant, + /// FIFO cache for the network density samples + pub(crate) network_density_samples: FifoRegister, } impl SwarmDriver { @@ -919,7 +923,13 @@ impl SwarmDriver { let closest_k_peers = self.get_closest_k_value_local_peers(); if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) { - info!("Set responsible range to {distance}"); + let network_density = self.network_density_samples.get_median(); + let ilog2 = if let Some(distance) = network_density { + distance.ilog2() + } else { + None + }; + info!("Set responsible range to {distance}, current sampled network density is {ilog2:?}({network_density:?})"); // set any new distance to farthest record in the store self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance); // the distance range within the replication_fetcher shall be in sync as well diff --git a/sn_networking/src/fifo_register.rs b/sn_networking/src/fifo_register.rs new file mode 100644 index 0000000000..c8ab96ba8c --- /dev/null +++ b/sn_networking/src/fifo_register.rs @@ -0,0 +1,62 @@ +// Copyright 2024 MaidSafe.net limited. +// +// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. +// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed +// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. Please review the Licences for the specific language governing +// permissions and limitations relating to use of the SAFE Network Software. + +use libp2p::kad::KBucketDistance as Distance; +use std::collections::VecDeque; + +pub(crate) struct FifoRegister { + queue: VecDeque, + max_length: usize, + cached_median: Option, // Cache for the median result + is_dirty: bool, // Flag indicating if cache is valid +} + +impl FifoRegister { + // Creates a new FifoRegister with a specified maximum length + pub(crate) fn new(max_length: usize) -> Self { + FifoRegister { + queue: VecDeque::with_capacity(max_length), + max_length, + cached_median: None, + is_dirty: true, + } + } + + // Adds an entry to the register, removing excess elements if over max_length + pub(crate) fn add(&mut self, entry: Distance) { + if self.queue.len() == self.max_length { + self.queue.pop_front(); // Remove the oldest element to maintain length + } + self.queue.push_back(entry); + + // Mark the cache as invalid since the data has changed + self.is_dirty = true; + } + + // Returns the median of the maximum values of the entries + pub(crate) fn get_median(&mut self) -> Option { + if self.queue.is_empty() { + return None; // No median if the queue is empty + } + + if !self.is_dirty { + return self.cached_median; // Return cached result if it's valid + } + + let mut max_values: Vec = self.queue.iter().copied().collect(); + + max_values.sort_unstable(); + + let len = max_values.len(); + // Cache the result and mark the cache as valid + self.cached_median = Some(max_values[len / 2]); + self.is_dirty = false; + + self.cached_median + } +} diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..55f7afb959 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -16,6 +16,7 @@ mod driver; mod error; mod event; mod external_address; +mod fifo_register; mod log_markers; #[cfg(feature = "open-metrics")] mod metrics; @@ -1007,6 +1008,10 @@ impl Network { self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup) } + pub fn add_network_density_sample(&self, distance: KBucketDistance) { + self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance }) + } + /// Helper to send NetworkSwarmCmd fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) { send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd); diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..e81496876b 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -64,6 +64,9 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); /// Interval to clean up unrelevant records const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); +/// Interval to carryout network density sampling +const NETWORK_DENSITY_SAMPLING_INTERVAL: Duration = Duration::from_secs(113); + /// Helper to build and run a Node pub struct NodeBuilder { identity_keypair: Keypair, @@ -277,6 +280,10 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + let mut network_density_sampling_interval = + tokio::time::interval(NETWORK_DENSITY_SAMPLING_INTERVAL); + let _ = network_density_sampling_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -341,6 +348,16 @@ impl Node { Self::trigger_irrelevant_record_cleanup(network); }); } + _ = network_density_sampling_interval.tick() => { + let start = Instant::now(); + debug!("Periodic network density sampling triggered"); + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::network_density_sampling(network).await; + trace!("Periodic network density sampling took {:?}", start.elapsed()); + }); + } } } }); @@ -712,6 +729,22 @@ impl Node { Response::Query(resp) } + async fn network_density_sampling(network: Network) { + for _ in 0..10 { + let target = NetworkAddress::from_peer(PeerId::random()); + // Result is sorted and only return CLOSE_GROUP_SIZE entries + let peers = network.node_get_closest_peers(&target).await; + if let Ok(peers) = peers { + if peers.len() >= CLOSE_GROUP_SIZE { + // Calculate the distance to the farthest. + let distance = + target.distance(&NetworkAddress::from_peer(peers[CLOSE_GROUP_SIZE - 1])); + network.add_network_density_sample(distance); + } + } + } + } + async fn try_bad_nodes_check(network: Network, rolling_index: usize) { if let Ok(kbuckets) = network.get_kbuckets().await { let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();