From 2554937c6ab1122a1aaea56af243ae2bbace5807 Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 15 Nov 2024 19:52:15 +0800 Subject: [PATCH] chore: remove bad_node group concensus --- sn_networking/src/event/request_response.rs | 3 - sn_networking/src/lib.rs | 28 ---- sn_node/src/node.rs | 151 +------------------- 3 files changed, 2 insertions(+), 180 deletions(-) diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index 5a8999703f..a028d34129 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -100,9 +100,6 @@ impl SwarmDriver { self.record_metrics(Marker::FlaggedAsBadNode { flagged_by: &detected_by, }); - - // TODO: shall we terminate self after received such notifications - // from the majority close_group nodes around us? } else { error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us."); } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..74ea3cbd46 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -424,8 +424,6 @@ impl Network { self.send_req_ignore_reply(request, *peer_id); } - filter_out_bad_nodes(&mut all_costs, record_address); - get_fees_from_store_cost_responses(all_costs) } @@ -1189,32 +1187,6 @@ fn get_fees_from_store_cost_responses( Ok((payee_id, payee.1, payee.2)) } -/// According to the bad_nodes list collected via quotes, -/// candidate that received majority votes from others shall be ignored. -fn filter_out_bad_nodes( - all_costs: &mut Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>, - record_address: NetworkAddress, -) { - let mut bad_node_votes: BTreeMap = BTreeMap::new(); - for (peer_addr, _reward_addr, quote) in all_costs.iter() { - let bad_nodes: Vec = match rmp_serde::from_slice("e.bad_nodes) { - Ok(bad_nodes) => bad_nodes, - Err(err) => { - error!("For record {record_address:?}, failed to recover bad_nodes from quote of {peer_addr:?} with error {err:?}"); - continue; - } - }; - for bad_node in bad_nodes { - let entry = bad_node_votes.entry(bad_node).or_default(); - *entry += 1; - } - } - all_costs.retain(|(peer_addr, _, _)| { - let entry = bad_node_votes.entry(peer_addr.clone()).or_default(); - *entry < close_group_majority() - }); -} - /// Get the value of the provided Quorum pub fn get_quorum_value(quorum: &Quorum) -> usize { match quorum { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..22ec7e9336 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -19,8 +19,7 @@ use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; use sn_networking::{ - close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, - SwarmDriver, + Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver, }; use sn_protocol::{ error::Error as ProtocolError, @@ -36,10 +35,7 @@ use std::{ }, time::Duration, }; -use tokio::{ - sync::mpsc::Receiver, - task::{spawn, JoinHandle}, -}; +use tokio::{sync::mpsc::Receiver, task::spawn}; use sn_evm::EvmNetwork; @@ -47,10 +43,6 @@ use sn_evm::EvmNetwork; /// This is the max time it should take. Minimum interval at any node will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; -/// Interval to trigger bad node detection. -/// This is the max time it should take. Minimum interval at any node will be half this -const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600; - /// Max number of attempts that chunk proof verification will be carried out against certain target, /// before classifying peer as a bad peer. const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3; @@ -256,19 +248,6 @@ impl Node { let mut replication_interval = tokio::time::interval(replication_interval_time); let _ = replication_interval.tick().await; // first tick completes immediately - // use a random timeout to ensure not sync when transmit messages. - let bad_nodes_check_interval: u64 = rng.gen_range( - PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S / 2 - ..PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S, - ); - let bad_nodes_check_time = Duration::from_secs(bad_nodes_check_interval); - debug!("BadNodesCheck interval set to {bad_nodes_check_time:?}"); - - let mut bad_nodes_check_interval = tokio::time::interval(bad_nodes_check_time); - let _ = bad_nodes_check_interval.tick().await; // first tick completes immediately - - let mut rolling_index = 0; - let mut uptime_metrics_update_interval = tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL); let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately @@ -310,24 +289,6 @@ impl Node { trace!("Periodic replication took {:?}", start.elapsed()); }); } - // runs every bad_nodes_check_time time - _ = bad_nodes_check_interval.tick() => { - let start = Instant::now(); - debug!("Periodic bad_nodes check triggered"); - let network = self.network().clone(); - self.record_metrics(Marker::IntervalBadNodesCheckTriggered); - - let _handle = spawn(async move { - Self::try_bad_nodes_check(network, rolling_index).await; - trace!("Periodic bad_nodes check took {:?}", start.elapsed()); - }); - - if rolling_index == 511 { - rolling_index = 0; - } else { - rolling_index += 1; - } - } _ = uptime_metrics_update_interval.tick() => { #[cfg(feature = "open-metrics")] if let Some(metrics_recorder) = self.metrics_recorder() { @@ -524,58 +485,6 @@ impl Node { ); } - // Query close_group peers to the target to verifify whether the target is bad_node - // Returns true when it is a bad_node, otherwise false - async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool { - // using `client` to exclude self - let closest_peers = match network - .client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id)) - .await - { - Ok(peers) => peers, - Err(err) => { - error!("Failed to finding closest_peers to {peer_id:?} client_get_closest_peers errored: {err:?}"); - return false; - } - }; - - // Query the peer status from the close_group to the peer, - // raise alert as long as getting alerts from majority(3) of the close_group. - let req = Request::Query(Query::CheckNodeInProblem(NetworkAddress::from_peer( - peer_id, - ))); - let mut handles = Vec::new(); - for peer in closest_peers { - let req_copy = req.clone(); - let network_copy = network.clone(); - let handle: JoinHandle = spawn(async move { - debug!("getting node_status of {peer_id:?} from {peer:?}"); - if let Ok(resp) = network_copy.send_request(req_copy, peer).await { - match resp { - Response::Query(QueryResponse::CheckNodeInProblem { - is_in_trouble, - .. - }) => is_in_trouble, - other => { - error!("Cannot get node status of {peer_id:?} from node {peer:?}, with response {other:?}"); - false - } - } - } else { - false - } - }); - handles.push(handle); - } - let results: Vec<_> = futures::future::join_all(handles).await; - - results - .iter() - .filter(|r| *r.as_ref().unwrap_or(&false)) - .count() - >= close_group_majority() - } - // Handle the response that was not awaited at the call site fn handle_response(&self, response: Response) -> Result<()> { match response { @@ -711,62 +620,6 @@ impl Node { }; Response::Query(resp) } - - async fn try_bad_nodes_check(network: Network, rolling_index: usize) { - if let Ok(kbuckets) = network.get_kbuckets().await { - let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum(); - if total_peers > 100 { - // The `rolling_index` is rotating among 0-511, - // meanwhile the returned `kbuckets` only holding non-empty buckets. - // Hence using the `remainder` calculate to achieve a rolling check. - // A further `remainder of 2` is used to allow `upper or lower part` - // index within a bucket, to further reduce the concurrent queries. - let mut bucket_index = (rolling_index / 2) % kbuckets.len(); - let part_index = rolling_index % 2; - - for (distance, peers) in kbuckets.iter() { - if bucket_index == 0 { - let peers_to_query = if peers.len() > 10 { - let split_index = peers.len() / 2; - let (left, right) = peers.split_at(split_index); - if part_index == 0 { - left - } else { - right - } - } else { - peers - }; - - debug!( - "Undertake bad_nodes check against bucket {distance} having {} peers, {} candidates to be queried", - peers.len(), peers_to_query.len() - ); - for peer_id in peers_to_query { - let peer_id_clone = *peer_id; - let network_clone = network.clone(); - let _handle = spawn(async move { - let is_bad = - Self::close_nodes_shunning_peer(&network_clone, peer_id_clone) - .await; - if is_bad { - network_clone.record_node_issues( - peer_id_clone, - NodeIssue::CloseNodesShunning, - ); - } - }); - } - break; - } else { - bucket_index = bucket_index.saturating_sub(1); - } - } - } else { - debug!("Skip bad_nodes check as not having too many nodes in RT"); - } - } - } } async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool {