diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index 5a8999703f..a028d34129 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -100,9 +100,6 @@ impl SwarmDriver { self.record_metrics(Marker::FlaggedAsBadNode { flagged_by: &detected_by, }); - - // TODO: shall we terminate self after received such notifications - // from the majority close_group nodes around us? } else { error!("Received a bad_peer notification from {detected_by:?}, targeting {bad_peer:?}, which is not us."); } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index b7118d18a3..74ea3cbd46 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -424,8 +424,6 @@ impl Network { self.send_req_ignore_reply(request, *peer_id); } - filter_out_bad_nodes(&mut all_costs, record_address); - get_fees_from_store_cost_responses(all_costs) } @@ -1189,32 +1187,6 @@ fn get_fees_from_store_cost_responses( Ok((payee_id, payee.1, payee.2)) } -/// According to the bad_nodes list collected via quotes, -/// candidate that received majority votes from others shall be ignored. -fn filter_out_bad_nodes( - all_costs: &mut Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>, - record_address: NetworkAddress, -) { - let mut bad_node_votes: BTreeMap = BTreeMap::new(); - for (peer_addr, _reward_addr, quote) in all_costs.iter() { - let bad_nodes: Vec = match rmp_serde::from_slice("e.bad_nodes) { - Ok(bad_nodes) => bad_nodes, - Err(err) => { - error!("For record {record_address:?}, failed to recover bad_nodes from quote of {peer_addr:?} with error {err:?}"); - continue; - } - }; - for bad_node in bad_nodes { - let entry = bad_node_votes.entry(bad_node).or_default(); - *entry += 1; - } - } - all_costs.retain(|(peer_addr, _, _)| { - let entry = bad_node_votes.entry(peer_addr.clone()).or_default(); - *entry < close_group_majority() - }); -} - /// Get the value of the provided Quorum pub fn get_quorum_value(quorum: &Quorum) -> usize { match quorum { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index bff4266b6b..22ec7e9336 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -19,8 +19,7 @@ use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; use sn_networking::{ - close_group_majority, Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, - SwarmDriver, + Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver, }; use sn_protocol::{ error::Error as ProtocolError, @@ -36,10 +35,7 @@ use std::{ }, time::Duration, }; -use tokio::{ - sync::mpsc::Receiver, - task::{spawn, JoinHandle}, -}; +use tokio::{sync::mpsc::Receiver, task::spawn}; use sn_evm::EvmNetwork; @@ -47,10 +43,6 @@ use sn_evm::EvmNetwork; /// This is the max time it should take. Minimum interval at any node will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; -/// Interval to trigger bad node detection. -/// This is the max time it should take. Minimum interval at any node will be half this -const PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S: u64 = 600; - /// Max number of attempts that chunk proof verification will be carried out against certain target, /// before classifying peer as a bad peer. const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3; @@ -256,19 +248,6 @@ impl Node { let mut replication_interval = tokio::time::interval(replication_interval_time); let _ = replication_interval.tick().await; // first tick completes immediately - // use a random timeout to ensure not sync when transmit messages. - let bad_nodes_check_interval: u64 = rng.gen_range( - PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S / 2 - ..PERIODIC_BAD_NODE_DETECTION_INTERVAL_MAX_S, - ); - let bad_nodes_check_time = Duration::from_secs(bad_nodes_check_interval); - debug!("BadNodesCheck interval set to {bad_nodes_check_time:?}"); - - let mut bad_nodes_check_interval = tokio::time::interval(bad_nodes_check_time); - let _ = bad_nodes_check_interval.tick().await; // first tick completes immediately - - let mut rolling_index = 0; - let mut uptime_metrics_update_interval = tokio::time::interval(UPTIME_METRICS_UPDATE_INTERVAL); let _ = uptime_metrics_update_interval.tick().await; // first tick completes immediately @@ -310,24 +289,6 @@ impl Node { trace!("Periodic replication took {:?}", start.elapsed()); }); } - // runs every bad_nodes_check_time time - _ = bad_nodes_check_interval.tick() => { - let start = Instant::now(); - debug!("Periodic bad_nodes check triggered"); - let network = self.network().clone(); - self.record_metrics(Marker::IntervalBadNodesCheckTriggered); - - let _handle = spawn(async move { - Self::try_bad_nodes_check(network, rolling_index).await; - trace!("Periodic bad_nodes check took {:?}", start.elapsed()); - }); - - if rolling_index == 511 { - rolling_index = 0; - } else { - rolling_index += 1; - } - } _ = uptime_metrics_update_interval.tick() => { #[cfg(feature = "open-metrics")] if let Some(metrics_recorder) = self.metrics_recorder() { @@ -524,58 +485,6 @@ impl Node { ); } - // Query close_group peers to the target to verifify whether the target is bad_node - // Returns true when it is a bad_node, otherwise false - async fn close_nodes_shunning_peer(network: &Network, peer_id: PeerId) -> bool { - // using `client` to exclude self - let closest_peers = match network - .client_get_all_close_peers_in_range_or_close_group(&NetworkAddress::from_peer(peer_id)) - .await - { - Ok(peers) => peers, - Err(err) => { - error!("Failed to finding closest_peers to {peer_id:?} client_get_closest_peers errored: {err:?}"); - return false; - } - }; - - // Query the peer status from the close_group to the peer, - // raise alert as long as getting alerts from majority(3) of the close_group. - let req = Request::Query(Query::CheckNodeInProblem(NetworkAddress::from_peer( - peer_id, - ))); - let mut handles = Vec::new(); - for peer in closest_peers { - let req_copy = req.clone(); - let network_copy = network.clone(); - let handle: JoinHandle = spawn(async move { - debug!("getting node_status of {peer_id:?} from {peer:?}"); - if let Ok(resp) = network_copy.send_request(req_copy, peer).await { - match resp { - Response::Query(QueryResponse::CheckNodeInProblem { - is_in_trouble, - .. - }) => is_in_trouble, - other => { - error!("Cannot get node status of {peer_id:?} from node {peer:?}, with response {other:?}"); - false - } - } - } else { - false - } - }); - handles.push(handle); - } - let results: Vec<_> = futures::future::join_all(handles).await; - - results - .iter() - .filter(|r| *r.as_ref().unwrap_or(&false)) - .count() - >= close_group_majority() - } - // Handle the response that was not awaited at the call site fn handle_response(&self, response: Response) -> Result<()> { match response { @@ -711,62 +620,6 @@ impl Node { }; Response::Query(resp) } - - async fn try_bad_nodes_check(network: Network, rolling_index: usize) { - if let Ok(kbuckets) = network.get_kbuckets().await { - let total_peers: usize = kbuckets.values().map(|peers| peers.len()).sum(); - if total_peers > 100 { - // The `rolling_index` is rotating among 0-511, - // meanwhile the returned `kbuckets` only holding non-empty buckets. - // Hence using the `remainder` calculate to achieve a rolling check. - // A further `remainder of 2` is used to allow `upper or lower part` - // index within a bucket, to further reduce the concurrent queries. - let mut bucket_index = (rolling_index / 2) % kbuckets.len(); - let part_index = rolling_index % 2; - - for (distance, peers) in kbuckets.iter() { - if bucket_index == 0 { - let peers_to_query = if peers.len() > 10 { - let split_index = peers.len() / 2; - let (left, right) = peers.split_at(split_index); - if part_index == 0 { - left - } else { - right - } - } else { - peers - }; - - debug!( - "Undertake bad_nodes check against bucket {distance} having {} peers, {} candidates to be queried", - peers.len(), peers_to_query.len() - ); - for peer_id in peers_to_query { - let peer_id_clone = *peer_id; - let network_clone = network.clone(); - let _handle = spawn(async move { - let is_bad = - Self::close_nodes_shunning_peer(&network_clone, peer_id_clone) - .await; - if is_bad { - network_clone.record_node_issues( - peer_id_clone, - NodeIssue::CloseNodesShunning, - ); - } - }); - } - break; - } else { - bucket_index = bucket_index.saturating_sub(1); - } - } - } else { - debug!("Skip bad_nodes check as not having too many nodes in RT"); - } - } - } } async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { diff --git a/sn_transfers/src/wallet/data_payments.rs b/sn_transfers/src/wallet/data_payments.rs index b200ff4c97..7ff31f065a 100644 --- a/sn_transfers/src/wallet/data_payments.rs +++ b/sn_transfers/src/wallet/data_payments.rs @@ -15,6 +15,7 @@ use xor_name::XorName; /// The time in seconds that a quote is valid for pub const QUOTE_EXPIRATION_SECS: u64 = 3600; +#[allow(dead_code)] /// The margin allowed for live_time const LIVE_TIME_MARGIN: u64 = 10; @@ -186,13 +187,19 @@ impl PaymentQuote { true } - /// Returns true) if the quote has not yet expired + /// Returns true if the quote has not yet expired pub fn has_expired(&self) -> bool { let now = std::time::SystemTime::now(); let dur_s = match now.duration_since(self.timestamp) { Ok(dur) => dur.as_secs(), - Err(_) => return true, + Err(err) => { + info!( + "Cann't deduce elapsed time from {:?} with error {err:?}", + self.timestamp + ); + return true; + } }; dur_s > QUOTE_EXPIRATION_SECS } @@ -217,60 +224,62 @@ impl PaymentQuote { /// Check against a new quote, verify whether it is a valid one from self perspective. /// Returns `true` to flag the `other` quote is valid, from self perspective. - pub fn historical_verify(&self, other: &Self) -> bool { - // There is a chance that an old quote got used later than a new quote - let self_is_newer = self.is_newer_than(other); - let (old_quote, new_quote) = if self_is_newer { - (other, self) - } else { - (self, other) - }; - - if new_quote.quoting_metrics.live_time < old_quote.quoting_metrics.live_time { - info!("Claimed live_time out of sequence"); - return false; - } - - let old_elapsed = if let Ok(elapsed) = old_quote.timestamp.elapsed() { - elapsed - } else { - info!("timestamp failure"); - return false; - }; - let new_elapsed = if let Ok(elapsed) = new_quote.timestamp.elapsed() { - elapsed - } else { - info!("timestamp failure"); - return false; - }; - - let time_diff = old_elapsed.as_secs().saturating_sub(new_elapsed.as_secs()); - let live_time_diff = - new_quote.quoting_metrics.live_time - old_quote.quoting_metrics.live_time; - // In theory, these two shall match, give it a LIVE_TIME_MARGIN to avoid system glitch - if live_time_diff > time_diff + LIVE_TIME_MARGIN { - info!("claimed live_time out of sync with the timestamp"); - return false; - } - - // There could be pruning to be undertaken, also the close range keeps changing as well. - // Hence `close_records_stored` could be growing or shrinking. - // Currently not to carry out check on it, just logging to observe the trend. - debug!( - "The new quote has {} close records stored, meanwhile old one has {}.", - new_quote.quoting_metrics.close_records_stored, - old_quote.quoting_metrics.close_records_stored - ); - - // TODO: Double check if this applies, as this will prevent a node restart with same ID - if new_quote.quoting_metrics.received_payment_count - < old_quote.quoting_metrics.received_payment_count - { - info!("claimed received_payment_count out of sequence"); - return false; - } - + pub fn historical_verify(&self, _other: &Self) -> bool { + // TODO: Shall be refactored once new quote filtering scheme deployed true + // // There is a chance that an old quote got used later than a new quote + // let self_is_newer = self.is_newer_than(other); + // let (old_quote, new_quote) = if self_is_newer { + // (other, self) + // } else { + // (self, other) + // }; + + // if new_quote.quoting_metrics.live_time < old_quote.quoting_metrics.live_time { + // info!("Claimed live_time out of sequence"); + // return false; + // } + + // let old_elapsed = if let Ok(elapsed) = old_quote.timestamp.elapsed() { + // elapsed + // } else { + // info!("timestamp failure"); + // return false; + // }; + // let new_elapsed = if let Ok(elapsed) = new_quote.timestamp.elapsed() { + // elapsed + // } else { + // info!("timestamp failure"); + // return false; + // }; + + // let time_diff = old_elapsed.as_secs().saturating_sub(new_elapsed.as_secs()); + // let live_time_diff = + // new_quote.quoting_metrics.live_time - old_quote.quoting_metrics.live_time; + // // In theory, these two shall match, give it a LIVE_TIME_MARGIN to avoid system glitch + // if live_time_diff > time_diff + LIVE_TIME_MARGIN { + // info!("claimed live_time out of sync with the timestamp"); + // return false; + // } + + // // There could be pruning to be undertaken, also the close range keeps changing as well. + // // Hence `close_records_stored` could be growing or shrinking. + // // Currently not to carry out check on it, just logging to observe the trend. + // debug!( + // "The new quote has {} close records stored, meanwhile old one has {}.", + // new_quote.quoting_metrics.close_records_stored, + // old_quote.quoting_metrics.close_records_stored + // ); + + // // TODO: Double check if this applies, as this will prevent a node restart with same ID + // if new_quote.quoting_metrics.received_payment_count + // < old_quote.quoting_metrics.received_payment_count + // { + // info!("claimed received_payment_count out of sequence"); + // return false; + // } + + // true } } @@ -332,6 +341,7 @@ mod tests { assert!(!quote.check_is_signed_by_claimed_peer(false_peer)); } + #[ignore = "Shall be refactored once new quote filtering scheme deployed"] #[test] fn test_historical_verify() { let mut old_quote = PaymentQuote::zero();