Skip to content

Commit

Permalink
feat(node): use sampled network density for replicate candidates
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 26, 2024
1 parent 169f0bd commit 1c21f8c
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 72 deletions.
90 changes: 70 additions & 20 deletions sn_networking/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use crate::{
event::TerminateNodeReason,
log_markers::Marker,
multiaddr_pop_p2p, GetRecordCfg, GetRecordError, MsgResponder, NetworkEvent, CLOSE_GROUP_SIZE,
REPLICATION_PEERS_COUNT,
};
use libp2p::{
kad::{
Expand Down Expand Up @@ -64,6 +63,12 @@ pub enum LocalSwarmCmd {
GetKBuckets {
sender: oneshot::Sender<BTreeMap<u32, Vec<PeerId>>>,
},
/// Returns the replicate candidates in range.
/// In case the range is too narrow, returns at lease CLOSE_GROUP_SIZE peers.
GetReplicateCandidates {
data_addr: NetworkAddress,
sender: oneshot::Sender<Vec<PeerId>>,
},
// Returns up to K_VALUE peers from all the k-buckets from the local Routing Table.
// And our PeerId as well.
GetClosestKLocalPeers {
Expand Down Expand Up @@ -220,7 +225,9 @@ impl Debug for LocalSwarmCmd {
PrettyPrintRecordKey::from(key)
)
}

LocalSwarmCmd::GetReplicateCandidates { .. } => {
write!(f, "LocalSwarmCmd::GetReplicateCandidates")
}
LocalSwarmCmd::GetClosestKLocalPeers { .. } => {
write!(f, "LocalSwarmCmd::GetClosestKLocalPeers")
}
Expand Down Expand Up @@ -709,7 +716,7 @@ impl SwarmDriver {
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance_bucket()
.get_farthest_replication_distance()
{
self.replication_fetcher
.set_replication_distance_range(distance);
Expand Down Expand Up @@ -809,7 +816,10 @@ impl SwarmDriver {
cmd_string = "GetClosestKLocalPeers";
let _ = sender.send(self.get_closest_k_value_local_peers());
}

LocalSwarmCmd::GetReplicateCandidates { data_addr, sender } => {
cmd_string = "GetReplicateCandidates";
let _ = sender.send(self.get_replicate_candidates(&data_addr));
}
LocalSwarmCmd::GetSwarmLocalState(sender) => {
cmd_string = "GetSwarmLocalState";
let current_state = SwarmLocalState {
Expand Down Expand Up @@ -1006,22 +1016,8 @@ impl SwarmDriver {
// Store the current time as the last replication time
self.last_replication = Some(Instant::now());

// get closest peers from buckets, sorted by increasing distance to us
let our_peer_id = self.self_peer_id.into();
let closest_k_peers = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&our_peer_id)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage());

// Only grab the closest nodes within the REPLICATE_RANGE
let mut replicate_targets = closest_k_peers
.into_iter()
// add some leeway to allow for divergent knowledge
.take(REPLICATION_PEERS_COUNT)
.collect::<Vec<_>>();
let self_addr = NetworkAddress::from_peer(self.self_peer_id);
let mut replicate_targets = self.get_replicate_candidates(&self_addr);

let now = Instant::now();
self.replication_targets
Expand Down Expand Up @@ -1066,4 +1062,58 @@ impl SwarmDriver {

Ok(())
}

// Replies with in-range replicate candidates
// Fall back to CLOSE_GROUP_SIZE peers if range is too narrow.
// Note that:
// * For general replication, replicate candidates shall be the closest to self
// * For replicate fresh records, the replicate candidates shall be the closest to data
pub(crate) fn get_replicate_candidates(&mut self, target: &NetworkAddress) -> Vec<PeerId> {
// get closest peers from buckets, sorted by increasing distance to the target
let kbucket_key = target.as_kbucket_key();
let closest_k_peers: Vec<PeerId> = self
.swarm
.behaviour_mut()
.kademlia
.get_closest_local_peers(&kbucket_key)
// Map KBucketKey<PeerId> to PeerId.
.map(|key| key.into_preimage())
.collect();

if let Some(responsible_range) = self
.swarm
.behaviour_mut()
.kademlia
.store_mut()
.get_farthest_replication_distance()
{
let peers_in_range = get_peers_in_range(&closest_k_peers, target, responsible_range);

if peers_in_range.len() >= CLOSE_GROUP_SIZE {
return peers_in_range;
}
}

// In case the range is too narrow, fall back to at least CLOSE_GROUP_SIZE peers.
closest_k_peers
.iter()
.take(CLOSE_GROUP_SIZE)
.cloned()
.collect()
}
}

/// Returns the nodes that within the defined distance.
fn get_peers_in_range(peers: &[PeerId], address: &NetworkAddress, range: Distance) -> Vec<PeerId> {
peers
.iter()
.filter_map(|peer_id| {
let distance = address.distance(&NetworkAddress::from_peer(*peer_id));
if distance <= range {
Some(*peer_id)
} else {
None
}
})
.collect()
}
14 changes: 10 additions & 4 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ use {
/// The type of quote for a selected payee.
pub type PayeeQuote = (PeerId, RewardsAddress, PaymentQuote);

/// The count of peers that will be considered as close to a record target,
/// that a replication of the record shall be sent/accepted to/by the peer.
pub const REPLICATION_PEERS_COUNT: usize = CLOSE_GROUP_SIZE + 2;

/// Majority of a given group (i.e. > 1/2).
#[inline]
pub const fn close_group_majority() -> usize {
Expand Down Expand Up @@ -269,6 +265,16 @@ impl Network {
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}

/// Returns the replicate candidates in range.
pub async fn get_replicate_candidates(&self, data_addr: NetworkAddress) -> Result<Vec<PeerId>> {
let (sender, receiver) = oneshot::channel();
self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender });

receiver
.await
.map_err(|_e| NetworkError::InternalMsgChannelDropped)
}

/// Get the Chunk existence proof from the close nodes to the provided chunk address.
/// This is to be used by client only to verify the success of the upload.
pub async fn verify_chunk_existence(
Expand Down
24 changes: 9 additions & 15 deletions sn_networking/src/record_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use sn_protocol::{
};
use std::{
borrow::Cow,
collections::{BTreeMap, HashMap, HashSet},
collections::{BTreeMap, HashMap},
fs,
path::{Path, PathBuf},
time::SystemTime,
Expand Down Expand Up @@ -727,7 +727,6 @@ impl NodeRecordStore {
/// Calculate the cost to store data for our current store state
pub(crate) fn store_cost(&self, key: &Key) -> (AttoTokens, QuotingMetrics) {
let records_stored = self.records.len();
let record_keys_as_hashset: HashSet<&Key> = self.records.keys().collect();

let live_time = if let Ok(elapsed) = self.timestamp.elapsed() {
elapsed.as_secs()
Expand All @@ -743,8 +742,7 @@ impl NodeRecordStore {
};

if let Some(distance_range) = self.responsible_distance_range {
let relevant_records =
self.get_records_within_distance_range(record_keys_as_hashset, distance_range);
let relevant_records = self.get_records_within_distance_range(distance_range);

quoting_metrics.close_records_stored = relevant_records;
} else {
Expand All @@ -770,11 +768,7 @@ impl NodeRecordStore {
}

/// Calculate how many records are stored within a distance range
pub fn get_records_within_distance_range(
&self,
_records: HashSet<&Key>,
range: Distance,
) -> usize {
pub fn get_records_within_distance_range(&self, range: Distance) -> usize {
let within_range = self
.records_by_distance
.range(..range)
Expand Down Expand Up @@ -1609,7 +1603,7 @@ mod tests {
}

#[tokio::test]
async fn get_records_within_bucket_range() -> eyre::Result<()> {
async fn get_records_within_range() -> eyre::Result<()> {
let max_records = 50;

let temp_dir = std::env::temp_dir();
Expand Down Expand Up @@ -1654,7 +1648,6 @@ mod tests {
publisher: None,
expires: None,
};
// The new entry is closer, it shall replace the existing one
assert!(store.put_verified(record, RecordType::Chunk).is_ok());
// We must also mark the record as stored (which would be triggered after the async write in nodes
// via NetworkEvent::CompletedWrite)
Expand All @@ -1671,7 +1664,7 @@ mod tests {
// get a record halfway through the list
let halfway_record_address = NetworkAddress::from_record_key(
stored_records
.get((stored_records.len() / 2) - 1)
.get(max_records / 2)
.wrap_err("Could not parse record store key")?,
);
// get the distance to this record from our local key
Expand All @@ -1680,13 +1673,14 @@ mod tests {
// must be plus one bucket from the halfway record
store.set_responsible_distance_range(distance);

let record_keys = store.records.keys().collect();
let records_in_range = store.get_records_within_distance_range(distance);

// check that the number of records returned is larger than half our records
// (ie, that we cover _at least_ all the records within our distance range)
assert!(
store.get_records_within_distance_range(record_keys, distance)
>= stored_records.len() / 2
records_in_range >= max_records / 2,
"Not enough records in range {records_in_range}/{}",
max_records / 2
);

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion sn_networking/src/record_store_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ impl UnifiedRecordStore {
}
}

pub(crate) fn get_farthest_replication_distance_bucket(&self) -> Option<Distance> {
pub(crate) fn get_farthest_replication_distance(&self) -> Option<Distance> {
match self {
Self::Client(_store) => {
warn!("Calling get_distance_range at Client. This should not happen");
Expand Down
36 changes: 10 additions & 26 deletions sn_node/src/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use libp2p::{
kad::{Quorum, Record, RecordKey},
PeerId,
};
use sn_networking::{sort_peers_by_address, GetRecordCfg, Network, REPLICATION_PEERS_COUNT};
use sn_networking::{GetRecordCfg, Network};
use sn_protocol::{
messages::{Cmd, Query, QueryResponse, Request, Response},
storage::RecordType,
Expand Down Expand Up @@ -146,46 +146,30 @@ impl Node {

debug!("Start replication of fresh record {pretty_key:?} from store");

// Already contains self_peer_id
let mut closest_k_peers = match network.get_closest_k_value_local_peers().await {
Ok(peers) => peers,
Err(err) => {
error!("Replicating fresh record {pretty_key:?} get_closest_local_peers errored: {err:?}");
return;
}
};

// remove ourself from these calculations
closest_k_peers.retain(|peer_id| peer_id != &network.peer_id());

let data_addr = NetworkAddress::from_record_key(&paid_key);

let sorted_based_on_addr = match sort_peers_by_address(
&closest_k_peers,
&data_addr,
REPLICATION_PEERS_COUNT,
) {
Ok(result) => result,
let replicate_candidates = match network
.get_replicate_candidates(data_addr.clone())
.await
{
Ok(peers) => peers,
Err(err) => {
error!(
"When replicating fresh record {pretty_key:?}, having error when sort {err:?}"
);
error!("Replicating fresh record {pretty_key:?} get_replicate_candidates errored: {err:?}");
return;
}
};

let our_peer_id = network.peer_id();
let our_address = NetworkAddress::from_peer(our_peer_id);
let keys = vec![(data_addr.clone(), record_type.clone())];
let keys = vec![(data_addr, record_type.clone())];

for peer_id in sorted_based_on_addr {
for peer_id in replicate_candidates {
debug!("Replicating fresh record {pretty_key:?} to {peer_id:?}");
let request = Request::Cmd(Cmd::Replicate {
holder: our_address.clone(),
keys: keys.clone(),
});

network.send_req_ignore_reply(request, *peer_id);
network.send_req_ignore_reply(request, peer_id);
}
debug!(
"Completed replicate fresh record {pretty_key:?} on store, in {:?}",
Expand Down
25 changes: 19 additions & 6 deletions sn_node/tests/verify_data_location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const VERIFICATION_ATTEMPTS: usize = 5;

/// Length of time to wait before re-verifying the data location
const REVERIFICATION_DELAY: Duration =
Duration::from_secs(sn_node::PERIODIC_REPLICATION_INTERVAL_MAX_S);
Duration::from_secs(sn_node::PERIODIC_REPLICATION_INTERVAL_MAX_S / 2);

// Default number of churns that should be performed. After each churn, we
// wait for VERIFICATION_DELAY time before verifying the data location.
Expand Down Expand Up @@ -301,14 +301,27 @@ async fn verify_location(all_peers: &Vec<PeerId>, node_rpc_addresses: &[SocketAd
}
}

if !failed.is_empty() {
println!("Verification failed after {VERIFICATION_ATTEMPTS} times");
error!("Verification failed after {VERIFICATION_ATTEMPTS} times");
Err(eyre!("Verification failed for: {failed:?}"))
} else {
// Replication only pick peer candidates closing to self.
// With responsible_range switched to `distance`, this makes some `edge` peers could
// be skipped for some `edge` records that it supposed to kept, but not picked as candidate.
// This will be a more noticable behaviour with small sized network, which could have sparsed
// and uneven distribution more likely, with the `network density sampling scheme`.
// Hence, allowing a small `glitch` for this test setup only.
if failed.is_empty() {
println!("All the Records have been verified!");
info!("All the Records have been verified!");
Ok(())
} else {
let just_missed_one = failed.values().all(|failed_peers| failed_peers.len() <= 1);
if just_missed_one {
println!("Still have one failed peer after {VERIFICATION_ATTEMPTS} times");
info!("Still have one failed peer after {VERIFICATION_ATTEMPTS} times");
Ok(())
} else {
println!("Verification failed after {VERIFICATION_ATTEMPTS} times");
error!("Verification failed after {VERIFICATION_ATTEMPTS} times");
Err(eyre!("Verification failed for: {failed:?}"))
}
}
}

Expand Down

0 comments on commit 1c21f8c

Please sign in to comment.