Skip to content

Commit

Permalink
Merge pull request #2445 from maqi/improve_upgradability
Browse files Browse the repository at this point in the history
Improve upgradability
  • Loading branch information
maqi authored Nov 22, 2024
2 parents 3e7ed69 + 41bad91 commit 199dc78
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 235 deletions.
3 changes: 0 additions & 3 deletions sn_networking/src/event/request_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ impl SwarmDriver {
self.record_metrics(Marker::FlaggedAsBadNode {
flagged_by: &detected_by,
});

// TODO: shall we terminate self after received such notifications
// from the majority close_group nodes around us?
} else {
error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us.");
}
Expand Down
28 changes: 0 additions & 28 deletions sn_networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,8 +424,6 @@ impl Network {
self.send_req_ignore_reply(request, *peer_id);
}

filter_out_bad_nodes(&mut all_costs, record_address);

get_fees_from_store_cost_responses(all_costs)
}

Expand Down Expand Up @@ -1189,32 +1187,6 @@ fn get_fees_from_store_cost_responses(
Ok((payee_id, payee.1, payee.2))
}

/// According to the bad_nodes list collected via quotes,
/// candidate that received majority votes from others shall be ignored.
fn filter_out_bad_nodes(
all_costs: &mut Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>,
record_address: NetworkAddress,
) {
let mut bad_node_votes: BTreeMap<NetworkAddress, usize> = BTreeMap::new();
for (peer_addr, _reward_addr, quote) in all_costs.iter() {
let bad_nodes: Vec<NetworkAddress> = match rmp_serde::from_slice(&quote.bad_nodes) {
Ok(bad_nodes) => bad_nodes,
Err(err) => {
error!("For record {record_address:?}, failed to recover bad_nodes from quote of {peer_addr:?} with error {err:?}");
continue;
}
};
for bad_node in bad_nodes {
let entry = bad_node_votes.entry(bad_node).or_default();
*entry += 1;
}
}
all_costs.retain(|(peer_addr, _, _)| {
let entry = bad_node_votes.entry(peer_addr.clone()).or_default();
*entry < close_group_majority()
});
}

/// Get the value of the provided Quorum
pub fn get_quorum_value(quorum: &Quorum) -> usize {
match quorum {
Expand Down
151 changes: 2 additions & 149 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ use sn_evm::{AttoTokens, RewardsAddress};
#[cfg(feature = "open-metrics")]
use sn_networking::MetricsRegistries;
use sn_networking::{
close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue,
SwarmDriver,
Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver,
};
use sn_protocol::{
error::Error as ProtocolError,
Expand All @@ -36,21 +35,14 @@ use std::{
},
time::Duration,
};
use tokio::{
sync::mpsc::Receiver,
task::{spawn, JoinHandle},
};
use tokio::{sync::mpsc::Receiver, task::spawn};

use sn_evm::EvmNetwork;

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any node will be half this
pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180;

/// Interval to trigger bad node detection.
/// This is the max time it should take. Minimum interval at any node will be half this
const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600;

/// Max number of attempts that chunk proof verification will be carried out against certain target,
/// before classifying peer as a bad peer.
const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3;
Expand Down Expand Up @@ -256,19 +248,6 @@ impl Node {
let mut replication_interval = tokio::time::interval(replication_interval_time);
let _ = replication_interval.tick().await; // first tick completes immediately

// use a random timeout to ensure not sync when transmit messages.
let bad_nodes_check_interval: u64 = rng.gen_range(
PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S / 2
..PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S,
);
let bad_nodes_check_time = Duration::from_secs(bad_nodes_check_interval);
debug!("BadNodesCheck interval set to {bad_nodes_check_time:?}");

let mut bad_nodes_check_interval = tokio::time::interval(bad_nodes_check_time);
let _ = bad_nodes_check_interval.tick().await; // first tick completes immediately

let mut rolling_index = 0;

let mut uptime_metrics_update_interval =
tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL);
let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately
Expand Down Expand Up @@ -310,24 +289,6 @@ impl Node {
trace!("Periodic replication took {:?}", start.elapsed());
});
}
// runs every bad_nodes_check_time time
_ = bad_nodes_check_interval.tick() => {
let start = Instant::now();
debug!("Periodic bad_nodes check triggered");
let network = self.network().clone();
self.record_metrics(Marker::IntervalBadNodesCheckTriggered);

let _handle = spawn(async move {
Self::try_bad_nodes_check(network, rolling_index).await;
trace!("Periodic bad_nodes check took {:?}", start.elapsed());
});

if rolling_index == 511 {
rolling_index = 0;
} else {
rolling_index += 1;
}
}
_ = uptime_metrics_update_interval.tick() => {
#[cfg(feature = "open-metrics")]
if let Some(metrics_recorder) = self.metrics_recorder() {
Expand Down Expand Up @@ -524,58 +485,6 @@ impl Node {
);
}

// Query close_group peers to the target to verifify whether the target is bad_node
// Returns true when it is a bad_node, otherwise false
async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool {
// using `client` to exclude self
let closest_peers = match network
.client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id))
.await
{
Ok(peers) => peers,
Err(err) => {
error!("Failed to finding closest_peers to {peer_id:?} client_get_closest_peers errored: {err:?}");
return false;
}
};

