diff --git a/ant-networking/src/cmd.rs b/ant-networking/src/cmd.rs index 8b84dccb84..de66fcdf56 100644 --- a/ant-networking/src/cmd.rs +++ b/ant-networking/src/cmd.rs @@ -58,8 +58,12 @@ pub enum NodeIssue { /// Commands to send to the Swarm pub enum LocalSwarmCmd { - /// Get a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that - /// bucket. + /// Get a list of all peers in local RT, with correspondent Multiaddr info attached as well. + GetPeersWithMultiaddr { + sender: oneshot::Sender)>>, + }, + /// Get a map where each key is the ilog2 distance of that Kbucket + /// and each value is a vector of peers in that bucket. GetKBuckets { sender: oneshot::Sender>>, }, @@ -253,6 +257,9 @@ impl Debug for LocalSwarmCmd { LocalSwarmCmd::GetAllLocalRecordAddresses { .. } => { write!(f, "LocalSwarmCmd::GetAllLocalRecordAddresses") } + LocalSwarmCmd::GetPeersWithMultiaddr { .. } => { + write!(f, "LocalSwarmCmd::GetPeersWithMultiaddr") + } LocalSwarmCmd::GetKBuckets { .. } => { write!(f, "LocalSwarmCmd::GetKBuckets") } @@ -795,6 +802,23 @@ impl SwarmDriver { } let _ = sender.send(ilog2_kbuckets); } + LocalSwarmCmd::GetPeersWithMultiaddr { sender } => { + cmd_string = "GetPeersWithMultiAddr"; + let mut result: Vec<(PeerId, Vec)> = vec![]; + for kbucket in self.swarm.behaviour_mut().kademlia.kbuckets() { + let peers_in_kbucket = kbucket + .iter() + .map(|peer_entry| { + ( + peer_entry.node.key.into_preimage(), + peer_entry.node.value.clone().into_vec(), + ) + }) + .collect::)>>(); + result.extend(peers_in_kbucket); + } + let _ = sender.send(result); + } LocalSwarmCmd::GetCloseGroupLocalPeers { key, sender } => { cmd_string = "GetCloseGroupLocalPeers"; let key = key.as_kbucket_key(); diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index 89f3c5428e..c7dc9928f8 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -242,8 +242,18 @@ impl Network { .await } - /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that - /// bucket. + /// Returns a list of peers in local RT and their correspondent Multiaddr. + /// Does not include self + pub async fn get_local_peers_with_multiaddr(&self) -> Result)>> { + let (sender, receiver) = oneshot::channel(); + self.send_local_swarm_cmd(LocalSwarmCmd::GetPeersWithMultiaddr { sender }); + receiver + .await + .map_err(|_e| NetworkError::InternalMsgChannelDropped) + } + + /// Returns a map where each key is the ilog2 distance of that Kbucket + /// and each value is a vector of peers in that bucket. /// Does not include self pub async fn get_kbuckets(&self) -> Result>> { let (sender, receiver) = oneshot::channel(); diff --git a/ant-node/src/node.rs b/ant-node/src/node.rs index 2f0d47fb0c..c1ea235239 100644 --- a/ant-node/src/node.rs +++ b/ant-node/src/node.rs @@ -24,7 +24,11 @@ use ant_protocol::{ }; use bytes::Bytes; use itertools::Itertools; -use libp2p::{identity::Keypair, Multiaddr, PeerId}; +use libp2p::{ + identity::Keypair, + kad::{KBucketDistance as Distance, U256}, + Multiaddr, PeerId, +}; use num_traits::cast::ToPrimitive; use rand::{ rngs::{OsRng, StdRng}, @@ -674,10 +678,91 @@ impl Node { is_in_trouble, } } + Query::GetClosestPeers { + key, + num_of_peers, + range, + sign_result, + } => { + debug!( + "Got GetClosestPeers targeting {key:?} with {num_of_peers:?} peers or {range:?} range, signature {sign_result} required." + ); + Self::respond_get_closest_peers(network, key, num_of_peers, range, sign_result) + .await + } }; Response::Query(resp) } + async fn respond_get_closest_peers( + network: &Network, + target: NetworkAddress, + num_of_peers: Option, + range: Option<[u8; 32]>, + sign_result: bool, + ) -> QueryResponse { + let local_peers = network.get_local_peers_with_multiaddr().await; + let peers: Vec<(NetworkAddress, Vec)> = if let Ok(local_peers) = local_peers { + Self::calculate_get_closest_peers(local_peers, target.clone(), num_of_peers, range) + } else { + vec![] + }; + + let signature = if sign_result { + let mut bytes = rmp_serde::to_vec(&target).unwrap_or_default(); + bytes.extend_from_slice(&rmp_serde::to_vec(&peers).unwrap_or_default()); + if let Ok(sig) = network.sign(&bytes) { + Some(sig) + } else { + None + } + } else { + None + }; + + QueryResponse::GetClosestPeers { + target, + peers, + signature, + } + } + + fn calculate_get_closest_peers( + peer_addrs: Vec<(PeerId, Vec)>, + target: NetworkAddress, + num_of_peers: Option, + range: Option<[u8; 32]>, + ) -> Vec<(NetworkAddress, Vec)> { + match (num_of_peers, range) { + (_, Some(value)) => { + let distance = Distance(U256::from(value)); + peer_addrs + .iter() + .filter_map(|(peer_id, multi_addrs)| { + let addr = NetworkAddress::from_peer(*peer_id); + if target.distance(&addr) <= distance { + Some((addr, multi_addrs.clone())) + } else { + None + } + }) + .collect() + } + (Some(num_of_peers), _) => { + let mut result: Vec<(NetworkAddress, Vec)> = peer_addrs + .iter() + .map(|(peer_id, multi_addrs)| { + let addr = NetworkAddress::from_peer(*peer_id); + (addr, multi_addrs.clone()) + }) + .collect(); + result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr)); + result.into_iter().take(num_of_peers).collect() + } + (None, None) => vec![], + } + } + // Nodes only check ChunkProof each other, to avoid `multi-version` issue // Client check proof against all records, as have to fetch from network anyway. async fn respond_x_closest_record_proof( @@ -971,3 +1056,104 @@ fn challenge_score_scheme( HIGHEST_SCORE * correct_answers / expected_proofs.len(), ) } + +#[cfg(test)] +mod tests { + use super::*; + use std::str::FromStr; + + #[test] + fn test_no_local_peers() { + let local_peers: Vec<(PeerId, Vec)> = vec![]; + let target = NetworkAddress::from_peer(PeerId::random()); + let num_of_peers = Some(5); + let range = None; + let result = Node::calculate_get_closest_peers(local_peers, target, num_of_peers, range); + + assert_eq!(result, vec![]); + } + + #[test] + fn test_fewer_local_peers_than_num_of_peers() { + let local_peers: Vec<(PeerId, Vec)> = vec![ + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()], + ), + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()], + ), + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()], + ), + ]; + let target = NetworkAddress::from_peer(PeerId::random()); + let num_of_peers = Some(2); + let range = None; + let result = Node::calculate_get_closest_peers( + local_peers.clone(), + target.clone(), + num_of_peers, + range, + ); + + // Result shall be sorted and truncated + let mut expected_result: Vec<(NetworkAddress, Vec)> = local_peers + .iter() + .map(|(peer_id, multi_addrs)| { + let addr = NetworkAddress::from_peer(*peer_id); + (addr, multi_addrs.clone()) + }) + .collect(); + expected_result.sort_by_key(|(addr, _multi_addrs)| target.distance(addr)); + let expected_result: Vec<_> = expected_result.into_iter().take(2).collect(); + + assert_eq!(expected_result, result); + } + + #[test] + fn test_with_range_and_num_of_peers() { + let local_peers: Vec<(PeerId, Vec)> = vec![ + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.1/tcp/8080").unwrap()], + ), + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()], + ), + ( + PeerId::random(), + vec![Multiaddr::from_str("/ip4/192.168.1.2/tcp/8080").unwrap()], + ), + ]; + let target = NetworkAddress::from_peer(PeerId::random()); + let num_of_peers = Some(0); + let range_value = [128; 32]; + let range = Some(range_value); + let result = Node::calculate_get_closest_peers( + local_peers.clone(), + target.clone(), + num_of_peers, + range, + ); + + // Range shall be preferred, i.e. the result peers shall all within the range + let distance = Distance(U256::from(range_value)); + let expected_result: Vec<(NetworkAddress, Vec)> = local_peers + .into_iter() + .filter_map(|(peer_id, multi_addrs)| { + let addr = NetworkAddress::from_peer(peer_id); + if target.distance(&addr) <= distance { + Some((addr, multi_addrs.clone())) + } else { + None + } + }) + .collect(); + + assert_eq!(expected_result, result); + } +} diff --git a/ant-protocol/src/messages/query.rs b/ant-protocol/src/messages/query.rs index c7e4a56639..60392d7651 100644 --- a/ant-protocol/src/messages/query.rs +++ b/ant-protocol/src/messages/query.rs @@ -7,6 +7,7 @@ // permissions and limitations relating to use of the SAFE Network Software. use crate::{messages::Nonce, NetworkAddress}; +use libp2p::kad::{KBucketDistance as Distance, U256}; use serde::{Deserialize, Serialize}; /// Data queries - retrieving data and inspecting their structure. @@ -65,6 +66,18 @@ pub enum Query { }, /// Queries close_group peers whether the target peer is a bad_node CheckNodeInProblem(NetworkAddress), + /// Query the the peers in range to the target address, from the receiver's perspective. + /// In case none of the parameters provided, returns nothing. + /// In case both of the parameters provided, `range` is preferred to be replied. + GetClosestPeers { + key: NetworkAddress, + // Shall be greater than K_VALUE, otherwise can use libp2p function directly + num_of_peers: Option, + // Defines the range that replied peers shall be within + range: Option<[u8; 32]>, + // For future econ usage, + sign_result: bool, + }, } impl Query { @@ -77,7 +90,8 @@ impl Query { Query::GetStoreCost { key, .. } | Query::GetReplicatedRecord { key, .. } | Query::GetRegisterRecord { key, .. } - | Query::GetChunkExistenceProof { key, .. } => key.clone(), + | Query::GetChunkExistenceProof { key, .. } + | Query::GetClosestPeers { key, .. } => key.clone(), } } } @@ -111,6 +125,18 @@ impl std::fmt::Display for Query { Query::CheckNodeInProblem(address) => { write!(f, "Query::CheckNodeInProblem({address:?})") } + Query::GetClosestPeers { + key, + num_of_peers, + range, + sign_result, + } => { + let distance = range.as_ref().map(|value| Distance(U256::from(value))); + write!( + f, + "Query::GetClosestPeers({key:?} {num_of_peers:?} {distance:?} {sign_result})" + ) + } } } } diff --git a/ant-protocol/src/messages/response.rs b/ant-protocol/src/messages/response.rs index 975817de8a..a7f8bf9220 100644 --- a/ant-protocol/src/messages/response.rs +++ b/ant-protocol/src/messages/response.rs @@ -12,6 +12,7 @@ use super::ChunkProof; use ant_evm::{PaymentQuote, RewardsAddress}; use bytes::Bytes; use core::fmt; +use libp2p::Multiaddr; use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -59,6 +60,20 @@ pub enum QueryResponse { /// /// [`GetChunkExistenceProof`]: crate::messages::Query::GetChunkExistenceProof GetChunkExistenceProof(Vec<(NetworkAddress, Result)>), + // ===== GetClosestPeers ===== + // + /// Response to [`GetClosestPeers`] + /// + /// [`GetClosestPeers`]: crate::messages::Query::GetClosestPeers + GetClosestPeers { + // The target address that the original request is about. + target: NetworkAddress, + // `Multiaddr` is required to allow the requester to dial the peer + // Note: the list doesn't contain the node that being queried. + peers: Vec<(NetworkAddress, Vec)>, + // Signature of signing the above (if requested), for future economic model usage. + signature: Option>, + }, } // Debug implementation for QueryResponse, to avoid printing Vec @@ -117,6 +132,13 @@ impl Debug for QueryResponse { let addresses: Vec<_> = proofs.iter().map(|(addr, _)| addr.clone()).collect(); write!(f, "GetChunkExistenceProof(checked chunks: {addresses:?})") } + QueryResponse::GetClosestPeers { target, peers, .. } => { + let addresses: Vec<_> = peers.iter().map(|(addr, _)| addr.clone()).collect(); + write!( + f, + "GetClosestPeers target {target:?} close peers {addresses:?}" + ) + } } } }