From 07ce799a8020000362506ed1b8929dfbccc89b84 Mon Sep 17 00:00:00 2001 From: qima Date: Sat, 16 Nov 2024 01:27:04 +0800 Subject: [PATCH 1/4] chore!: improve ChunkProofVerification BREAKING CHANGE --- sn_networking/src/event/request_response.rs | 112 +------ sn_networking/src/lib.rs | 20 +- sn_node/src/node.rs | 307 ++++++++++++++------ sn_protocol/src/messages/query.rs | 15 +- sn_protocol/src/messages/response.rs | 7 +- 5 files changed, 252 insertions(+), 209 deletions(-) diff --git a/sn_networking/src/event/request_response.rs b/sn_networking/src/event/request_response.rs index a028d34129..7dacaa93e4 100644 --- a/sn_networking/src/event/request_response.rs +++ b/sn_networking/src/event/request_response.rs @@ -7,12 +7,10 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{ - cmd::NetworkSwarmCmd, log_markers::Marker, sort_peers_by_address, MsgResponder, NetworkError, - NetworkEvent, SwarmDriver, CLOSE_GROUP_SIZE, + cmd::NetworkSwarmCmd, log_markers::Marker, MsgResponder, NetworkError, NetworkEvent, + SwarmDriver, }; -use itertools::Itertools; use libp2p::request_response::{self, Message}; -use rand::{rngs::OsRng, thread_rng, Rng}; use sn_protocol::{ messages::{CmdResponse, Request, Response}, storage::RecordType, @@ -207,14 +205,10 @@ impl SwarmDriver { return; } - let more_than_one_key = incoming_keys.len() > 1; - - // On receive a replication_list from a close_group peer, we undertake two tasks: + // On receive a replication_list from a close_group peer, we undertake: // 1, For those keys that we don't have: // fetch them if close enough to us - // 2, For those keys that we have and supposed to be held by the sender as well: - // start chunk_proof check against a randomly selected chunk type record to the sender - // 3, For those spends that we have that differ in the hash, we fetch the other version + // 2, For those spends that we have that differ in the hash, we fetch the other version // and update our local copy. let all_keys = self .swarm @@ -230,103 +224,5 @@ impl SwarmDriver { } else { self.send_event(NetworkEvent::KeysToFetchForReplication(keys_to_fetch)); } - - // Only trigger chunk_proof check based every X% of the time - let mut rng = thread_rng(); - // 5% probability - if more_than_one_key && rng.gen_bool(0.05) { - self.verify_peer_storage(sender.clone()); - - // In additon to verify the sender, we also verify a random close node. - // This is to avoid malicious node escaping the check by never send a replication_list. - // With further reduced probability of 1% (5% * 20%) - if rng.gen_bool(0.2) { - let close_group_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(CLOSE_GROUP_SIZE) - .collect_vec(); - if close_group_peers.len() == CLOSE_GROUP_SIZE { - loop { - let index: usize = OsRng.gen_range(0..close_group_peers.len()); - let candidate = NetworkAddress::from_peer(close_group_peers[index]); - if sender != candidate { - self.verify_peer_storage(candidate); - break; - } - } - } - } - } - } - - /// Check among all chunk type records that we have, select those close to the peer, - /// and randomly pick one as the verification candidate. - fn verify_peer_storage(&mut self, peer: NetworkAddress) { - let mut closest_peers = self - .swarm - .behaviour_mut() - .kademlia - .get_closest_local_peers(&self.self_peer_id.into()) - .map(|peer| peer.into_preimage()) - .take(20) - .collect_vec(); - closest_peers.push(self.self_peer_id); - - let target_peer = if let Some(peer_id) = peer.as_peer_id() { - peer_id - } else { - error!("Target {peer:?} is not a valid PeerId"); - return; - }; - - let all_keys = self - .swarm - .behaviour_mut() - .kademlia - .store_mut() - .record_addresses_ref(); - - // Targeted chunk type record shall be expected within the close range from our perspective. - let mut verify_candidates: Vec = all_keys - .values() - .filter_map(|(addr, record_type)| { - if RecordType::Chunk == *record_type { - match sort_peers_by_address(&closest_peers, addr, CLOSE_GROUP_SIZE) { - Ok(close_group) => { - if close_group.contains(&&target_peer) { - Some(addr.clone()) - } else { - None - } - } - Err(err) => { - warn!("Could not get sorted peers for {addr:?} with error {err:?}"); - None - } - } - } else { - None - } - }) - .collect(); - - verify_candidates.sort_by_key(|a| peer.distance(a)); - - // To ensure the candidate must have to be held by the peer, - // we only carry out check when there are already certain amount of chunks uploaded - // AND choose candidate from certain reduced range. - if verify_candidates.len() > 50 { - let index: usize = OsRng.gen_range(0..(verify_candidates.len() / 2)); - self.send_event(NetworkEvent::ChunkProofVerification { - peer_id: target_peer, - key_to_verify: verify_candidates[index].clone(), - }); - } else { - debug!("No valid candidate to be checked against peer {peer:?}"); - } } } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index 74ea3cbd46..cd0875fa5e 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -269,6 +269,7 @@ impl Network { } /// Get the Chunk existence proof from the close nodes to the provided chunk address. + /// This is to be used by client only to verify the success of the upload. pub async fn verify_chunk_existence( &self, chunk_address: NetworkAddress, @@ -304,6 +305,7 @@ impl Network { let request = Request::Query(Query::GetChunkExistenceProof { key: chunk_address.clone(), nonce, + difficulty: 1, }); let responses = self .send_and_get_responses(&close_nodes, &request, true) @@ -311,14 +313,22 @@ impl Network { let n_verified = responses .into_iter() .filter_map(|(peer, resp)| { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = + if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) = resp { - if expected_proof.verify(&proof) { - debug!("Got a valid ChunkProof from {peer:?}"); - Some(()) + if proofs.is_empty() { + warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty."); + None + } else if let Ok(ref proof) = proofs[0].1 { + if expected_proof.verify(proof) { + debug!("Got a valid ChunkProof from {peer:?}"); + Some(()) + } else { + warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + None + } } else { - warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); + warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1); None } } else { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 22ec7e9336..09103d923a 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -13,17 +13,20 @@ use super::{ use crate::metrics::NodeMetricsRecorder; use crate::RunningNode; use bytes::Bytes; +use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; -use rand::{rngs::StdRng, thread_rng, Rng, SeedableRng}; +use rand::{ + rngs::{OsRng, StdRng}, + thread_rng, Rng, SeedableRng, +}; use sn_evm::{AttoTokens, RewardsAddress}; #[cfg(feature = "open-metrics")] use sn_networking::MetricsRegistries; -use sn_networking::{ - Instant, Network, NetworkBuilder, NetworkError, NetworkEvent, NodeIssue, SwarmDriver, -}; +use sn_networking::{Instant, Network, NetworkBuilder, NetworkEvent, NodeIssue, SwarmDriver}; use sn_protocol::{ error::Error as ProtocolError, - messages::{ChunkProof, CmdResponse, Query, QueryResponse, Request, Response}, + messages::{ChunkProof, CmdResponse, Nonce, Query, QueryResponse, Request, Response}, + storage::RecordType, NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ @@ -43,12 +46,9 @@ use sn_evm::EvmNetwork; /// This is the max time it should take. Minimum interval at any node will be half this pub const PERIODIC_REPLICATION_INTERVAL_MAX_S: u64 = 180; -/// Max number of attempts that chunk proof verification will be carried out against certain target, -/// before classifying peer as a bad peer. -const MAX_CHUNK_PROOF_VERIFY_ATTEMPTS: usize = 3; - -/// Interval between chunk proof verification to be retired against the same target. -const CHUNK_PROOF_VERIFY_RETRY_INTERVAL: Duration = Duration::from_secs(15); +/// Interval to trigger storage challenge. +/// This is the max time it should take. Minimum interval at any node will be half this +const STORE_CHALLENGE_INTERVAL_MAX_S: u64 = 7200; /// Interval to update the nodes uptime metric const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); @@ -256,6 +256,17 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately + // use a random neighbour storege challenge ticker to ensure + // neighbour do not carryout challenges at the same time + let storage_challenge_interval: u64 = + rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S); + let storage_challenge_interval_time = Duration::from_secs(storage_challenge_interval); + debug!("Storage challenge interval set to {storage_challenge_interval_time:?}"); + + let mut storage_challenge_interval = + tokio::time::interval(storage_challenge_interval_time); + let _ = storage_challenge_interval.tick().await; // first tick completes immediately + loop { let peers_connected = &peers_connected; @@ -302,6 +313,17 @@ impl Node { Self::trigger_irrelevant_record_cleanup(network); }); } + // runs every storage_challenge_interval time + _ = storage_challenge_interval.tick() => { + let start = Instant::now(); + debug!("Periodic storage challenge triggered"); + let network = self.network().clone(); + + let _handle = spawn(async move { + Self::storage_challenge(network).await; + trace!("Periodic storege challenge took {:?}", start.elapsed()); + }); + } } } }); @@ -452,28 +474,16 @@ impl Node { event_header = "ChunkProofVerification"; let network = self.network().clone(); - debug!("Going to verify chunk {key_to_verify} against peer {peer_id:?}"); + debug!("Going to carry out storage existence check against peer {peer_id:?}"); let _handle = spawn(async move { - // To avoid the peer is in the process of getting the copy via replication, - // repeat the verification for couple of times (in case of error). - // Only report the node as bad when ALL the verification attempts failed. - let mut attempts = 0; - while attempts < MAX_CHUNK_PROOF_VERIFY_ATTEMPTS { - if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { - return; - } - // Replication interval is 22s - 45s. - // Hence some re-try erquired to allow copies to spread out. - tokio::time::sleep(CHUNK_PROOF_VERIFY_RETRY_INTERVAL).await; - attempts += 1; + if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { + return; } - // Now ALL attempts failed, hence report the issue. - // Note this won't immediately trigger the node to be considered as BAD. - // Only the same peer accumulated three same issue - // within 5 mins will be considered as BAD. - // As the chunk_proof_check will be triggered every periodical replication, - // a low performed or cheaty peer will raise multiple issue alerts during it. + info!("Peer {peer_id:?} failed storage existence challenge."); + // TODO: shall challenge failure immediately triggers the node to be removed? + // or to lower connection score once feature introduced. + // If score falls too low, sever connection. network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); }); } @@ -584,21 +594,18 @@ impl Node { QueryResponse::GetReplicatedRecord(result) } - Query::GetChunkExistenceProof { key, nonce } => { - debug!("Got GetChunkExistenceProof for chunk {key:?}"); - - let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); - if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { - let proof = ChunkProof::new(&record.value, nonce); - debug!("Chunk proof for {key:?} is {proof:?}"); - result = Ok(proof) - } else { - debug!( - "Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - } + Query::GetChunkExistenceProof { + key, + nonce, + difficulty, + } => { + debug!( + "Got GetChunkExistenceProof targeting chunk {key:?} with {difficulty} answers." + ); - QueryResponse::GetChunkExistenceProof(result) + QueryResponse::GetChunkExistenceProof( + Self::respond_x_closest_chunk_proof(network, key, nonce, difficulty).await, + ) } Query::CheckNodeInProblem(target_address) => { debug!("Got CheckNodeInProblem for peer {target_address:?}"); @@ -620,61 +627,179 @@ impl Node { }; Response::Query(resp) } -} -async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let check_passed = if let Ok(Some(record)) = - network.get_local_record(&key.to_record_key()).await - { - let nonce = thread_rng().gen::(); - let expected_proof = ChunkProof::new(&record.value, nonce); - debug!("To verify peer {peer_id:?}, chunk_proof for {key:?} is {expected_proof:?}"); + async fn respond_x_closest_chunk_proof( + network: &Network, + key: NetworkAddress, + nonce: Nonce, + difficulty: usize, + ) -> Vec<(NetworkAddress, Result)> { + info!("Received StorageChallenge targeting {key:?} with difficulty level of {difficulty}."); + let mut results = vec![]; + if difficulty == 1 { + // Client checking existence of published chunk. + let mut result = Err(ProtocolError::ChunkDoesNotExist(key.clone())); + if let Ok(Some(record)) = network.get_local_record(&key.to_record_key()).await { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + result = Ok(proof) + } else { + debug!("Could not get ChunkProof for {key:?} as we don't have the record locally."); + } - let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), - nonce, - }); - let responses = network - .send_and_get_responses(&[peer_id], &request, true) - .await; - let n_verified = responses - .into_iter() - .filter_map(|(peer, resp)| received_valid_chunk_proof(key, &expected_proof, peer, resp)) - .count(); - - n_verified >= 1 - } else { - error!( - "To verify peer {peer_id:?} Could not get ChunkProof for {key:?} as we don't have the record locally." - ); - true - }; + results.push((key.clone(), result)); + } else { + let all_local_records = network.get_all_local_record_addresses().await; + + if let Ok(all_local_records) = all_local_records { + // Only `ChunkRecord`s can be consistantly verified + let mut all_chunk_addrs: Vec<_> = all_local_records + .iter() + .filter_map(|(addr, record_type)| { + if *record_type == RecordType::Chunk { + Some(addr.clone()) + } else { + None + } + }) + .collect(); - if !check_passed { - return false; - } + // Sort by distance and only take first X closest entries + all_chunk_addrs.sort_by_key(|addr| key.distance(addr)); - true -} + // TODO: this shall be deduced from resource usage dynamically + let workload_factor = std::cmp::min(difficulty, CLOSE_GROUP_SIZE); + + for addr in all_chunk_addrs.iter().take(workload_factor) { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await + { + let proof = ChunkProof::new(&record.value, nonce); + debug!("Chunk proof for {key:?} is {proof:?}"); + results.push((addr.clone(), Ok(proof))); + } + } + } + } -fn received_valid_chunk_proof( - key: &NetworkAddress, - expected_proof: &ChunkProof, - peer: PeerId, - resp: Result, -) -> Option<()> { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(Ok(proof)))) = resp { - if expected_proof.verify(&proof) { + info!( + "Respond with {} answers to the StorageChallenge targeting {key:?}.", + results.len() + ); + + results + } + + /// Check among all chunk type records that we have, + /// and randomly pick one as the verification candidate. + /// This will challenge all closest peers at once. + async fn storage_challenge(network: Network) { + let closest_peers: Vec = + if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await { + closest_peers + .into_iter() + .take(CLOSE_GROUP_SIZE) + .collect_vec() + } else { + error!("Cannot get local neighbours"); + return; + }; + if closest_peers.len() < CLOSE_GROUP_SIZE { debug!( - "Got a valid ChunkProof of {key:?} from {peer:?}, during peer chunk proof check." + "Not enough neighbours ({}/{}) to carry out storage challenge.", + closest_peers.len(), + CLOSE_GROUP_SIZE ); - Some(()) - } else { - warn!("When verify {peer:?} with ChunkProof of {key:?}, the chunk might have been tampered?"); - None + return; + } + + let verify_candidates: Vec = + if let Ok(all_keys) = network.get_all_local_record_addresses().await { + all_keys + .iter() + .filter_map(|(addr, record_type)| { + if RecordType::Chunk == *record_type { + Some(addr.clone()) + } else { + None + } + }) + .collect() + } else { + error!("Failed to get local record addresses."); + return; + }; + let num_of_targets = verify_candidates.len(); + if num_of_targets < 50 { + debug!("Not enough candidates({num_of_targets}/50) to be checked against neighbours."); + return; + } + + info!("Starting node StorageChallenge against neighbours!"); + + // TODO: launch the challenges parrallely, so that a scoring scheme can be utilized. + for peer_id in closest_peers { + if peer_id == network.peer_id() { + continue; + } + + let index: usize = OsRng.gen_range(0..num_of_targets); + if !chunk_proof_verify_peer(&network, peer_id, &verify_candidates[index]).await { + info!("Peer {peer_id:?} failed storage challenge."); + // TODO: shall the challenge failure immediately triggers the node to be removed? + network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + } + } + + info!("Completed node StorageChallenge against neighbours!"); + } +} + +async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { + let nonce: Nonce = thread_rng().gen::(); + + let request = Request::Query(Query::GetChunkExistenceProof { + key: key.clone(), + nonce, + difficulty: CLOSE_GROUP_SIZE, + }); + + let responses = network + .send_and_get_responses(&[peer_id], &request, true) + .await; + + // TODO: cross check with local knowledge (i.e. the claimed closest shall match locals) + // this also prevent peer falsely give empty or non-existent answers. + + if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = + responses.get(&peer_id) + { + if answers.is_empty() { + info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); + return false; + } + for (addr, proof) in answers { + if let Ok(proof) = proof { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + // Any wrong answer shall be considered as a failure + if *proof != expected_proof { + return false; + } + } else { + debug!( + "Could not get ChunkProof for {addr:?} as we don't have the record locally." + ); + } + } else { + debug!( + "Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}" + ); + } } } else { - debug!("Did not get a valid response for the ChunkProof from {peer:?}"); - None + info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); + return false; } + + true } diff --git a/sn_protocol/src/messages/query.rs b/sn_protocol/src/messages/query.rs index b28f6830fa..dc941e634f 100644 --- a/sn_protocol/src/messages/query.rs +++ b/sn_protocol/src/messages/query.rs @@ -47,6 +47,10 @@ pub enum Query { key: NetworkAddress, /// The random nonce that the node uses to produce the Proof (i.e., hash(record+nonce)) nonce: Nonce, + /// Defines the expected number of answers to the challenge. + /// For client publish verification, use 1 for efficiency. + /// Node shall try their best to fulfill the number, based on their capacity. + difficulty: usize, }, /// Queries close_group peers whether the target peer is a bad_node CheckNodeInProblem(NetworkAddress), @@ -78,8 +82,15 @@ impl std::fmt::Display for Query { Query::GetRegisterRecord { key, requester } => { write!(f, "Query::GetRegisterRecord({requester:?} {key:?})") } - Query::GetChunkExistenceProof { key, nonce } => { - write!(f, "Query::GetChunkExistenceProof({key:?} {nonce:?})") + Query::GetChunkExistenceProof { + key, + nonce, + difficulty, + } => { + write!( + f, + "Query::GetChunkExistenceProof({key:?} {nonce:?} {difficulty})" + ) } Query::CheckNodeInProblem(address) => { write!(f, "Query::CheckNodeInProblem({address:?})") diff --git a/sn_protocol/src/messages/response.rs b/sn_protocol/src/messages/response.rs index 17c986f581..44e9932c23 100644 --- a/sn_protocol/src/messages/response.rs +++ b/sn_protocol/src/messages/response.rs @@ -56,7 +56,7 @@ pub enum QueryResponse { /// Response to [`GetChunkExistenceProof`] /// /// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof - GetChunkExistenceProof(Result), + GetChunkExistenceProof(Vec<(NetworkAddress, Result)>), } // Debug implementation for QueryResponse, to avoid printing Vec @@ -109,8 +109,9 @@ impl Debug for QueryResponse { write!(f, "GetRegisterRecord(Err({err:?}))") } }, - QueryResponse::GetChunkExistenceProof(proof) => { - write!(f, "GetChunkExistenceProof(proof: {proof:?})") + QueryResponse::GetChunkExistenceProof(proofs) => { + let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect(); + write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})") } } } From 107918cc0c90d9b09e6222dc2073dc7940f8a08e Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 22 Nov 2024 00:18:44 +0800 Subject: [PATCH 2/4] feat: implement the initial scoring system --- Cargo.lock | 1 + sn_networking/src/event/mod.rs | 14 --- sn_node/Cargo.toml | 1 + sn_node/src/node.rs | 207 +++++++++++++++++++++++---------- 4 files changed, 146 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ff28dc1c5..e5b4b21ba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8733,6 +8733,7 @@ dependencies = [ "hex 0.4.3", "itertools 0.12.1", "libp2p 0.54.1", + "num-traits", "prometheus-client", "prost 0.9.0", "pyo3", diff --git a/sn_networking/src/event/mod.rs b/sn_networking/src/event/mod.rs index e1d8074d29..67f7c41c0d 100644 --- a/sn_networking/src/event/mod.rs +++ b/sn_networking/src/event/mod.rs @@ -143,11 +143,6 @@ pub enum NetworkEvent { FailedToFetchHolders(BTreeSet), /// Quotes to be verified QuoteVerification { quotes: Vec<(PeerId, PaymentQuote)> }, - /// Carry out chunk proof check against the specified record and peer - ChunkProofVerification { - peer_id: PeerId, - key_to_verify: NetworkAddress, - }, } /// Terminate node for the following reason @@ -206,15 +201,6 @@ impl Debug for NetworkEvent { quotes.len() ) } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify: keys_to_verify, - } => { - write!( - f, - "NetworkEvent::ChunkProofVerification({peer_id:?} {keys_to_verify:?})" - ) - } } } } diff --git a/sn_node/Cargo.toml b/sn_node/Cargo.toml index 2d98a27ef8..980dc84d76 100644 --- a/sn_node/Cargo.toml +++ b/sn_node/Cargo.toml @@ -44,6 +44,7 @@ futures = "~0.3.13" hex = "~0.4.3" itertools = "~0.12.1" libp2p = { version = "0.54.1", features = ["tokio", "dns", "kad", "macros"] } +num-traits = "0.2" prometheus-client = { version = "0.22", optional = true } # watch out updating this, protoc compiler needs to be installed on all build systems # arm builds + musl are very problematic diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 09103d923a..29bb5ed0f5 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -15,6 +15,7 @@ use crate::RunningNode; use bytes::Bytes; use itertools::Itertools; use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use num_traits::cast::ToPrimitive; use rand::{ rngs::{OsRng, StdRng}, thread_rng, Rng, SeedableRng, @@ -30,6 +31,7 @@ use sn_protocol::{ NetworkAddress, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, }; use std::{ + collections::HashMap, net::SocketAddr, path::PathBuf, sync::{ @@ -38,7 +40,7 @@ use std::{ }, time::Duration, }; -use tokio::{sync::mpsc::Receiver, task::spawn}; +use tokio::{sync::mpsc::Receiver, task::{spawn, JoinSet}}; use sn_evm::EvmNetwork; @@ -56,6 +58,16 @@ const UPTIME_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); /// Interval to clean up unrelevant records const UNRELEVANT_RECORDS_CLEANUP_INTERVAL: Duration = Duration::from_secs(3600); +/// Highest score to achieve from each metric sub-sector during StorageChallenge. +const HIGHEST_SCORE: usize = 100; + +/// Any nodes bearing a score below this shall be considered as bad. +/// Max is to be 100 * 100 +const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 2000; + +/// in ms, expecting average StorageChallenge complete time to be around 500ms. +const TIME_STEP: usize = 100; + /// Helper to build and run a Node pub struct NodeBuilder { identity_keypair: Keypair, @@ -467,26 +479,6 @@ impl Node { quotes_verification(&network, quotes).await; }); } - NetworkEvent::ChunkProofVerification { - peer_id, - key_to_verify, - } => { - event_header = "ChunkProofVerification"; - let network = self.network().clone(); - - debug!("Going to carry out storage existence check against peer {peer_id:?}"); - - let _handle = spawn(async move { - if chunk_proof_verify_peer(&network, peer_id, &key_to_verify).await { - return; - } - info!("Peer {peer_id:?} failed storage existence challenge."); - // TODO: shall challenge failure immediately triggers the node to be removed? - // or to lower connection score once feature introduced. - // If score falls too low, sever connection. - network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); - }); - } } trace!( @@ -634,7 +626,7 @@ impl Node { nonce: Nonce, difficulty: usize, ) -> Vec<(NetworkAddress, Result)> { - info!("Received StorageChallenge targeting {key:?} with difficulty level of {difficulty}."); + let start = Instant::now(); let mut results = vec![]; if difficulty == 1 { // Client checking existence of published chunk. @@ -682,8 +674,8 @@ impl Node { } info!( - "Respond with {} answers to the StorageChallenge targeting {key:?}.", - results.len() + "Respond with {} answers to the StorageChallenge targeting {key:?} with {difficulty} difficulty, in {:?}", + results.len(), start.elapsed() ); results @@ -693,6 +685,7 @@ impl Node { /// and randomly pick one as the verification candidate. /// This will challenge all closest peers at once. async fn storage_challenge(network: Network) { + let start = Instant::now(); let closest_peers: Vec = if let Ok(closest_peers) = network.get_closest_k_value_local_peers().await { closest_peers @@ -712,7 +705,7 @@ impl Node { return; } - let verify_candidates: Vec = + let mut verify_candidates: Vec = if let Ok(all_keys) = network.get_all_local_record_addresses().await { all_keys .iter() @@ -734,72 +727,160 @@ impl Node { return; } - info!("Starting node StorageChallenge against neighbours!"); + let index: usize = OsRng.gen_range(0..num_of_targets); + let target = verify_candidates[index].clone(); + // TODO: workload shall be dynamically deduced from resource usage + let difficulty = CLOSE_GROUP_SIZE; + verify_candidates.sort_by_key(|addr| target.distance(addr)); + let expected_targets = verify_candidates.into_iter().take(difficulty); + let nonce: Nonce = thread_rng().gen::(); + let mut expected_proofs = HashMap::new(); + for addr in expected_targets { + if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { + let expected_proof = ChunkProof::new(&record.value, nonce); + let _ = expected_proofs.insert(addr, expected_proof); + } else { + error!("Local record {addr:?} cann't be loaded from disk."); + } + } + let request = Request::Query(Query::GetChunkExistenceProof { + key: target.clone(), + nonce, + difficulty, + }); - // TODO: launch the challenges parrallely, so that a scoring scheme can be utilized. + let mut tasks = JoinSet::new(); for peer_id in closest_peers { if peer_id == network.peer_id() { continue; } + let network_clone = network.clone(); + let request_clone = request.clone(); + let expected_proofs_clone = expected_proofs.clone(); + let _ = tasks.spawn(async move { + let res = + scoring_peer(network_clone, peer_id, request_clone, expected_proofs_clone) + .await; + (peer_id, res) + }); + } - let index: usize = OsRng.gen_range(0..num_of_targets); - if !chunk_proof_verify_peer(&network, peer_id, &verify_candidates[index]).await { - info!("Peer {peer_id:?} failed storage challenge."); - // TODO: shall the challenge failure immediately triggers the node to be removed? - network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + while let Some(res) = tasks.join_next().await { + match res { + Ok((peer_id, score)) => { + if score < MIN_ACCEPTABLE_HEALTHY_SCORE { + info!("Peer {peer_id:?} failed storage challenge with low score {score}/{MIN_ACCEPTABLE_HEALTHY_SCORE}."); + // TODO: shall the challenge failure immediately triggers the node to be removed? + network.record_node_issues(peer_id, NodeIssue::FailedChunkProofCheck); + } + } + Err(e) => { + info!("StorageChallenge task completed with error {e:?}"); + } } } - info!("Completed node StorageChallenge against neighbours!"); + info!( + "Completed node StorageChallenge against neighbours in {:?}!", + start.elapsed() + ); } } -async fn chunk_proof_verify_peer(network: &Network, peer_id: PeerId, key: &NetworkAddress) -> bool { - let nonce: Nonce = thread_rng().gen::(); - - let request = Request::Query(Query::GetChunkExistenceProof { - key: key.clone(), - nonce, - difficulty: CLOSE_GROUP_SIZE, - }); - +async fn scoring_peer( + network: Network, + peer_id: PeerId, + request: Request, + expected_proofs: HashMap, +) -> usize { + let start = Instant::now(); let responses = network .send_and_get_responses(&[peer_id], &request, true) .await; - // TODO: cross check with local knowledge (i.e. the claimed closest shall match locals) - // this also prevent peer falsely give empty or non-existent answers. - if let Some(Ok(Response::Query(QueryResponse::GetChunkExistenceProof(answers)))) = responses.get(&peer_id) { if answers.is_empty() { info!("Peer {peer_id:?} didn't answer the ChunkProofChallenge."); - return false; + return 0; } + let elapsed = start.elapsed(); + + let mut received_proofs = vec![]; for (addr, proof) in answers { if let Ok(proof) = proof { - if let Ok(Some(record)) = network.get_local_record(&addr.to_record_key()).await { - let expected_proof = ChunkProof::new(&record.value, nonce); - // Any wrong answer shall be considered as a failure - if *proof != expected_proof { - return false; - } - } else { - debug!( - "Could not get ChunkProof for {addr:?} as we don't have the record locally." - ); - } - } else { - debug!( - "Could not verify answer of {addr:?} from {peer_id:?} as responded with {proof:?}" - ); + received_proofs.push((addr.clone(), proof.clone())); } } + + let score = mark_peer(elapsed, received_proofs, &expected_proofs); + info!( + "Received {} answers from peer {peer_id:?} after {elapsed:?}, score it as {score}.", + answers.len() + ); + score } else { info!("Peer {peer_id:?} doesn't reply the ChunkProofChallenge, or replied with error."); - return false; + 0 } +} + +// Based on following metrics: +// * the duration +// * is there false answer +// * percentage of correct answers among the expected closest +// The higher the score, the better confidence on the peer +fn mark_peer( + duration: Duration, + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let duration_score = duration_score_scheme(duration); + let challenge_score = challenge_score_scheme(answers, expected_proofs); + + duration_score * challenge_score +} + +// Less duration shall get higher score +fn duration_score_scheme(duration: Duration) -> usize { + // So far just a simple stepped scheme, capped by HIGHEST_SCORE + let in_ms = if let Some(value) = duration.as_millis().to_usize() { + value + } else { + info!("Cannot get milli seconds from {duration:?}, using a default value of 1000ms."); + 1000 + }; - true + let step = std::cmp::min(HIGHEST_SCORE, in_ms / TIME_STEP); + HIGHEST_SCORE - step +} + +// Any false answer shall result in 0 score immediately +fn challenge_score_scheme( + answers: Vec<(NetworkAddress, ChunkProof)>, + expected_proofs: &HashMap, +) -> usize { + let mut correct_answers = 0; + for (addr, chunk_proof) in answers { + if let Some(expected_proof) = expected_proofs.get(&addr) { + if expected_proof.verify(&chunk_proof) { + correct_answers += 1; + } else { + info!("Spot a false answer to the challenge regarding {addr:?}"); + // Any false answer shall result in 0 score immediately + return 0; + } + } + } + // TODO: For those answers not among the expected_proofs, + // it could be due to having different knowledge of records to us. + // shall we: + // * set the target being close to us, so that neighbours sharing same knowledge in higher chance + // * fetch from local to testify + // * fetch from network to testify + std::cmp::min( + HIGHEST_SCORE, + HIGHEST_SCORE * correct_answers / expected_proofs.len(), + ) } From 57eb2c321f4d043c9f2044a24a235140b1184a5e Mon Sep 17 00:00:00 2001 From: qima Date: Fri, 22 Nov 2024 19:42:33 +0800 Subject: [PATCH 3/4] chore: storage verification factors tweaking --- sn_networking/src/driver.rs | 2 +- sn_node/src/node.rs | 54 +++++++++++++++++++++++-------------- 2 files changed, 35 insertions(+), 21 deletions(-) diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index 43a5525ccf..2afa0b0701 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -768,7 +768,7 @@ fn check_and_wipe_storage_dir_if_necessary( // * the storage_dir shall be wiped out // * the version file shall be updated if cur_version_str != prev_version_str { - warn!("Trying to wipe out storege dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}"); + warn!("Trying to wipe out storage dir {storage_dir_path:?}, as cur_version {cur_version_str:?} doesn't match prev_version {prev_version_str:?}"); let _ = fs::remove_dir_all(storage_dir_path); let mut file = fs::OpenOptions::new() diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 29bb5ed0f5..4fb6294727 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -40,7 +40,10 @@ use std::{ }, time::Duration, }; -use tokio::{sync::mpsc::Receiver, task::{spawn, JoinSet}}; +use tokio::{ + sync::mpsc::Receiver, + task::{spawn, JoinSet}, +}; use sn_evm::EvmNetwork; @@ -63,10 +66,10 @@ const HIGHEST_SCORE: usize = 100; /// Any nodes bearing a score below this shall be considered as bad. /// Max is to be 100 * 100 -const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 2000; +const MIN_ACCEPTABLE_HEALTHY_SCORE: usize = 5000; -/// in ms, expecting average StorageChallenge complete time to be around 500ms. -const TIME_STEP: usize = 100; +/// in ms, expecting average StorageChallenge complete time to be around 250ms. +const TIME_STEP: usize = 20; /// Helper to build and run a Node pub struct NodeBuilder { @@ -268,7 +271,7 @@ impl Node { tokio::time::interval(UNRELEVANT_RECORDS_CLEANUP_INTERVAL); let _ = irrelevant_records_cleanup_interval.tick().await; // first tick completes immediately - // use a random neighbour storege challenge ticker to ensure + // use a random neighbour storage challenge ticker to ensure // neighbour do not carryout challenges at the same time let storage_challenge_interval: u64 = rng.gen_range(STORE_CHALLENGE_INTERVAL_MAX_S / 2..STORE_CHALLENGE_INTERVAL_MAX_S); @@ -333,7 +336,7 @@ impl Node { let _handle = spawn(async move { Self::storage_challenge(network).await; - trace!("Periodic storege challenge took {:?}", start.elapsed()); + trace!("Periodic storage challenge took {:?}", start.elapsed()); }); } } @@ -596,7 +599,8 @@ impl Node { ); QueryResponse::GetChunkExistenceProof( - Self::respond_x_closest_chunk_proof(network, key, nonce, difficulty).await, + Self::respond_x_closest_record_proof(network, key, nonce, difficulty, true) + .await, ) } Query::CheckNodeInProblem(target_address) => { @@ -620,11 +624,14 @@ impl Node { Response::Query(resp) } - async fn respond_x_closest_chunk_proof( + // Nodes only check ChunkProof each other, to avoid `multi-version` issue + // Client check proof against all records, as have to fetch from network anyway. + async fn respond_x_closest_record_proof( network: &Network, key: NetworkAddress, nonce: Nonce, difficulty: usize, + chunk_only: bool, ) -> Vec<(NetworkAddress, Result)> { let start = Instant::now(); let mut results = vec![]; @@ -644,17 +651,20 @@ impl Node { let all_local_records = network.get_all_local_record_addresses().await; if let Ok(all_local_records) = all_local_records { - // Only `ChunkRecord`s can be consistantly verified - let mut all_chunk_addrs: Vec<_> = all_local_records - .iter() - .filter_map(|(addr, record_type)| { - if *record_type == RecordType::Chunk { - Some(addr.clone()) - } else { - None - } - }) - .collect(); + let mut all_chunk_addrs: Vec<_> = if chunk_only { + all_local_records + .iter() + .filter_map(|(addr, record_type)| { + if *record_type == RecordType::Chunk { + Some(addr.clone()) + } else { + None + } + }) + .collect() + } else { + all_local_records.keys().cloned().collect() + }; // Sort by distance and only take first X closest entries all_chunk_addrs.sort_by_key(|addr| key.distance(addr)); @@ -727,7 +737,11 @@ impl Node { return; } - let index: usize = OsRng.gen_range(0..num_of_targets); + // To ensure the neighbours sharing same knowledge as to us, + // The target is choosen to be not far from us. + let self_addr = NetworkAddress::from_peer(network.peer_id()); + verify_candidates.sort_by_key(|addr| self_addr.distance(addr)); + let index: usize = OsRng.gen_range(0..num_of_targets / 2); let target = verify_candidates[index].clone(); // TODO: workload shall be dynamically deduced from resource usage let difficulty = CLOSE_GROUP_SIZE; From 8889ef37465fbff505f29db706c178abbab2a567 Mon Sep 17 00:00:00 2001 From: qima Date: Mon, 25 Nov 2024 23:27:28 +0800 Subject: [PATCH 4/4] feat: allowing client carryout storage check when GetStoreCost --- sn_networking/src/lib.rs | 15 ++++++++++++++- sn_node/src/node.rs | 28 ++++++++++++++++++++++++---- sn_protocol/src/messages/query.rs | 26 +++++++++++++++++++++----- sn_protocol/src/messages/response.rs | 6 +++++- 4 files changed, 64 insertions(+), 11 deletions(-) diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index cd0875fa5e..cd5c513fad 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -380,7 +380,12 @@ impl Network { return Err(NetworkError::NoStoreCostResponses); } - let request = Request::Query(Query::GetStoreCost(record_address.clone())); + // Client shall decide whether to carry out storage verification or not. + let request = Request::Query(Query::GetStoreCost { + key: record_address.clone(), + nonce: None, + difficulty: 0, + }); let responses = self .send_and_get_responses(&close_nodes, &request, true) .await; @@ -398,7 +403,11 @@ impl Network { quote: Ok(quote), payment_address, peer_address, + storage_proofs, }) => { + if !storage_proofs.is_empty() { + debug!("Storage proofing during GetStoreCost to be implemented."); + } // Check the quote itself is valid. if quote.cost != AttoTokens::from_u64(calculate_cost_for_records( @@ -416,7 +425,11 @@ impl Network { quote: Err(ProtocolError::RecordExists(_)), payment_address, peer_address, + storage_proofs, }) => { + if !storage_proofs.is_empty() { + debug!("Storage proofing during GetStoreCost to be implemented."); + } all_costs.push((peer_address, payment_address, PaymentQuote::zero())); } _ => { diff --git a/sn_node/src/node.rs b/sn_node/src/node.rs index 4fb6294727..bd4e31c36b 100644 --- a/sn_node/src/node.rs +++ b/sn_node/src/node.rs @@ -514,13 +514,30 @@ impl Node { payment_address: RewardsAddress, ) -> Response { let resp: QueryResponse = match query { - Query::GetStoreCost(address) => { - debug!("Got GetStoreCost request for {address:?}"); - let record_key = address.to_record_key(); + Query::GetStoreCost { + key, + nonce, + difficulty, + } => { + debug!("Got GetStoreCost request for {key:?} with difficulty {difficulty}"); + let record_key = key.to_record_key(); let self_id = network.peer_id(); let store_cost = network.get_local_storecost(record_key.clone()).await; + let storage_proofs = if let Some(nonce) = nonce { + Self::respond_x_closest_record_proof( + network, + key.clone(), + nonce, + difficulty, + false, + ) + .await + } else { + vec![] + }; + match store_cost { Ok((cost, quoting_metrics, bad_nodes)) => { if cost == AttoTokens::zero() { @@ -530,19 +547,21 @@ impl Node { )), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, } } else { QueryResponse::GetStoreCost { quote: Self::create_quote_for_storecost( network, cost, - &address, + &key, "ing_metrics, bad_nodes, &payment_address, ), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, } } } @@ -550,6 +569,7 @@ impl Node { quote: Err(ProtocolError::GetStoreCostFailed), payment_address, peer_address: NetworkAddress::from_peer(self_id), + storage_proofs, }, } } diff --git a/sn_protocol/src/messages/query.rs b/sn_protocol/src/messages/query.rs index dc941e634f..c7e4a56639 100644 --- a/sn_protocol/src/messages/query.rs +++ b/sn_protocol/src/messages/query.rs @@ -18,7 +18,18 @@ use serde::{Deserialize, Serialize}; #[derive(Eq, PartialEq, PartialOrd, Clone, Serialize, Deserialize, Debug)] pub enum Query { /// Retrieve the cost of storing a record at the given address. - GetStoreCost(NetworkAddress), + /// The storage verification is optional to be undertaken + GetStoreCost { + /// The Address of the record to be stored. + key: NetworkAddress, + /// The random nonce that nodes use to produce the Proof (i.e., hash(record+nonce)) + /// Set to None if no need to carry out storage check. + nonce: Option, + /// Defines the expected number of answers to the challenge. + /// Node shall try their best to fulfill the number, based on their capacity. + /// Set to 0 to indicate not carry out any verification. + difficulty: usize, + }, /// Retrieve a specific record from a specific peer. /// /// This should eventually lead to a [`GetReplicatedRecord`] response. @@ -60,10 +71,11 @@ impl Query { /// Used to send a query to the close group of the address. pub fn dst(&self) -> NetworkAddress { match self { - Query::GetStoreCost(address) | Query::CheckNodeInProblem(address) => address.clone(), + Query::CheckNodeInProblem(address) => address.clone(), // Shall not be called for this, as this is a `one-to-one` message, // and the destination shall be decided by the requester already. - Query::GetReplicatedRecord { key, .. } + Query::GetStoreCost { key, .. } + | Query::GetReplicatedRecord { key, .. } | Query::GetRegisterRecord { key, .. } | Query::GetChunkExistenceProof { key, .. } => key.clone(), } @@ -73,8 +85,12 @@ impl Query { impl std::fmt::Display for Query { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - Query::GetStoreCost(address) => { - write!(f, "Query::GetStoreCost({address:?})") + Query::GetStoreCost { + key, + nonce, + difficulty, + } => { + write!(f, "Query::GetStoreCost({key:?} {nonce:?} {difficulty})") } Query::GetReplicatedRecord { key, requester } => { write!(f, "Query::GetReplicatedRecord({requester:?} {key:?})") diff --git a/sn_protocol/src/messages/response.rs b/sn_protocol/src/messages/response.rs index 44e9932c23..f29aecc76f 100644 --- a/sn_protocol/src/messages/response.rs +++ b/sn_protocol/src/messages/response.rs @@ -30,6 +30,8 @@ pub enum QueryResponse { payment_address: RewardsAddress, /// Node's Peer Address peer_address: NetworkAddress, + /// Storage proofs based on requested target address and difficulty + storage_proofs: Vec<(NetworkAddress, Result)>, }, CheckNodeInProblem { /// Address of the peer that queried @@ -67,10 +69,12 @@ impl Debug for QueryResponse { quote, payment_address, peer_address, + storage_proofs, } => { write!( f, - "GetStoreCost(quote: {quote:?}, from {peer_address:?} w/ payment_address: {payment_address:?})" + "GetStoreCost(quote: {quote:?}, from {peer_address:?} w/ payment_address: {payment_address:?}, and {} storage proofs)", + storage_proofs.len() ) } QueryResponse::CheckNodeInProblem {