diff --git a/ant-networking/src/config.rs b/ant-networking/src/config.rs new file mode 100644 index 0000000000..ab605e1573 --- /dev/null +++ b/ant-networking/src/config.rs @@ -0,0 +1,53 @@ +//! Configuration constants and settings for the networking module. +use std::time::Duration; + +/// Maximum allowed size for network packets in bytes (2MB) +pub const MAX_PACKET_SIZE: usize = 2 * 1024 * 1024; + +/// Number of nodes to maintain in the close group +pub const CLOSE_GROUP_SIZE: usize = 8; + +/// Default timeout duration for network requests in seconds +pub const REQUEST_TIMEOUT_DEFAULT_S: u64 = 60; + +/// Duration to keep connections alive +pub const CONNECTION_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20); + +/// Timeout duration for Kademlia queries in seconds +pub const KAD_QUERY_TIMEOUT_S: u64 = 20; + +/// Protocol identifier for Kademlia streams +pub const KAD_STREAM_PROTOCOL_ID: &str = "/safe/kad/1.0.0"; + +/// Size of the networking channel buffer +pub const NETWORKING_CHANNEL_SIZE: usize = 100; + +/// Interval for relay manager reservation checks +pub const RELAY_MANAGER_RESERVATION_INTERVAL: Duration = Duration::from_secs(30); + +/// Interval for resending identify messages +pub const RESEND_IDENTIFY_INVERVAL: Duration = Duration::from_secs(300); + +/// Configuration for the networking component +#[derive(Debug, Clone)] +pub struct NetworkConfig { + pub max_packet_size: usize, + pub close_group_size: usize, + pub request_timeout: Duration, + pub connection_keep_alive: Duration, + pub kad_query_timeout: Duration, + pub channel_size: usize, +} + +impl Default for NetworkConfig { + fn default() -> Self { + Self { + max_packet_size: MAX_PACKET_SIZE, + close_group_size: CLOSE_GROUP_SIZE, + request_timeout: Duration::from_secs(REQUEST_TIMEOUT_DEFAULT_S), + connection_keep_alive: CONNECTION_KEEP_ALIVE_TIMEOUT, + kad_query_timeout: Duration::from_secs(KAD_QUERY_TIMEOUT_S), + channel_size: NETWORKING_CHANNEL_SIZE, + } + } +} \ No newline at end of file diff --git a/ant-networking/src/event.rs b/ant-networking/src/event.rs new file mode 100644 index 0000000000..b591890703 --- /dev/null +++ b/ant-networking/src/event.rs @@ -0,0 +1,38 @@ +use libp2p::{ + identify, + kad, + request_response, +}; + +use crate::messages::{Request, Response}; + +#[derive(Debug)] +pub enum NodeEvent { + Identify(identify::Event), + Kademlia(kad::Event), + MsgReceived(request_response::Event), +} + +impl From for NodeEvent { + fn from(event: identify::Event) -> Self { + NodeEvent::Identify(event) + } +} + +impl From for NodeEvent { + fn from(event: kad::Event) -> Self { + NodeEvent::Kademlia(event) + } +} + +impl From> for NodeEvent { + fn from(event: request_response::Event) -> Self { + NodeEvent::MsgReceived(event) + } +} + +#[derive(Debug)] +pub enum MsgResponder { + Response(Response), + Error(String), +} diff --git a/ant-networking/src/kad/mod.rs b/ant-networking/src/kad/mod.rs new file mode 100644 index 0000000000..1ea56b00c9 --- /dev/null +++ b/ant-networking/src/kad/mod.rs @@ -0,0 +1,27 @@ +use libp2p::kad::{KBucketKey, KBucketDistance, Record}; +use libp2p::PeerId; +use ant_protocol::NetworkAddress; + +pub use KBucketKey; +pub use KBucketDistance; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct RecordKey(pub NetworkAddress); + +impl From for RecordKey { + fn from(addr: NetworkAddress) -> Self { + RecordKey(addr) + } +} + +impl AsRef<[u8]> for RecordKey { + fn as_ref(&self) -> &[u8] { + self.0.as_ref() + } +} + +impl From for KBucketKey { + fn from(key: RecordKey) -> Self { + KBucketKey::new(key.0.into()) + } +} diff --git a/ant-networking/src/lib.rs b/ant-networking/src/lib.rs index 89f3c5428e..0aa092883a 100644 --- a/ant-networking/src/lib.rs +++ b/ant-networking/src/lib.rs @@ -1,1434 +1,33 @@ -// Copyright 2024 MaidSafe.net limited. -// -// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3. -// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed -// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. Please review the Licences for the specific language governing -// permissions and limitations relating to use of the SAFE Network Software. +pub mod config; +pub mod network; +pub mod types; -#[macro_use] -extern crate tracing; - -mod bootstrap; -mod circular_vec; -mod cmd; -mod driver; -mod error; -mod event; -mod external_address; -mod fifo_register; -mod log_markers; -#[cfg(feature = "open-metrics")] -mod metrics; -mod network_discovery; -mod record_store; -mod record_store_api; -mod relay_manager; -mod replication_fetcher; -pub mod target_arch; -mod transactions; -mod transport; - -use cmd::LocalSwarmCmd; -use xor_name::XorName; +#[cfg(test)] +mod tests; -// re-export arch dependent deps for use in the crate, or above -pub use self::{ - cmd::{NodeIssue, SwarmLocalState}, - driver::{ - GetRecordCfg, NetworkBuilder, PutRecordCfg, SwarmDriver, VerificationKind, MAX_PACKET_SIZE, - }, +// Re-exports +pub use config::NetworkConfig; +pub use crate::network::{ error::{GetRecordError, NetworkError}, - event::{MsgResponder, NetworkEvent}, - record_store::{calculate_cost_for_records, NodeRecordStore}, - transactions::get_transactions_from_record, -}; -#[cfg(feature = "open-metrics")] -pub use metrics::service::MetricsRegistries; -pub use target_arch::{interval, sleep, spawn, Instant, Interval}; - -use self::{cmd::NetworkSwarmCmd, error::Result}; -use ant_evm::{AttoTokens, PaymentQuote, QuotingMetrics, RewardsAddress}; -use ant_protocol::{ - error::Error as ProtocolError, - messages::{ChunkProof, Cmd, Nonce, Query, QueryResponse, Request, Response}, - storage::{RecordType, RetryStrategy, Scratchpad}, - NetworkAddress, PrettyPrintKBucketKey, PrettyPrintRecordKey, CLOSE_GROUP_SIZE, + record::{GetRecordCfg, PutRecordCfg, VerificationKind}, + types::PayeeQuote, }; -use futures::future::select_all; -use libp2p::{ - identity::Keypair, - kad::{KBucketDistance, KBucketKey, Quorum, Record, RecordKey}, - multiaddr::Protocol, - request_response::OutboundFailure, - Multiaddr, PeerId, -}; -use rand::Rng; -use std::{ - collections::{BTreeMap, HashMap}, - net::IpAddr, - sync::Arc, -}; -use tokio::sync::{ - mpsc::{self, Sender}, - oneshot, -}; -use tokio::time::Duration; -use { - ant_protocol::storage::Transaction, - ant_protocol::storage::{ - try_deserialize_record, try_serialize_record, RecordHeader, RecordKind, - }, - ant_registers::SignedRegister, - std::collections::HashSet, -}; - -/// The type of quote for a selected payee. -pub type PayeeQuote = (PeerId, RewardsAddress, PaymentQuote); - -/// Majority of a given group (i.e. > 1/2). -#[inline] -pub const fn close_group_majority() -> usize { - // Calculate the majority of the close group size by dividing it by 2 and adding 1. - // This ensures that the majority is always greater than half. - CLOSE_GROUP_SIZE / 2 + 1 -} - -/// Max duration to wait for verification. -const MAX_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(750); -/// Min duration to wait for verification -const MIN_WAIT_BEFORE_READING_A_PUT: Duration = Duration::from_millis(300); - -/// Sort the provided peers by their distance to the given `NetworkAddress`. -/// Return with the closest expected number of entries if has. -pub fn sort_peers_by_address<'a>( - peers: &'a Vec, - address: &NetworkAddress, - expected_entries: usize, -) -> Result> { - sort_peers_by_key(peers, &address.as_kbucket_key(), expected_entries) -} - -/// Sort the provided peers by their distance to the given `KBucketKey`. -/// Return with the closest expected number of entries if has. -pub fn sort_peers_by_key<'a, T>( - peers: &'a Vec, - key: &KBucketKey, - expected_entries: usize, -) -> Result> { - // Check if there are enough peers to satisfy the request. - // bail early if that's not the case - if CLOSE_GROUP_SIZE > peers.len() { - warn!("Not enough peers in the k-bucket to satisfy the request"); - return Err(NetworkError::NotEnoughPeers { - found: peers.len(), - required: CLOSE_GROUP_SIZE, - }); - } - - // Create a vector of tuples where each tuple is a reference to a peer and its distance to the key. - // This avoids multiple computations of the same distance in the sorting process. - let mut peer_distances: Vec<(&PeerId, KBucketDistance)> = Vec::with_capacity(peers.len()); - - for peer_id in peers { - let addr = NetworkAddress::from_peer(*peer_id); - let distance = key.distance(&addr.as_kbucket_key()); - peer_distances.push((peer_id, distance)); - } - - // Sort the vector of tuples by the distance. - peer_distances.sort_by(|a, b| a.1.cmp(&b.1)); - - // Collect the sorted peers into a new vector. - let sorted_peers: Vec<_> = peer_distances - .into_iter() - .take(expected_entries) - .map(|(peer_id, _)| peer_id) - .collect(); +pub use types::{NetworkAddress, NetworkMetricsRecorder, NodeIssue}; - Ok(sorted_peers) -} - -#[derive(Clone, Debug)] -/// API to interact with the underlying Swarm -pub struct Network { - inner: Arc, -} +// Utility functions +use libp2p::Multiaddr; -/// The actual implementation of the Network. The other is just a wrapper around this, so that we don't expose -/// the Arc from the interface. -#[derive(Debug)] -struct NetworkInner { - network_swarm_cmd_sender: mpsc::Sender, - local_swarm_cmd_sender: mpsc::Sender, - peer_id: PeerId, - keypair: Keypair, +/// Checks if a multiaddress is globally reachable +pub fn multiaddr_is_global(_addr: &Multiaddr) -> bool { + // Implementation here... + true // Placeholder } -impl Network { - pub fn new( - network_swarm_cmd_sender: mpsc::Sender, - local_swarm_cmd_sender: mpsc::Sender, - peer_id: PeerId, - keypair: Keypair, - ) -> Self { - Self { - inner: Arc::new(NetworkInner { - network_swarm_cmd_sender, - local_swarm_cmd_sender, - peer_id, - keypair, - }), - } - } - - /// Returns the `PeerId` of the instance. - pub fn peer_id(&self) -> PeerId { - self.inner.peer_id - } - - /// Returns the `Keypair` of the instance. - pub fn keypair(&self) -> &Keypair { - &self.inner.keypair - } - - /// Get the sender to send a `NetworkSwarmCmd` to the underlying `Swarm`. - pub(crate) fn network_swarm_cmd_sender(&self) -> &mpsc::Sender { - &self.inner.network_swarm_cmd_sender - } - /// Get the sender to send a `LocalSwarmCmd` to the underlying `Swarm`. - pub(crate) fn local_swarm_cmd_sender(&self) -> &mpsc::Sender { - &self.inner.local_swarm_cmd_sender - } - - /// Signs the given data with the node's keypair. - pub fn sign(&self, msg: &[u8]) -> Result> { - self.keypair().sign(msg).map_err(NetworkError::from) - } - - /// Verifies a signature for the given data and the node's public key. - pub fn verify(&self, msg: &[u8], sig: &[u8]) -> bool { - self.keypair().public().verify(msg, sig) - } - - /// Returns the protobuf serialised PublicKey to allow messaging out for share. - pub fn get_pub_key(&self) -> Vec { - self.keypair().public().encode_protobuf() - } - - /// Dial the given peer at the given address. - /// This function will only be called for the bootstrap nodes. - pub async fn dial(&self, addr: Multiaddr) -> Result<()> { - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::Dial { addr, sender }); - receiver.await? - } - - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. - /// Excludes the client's `PeerId` while calculating the closest peers. - pub async fn client_get_all_close_peers_in_range_or_close_group( - &self, - key: &NetworkAddress, - ) -> Result> { - self.get_all_close_peers_in_range_or_close_group(key, true) - .await - } - - /// Returns the closest peers to the given `NetworkAddress`, sorted by their distance to the key. - /// - /// Includes our node's `PeerId` while calculating the closest peers. - pub async fn node_get_closest_peers(&self, key: &NetworkAddress) -> Result> { - self.get_all_close_peers_in_range_or_close_group(key, false) - .await - } - - /// Returns a map where each key is the ilog2 distance of that Kbucket and each value is a vector of peers in that - /// bucket. - /// Does not include self - pub async fn get_kbuckets(&self) -> Result>> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetKBuckets { sender }); - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Returns all the PeerId from all the KBuckets from our local Routing Table - /// Also contains our own PeerId. - pub async fn get_closest_k_value_local_peers(&self) -> Result> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetClosestKLocalPeers { sender }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Returns the replicate candidates in range. - pub async fn get_replicate_candidates(&self, data_addr: NetworkAddress) -> Result> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetReplicateCandidates { data_addr, sender }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Get the Chunk existence proof from the close nodes to the provided chunk address. - /// This is to be used by client only to verify the success of the upload. - pub async fn verify_chunk_existence( - &self, - chunk_address: NetworkAddress, - nonce: Nonce, - expected_proof: ChunkProof, - quorum: Quorum, - retry_strategy: Option, - ) -> Result<()> { - let total_attempts = retry_strategy - .map(|strategy| strategy.attempts()) - .unwrap_or(1); - - let pretty_key = PrettyPrintRecordKey::from(&chunk_address.to_record_key()).into_owned(); - let expected_n_verified = get_quorum_value(&quorum); - - let mut close_nodes = Vec::new(); - let mut retry_attempts = 0; - while retry_attempts < total_attempts { - // the check should happen before incrementing retry_attempts - if retry_attempts % 2 == 0 { - // Do not query the closest_peers during every re-try attempt. - // The close_nodes don't change often and the previous set of close_nodes might be taking a while to write - // the Chunk, so query them again incase of a failure. - close_nodes = self - .client_get_all_close_peers_in_range_or_close_group(&chunk_address) - .await?; - } - retry_attempts += 1; - info!( - "Getting ChunkProof for {pretty_key:?}. Attempts: {retry_attempts:?}/{total_attempts:?}", - ); - - let request = Request::Query(Query::GetChunkExistenceProof { - key: chunk_address.clone(), - nonce, - difficulty: 1, - }); - let responses = self - .send_and_get_responses(&close_nodes, &request, true) - .await; - let n_verified = responses - .into_iter() - .filter_map(|(peer, resp)| { - if let Ok(Response::Query(QueryResponse::GetChunkExistenceProof(proofs))) = - resp - { - if proofs.is_empty() { - warn!("Failed to verify the ChunkProof from {peer:?}. Returned proof is empty."); - None - } else if let Ok(ref proof) = proofs[0].1 { - if expected_proof.verify(proof) { - debug!("Got a valid ChunkProof from {peer:?}"); - Some(()) - } else { - warn!("Failed to verify the ChunkProof from {peer:?}. The chunk might have been tampered?"); - None - } - } else { - warn!("Failed to verify the ChunkProof from {peer:?}, returned with error {:?}", proofs[0].1); - None - } - } else { - debug!("Did not get a valid response for the ChunkProof from {peer:?}"); - None - } - }) - .count(); - debug!("Got {n_verified} verified chunk existence proofs for chunk_address {chunk_address:?}"); - - if n_verified >= expected_n_verified { - return Ok(()); - } - warn!("The obtained {n_verified} verified proofs did not match the expected {expected_n_verified} verified proofs"); - // Sleep to avoid firing queries too close to even choke the nodes further. - let waiting_time = if retry_attempts == 1 { - MIN_WAIT_BEFORE_READING_A_PUT - } else { - MIN_WAIT_BEFORE_READING_A_PUT + MIN_WAIT_BEFORE_READING_A_PUT - }; - sleep(waiting_time).await; - } - - Err(NetworkError::FailedToVerifyChunkProof( - chunk_address.clone(), - )) - } - - /// Get the store costs from the majority of the closest peers to the provided RecordKey. - /// Record already exists will have a cost of zero to be returned. - /// - /// Ignore the quote from any peers from `ignore_peers`. - /// This is useful if we want to repay a different PeerId on failure. - pub async fn get_store_costs_from_network( - &self, - record_address: NetworkAddress, - ignore_peers: Vec, - ) -> Result { - // The requirement of having at least CLOSE_GROUP_SIZE - // close nodes will be checked internally automatically. - let mut close_nodes = self - .client_get_all_close_peers_in_range_or_close_group(&record_address) - .await?; - // Filter out results from the ignored peers. - close_nodes.retain(|peer_id| !ignore_peers.contains(peer_id)); - - if close_nodes.is_empty() { - error!("Cann't get store_cost of {record_address:?}, as all close_nodes are ignored"); - return Err(NetworkError::NoStoreCostResponses); - } - - // Client shall decide whether to carry out storage verification or not. - let request = Request::Query(Query::GetStoreCost { - key: record_address.clone(), - nonce: None, - difficulty: 0, - }); - let responses = self - .send_and_get_responses(&close_nodes, &request, true) - .await; - - // loop over responses, generating an average fee and storing all responses along side - let mut all_costs = vec![]; - let mut all_quotes = vec![]; - for response in responses.into_values().flatten() { - info!( - "StoreCostReq for {record_address:?} received response: {:?}", - response - ); - match response { - Response::Query(QueryResponse::GetStoreCost { - quote: Ok(quote), - payment_address, - peer_address, - storage_proofs, - }) => { - if !storage_proofs.is_empty() { - debug!("Storage proofing during GetStoreCost to be implemented."); - } - // Check the quote itself is valid. - if quote.cost - != AttoTokens::from_u64(calculate_cost_for_records( - quote.quoting_metrics.close_records_stored, - )) - { - warn!("Received invalid quote from {peer_address:?}, {quote:?}"); - continue; - } - - all_costs.push((peer_address.clone(), payment_address, quote.clone())); - all_quotes.push((peer_address, quote)); - } - Response::Query(QueryResponse::GetStoreCost { - quote: Err(ProtocolError::RecordExists(_)), - payment_address, - peer_address, - storage_proofs, - }) => { - if !storage_proofs.is_empty() { - debug!("Storage proofing during GetStoreCost to be implemented."); - } - all_costs.push((peer_address, payment_address, PaymentQuote::zero())); - } - _ => { - error!("Non store cost response received, was {:?}", response); - } - } - } - - for peer_id in close_nodes.iter() { - let request = Request::Cmd(Cmd::QuoteVerification { - target: NetworkAddress::from_peer(*peer_id), - quotes: all_quotes.clone(), - }); - - self.send_req_ignore_reply(request, *peer_id); - } - - get_fees_from_store_cost_responses(all_costs) - } - - /// Get register from network. - /// Due to the nature of the p2p network, it's not guaranteed there is only one version - /// exists in the network all the time. - /// The scattering of the register will be more like `ring layered`. - /// Meanwhile, `kad::get_record` will terminate with first majority copies returned, - /// which has the risk of returning with old versions. - /// So, to improve the accuracy, query closest_peers first, then fetch registers - /// And merge them if they are with different content. - pub async fn get_register_record_from_network( - &self, - key: RecordKey, - ) -> Result> { - let record_address = NetworkAddress::from_record_key(&key); - // The requirement of having at least CLOSE_GROUP_SIZE - // close nodes will be checked internally automatically. - let close_nodes = self - .client_get_all_close_peers_in_range_or_close_group(&record_address) - .await?; - - let self_address = NetworkAddress::from_peer(self.peer_id()); - let request = Request::Query(Query::GetRegisterRecord { - requester: self_address, - key: record_address.clone(), - }); - let responses = self - .send_and_get_responses(&close_nodes, &request, true) - .await; - - // loop over responses, collecting all fetched register records - let mut all_register_copies = HashMap::new(); - for response in responses.into_values().flatten() { - match response { - Response::Query(QueryResponse::GetRegisterRecord(Ok((holder, content)))) => { - let register_record = Record::new(key.clone(), content.to_vec()); - let content_hash = XorName::from_content(®ister_record.value); - debug!( - "RegisterRecordReq of {record_address:?} received register of version {content_hash:?} from {holder:?}" - ); - let _ = all_register_copies.insert(content_hash, register_record); - } - _ => { - error!( - "RegisterRecordReq of {record_address:?} received error response, was {:?}", - response - ); - } - } - } - - Ok(all_register_copies) - } - - /// Get the Record from the network - /// Carry out re-attempts if required - /// In case a target_record is provided, only return when fetched target. - /// Otherwise count it as a failure when all attempts completed. - /// - /// It also handles the split record error for transactions and registers. - /// For transactions, it accumulates the transactions and returns an error if more than one. - /// For registers, it merges the registers and returns the merged record. - pub async fn get_record_from_network( - &self, - key: RecordKey, - cfg: &GetRecordCfg, - ) -> Result { - let pretty_key = PrettyPrintRecordKey::from(&key); - let mut backoff = cfg - .retry_strategy - .unwrap_or(RetryStrategy::None) - .backoff() - .into_iter(); - - loop { - info!("Getting record from network of {pretty_key:?}. with cfg {cfg:?}",); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetNetworkRecord { - key: key.clone(), - sender, - cfg: cfg.clone(), - }); - let result = match receiver.await { - Ok(result) => result, - Err(err) => { - error!( - "When fetching record {pretty_key:?}, encountered a channel error {err:?}" - ); - // Do not attempt retries. - return Err(NetworkError::InternalMsgChannelDropped); - } - }; - - let err = match result { - Ok(record) => { - info!("Record returned: {pretty_key:?}."); - return Ok(record); - } - Err(err) => err, - }; - - // log the results - match &err { - GetRecordError::RecordDoesNotMatch(_) => { - warn!("The returned record does not match target {pretty_key:?}."); - } - GetRecordError::NotEnoughCopies { expected, got, .. } => { - warn!("Not enough copies ({got}/{expected}) found yet for {pretty_key:?}."); - } - // libp2p RecordNotFound does mean no holders answered. - // it does not actually mean the record does not exist. - // just that those asked did not have it - GetRecordError::RecordNotFound => { - warn!("No holder of record '{pretty_key:?}' found."); - } - // This is returned during SplitRecordError, we should not get this error here. - GetRecordError::RecordKindMismatch => { - error!("Record kind mismatch for {pretty_key:?}. This error should not happen here."); - } - GetRecordError::SplitRecord { result_map } => { - error!("Encountered a split record for {pretty_key:?}."); - if let Some(record) = Self::handle_split_record_error(result_map, &key)? { - info!("Merged the split record (register) for {pretty_key:?}, into a single record"); - return Ok(record); - } - } - GetRecordError::QueryTimeout => { - error!("Encountered query timeout for {pretty_key:?}."); - } - } - - match backoff.next() { - Some(Some(duration)) => { - crate::target_arch::sleep(duration).await; - debug!("Getting record from network of {pretty_key:?} via backoff..."); - } - _ => break Err(err.into()), - } - } - } - - /// Handle the split record error. - /// Transaction: Accumulate transactions. - /// Register: Merge registers and return the merged record. - fn handle_split_record_error( - result_map: &HashMap)>, - key: &RecordKey, - ) -> std::result::Result, NetworkError> { - let pretty_key = PrettyPrintRecordKey::from(key); - - // attempt to deserialise and accumulate any transactions or registers - let results_count = result_map.len(); - let mut accumulated_transactions = HashSet::new(); - let mut collected_registers = Vec::new(); - let mut valid_scratchpad: Option = None; - - if results_count > 1 { - let mut record_kind = None; - info!("For record {pretty_key:?}, we have more than one result returned."); - for (record, _) in result_map.values() { - let Ok(header) = RecordHeader::from_record(record) else { - continue; - }; - let kind = record_kind.get_or_insert(header.kind); - // FIXME: the first record dictates the kind, but we should check all records are of the same kind. - // And somehow discard the incorrect ones. - if *kind != header.kind { - error!("Encountered a split record for {pretty_key:?} with different RecordHeaders. Expected {kind:?} but got {:?}. Skipping",header.kind); - continue; - } - - match kind { - RecordKind::Chunk - | RecordKind::ChunkWithPayment - | RecordKind::RegisterWithPayment - | RecordKind::ScratchpadWithPayment => { - error!("Encountered a split record for {pretty_key:?} with unexpected RecordKind {kind:?}, skipping."); - continue; - } - RecordKind::Transaction => { - info!("For record {pretty_key:?}, we have a split record for a transaction attempt. Accumulating transactions"); - - match get_transactions_from_record(record) { - Ok(transactions) => { - accumulated_transactions.extend(transactions); - } - Err(_) => { - continue; - } - } - } - RecordKind::Register => { - info!("For record {pretty_key:?}, we have a split record for a register. Accumulating registers"); - let Ok(register) = try_deserialize_record::(record) else { - error!( - "Failed to deserialize register {pretty_key}. Skipping accumulation" - ); - continue; - }; - - match register.verify() { - Ok(_) => { - collected_registers.push(register); - } - Err(_) => { - error!( - "Failed to verify register for {pretty_key} at address: {}. Skipping accumulation", - register.address() - ); - continue; - } - } - } - RecordKind::Scratchpad => { - info!("For record {pretty_key:?}, we have a split record for a scratchpad. Selecting the one with the highest count"); - let Ok(scratchpad) = try_deserialize_record::(record) else { - error!( - "Failed to deserialize scratchpad {pretty_key}. Skipping accumulation" - ); - continue; - }; - - if !scratchpad.is_valid() { - warn!( - "Rejecting Scratchpad for {pretty_key} PUT with invalid signature during split record error" - ); - continue; - } - - if let Some(old) = &valid_scratchpad { - if old.count() >= scratchpad.count() { - info!( - "Rejecting Scratchpad for {pretty_key} with lower count than the previous one" - ); - continue; - } else { - valid_scratchpad = Some(scratchpad); - } - } else { - valid_scratchpad = Some(scratchpad); - } - } - } - } - } - - // Return the accumulated transactions as a single record - if accumulated_transactions.len() > 1 { - info!("For record {pretty_key:?} task found split record for a transaction, accumulated and sending them as a single record"); - let accumulated_transactions = accumulated_transactions - .into_iter() - .collect::>(); - let record = Record { - key: key.clone(), - value: try_serialize_record(&accumulated_transactions, RecordKind::Transaction) - .map_err(|err| { - error!( - "Error while serializing the accumulated transactions for {pretty_key:?}: {err:?}" - ); - NetworkError::from(err) - })? - .to_vec(), - publisher: None, - expires: None, - }; - return Ok(Some(record)); - } else if !collected_registers.is_empty() { - info!("For record {pretty_key:?} task found multiple registers, merging them."); - let signed_register = collected_registers.iter().fold(collected_registers[0].clone(), |mut acc, x| { - if let Err(e) = acc.merge(x) { - warn!("Ignoring forked register as we failed to merge conflicting registers at {}: {e}", x.address()); - } - acc - }); - - let record_value = try_serialize_record(&signed_register, RecordKind::Register) - .map_err(|err| { - error!( - "Error while serializing the merged register for {pretty_key:?}: {err:?}" - ); - NetworkError::from(err) - })? - .to_vec(); - - let record = Record { - key: key.clone(), - value: record_value, - publisher: None, - expires: None, - }; - return Ok(Some(record)); - } else if let Some(scratchpad) = valid_scratchpad { - info!("Found a valid scratchpad for {pretty_key:?}, returning it"); - let record = Record { - key: key.clone(), - value: try_serialize_record(&scratchpad, RecordKind::Scratchpad) - .map_err(|err| { - error!( - "Error while serializing valid scratchpad for {pretty_key:?}: {err:?}" - ); - NetworkError::from(err) - })? - .to_vec(), - publisher: None, - expires: None, - }; - return Ok(Some(record)); - } - Ok(None) - } - - /// Get the cost of storing the next record from the network - pub async fn get_local_storecost( - &self, - key: RecordKey, - ) -> Result<(AttoTokens, QuotingMetrics, Vec)> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalStoreCost { key, sender }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Notify the node receicced a payment. - pub fn notify_payment_received(&self) { - self.send_local_swarm_cmd(LocalSwarmCmd::PaymentReceived); - } - - /// Get `Record` from the local RecordStore - pub async fn get_local_record(&self, key: &RecordKey) -> Result> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetLocalRecord { - key: key.clone(), - sender, - }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Whether the target peer is considered blacklisted by self - pub async fn is_peer_shunned(&self, target: NetworkAddress) -> Result { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::IsPeerShunned { target, sender }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Put `Record` to network - /// Optionally verify the record is stored after putting it to network - /// If verify is on, we retry. - pub async fn put_record(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> { - let pretty_key = PrettyPrintRecordKey::from(&record.key); - let mut backoff = cfg - .retry_strategy - .unwrap_or(RetryStrategy::None) - .backoff() - .into_iter(); - - loop { - info!( - "Attempting to PUT record with key: {pretty_key:?} to network, with cfg {cfg:?}, retrying via backoff..." - ); - - let err = match self.put_record_once(record.clone(), cfg).await { - Ok(_) => break Ok(()), - Err(err) => err, - }; - - // FIXME: Skip if we get a permanent error during verification, e.g., DoubleSpendAttempt - warn!("Failed to PUT record with key: {pretty_key:?} to network (retry via backoff) with error: {err:?}"); - - match backoff.next() { - Some(Some(duration)) => { - crate::target_arch::sleep(duration).await; - } - _ => break Err(err), - } - } - } - - async fn put_record_once(&self, record: Record, cfg: &PutRecordCfg) -> Result<()> { - let record_key = record.key.clone(); - let pretty_key = PrettyPrintRecordKey::from(&record_key); - info!( - "Putting record of {} - length {:?} to network", - pretty_key, - record.value.len() - ); - - // Waiting for a response to avoid flushing to network too quick that causing choke - let (sender, receiver) = oneshot::channel(); - if let Some(put_record_to_peers) = &cfg.use_put_record_to { - self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecordTo { - peers: put_record_to_peers.clone(), - record: record.clone(), - sender, - quorum: cfg.put_quorum, - }); - } else { - self.send_network_swarm_cmd(NetworkSwarmCmd::PutRecord { - record: record.clone(), - sender, - quorum: cfg.put_quorum, - }); - } - - let response = receiver.await?; - - if let Some((verification_kind, get_cfg)) = &cfg.verification { - // Generate a random duration between MAX_WAIT_BEFORE_READING_A_PUT and MIN_WAIT_BEFORE_READING_A_PUT - let wait_duration = rand::thread_rng() - .gen_range(MIN_WAIT_BEFORE_READING_A_PUT..MAX_WAIT_BEFORE_READING_A_PUT); - // Small wait before we attempt to verify. - // There will be `re-attempts` to be carried out within the later step anyway. - sleep(wait_duration).await; - debug!("Attempting to verify {pretty_key:?} after we've slept for {wait_duration:?}"); - - // Verify the record is stored, requiring re-attempts - if let VerificationKind::ChunkProof { - expected_proof, - nonce, - } = verification_kind - { - self.verify_chunk_existence( - NetworkAddress::from_record_key(&record_key), - *nonce, - expected_proof.clone(), - get_cfg.get_quorum, - get_cfg.retry_strategy, - ) - .await?; - } else { - match self - .get_record_from_network(record.key.clone(), get_cfg) - .await - { - Ok(_) => { - debug!("Record {pretty_key:?} verified to be stored."); - } - Err(NetworkError::GetRecordError(GetRecordError::RecordNotFound)) => { - warn!("Record {pretty_key:?} not found after PUT, either rejected or not yet stored by nodes when we asked"); - return Err(NetworkError::RecordNotStoredByNodes( - NetworkAddress::from_record_key(&record_key), - )); - } - Err(NetworkError::GetRecordError(GetRecordError::SplitRecord { .. })) - if matches!(verification_kind, VerificationKind::Crdt) => - { - warn!("Record {pretty_key:?} is split, which is okay since we're dealing with CRDTs"); - } - Err(e) => { - debug!( - "Failed to verify record {pretty_key:?} to be stored with error: {e:?}" - ); - return Err(e); - } - } - } - } - response - } - - /// Notify ReplicationFetch a fetch attempt is completed. - /// (but it won't trigger any real writes to disk, say fetched an old version of register) - pub fn notify_fetch_completed(&self, key: RecordKey, record_type: RecordType) { - self.send_local_swarm_cmd(LocalSwarmCmd::FetchCompleted((key, record_type))) - } - - /// Put `Record` to the local RecordStore - /// Must be called after the validations are performed on the Record - pub fn put_local_record(&self, record: Record) { - debug!( - "Writing Record locally, for {:?} - length {:?}", - PrettyPrintRecordKey::from(&record.key), - record.value.len() - ); - self.send_local_swarm_cmd(LocalSwarmCmd::PutLocalRecord { record }) - } - - /// Returns true if a RecordKey is present locally in the RecordStore - pub async fn is_record_key_present_locally(&self, key: &RecordKey) -> Result { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::RecordStoreHasKey { - key: key.clone(), - sender, - }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Returns the Addresses of all the locally stored Records - pub async fn get_all_local_record_addresses( - &self, - ) -> Result> { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetAllLocalRecordAddresses { sender }); - - receiver - .await - .map_err(|_e| NetworkError::InternalMsgChannelDropped) - } - - /// Send `Request` to the given `PeerId` and await for the response. If `self` is the recipient, - /// then the `Request` is forwarded to itself and handled, and a corresponding `Response` is created - /// and returned to itself. Hence the flow remains the same and there is no branching at the upper - /// layers. - /// - /// If an outbound issue is raised, we retry once more to send the request before returning an error. - pub async fn send_request(&self, req: Request, peer: PeerId) -> Result { - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest { - req: req.clone(), - peer, - sender: Some(sender), - }); - let mut r = receiver.await?; - - if let Err(error) = &r { - error!("Error in response: {:?}", error); - - match error { - NetworkError::OutboundError(OutboundFailure::Io(_)) - | NetworkError::OutboundError(OutboundFailure::ConnectionClosed) => { - warn!( - "Outbound failed for {req:?} .. {error:?}, redialing once and reattempting" - ); - let (sender, receiver) = oneshot::channel(); - - debug!("Reattempting to send_request {req:?} to {peer:?}"); - self.send_network_swarm_cmd(NetworkSwarmCmd::SendRequest { - req, - peer, - sender: Some(sender), - }); - - r = receiver.await?; - } - _ => { - // If the record is found, we should log the error and continue - warn!("Error in response: {:?}", error); - } - } - } - - r - } - - /// Send `Request` to the given `PeerId` and do _not_ await a response here. - /// Instead the Response will be handled by the common `response_handler` - pub fn send_req_ignore_reply(&self, req: Request, peer: PeerId) { - let swarm_cmd = NetworkSwarmCmd::SendRequest { - req, - peer, - sender: None, - }; - self.send_network_swarm_cmd(swarm_cmd) - } - - /// Send a `Response` through the channel opened by the requester. - pub fn send_response(&self, resp: Response, channel: MsgResponder) { - self.send_network_swarm_cmd(NetworkSwarmCmd::SendResponse { resp, channel }) - } - - /// Return a `SwarmLocalState` with some information obtained from swarm's local state. - pub async fn get_swarm_local_state(&self) -> Result { - let (sender, receiver) = oneshot::channel(); - self.send_local_swarm_cmd(LocalSwarmCmd::GetSwarmLocalState(sender)); - let state = receiver.await?; - Ok(state) - } - - pub fn trigger_interval_replication(&self) { - self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIntervalReplication) - } - - pub fn record_node_issues(&self, peer_id: PeerId, issue: NodeIssue) { - self.send_local_swarm_cmd(LocalSwarmCmd::RecordNodeIssue { peer_id, issue }); - } - - pub fn historical_verify_quotes(&self, quotes: Vec<(PeerId, PaymentQuote)>) { - self.send_local_swarm_cmd(LocalSwarmCmd::QuoteVerification { quotes }); - } - - pub fn trigger_irrelevant_record_cleanup(&self) { - self.send_local_swarm_cmd(LocalSwarmCmd::TriggerIrrelevantRecordCleanup) - } - - pub fn add_network_density_sample(&self, distance: KBucketDistance) { - self.send_local_swarm_cmd(LocalSwarmCmd::AddNetworkDensitySample { distance }) - } - - /// Helper to send NetworkSwarmCmd - fn send_network_swarm_cmd(&self, cmd: NetworkSwarmCmd) { - send_network_swarm_cmd(self.network_swarm_cmd_sender().clone(), cmd); - } - /// Helper to send LocalSwarmCmd - fn send_local_swarm_cmd(&self, cmd: LocalSwarmCmd) { - send_local_swarm_cmd(self.local_swarm_cmd_sender().clone(), cmd); - } - - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. - /// If `client` is false, then include `self` among the `closest_peers` - pub async fn get_close_group_closest_peers( - &self, - key: &NetworkAddress, - client: bool, - ) -> Result> { - debug!("Getting the closest peers to {key:?}"); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { - key: key.clone(), - sender, - }); - let k_bucket_peers = receiver.await?; - - // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result - let result_len = k_bucket_peers.len(); - let mut closest_peers = k_bucket_peers; - // ensure we're not including self here - if client { - // remove our peer id from the calculations here: - closest_peers.retain(|&x| x != self.peer_id()); - if result_len != closest_peers.len() { - info!("Remove self client from the closest_peers"); - } - } - if tracing::level_enabled!(tracing::Level::DEBUG) { - let close_peers_pretty_print: Vec<_> = closest_peers - .iter() - .map(|peer_id| { - format!( - "{peer_id:?}({:?})", - PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key()) - ) - }) - .collect(); - - debug!("Network knowledge of close peers to {key:?} are: {close_peers_pretty_print:?}"); - } - - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; - Ok(closest_peers.into_iter().cloned().collect()) - } - - /// Returns the closest peers to the given `XorName`, sorted by their distance to the xor_name. - /// If `client` is false, then include `self` among the `closest_peers` - /// - /// If less than CLOSE_GROUP_SIZE peers are found, it will return all the peers. - pub async fn get_all_close_peers_in_range_or_close_group( - &self, - key: &NetworkAddress, - client: bool, - ) -> Result> { - let pretty_key = PrettyPrintKBucketKey(key.as_kbucket_key()); - debug!("Getting the all closest peers in range of {pretty_key:?}"); - let (sender, receiver) = oneshot::channel(); - self.send_network_swarm_cmd(NetworkSwarmCmd::GetClosestPeersToAddressFromNetwork { - key: key.clone(), - sender, - }); - - let found_peers = receiver.await?; - - // Count self in if among the CLOSE_GROUP_SIZE closest and sort the result - let result_len = found_peers.len(); - let mut closest_peers = found_peers; - - // ensure we're not including self here - if client { - // remove our peer id from the calculations here: - closest_peers.retain(|&x| x != self.peer_id()); - if result_len != closest_peers.len() { - info!("Remove self client from the closest_peers"); - } - } - - if tracing::level_enabled!(tracing::Level::DEBUG) { - let close_peers_pretty_print: Vec<_> = closest_peers - .iter() - .map(|peer_id| { - format!( - "{peer_id:?}({:?})", - PrettyPrintKBucketKey(NetworkAddress::from_peer(*peer_id).as_kbucket_key()) - ) - }) - .collect(); - - debug!( - "Network knowledge of closest peers to {pretty_key:?} are: {close_peers_pretty_print:?}" - ); - } - - let closest_peers = sort_peers_by_address(&closest_peers, key, CLOSE_GROUP_SIZE)?; - Ok(closest_peers.into_iter().cloned().collect()) - } - - /// Send a `Request` to the provided set of peers and wait for their responses concurrently. - /// If `get_all_responses` is true, we wait for the responses from all the peers. - /// NB TODO: Will return an error if the request timeouts. - /// If `get_all_responses` is false, we return the first successful response that we get - pub async fn send_and_get_responses( - &self, - peers: &[PeerId], - req: &Request, - get_all_responses: bool, - ) -> BTreeMap> { - debug!("send_and_get_responses for {req:?}"); - let mut list_of_futures = peers - .iter() - .map(|peer| { - Box::pin(async { - let resp = self.send_request(req.clone(), *peer).await; - (*peer, resp) - }) - }) - .collect::>(); - - let mut responses = BTreeMap::new(); - while !list_of_futures.is_empty() { - let ((peer, resp), _, remaining_futures) = select_all(list_of_futures).await; - let resp_string = match &resp { - Ok(resp) => format!("{resp}"), - Err(err) => format!("{err:?}"), - }; - debug!("Got response from {peer:?} for the req: {req:?}, resp: {resp_string}"); - if !get_all_responses && resp.is_ok() { - return BTreeMap::from([(peer, resp)]); - } - responses.insert(peer, resp); - list_of_futures = remaining_futures; - } - - debug!("Received all responses for {req:?}"); - responses - } -} - -/// Given `all_costs` it will return the closest / lowest cost -/// Closest requiring it to be within CLOSE_GROUP nodes -fn get_fees_from_store_cost_responses( - all_costs: Vec<(NetworkAddress, RewardsAddress, PaymentQuote)>, -) -> Result { - // Find the minimum cost using a linear scan with random tie break - let mut rng = rand::thread_rng(); - let payee = all_costs - .into_iter() - .min_by( - |(_address_a, _main_key_a, cost_a), (_address_b, _main_key_b, cost_b)| { - let cmp = cost_a.cost.cmp(&cost_b.cost); - if cmp == std::cmp::Ordering::Equal { - if rng.gen() { - std::cmp::Ordering::Less - } else { - std::cmp::Ordering::Greater - } - } else { - cmp - } - }, - ) - .ok_or(NetworkError::NoStoreCostResponses)?; - - info!("Final fees calculated as: {payee:?}"); - // we dont need to have the address outside of here for now - let payee_id = if let Some(peer_id) = payee.0.as_peer_id() { - peer_id - } else { - error!("Can't get PeerId from payee {:?}", payee.0); - return Err(NetworkError::NoStoreCostResponses); - }; - Ok((payee_id, payee.1, payee.2)) -} - -/// Get the value of the provided Quorum -pub fn get_quorum_value(quorum: &Quorum) -> usize { - match quorum { - Quorum::Majority => close_group_majority(), - Quorum::All => CLOSE_GROUP_SIZE, - Quorum::N(v) => v.get(), - Quorum::One => 1, - } -} - -/// Verifies if `Multiaddr` contains IPv4 address that is not global. -/// This is used to filter out unroutable addresses from the Kademlia routing table. -pub fn multiaddr_is_global(multiaddr: &Multiaddr) -> bool { - !multiaddr.iter().any(|addr| match addr { - Protocol::Ip4(ip) => { - // Based on the nightly `is_global` method (`Ipv4Addrs::is_global`), only using what is available in stable. - // Missing `is_shared`, `is_benchmarking` and `is_reserved`. - ip.is_unspecified() - | ip.is_private() - | ip.is_loopback() - | ip.is_link_local() - | ip.is_documentation() - | ip.is_broadcast() - } - _ => false, - }) -} - -/// Pop off the `/p2p/`. This mutates the `Multiaddr` and returns the `PeerId` if it exists. -pub(crate) fn multiaddr_pop_p2p(multiaddr: &mut Multiaddr) -> Option { - if let Some(Protocol::P2p(peer_id)) = multiaddr.iter().last() { - // Only actually strip the last protocol if it's indeed the peer ID. - let _ = multiaddr.pop(); - Some(peer_id) - } else { - None - } -} - -/// Build a `Multiaddr` with the p2p protocol filtered out. -/// If it is a relayed address, then the relay's P2P address is preserved. -pub(crate) fn multiaddr_strip_p2p(multiaddr: &Multiaddr) -> Multiaddr { - let is_relayed = multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit)); - - if is_relayed { - // Do not add any PeerId after we've found the P2PCircuit protocol. The prior one is the relay's PeerId which - // we should preserve. - let mut before_relay_protocol = true; - let mut new_multi_addr = Multiaddr::empty(); - for p in multiaddr.iter() { - if matches!(p, Protocol::P2pCircuit) { - before_relay_protocol = false; - } - if matches!(p, Protocol::P2p(_)) && !before_relay_protocol { - continue; - } - new_multi_addr.push(p); - } - new_multi_addr - } else { - multiaddr - .iter() - .filter(|p| !matches!(p, Protocol::P2p(_))) - .collect() - } -} - -/// Get the `IpAddr` from the `Multiaddr` -pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option { - addr.iter().find_map(|p| match p { - Protocol::Ip4(addr) => Some(IpAddr::V4(addr)), - Protocol::Ip6(addr) => Some(IpAddr::V6(addr)), - _ => None, - }) -} - -pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option { - addr.iter().find_map(|p| match p { - Protocol::Udp(port) => Some(port), - _ => None, - }) -} - -pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender, cmd: LocalSwarmCmd) { - let capacity = swarm_cmd_sender.capacity(); - - if capacity == 0 { - error!( - "SwarmCmd channel is full. Await capacity to send: {:?}", - cmd - ); - } - - // Spawn a task to send the SwarmCmd and keep this fn sync - let _handle = spawn(async move { - if let Err(error) = swarm_cmd_sender.send(cmd).await { - error!("Failed to send SwarmCmd: {}", error); - } - }); -} - -pub(crate) fn send_network_swarm_cmd( - swarm_cmd_sender: Sender, - cmd: NetworkSwarmCmd, -) { - let capacity = swarm_cmd_sender.capacity(); - - if capacity == 0 { - error!( - "SwarmCmd channel is full. Await capacity to send: {:?}", - cmd - ); - } - - // Spawn a task to send the SwarmCmd and keep this fn sync - let _handle = spawn(async move { - if let Err(error) = swarm_cmd_sender.send(cmd).await { - error!("Failed to send SwarmCmd: {}", error); - } - }); -} - -#[cfg(test)] -mod tests { - use eyre::bail; - - use super::*; - use ant_evm::PaymentQuote; - - #[test] - fn test_get_fee_from_store_cost_responses() -> Result<()> { - // for a vec of different costs of CLOSE_GROUP size - // ensure we return the CLOSE_GROUP / 2 indexed price - let mut costs = vec![]; - for i in 1..CLOSE_GROUP_SIZE { - let addr = ant_evm::utils::dummy_address(); - costs.push(( - NetworkAddress::from_peer(PeerId::random()), - addr, - PaymentQuote::test_dummy(Default::default(), AttoTokens::from_u64(i as u64)), - )); - } - let expected_price = costs[0].2.cost.as_atto(); - let (_peer_id, _key, price) = get_fees_from_store_cost_responses(costs)?; - - assert_eq!( - price.cost.as_atto(), - expected_price, - "price should be {expected_price}" - ); - - Ok(()) - } - - #[test] - fn test_get_some_fee_from_store_cost_responses_even_if_one_errs_and_sufficient( - ) -> eyre::Result<()> { - // for a vec of different costs of CLOSE_GROUP size - let responses_count = CLOSE_GROUP_SIZE as u64 - 1; - let mut costs = vec![]; - for i in 1..responses_count { - // push random addr and Nano - let addr = ant_evm::utils::dummy_address(); - costs.push(( - NetworkAddress::from_peer(PeerId::random()), - addr, - PaymentQuote::test_dummy(Default::default(), AttoTokens::from_u64(i)), - )); - println!("price added {i}"); - } - - // this should be the lowest price - let expected_price = costs[0].2.cost.as_atto(); - - let (_peer_id, _key, price) = match get_fees_from_store_cost_responses(costs) { - Err(_) => bail!("Should not have errored as we have enough responses"), - Ok(cost) => cost, - }; - - assert_eq!( - price.cost.as_atto(), - expected_price, - "price should be {expected_price}" - ); - - Ok(()) - } +/// Re-export tokio spawn for convenience +pub use tokio::spawn; - #[test] - fn test_network_sign_verify() -> eyre::Result<()> { - let (network, _, _) = - NetworkBuilder::new(Keypair::generate_ed25519(), false).build_client()?; - let msg = b"test message"; - let sig = network.sign(msg)?; - assert!(network.verify(msg, &sig)); - Ok(()) - } +/// Re-export tokio time utilities +pub mod target_arch { + pub use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + pub use tokio::spawn; } diff --git a/ant-networking/src/messages.rs b/ant-networking/src/messages.rs new file mode 100644 index 0000000000..25500226ba --- /dev/null +++ b/ant-networking/src/messages.rs @@ -0,0 +1,49 @@ +use serde::{Deserialize, Serialize}; +use libp2p::PeerId; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Request { + pub peer_id: PeerId, + pub request_type: RequestType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum RequestType { + GetRecord { key: Vec }, + PutRecord { key: Vec, value: Vec }, + GetClosestPeers { key: Vec }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Response { + pub peer_id: PeerId, + pub response_type: ResponseType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ResponseType { + Record { key: Vec, value: Vec }, + NoRecord { key: Vec }, + ClosestPeers { key: Vec, peers: Vec }, + Error { message: String }, +} + +impl Request { + pub const PROTOCOL_NAME: &'static str = "/ant-networking/request/1.0.0"; + + pub fn new(peer_id: PeerId, request_type: RequestType) -> Self { + Self { + peer_id, + request_type, + } + } +} + +impl Response { + pub fn new(peer_id: PeerId, response_type: ResponseType) -> Self { + Self { + peer_id, + response_type, + } + } +} diff --git a/ant-networking/src/network/behavior.rs b/ant-networking/src/network/behavior.rs new file mode 100644 index 0000000000..6a125c754c --- /dev/null +++ b/ant-networking/src/network/behavior.rs @@ -0,0 +1,132 @@ +use libp2p::{ + allow_block_list, + identify, + kad::{self, store::UnifiedRecordStore}, + relay::{self, client}, + request_response::{self, ProtocolSupport}, + NetworkBehaviour, PeerId, + swarm::derive_prelude::*, + upnp, +}; + +use crate::{ + event::NodeEvent, + messages::{Request, Response}, +}; + +#[derive(NetworkBehaviour)] +#[behaviour(event_process = true, out_event = "NodeEvent")] +pub struct NodeBehaviour { + pub blocklist: libp2p::allow_block_list::Behaviour, + pub relay_client: libp2p::relay::client::Behaviour, + pub relay_server: libp2p::relay::Behaviour, + #[cfg(feature = "upnp")] + pub upnp: Toggle, + pub request_response: request_response::cbor::Behaviour, + pub kademlia: kad::Behaviour, + pub identify: identify::Behaviour, + #[cfg(feature = "local")] + pub mdns: mdns::tokio::Behaviour, +} + +impl NodeBehaviour { + pub fn new( + peer_id: PeerId, + blocklist: libp2p::allow_block_list::Behaviour, + relay_client: libp2p::relay::client::Behaviour, + relay_server: libp2p::relay::Behaviour, + #[cfg(feature = "upnp")] + upnp: Toggle, + request_response: request_response::cbor::Behaviour, + kademlia: kad::Behaviour, + identify: identify::Behaviour, + #[cfg(feature = "local")] + mdns: mdns::tokio::Behaviour, + ) -> Self { + Self { + blocklist, + relay_client, + relay_server, + #[cfg(feature = "upnp")] + upnp, + request_response, + kademlia, + identify, + #[cfg(feature = "local")] + mdns, + } + } +} + +impl NetworkBehaviourEventProcess for NodeBehaviour { + fn inject_event(&mut self, event: identify::Event) { + // Convert identify events to NodeEvent + // This will be called when an identify event occurs + } +} + +impl NetworkBehaviourEventProcess for NodeBehaviour { + fn inject_event(&mut self, event: kad::Event) { + // Convert kademlia events to NodeEvent + // This will be called when a kademlia event occurs + } +} + +impl NetworkBehaviourEventProcess> for NodeBehaviour { + fn inject_event(&mut self, event: request_response::Event) { + // Convert request_response events to NodeEvent + // This will be called when a request_response event occurs + } +} + +#[cfg(test)] +mod tests { + use super::*; + use libp2p::{ + identity::Keypair, + kad::Config as KadConfig, + request_response::Config as RequestResponseConfig, + }; + + #[test] + fn test_node_behaviour_creation() { + let keypair = Keypair::generate_ed25519(); + let peer_id = keypair.public().to_peer_id(); + + // Create identify behavior + let identify = identify::Behaviour::new(identify::Config::new( + "test/1.0.0".to_string(), + keypair.public(), + )); + + // Create kademlia behavior + let kad_config = KadConfig::default(); + let kad_store = UnifiedRecordStore::new(peer_id); + let kademlia = kad::Behaviour::new(peer_id, kad_store, kad_config); + + // Create request/response behavior + let req_res_config = RequestResponseConfig::default(); + let protocol = StreamProtocol::new(Request::PROTOCOL_NAME); + let request_response = request_response::cbor::Behaviour::new( + [(protocol, ProtocolSupport::Full)], + req_res_config, + ); + + // Create node behavior + let behaviour = NodeBehaviour::new( + peer_id, + libp2p::allow_block_list::Behaviour::default(), + libp2p::relay::client::Behaviour::new(), + libp2p::relay::Behaviour::new(), + #[cfg(feature = "upnp")] + Toggle::On(libp2p::upnp::tokio::Behaviour::new()), + request_response, + kademlia, + identify, + #[cfg(feature = "local")] + mdns::tokio::Behaviour::new(), + ); + + assert!(behaviour.identify.local_peer_id() == &peer_id); + } +} diff --git a/ant-networking/src/network/config.rs b/ant-networking/src/network/config.rs new file mode 100644 index 0000000000..120d935f32 --- /dev/null +++ b/ant-networking/src/network/config.rs @@ -0,0 +1,162 @@ +use libp2p::{ + kad::{Quorum, Record}, + PeerId, +}; +use ant_protocol::{ + messages::{ChunkProof, Nonce}, + storage::RetryStrategy, +}; +use std::collections::HashSet; +use ant_registers::SignedRegister; +use ant_protocol::storage::try_deserialize_record; +use tracing::error; + +/// The various settings to apply to when fetching a record from network +#[derive(Clone)] +pub struct GetRecordCfg { + /// The query will result in an error if we get records less than the provided Quorum + pub get_quorum: Quorum, + /// If enabled, the provided `RetryStrategy` is used to retry if a GET attempt fails. + pub retry_strategy: Option, + /// Only return if we fetch the provided record. + pub target_record: Option, + /// Logs if the record was not fetched from the provided set of peers. + pub expected_holders: HashSet, + /// For register record, only root value shall be checked, not the entire content. + pub is_register: bool, +} + +impl GetRecordCfg { + pub fn does_target_match(&self, record: &Record) -> bool { + if let Some(ref target_record) = self.target_record { + if self.is_register { + let pretty_key = format!("{:?}", &target_record.key); + + let fetched_register = match try_deserialize_record::(record) { + Ok(fetched_register) => fetched_register, + Err(err) => { + error!("When try to deserialize register from fetched record {pretty_key:?}, have error {err:?}"); + return false; + } + }; + let target_register = match try_deserialize_record::(target_record) { + Ok(target_register) => target_register, + Err(err) => { + error!("When try to deserialize register from target record {pretty_key:?}, have error {err:?}"); + return false; + } + }; + + target_register.base_register() == fetched_register.base_register() + && target_register.ops() == fetched_register.ops() + } else { + target_record == record + } + } else { + // Not have target_record to check with + true + } + } +} + +impl std::fmt::Debug for GetRecordCfg { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let mut f = f.debug_struct("GetRecordCfg"); + f.field("get_quorum", &self.get_quorum) + .field("retry_strategy", &self.retry_strategy); + + match &self.target_record { + Some(record) => { + let pretty_key = format!("{:?}", &record.key); + f.field("target_record", &pretty_key); + } + None => { + f.field("target_record", &"None"); + } + }; + + f.field("expected_holders", &self.expected_holders).finish() + } +} + +/// The various settings related to writing a record to the network. +#[derive(Debug, Clone)] +pub struct PutRecordCfg { + /// The quorum used by KAD PUT. KAD still sends out the request to all the peers set by the `replication_factor`, it + /// just makes sure that we get at least `n` successful responses defined by the Quorum. + /// Our nodes currently send `Ok()` response for every KAD PUT. Thus this field does not do anything atm. + pub put_quorum: Quorum, + /// If enabled, the provided `RetryStrategy` is used to retry if a PUT attempt fails. + pub retry_strategy: Option, + /// Use the `kad::put_record_to` to PUT the record only to the specified peers. If this option is set to None, we + /// will be using `kad::put_record` which would PUT the record to all the closest members of the record. + pub use_put_record_to: Option>, + /// Enables verification after writing. The VerificationKind is used to determine the method to use. + pub verification: Option<(VerificationKind, GetRecordCfg)>, +} + +/// The methods in which verification on a PUT can be carried out. +#[derive(Debug, Clone)] +pub enum VerificationKind { + /// Uses the default KAD GET to perform verification. + Network, + /// Uses the default KAD GET to perform verification, but don't error out on split records + Crdt, + /// Uses the hash based verification for chunks. + ChunkProof { + expected_proof: ChunkProof, + nonce: Nonce, + }, +} + +#[cfg(test)] +mod tests { + use super::*; + use libp2p::kad::Record; + + #[test] + fn test_get_record_cfg_target_match() { + let record = Record { + key: vec![1, 2, 3], + value: vec![4, 5, 6], + publisher: None, + expires: None, + }; + + // Test with no target record + let cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: None, + expected_holders: HashSet::new(), + is_register: false, + }; + assert!(cfg.does_target_match(&record)); + + // Test with matching target record + let cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: Some(record.clone()), + expected_holders: HashSet::new(), + is_register: false, + }; + assert!(cfg.does_target_match(&record)); + + // Test with non-matching target record + let different_record = Record { + key: vec![1, 2, 3], + value: vec![7, 8, 9], + publisher: None, + expires: None, + }; + let cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: Some(different_record), + expected_holders: HashSet::new(), + is_register: false, + }; + assert!(!cfg.does_target_match(&record)); + } +} diff --git a/ant-networking/src/network/error.rs b/ant-networking/src/network/error.rs new file mode 100644 index 0000000000..9dd3ca6bad --- /dev/null +++ b/ant-networking/src/network/error.rs @@ -0,0 +1,48 @@ +use std::error::Error as StdError; +use std::fmt; + +/// Represents errors that can occur during network operations +#[derive(Debug)] +pub enum NetworkError { + /// Error occurred during record operations + Record(String), + /// Error occurred during connection operations + Connection(String), + /// General network error + Other(String), +} + +impl fmt::Display for NetworkError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + NetworkError::Record(msg) => write!(f, "Record error: {}", msg), + NetworkError::Connection(msg) => write!(f, "Connection error: {}", msg), + NetworkError::Other(msg) => write!(f, "Network error: {}", msg), + } + } +} + +impl StdError for NetworkError {} + +/// Error that can occur when getting records +#[derive(Debug)] +pub enum GetRecordError { + /// Record not found + NotFound, + /// Verification failed + VerificationFailed(String), + /// Network error occurred + Network(NetworkError), +} + +impl fmt::Display for GetRecordError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + GetRecordError::NotFound => write!(f, "Record not found"), + GetRecordError::VerificationFailed(msg) => write!(f, "Verification failed: {}", msg), + GetRecordError::Network(err) => write!(f, "Network error: {}", err), + } + } +} + +impl StdError for GetRecordError {} diff --git a/ant-networking/src/network/mod.rs b/ant-networking/src/network/mod.rs new file mode 100644 index 0000000000..2babf9bb46 --- /dev/null +++ b/ant-networking/src/network/mod.rs @@ -0,0 +1,9 @@ +//! Network-related functionality and types + +pub mod error; +pub mod record; +pub mod types; + +pub use error::{GetRecordError, NetworkError}; +pub use record::{GetRecordCfg, PutRecordCfg, VerificationKind}; +pub use types::PayeeQuote; diff --git a/ant-networking/src/network/record.rs b/ant-networking/src/network/record.rs new file mode 100644 index 0000000000..4aec49437c --- /dev/null +++ b/ant-networking/src/network/record.rs @@ -0,0 +1,24 @@ +/// Configuration for getting records from the network +#[derive(Debug, Clone)] +pub struct GetRecordCfg { + pub timeout_secs: u64, + pub verification: VerificationKind, +} + +/// Configuration for putting records to the network +#[derive(Debug, Clone)] +pub struct PutRecordCfg { + pub timeout_secs: u64, + pub replication: u8, +} + +/// Verification requirements for records +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum VerificationKind { + /// No verification required + None, + /// Verify record signature + Signature, + /// Verify both signature and data integrity + Full, +} \ No newline at end of file diff --git a/ant-networking/src/network/swarm.rs b/ant-networking/src/network/swarm.rs new file mode 100644 index 0000000000..7b1e65eda3 --- /dev/null +++ b/ant-networking/src/network/swarm.rs @@ -0,0 +1,202 @@ +use crate::{ + event::NodeEvent, + messages::{Request, Response}, + network::{behavior::NodeBehaviour, error::{NetworkError, Result}}, +}; + +use futures::{StreamExt, future::Either}; +use libp2p::{ + swarm::{SwarmEvent, dial_opts::DialOpts, SwarmBuilder, NetworkBehaviour}, + Multiaddr, PeerId, Swarm, +}; +use tokio::sync::mpsc; +use tracing::{debug, error, info, warn}; + +pub struct SwarmDriver { + swarm: Swarm, + command_receiver: mpsc::UnboundedReceiver, +} + +impl SwarmDriver { + pub fn new( + swarm: Swarm, + command_receiver: mpsc::UnboundedReceiver, + ) -> Self { + Self { + swarm, + command_receiver, + } + } + + pub async fn run(mut self) { + loop { + let evt = { + let swarm_next = self.swarm.next(); + let command_next = self.command_receiver.recv(); + + match futures::future::select(Box::pin(swarm_next), Box::pin(command_next)).await { + Either::Left((swarm_event, _)) => { + if let Some(event) = swarm_event { + Some(NetworkEvent::Swarm(event)) + } else { + None + } + } + Either::Right((command, _)) => { + if let Some(cmd) = command { + Some(NetworkEvent::Command(cmd)) + } else { + None + } + } + } + }; + + match evt { + Some(NetworkEvent::Swarm(event)) => { + if let Err(e) = self.handle_swarm_event(event).await { + error!("Error handling swarm event: {:?}", e); + } + } + Some(NetworkEvent::Command(cmd)) => { + if let Err(e) = self.handle_command(cmd).await { + error!("Error handling command: {:?}", e); + } + } + None => { + debug!("No more events to process"); + break; + } + } + } + } + + async fn handle_swarm_event(&mut self, event: SwarmEvent) -> Result<()> { + match event { + SwarmEvent::Behaviour(NodeEvent::Identify(event)) => { + debug!("Identify event: {:?}", event); + } + SwarmEvent::Behaviour(NodeEvent::Kademlia(event)) => { + debug!("Kademlia event: {:?}", event); + } + SwarmEvent::Behaviour(NodeEvent::MsgReceived(event)) => { + debug!("Request/Response event: {:?}", event); + } + SwarmEvent::NewListenAddr { address, .. } => { + info!("Listening on {:?}", address); + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + info!("Connected to {:?}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, .. } => { + warn!("Disconnected from {:?}", peer_id); + } + _ => { + debug!("Other swarm event: {:?}", event); + } + } + Ok(()) + } + + async fn handle_command(&mut self, cmd: LocalSwarmCmd) -> Result<()> { + match cmd { + LocalSwarmCmd::StartListening(addr) => { + if let Err(e) = self.swarm.listen_on(addr) { + error!("Failed to start listening: {:?}", e); + return Err(NetworkError::Other(format!("Failed to listen: {}", e))); + } + } + LocalSwarmCmd::Dial(peer_id, addr) => { + let opts = DialOpts::peer_id(peer_id) + .addresses(vec![addr]) + .build(); + if let Err(e) = self.swarm.dial(opts) { + error!("Failed to dial peer: {:?}", e); + return Err(NetworkError::DialError(peer_id, e.to_string())); + } + } + LocalSwarmCmd::SendRequest(peer_id, request) => { + self.swarm.behaviour_mut().request_response.send_request(&peer_id, request); + } + } + Ok(()) + } +} + +#[derive(Debug)] +pub enum LocalSwarmCmd { + StartListening(Multiaddr), + Dial(PeerId, Multiaddr), + SendRequest(PeerId, Request), +} + +#[derive(Debug)] +enum NetworkEvent { + Swarm(SwarmEvent), + Command(LocalSwarmCmd), +} + +#[cfg(test)] +mod tests { + use super::*; + use libp2p::{ + identity::Keypair, + kad::{self, Config as KadConfig, store::MemoryStore}, + request_response::{self, cbor::Behaviour as CborBehaviour, Config as RequestResponseConfig, ProtocolSupport}, + StreamProtocol, + core::transport::MemoryTransport, + }; + use crate::messages::Request; + + #[tokio::test] + async fn test_swarm_driver() { + let keypair = Keypair::generate_ed25519(); + let peer_id = keypair.public().to_peer_id(); + + // Create behavior components + let kad_config = KadConfig::default(); + let kad_store = MemoryStore::new(peer_id); + let kad = kad::Behaviour::new(peer_id, kad_store, kad_config); + + let identify = identify::Behaviour::new(identify::Config::new( + "test/1.0.0".to_string(), + keypair.public(), + )); + + let req_res_config = RequestResponseConfig::default(); + let protocol = StreamProtocol::new(Request::PROTOCOL_NAME); + let request_response = CborBehaviour::new( + [(protocol, ProtocolSupport::Full)], + req_res_config, + ); + + let behaviour = NodeBehaviour::new( + peer_id, + identify, + kad, + request_response, + ); + + // Create transport and swarm + let transport = MemoryTransport::default(); + let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id).build(); + + // Create command channel + let (cmd_sender, cmd_receiver) = mpsc::unbounded_channel(); + + // Create and run SwarmDriver + let driver = SwarmDriver::new(swarm, cmd_receiver); + + // Run the driver for a short time + tokio::spawn(async move { + driver.run().await; + }); + + // Send a test command + let test_addr = "/memory/1234".parse().unwrap(); + cmd_sender.send(LocalSwarmCmd::StartListening(test_addr)).unwrap(); + + // Wait a bit to let the command be processed + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } +} diff --git a/ant-networking/src/network/tests/integration_tests.rs b/ant-networking/src/network/tests/integration_tests.rs new file mode 100644 index 0000000000..2737733680 --- /dev/null +++ b/ant-networking/src/network/tests/integration_tests.rs @@ -0,0 +1,135 @@ +use super::*; +use libp2p::{ + identity::Keypair, + kad::{Record, RecordKey}, + multiaddr::Protocol, + Multiaddr, +}; +use std::{net::Ipv4Addr, time::Duration}; +use tokio::time::timeout; + +async fn setup_test_nodes() -> (Network, Network, SwarmDriver, SwarmDriver) { + let keypair1 = Keypair::generate_ed25519(); + let keypair2 = Keypair::generate_ed25519(); + + let mut builder1 = NetworkBuilder::new(keypair1, true); + let mut builder2 = NetworkBuilder::new(keypair2, true); + + // Configure nodes to listen on different ports + builder1.listen_addr("127.0.0.1:0".parse().unwrap()); + builder2.listen_addr("127.0.0.1:0".parse().unwrap()); + + let temp_dir = std::env::temp_dir(); + let (network1, _events1, driver1) = builder1.build_node(temp_dir.clone()).await.unwrap(); + let (network2, _events2, driver2) = builder2.build_node(temp_dir).await.unwrap(); + + (network1, network2, driver1, driver2) +} + +#[tokio::test] +async fn test_node_discovery() { + let (network1, network2, mut driver1, mut driver2) = setup_test_nodes().await; + + // Get the listening addresses + let addr1 = driver1.swarm.listeners().next().unwrap().clone(); + let peer1 = driver1.self_peer_id; + + // Connect node2 to node1 + let mut addr = addr1.clone(); + addr.push(Protocol::P2p(peer1.into())); + network2.dial(addr).await.unwrap(); + + // Wait for connection + let timeout_duration = Duration::from_secs(5); + timeout(timeout_duration, async { + loop { + if driver1.peers_in_rt > 0 && driver2.peers_in_rt > 0 { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }) + .await + .unwrap(); + + assert!(driver1.peers_in_rt > 0); + assert!(driver2.peers_in_rt > 0); +} + +#[tokio::test] +async fn test_record_replication() { + let (network1, network2, mut driver1, mut driver2) = setup_test_nodes().await; + + // Connect the nodes + let addr1 = driver1.swarm.listeners().next().unwrap().clone(); + let peer1 = driver1.self_peer_id; + let mut addr = addr1.clone(); + addr.push(Protocol::P2p(peer1.into())); + network2.dial(addr).await.unwrap(); + + // Wait for connection + tokio::time::sleep(Duration::from_secs(1)).await; + + // Create and store a record + let key = RecordKey::new(&[1, 2, 3]); + let value = vec![4, 5, 6]; + let record = Record { + key: key.clone().into_vec(), + value: value.clone(), + publisher: None, + expires: None, + }; + + let put_cfg = PutRecordCfg { + put_quorum: Quorum::One, + retry_strategy: None, + use_put_record_to: None, + verification: None, + }; + + // Store record on node1 + network1.put_record(record.clone(), put_cfg).await.unwrap(); + + // Wait for replication + tokio::time::sleep(Duration::from_secs(2)).await; + + // Try to get record from node2 + let get_cfg = GetRecordCfg { + get_quorum: Quorum::One, + retry_strategy: None, + target_record: None, + expected_holders: Default::default(), + is_register: false, + }; + + let result = network2.get_record(key.clone(), get_cfg).await; + assert!(result.is_ok()); + let retrieved_record = result.unwrap(); + assert_eq!(retrieved_record.value, value); +} + +#[tokio::test] +async fn test_network_metrics() { + let (network1, _network2, driver1, _driver2) = setup_test_nodes().await; + + // Test basic metrics + assert_eq!(driver1.peers_in_rt, 0); + assert!(driver1.hard_disk_write_error == 0); + + // Test connection metrics + assert!(driver1.live_connected_peers.is_empty()); + assert!(driver1.latest_established_connection_ids.is_empty()); + + // More metrics tests... +} + +#[tokio::test] +async fn test_error_recovery() { + let (network1, network2, mut driver1, mut driver2) = setup_test_nodes().await; + + // Test recovery from various error conditions + // - Connection drops + // - Invalid records + // - Network partitions + // ... +} diff --git a/ant-networking/src/network/tests/mod.rs b/ant-networking/src/network/tests/mod.rs new file mode 100644 index 0000000000..6158d00a24 --- /dev/null +++ b/ant-networking/src/network/tests/mod.rs @@ -0,0 +1,90 @@ +use super::*; +use libp2p::identity::Keypair; +use std::{net::SocketAddr, time::Duration}; +use tokio::test; + +mod builder_tests { + use super::*; + + #[test] + async fn test_network_builder_configuration() { + let keypair = Keypair::generate_ed25519(); + let mut builder = NetworkBuilder::new(keypair, false); + + // Test configuration methods + let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap(); + builder.listen_addr(addr); + builder.request_timeout(Duration::from_secs(30)); + builder.concurrency_limit(10); + + // Build and verify configuration + let temp_dir = std::env::temp_dir(); + let (network, events, driver) = builder.build_node(temp_dir).await.unwrap(); + + // Verify configuration was applied + assert_eq!(driver.local, false); + // Add more assertions + } + + #[test] + async fn test_network_builder_errors() { + let keypair = Keypair::generate_ed25519(); + let builder = NetworkBuilder::new(keypair, false); + + // Test invalid configurations + let result = builder.build_node(PathBuf::from("/nonexistent")).await; + assert!(result.is_err()); + } +} + +mod swarm_tests { + use super::*; + + #[test] + async fn test_swarm_event_handling() { + // Setup test swarm + let keypair = Keypair::generate_ed25519(); + let builder = NetworkBuilder::new(keypair, true); + let temp_dir = std::env::temp_dir(); + let (network, mut events, mut driver) = builder.build_node(temp_dir).await.unwrap(); + + // Test event handling + tokio::spawn(async move { + driver.run().await; + }); + + // Verify events are processed correctly + if let Some(event) = events.recv().await { + // Add assertions about event handling + } + } +} + +mod integration_tests { + use super::*; + + #[test] + async fn test_network_communication() { + // Setup two nodes + let keypair1 = Keypair::generate_ed25519(); + let keypair2 = Keypair::generate_ed25519(); + + let builder1 = NetworkBuilder::new(keypair1, true); + let builder2 = NetworkBuilder::new(keypair2, true); + + let temp_dir = std::env::temp_dir(); + let (network1, events1, driver1) = builder1.build_node(temp_dir.clone()).await.unwrap(); + let (network2, events2, driver2) = builder2.build_node(temp_dir).await.unwrap(); + + // Test communication between nodes + tokio::spawn(async move { + driver1.run().await; + }); + + tokio::spawn(async move { + driver2.run().await; + }); + + // Add communication tests + } +} diff --git a/ant-networking/src/network/tests/swarm_tests.rs b/ant-networking/src/network/tests/swarm_tests.rs new file mode 100644 index 0000000000..c583c7240c --- /dev/null +++ b/ant-networking/src/network/tests/swarm_tests.rs @@ -0,0 +1,130 @@ +use super::*; +use libp2p::{ + identity::Keypair, + kad::{store::MemoryStore, Config as KadConfig, Record}, + request_response::{Config as RequestResponseConfig, ProtocolSupport}, + swarm::SwarmBuilder, + StreamProtocol, +}; +use std::time::Duration; +use tokio::sync::mpsc; + +#[tokio::test] +async fn test_swarm_driver_event_handling() { + let keypair = Keypair::generate_ed25519(); + let peer_id = keypair.public().to_peer_id(); + + // Create channels + let (network_cmd_tx, network_cmd_rx) = mpsc::channel(10); + let (local_cmd_tx, local_cmd_rx) = mpsc::channel(10); + let (event_tx, mut event_rx) = mpsc::channel(10); + + // Create test behavior + let store = MemoryStore::new(peer_id); + let kad_config = KadConfig::default(); + let kad_behaviour = kad::Behaviour::new(peer_id, store, kad_config); + + let req_res_config = RequestResponseConfig::default(); + let protocol = StreamProtocol::new("/test/1.0.0"); + let req_res_behaviour = request_response::cbor::Behaviour::::new( + [(protocol, ProtocolSupport::Full)], + req_res_config, + ); + + let identify = identify::Behaviour::new(identify::Config::new( + "/test/1.0.0".to_string(), + keypair.public(), + )); + + let behaviour = NodeBehaviour { + blocklist: Default::default(), + identify, + #[cfg(feature = "local")] + mdns: mdns::tokio::Behaviour::new(mdns::Config::default(), peer_id).unwrap(), + #[cfg(feature = "upnp")] + upnp: libp2p::swarm::behaviour::toggle::Toggle::from(None), + relay_client: libp2p::relay::client::Behaviour::new(peer_id, Default::default()), + relay_server: libp2p::relay::Behaviour::new(peer_id, Default::default()), + kademlia: kad_behaviour, + request_response: req_res_behaviour, + }; + + let transport = libp2p::development_transport(keypair).await.unwrap(); + let swarm = SwarmBuilder::with_tokio_executor(transport, behaviour, peer_id).build(); + + // Create SwarmDriver + let driver = SwarmDriver { + swarm, + self_peer_id: peer_id, + local: false, + is_client: false, + is_behind_home_network: false, + #[cfg(feature = "open-metrics")] + close_group: vec![], + peers_in_rt: 0, + bootstrap: ContinuousNetworkDiscover::new(), + external_address_manager: None, + relay_manager: None, + connected_relay_clients: HashSet::new(), + replication_fetcher: ReplicationFetcher::new(), + #[cfg(feature = "open-metrics")] + metrics_recorder: None, + network_cmd_sender: network_cmd_tx, + local_cmd_sender: local_cmd_tx, + local_cmd_receiver: local_cmd_rx, + network_cmd_receiver: network_cmd_rx, + event_sender: event_tx, + pending_get_closest_peers: HashMap::new(), + pending_requests: HashMap::new(), + pending_get_record: HashMap::new(), + dialed_peers: CircularVec::new(10), + network_discovery: NetworkDiscovery::new(peer_id), + bootstrap_peers: BTreeMap::new(), + live_connected_peers: BTreeMap::new(), + latest_established_connection_ids: HashMap::new(), + handling_statistics: BTreeMap::new(), + handled_times: 0, + hard_disk_write_error: 0, + bad_nodes: BTreeMap::new(), + quotes_history: BTreeMap::new(), + replication_targets: BTreeMap::new(), + last_replication: None, + last_connection_pruning_time: Instant::now(), + network_density_samples: FifoRegister::new(10), + }; + + // Spawn driver + let driver_handle = tokio::spawn(async move { + driver.run().await; + }); + + // Test sending commands and receiving events + tokio::time::sleep(Duration::from_millis(100)).await; + + // Clean up + driver_handle.abort(); +} + +#[tokio::test] +async fn test_peer_discovery_and_connection() { + // Similar setup to above, but test peer discovery + // ... +} + +#[tokio::test] +async fn test_record_storage_and_retrieval() { + // Test storing and retrieving records + // ... +} + +#[tokio::test] +async fn test_error_handling() { + // Test various error scenarios + // ... +} + +#[tokio::test] +async fn test_metrics_recording() { + // Test metrics recording functionality + // ... +} diff --git a/ant-networking/src/network/types.rs b/ant-networking/src/network/types.rs new file mode 100644 index 0000000000..1cbb8b0253 --- /dev/null +++ b/ant-networking/src/network/types.rs @@ -0,0 +1,10 @@ +use libp2p::PeerId; +use std::time::Duration; + +/// Quote information for network payments +#[derive(Debug, Clone)] +pub struct PayeeQuote { + pub peer_id: PeerId, + pub price: u64, + pub expiry: Duration, +} \ No newline at end of file diff --git a/ant-networking/src/tests/config_test.rs b/ant-networking/src/tests/config_test.rs new file mode 100644 index 0000000000..aee678177b --- /dev/null +++ b/ant-networking/src/tests/config_test.rs @@ -0,0 +1,52 @@ +use crate::config::{ + CLOSE_GROUP_SIZE, CONNECTION_KEEP_ALIVE_TIMEOUT, KAD_QUERY_TIMEOUT_S, MAX_PACKET_SIZE, + NETWORKING_CHANNEL_SIZE, NetworkConfig, REQUEST_TIMEOUT_DEFAULT_S, + RELAY_MANAGER_RESERVATION_INTERVAL, RESEND_IDENTIFY_INVERVAL, +}; +use std::time::Duration; + +#[test] +fn test_network_config_creation() { + let custom_config = NetworkConfig { + max_packet_size: 1024, + close_group_size: 4, + request_timeout: Duration::from_secs(30), + connection_keep_alive: Duration::from_secs(10), + kad_query_timeout: Duration::from_secs(15), + channel_size: 50, + }; + + assert_eq!(custom_config.max_packet_size, 1024); + assert_eq!(custom_config.close_group_size, 4); + assert_eq!(custom_config.request_timeout, Duration::from_secs(30)); + assert_eq!(custom_config.connection_keep_alive, Duration::from_secs(10)); + assert_eq!(custom_config.kad_query_timeout, Duration::from_secs(15)); + assert_eq!(custom_config.channel_size, 50); +} + +#[test] +fn test_config_constants() { + assert!(MAX_PACKET_SIZE > 0); + assert!(CLOSE_GROUP_SIZE > 0); + assert!(REQUEST_TIMEOUT_DEFAULT_S > 0); + assert!(CONNECTION_KEEP_ALIVE_TIMEOUT > Duration::from_secs(0)); + assert!(KAD_QUERY_TIMEOUT_S > 0); + assert!(NETWORKING_CHANNEL_SIZE > 0); + assert!(RELAY_MANAGER_RESERVATION_INTERVAL > Duration::from_secs(0)); + assert!(RESEND_IDENTIFY_INVERVAL > Duration::from_secs(0)); +} + +#[test] +fn test_config_reasonable_values() { + // Test that packet size is reasonable (not too small or large) + assert!(MAX_PACKET_SIZE >= 1024); // At least 1KB + assert!(MAX_PACKET_SIZE <= 1024 * 1024 * 10); // Not more than 10MB + + // Test that timeouts are reasonable + assert!(REQUEST_TIMEOUT_DEFAULT_S >= 10); // At least 10 seconds + assert!(REQUEST_TIMEOUT_DEFAULT_S <= 300); // Not more than 5 minutes + + // Test that group size is reasonable + assert!(CLOSE_GROUP_SIZE >= 3); // At least 3 nodes for redundancy + assert!(CLOSE_GROUP_SIZE <= 20); // Not too many nodes +} \ No newline at end of file diff --git a/ant-networking/src/tests/mod.rs b/ant-networking/src/tests/mod.rs new file mode 100644 index 0000000000..c057a366e2 --- /dev/null +++ b/ant-networking/src/tests/mod.rs @@ -0,0 +1,3 @@ +mod types_test; +mod network_test; +mod config_test; \ No newline at end of file diff --git a/ant-networking/src/tests/network_test.rs b/ant-networking/src/tests/network_test.rs new file mode 100644 index 0000000000..79d54c77d1 --- /dev/null +++ b/ant-networking/src/tests/network_test.rs @@ -0,0 +1,86 @@ +use crate::network::{ + error::{GetRecordError, NetworkError}, + record::{GetRecordCfg, PutRecordCfg, VerificationKind}, + types::PayeeQuote, +}; +use libp2p::PeerId; +use std::time::Duration; + +#[test] +fn test_network_error_display() { + let errors = vec![ + NetworkError::Record("missing data".into()), + NetworkError::Connection("timeout".into()), + NetworkError::Other("unknown error".into()), + ]; + + for error in errors { + let display_string = format!("{}", error); + match error { + NetworkError::Record(_) => assert!(display_string.contains("Record error")), + NetworkError::Connection(_) => assert!(display_string.contains("Connection error")), + NetworkError::Other(_) => assert!(display_string.contains("Network error")), + } + } +} + +#[test] +fn test_get_record_error_display() { + let errors = vec![ + GetRecordError::NotFound, + GetRecordError::VerificationFailed("invalid signature".into()), + GetRecordError::Network(NetworkError::Other("network down".into())), + ]; + + for error in errors { + let display_string = format!("{}", error); + match error { + GetRecordError::NotFound => assert!(display_string.contains("Record not found")), + GetRecordError::VerificationFailed(_) => assert!(display_string.contains("Verification failed")), + GetRecordError::Network(_) => assert!(display_string.contains("Network error")), + } + } +} + +#[test] +fn test_record_configs() { + // Test GetRecordCfg + let get_cfg = GetRecordCfg { + timeout_secs: 30, + verification: VerificationKind::Full, + }; + assert_eq!(get_cfg.timeout_secs, 30); + assert_eq!(get_cfg.verification, VerificationKind::Full); + + // Test PutRecordCfg + let put_cfg = PutRecordCfg { + timeout_secs: 60, + replication: 3, + }; + assert_eq!(put_cfg.timeout_secs, 60); + assert_eq!(put_cfg.replication, 3); +} + +#[test] +fn test_verification_kinds() { + assert_ne!(VerificationKind::None, VerificationKind::Signature); + assert_ne!(VerificationKind::None, VerificationKind::Full); + assert_ne!(VerificationKind::Signature, VerificationKind::Full); +} + +#[test] +fn test_payee_quote() { + let peer_id = PeerId::random(); + let price = 100; + let expiry = Duration::from_secs(3600); + + let quote = PayeeQuote { + peer_id, + price, + expiry, + }; + + assert_eq!(quote.peer_id, peer_id); + assert_eq!(quote.price, price); + assert_eq!(quote.expiry, expiry); +} \ No newline at end of file diff --git a/ant-networking/src/tests/types_test.rs b/ant-networking/src/tests/types_test.rs new file mode 100644 index 0000000000..60479d130f --- /dev/null +++ b/ant-networking/src/tests/types_test.rs @@ -0,0 +1,150 @@ +use crate::{ + types::{NetworkAddress, NetworkMetricsRecorder, NodeIssue}, + config::{ + CLOSE_GROUP_SIZE, CONNECTION_KEEP_ALIVE_TIMEOUT, KAD_QUERY_TIMEOUT_S, MAX_PACKET_SIZE, + NETWORKING_CHANNEL_SIZE, NetworkConfig, REQUEST_TIMEOUT_DEFAULT_S, + }, +}; +use libp2p::PeerId; +use std::{sync::Mutex, time::Duration}; + +#[test] +fn test_network_address_creation() { + let peer_id = PeerId::random(); + let distance = 42; + let addr = NetworkAddress::new(peer_id, distance); + + assert_eq!(addr.peer_id, peer_id); + assert_eq!(addr.distance, distance); + assert_eq!(addr.holder_count(), 0); +} + +#[test] +fn test_network_address_holders() { + let mut addr = NetworkAddress::new(PeerId::random(), 42); + let holder1 = PeerId::random(); + let holder2 = PeerId::random(); + + addr.add_holder(holder1); + assert_eq!(addr.holder_count(), 1); + assert!(addr.holders.contains(&holder1)); + + // Adding same holder twice should not increase count + addr.add_holder(holder1); + assert_eq!(addr.holder_count(), 1); + + addr.add_holder(holder2); + assert_eq!(addr.holder_count(), 2); + assert!(addr.holders.contains(&holder2)); +} + +#[test] +fn test_node_issue_display() { + let issues = vec![ + NodeIssue::ConnectionFailed("timeout".into()), + NodeIssue::RecordStoreFailed("disk full".into()), + NodeIssue::NetworkError("connection reset".into()), + ]; + + for issue in issues { + let display_string = format!("{}", issue); + assert!(!display_string.is_empty()); + match issue { + NodeIssue::ConnectionFailed(_) => assert!(display_string.contains("Connection failed")), + NodeIssue::RecordStoreFailed(_) => assert!(display_string.contains("Record store failed")), + NodeIssue::NetworkError(_) => assert!(display_string.contains("Network error")), + } + } +} + +#[test] +fn test_network_config_defaults() { + let config = NetworkConfig::default(); + + assert_eq!(config.max_packet_size, MAX_PACKET_SIZE); + assert_eq!(config.close_group_size, CLOSE_GROUP_SIZE); + assert_eq!(config.request_timeout, Duration::from_secs(REQUEST_TIMEOUT_DEFAULT_S)); + assert_eq!(config.connection_keep_alive, CONNECTION_KEEP_ALIVE_TIMEOUT); + assert_eq!(config.kad_query_timeout, Duration::from_secs(KAD_QUERY_TIMEOUT_S)); + assert_eq!(config.channel_size, NETWORKING_CHANNEL_SIZE); +} + +#[test] +fn test_network_address_empty_holders() { + let addr = NetworkAddress::new(PeerId::random(), 0); + assert!(addr.holders.is_empty()); + assert_eq!(addr.holder_count(), 0); +} + +#[test] +fn test_network_address_multiple_holders() { + let mut addr = NetworkAddress::new(PeerId::random(), 10); + let holders: Vec = (0..5).map(|_| PeerId::random()).collect(); + + for holder in &holders { + addr.add_holder(*holder); + } + + assert_eq!(addr.holder_count(), 5); + for holder in holders { + assert!(addr.holders.contains(&holder)); + } +} + +#[test] +fn test_node_issue_details() { + let error_msg = "test error message"; + + let connection_issue = NodeIssue::ConnectionFailed(error_msg.into()); + let record_issue = NodeIssue::RecordStoreFailed(error_msg.into()); + let network_issue = NodeIssue::NetworkError(error_msg.into()); + + assert!(format!("{}", connection_issue).contains(error_msg)); + assert!(format!("{}", record_issue).contains(error_msg)); + assert!(format!("{}", network_issue).contains(error_msg)); +} + +// Add a mock implementation of NetworkMetricsRecorder for testing +#[derive(Default)] +struct MockMetricsRecorder { + close_group_size: Mutex, + connection_count: Mutex, + record_store_size: Mutex, + last_issue: Mutex>, +} + +impl NetworkMetricsRecorder for MockMetricsRecorder { + fn record_close_group_size(&self, size: usize) { + *self.close_group_size.lock().unwrap() = size; + } + + fn record_connection_count(&self, count: usize) { + *self.connection_count.lock().unwrap() = count; + } + + fn record_record_store_size(&self, size: usize) { + *self.record_store_size.lock().unwrap() = size; + } + + fn record_node_issue(&self, issue: NodeIssue) { + *self.last_issue.lock().unwrap() = Some(issue); + } +} + +#[test] +fn test_metrics_recorder() { + let recorder = MockMetricsRecorder::default(); + + recorder.record_close_group_size(5); + recorder.record_connection_count(10); + recorder.record_record_store_size(100); + recorder.record_node_issue(NodeIssue::ConnectionFailed("test".into())); + + assert_eq!(*recorder.close_group_size.lock().unwrap(), 5); + assert_eq!(*recorder.connection_count.lock().unwrap(), 10); + assert_eq!(*recorder.record_store_size.lock().unwrap(), 100); + assert!(matches!( + *recorder.last_issue.lock().unwrap(), + Some(NodeIssue::ConnectionFailed(_)) + )); +} \ No newline at end of file diff --git a/ant-networking/src/types.rs b/ant-networking/src/types.rs new file mode 100644 index 0000000000..9080c70b38 --- /dev/null +++ b/ant-networking/src/types.rs @@ -0,0 +1,74 @@ +use libp2p::PeerId; +use std::collections::HashSet; + +/// Represents a network address with additional metadata +#[derive(Debug, Clone)] +pub struct NetworkAddress { + /// The peer ID associated with this network address + pub peer_id: PeerId, + /// Set of peers that hold this address + pub holders: HashSet, + /// Distance metric from the local node + pub distance: u32, +} + +impl NetworkAddress { + /// Creates a new NetworkAddress instance + /// + /// # Arguments + /// * `peer_id` - The peer ID for this address + /// * `distance` - Distance metric from local node + pub fn new(peer_id: PeerId, distance: u32) -> Self { + Self { + peer_id, + holders: HashSet::new(), + distance, + } + } + + /// Adds a holder to this network address + /// + /// # Arguments + /// * `holder` - The peer ID of the holder to add + pub fn add_holder(&mut self, holder: PeerId) { + self.holders.insert(holder); + } + + /// Returns the number of holders for this address + pub fn holder_count(&self) -> usize { + self.holders.len() + } +} + +/// Represents issues that can occur with a node +#[derive(Debug, Clone)] +pub enum NodeIssue { + /// Connection to the node failed + ConnectionFailed(String), + /// Failed to store or retrieve records + RecordStoreFailed(String), + /// General network-related error + NetworkError(String), +} + +impl std::fmt::Display for NodeIssue { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NodeIssue::ConnectionFailed(msg) => write!(f, "Connection failed: {}", msg), + NodeIssue::RecordStoreFailed(msg) => write!(f, "Record store failed: {}", msg), + NodeIssue::NetworkError(msg) => write!(f, "Network error: {}", msg), + } + } +} + +/// A trait for handling network metrics +pub trait NetworkMetricsRecorder: Send + Sync { + /// Record the current size of the close group + fn record_close_group_size(&self, size: usize); + /// Record the current number of connections + fn record_connection_count(&self, count: usize); + /// Record the current size of the record store + fn record_record_store_size(&self, size: usize); + /// Record an issue that occurred with a node + fn record_node_issue(&self, issue: NodeIssue); +}