Skip to content

Commit

Permalink
feat: network density sampling
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 22, 2024
1 parent 3e7ed69 commit 77227ed
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 2 deletions.
13 changes: 12 additions & 1 deletion sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
use libp2p::{
kad::{
store::{Error as StoreError, RecordStore},
Quorum, Record, RecordKey,
KBucketDistance as Distance, Quorum, Record, RecordKey,
},
Multiaddr, PeerId,
};
Expand Down Expand Up @@ -136,6 +136,10 @@ pub enum LocalSwarmCmd {
TriggerIntervalReplication,
/// Triggers unrelevant record cleanup
TriggerIrrelevantRecordCleanup,
/// Add a network density sample
AddNetworkDensitySample {
distance: Distance,
},
}

/// Commands to send to the Swarm
Expand Down Expand Up @@ -287,6 +291,9 @@ impl Debug for LocalSwarmCmd {
LocalSwarmCmd::TriggerIrrelevantRecordCleanup => {
write!(f, "LocalSwarmCmd::TriggerUnrelevantRecordCleanup")
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
write!(f, "LocalSwarmCmd::AddNetworkDensitySample({distance:?})")
}
}
}
}
Expand Down Expand Up @@ -868,6 +875,10 @@ impl SwarmDriver {
.store_mut()
.cleanup_irrelevant_records();
}
LocalSwarmCmd::AddNetworkDensitySample { distance } => {
cmd_string = "AddNetworkDensitySample";
self.network_density_samples.add(distance);
}
}

self.log_handling(cmd_string.to_string(), start.elapsed());
Expand Down
12 changes: 11 additions & 1 deletion sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::{
error::{NetworkError, Result},
event::{NetworkEvent, NodeEvent},
external_address::ExternalAddressManager,
fifo_register::FifoRegister,
log_markers::Marker,
multiaddr_pop_p2p,
network_discovery::NetworkDiscovery,
Expand Down Expand Up @@ -730,6 +731,7 @@ impl NetworkBuilder {
replication_targets: Default::default(),
last_replication: None,
last_connection_pruning_time: Instant::now(),
network_density_samples: FifoRegister::new(100),
};

let network = Network::new(
Expand Down Expand Up @@ -835,6 +837,8 @@ pub struct SwarmDriver {
pub(crate) last_replication: Option<Instant>,
/// when was the last outdated connection prunning undertaken.
pub(crate) last_connection_pruning_time: Instant,
/// FIFO cache for the network density samples
pub(crate) network_density_samples: FifoRegister,
}

impl SwarmDriver {
Expand Down Expand Up @@ -919,7 +923,13 @@ impl SwarmDriver {
let closest_k_peers = self.get_closest_k_value_local_peers();

if let Some(distance) = self.get_responsbile_range_estimate(&closest_k_peers) {
info!("Set responsible range to {distance}");
let network_density = self.network_density_samples.get_median();
let ilog2 = if let Some(distance) = network_density {
distance.ilog2()
} else {
None
};
info!("Set responsible range to {distance}, current sampled network density is {ilog2:?}({network_density:?})");
// set any new distance to farthest record in the store
self.swarm.behaviour_mut().kademlia.store_mut().set_distance_range(distance);
// the distance range within the replication_fetcher shall be in sync as well
Expand Down
62 changes: 62 additions & 0 deletions sn_networking/src/fifo_register.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright 2024 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use libp2p::kad::KBucketDistance as Distance;
use std::collections::VecDeque;

pub(crate) struct FifoRegister {
queue: VecDeque<Distance>,
max_length: usize,
cached_median: Option<Distance>, // Cache for the median result
is_dirty: bool, // Flag indicating if cache is valid
}

impl FifoRegister {
// Creates a new FifoRegister with a specified maximum length
pub(crate) fn new(max_length: usize) -> Self {
FifoRegister {
queue: VecDeque::with_capacity(max_length),
max_length,
cached_median: None,
is_dirty: true,
}
}

// Adds an entry to the register, removing excess elements if over max_length
pub(crate) fn add(&mut self, entry: Distance) {
if self.queue.len() == self.max_length {
self.queue.pop_front(); // Remove the oldest element to maintain length
}
self.queue.push_back(entry);

// Mark the cache as invalid since the data has changed
self.is_dirty = true;
}

// Returns the median of the maximum values of the entries
pub(crate) fn get_median(&mut self) -> Option<Distance> {
if self.queue.is_empty() {
return None; // No median if the queue is empty
}

if !self.is_dirty {
return self.cached_median; // Return cached result if it's valid
}

let mut max_values: Vec<Distance> = self.queue.iter().copied().collect();

max_values.sort_unstable();

let len = max_values.len();
// Cache the result and mark the cache as valid
self.cached_median = Some(max_values[len / 2]);
self.is_dirty = false;

self.cached_median
}
}
5 changes: 5 additions & 0 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ mod driver;
mod error;
mod event;
mod external_address;
mod fifo_register;
mod log_markers;
#[cfg(feature = "open-metrics")]
mod metrics;
Expand Down Expand Up @@ -1007,6 +1008,10 @@ impl Network {
self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup)
}

pub fn add_network_density_sample(&self, distance: KBucketDistance) {
self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance })
}

/// Helper to send NetworkSwarmCmd
fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) {
send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd);
Expand Down
33 changes: 33 additions & 0 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);
/// Interval to clean up unrelevant records
const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600);

/// Interval to carryout network density sampling
const NETWORK_DENSITY_SAMPLING_INTERVAL: Duration = Duration::from_secs(113);

/// Helper to build and run a Node
pub struct NodeBuilder {
identity_keypair: Keypair,
Expand Down Expand Up @@ -277,6 +280,10 @@ impl Node {
tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL);
let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately

let mut network_density_sampling_interval =
tokio::time::interval(NETWORK_DENSITY_SAMPLING_INTERVAL);
let _ = network_density_sampling_interval.tick().await; // first tick completes immediately

loop {
let peers_connected = &peers_connected;

Expand Down Expand Up @@ -341,6 +348,16 @@ impl Node {
Self::trigger_irrelevant_record_cleanup(network);
});
}
_ = network_density_sampling_interval.tick() => {
let start = Instant::now();
debug!("Periodic network density sampling triggered");
let network = self.network().clone();

let _handle = spawn(async move {
Self::network_density_sampling(network).await;
trace!("Periodic network density sampling took {:?}", start.elapsed());
});
}
}
}
});
Expand Down Expand Up @@ -712,6 +729,22 @@ impl Node {
Response::Query(resp)
}

async fn network_density_sampling(network: Network) {
for _ in 0..10 {
let target = NetworkAddress::from_peer(PeerId::random());
// Result is sorted and only return CLOSE_GROUP_SIZE entries
let peers = network.node_get_closest_peers(&target).await;
if let Ok(peers) = peers {
if peers.len() >= CLOSE_GROUP_SIZE {
// Calculate the distance to the farthest.
let distance =
target.distance(&NetworkAddress::from_peer(peers[CLOSE_GROUP_SIZE - 1]));
network.add_network_density_sample(distance);
}
}
}
}

async fn try_bad_nodes_check(network: Network, rolling_index: usize) {
if let Ok(kbuckets) = network.get_kbuckets().await {
let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();
Expand Down

0 comments on commit 77227ed

Please sign in to comment.