// Query the peer status from the close_group to the peer,
// raise alert as long as getting alerts from majority(3) of the close_group.
let req = Request::Query(Query::CheckNodeInProblem(NetworkAddress::from_peer(
peer_id,
)));
let mut handles = Vec::new();
for peer in closest_peers {
let req_copy = req.clone();
let network_copy = network.clone();
let handle: JoinHandle<bool> = spawn(async move {
debug!("getting node_status of {peer_id:?} from {peer:?}");
if let Ok(resp) = network_copy.send_request(req_copy, peer).await {
match resp {
Response::Query(QueryResponse::CheckNodeInProblem {
is_in_trouble,
..
}) => is_in_trouble,
other => {
error!("Cannot get node status of {peer_id:?} from node {peer:?}, with response {other:?}");
false
}
}
} else {
false
}
});
handles.push(handle);
}
let results: Vec<_> = futures::future::join_all(handles).await;

results
.iter()
.filter(|r| *r.as_ref().unwrap_or(&false))
.count()
>= close_group_majority()
}

// Handle the response that was not awaited at the call site
fn handle_response(&self, response: Response) -> Result<()> {
match response {
Expand Down Expand Up @@ -711,62 +620,6 @@ impl Node {
};
Response::Query(resp)
}

async fn try_bad_nodes_check(network: Network, rolling_index: usize) {
if let Ok(kbuckets) = network.get_kbuckets().await {
let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum();
if total_peers > 100 {
// The `rolling_index` is rotating among 0-511,
// meanwhile the returned `kbuckets` only holding non-empty buckets.
// Hence using the `remainder` calculate to achieve a rolling check.
// A further `remainder of 2` is used to allow `upper or lower part`
// index within a bucket, to further reduce the concurrent queries.
let mut bucket_index = (rolling_index / 2) % kbuckets.len();
let part_index = rolling_index % 2;

for (distance, peers) in kbuckets.iter() {
if bucket_index == 0 {
let peers_to_query = if peers.len() > 10 {
let split_index = peers.len() / 2;
let (left, right) = peers.split_at(split_index);
if part_index == 0 {
left
} else {
right
}
} else {
peers
};

debug!(
"Undertake bad_nodes check against bucket {distance} having {} peers, {} candidates to be queried",
peers.len(), peers_to_query.len()
);
for peer_id in peers_to_query {
let peer_id_clone = *peer_id;
let network_clone = network.clone();
let _handle = spawn(async move {
let is_bad =
Self::close_nodes_shunning_peer(&network_clone, peer_id_clone)
.await;
if is_bad {
network_clone.record_node_issues(
peer_id_clone,
NodeIssue::CloseNodesShunning,
);
}
});
}
break;
} else {
bucket_index = bucket_index.saturating_sub(1);
}
}
} else {
debug!("Skip bad_nodes check as not having too many nodes in RT");
}
}
}
}

async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool {
Expand Down
Loading

0 comments on commit 199dc78

Please sign in to comment.