From ce23703abff84995aefc5ea378d09da4660bf017 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 31 Oct 2024 21:55:13 +0530 Subject: [PATCH 1/5] feat: remove listener as external addr on error --- sn_networking/src/event/swarm.rs | 20 ++- sn_networking/src/external_address.rs | 219 ++++++++++++++++---------- 2 files changed, 158 insertions(+), 81 deletions(-) diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index bffdfa425d..95ae4b2d0f 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -300,7 +300,6 @@ impl SwarmDriver { } } } - SwarmEvent::NewListenAddr { mut address, listener_id, @@ -322,7 +321,7 @@ impl SwarmDriver { self.swarm.add_external_address(address.clone()); } else { self.external_address_manager - .add_listen_addr_as_external_address(address.clone(), &mut self.swarm); + .on_new_listen_addr(address.clone(), &mut self.swarm); } } @@ -562,6 +561,23 @@ impl SwarmDriver { event_string = "ExternalAddrExpired"; info!(%address, "external address: expired"); } + SwarmEvent::ExpiredListenAddr { + listener_id, + address, + } => { + event_string = "ExpiredListenAddr"; + info!("Listen address has expired. {listener_id:?} on {address:?}"); + self.external_address_manager + .on_expired_listen_addr(address, &self.swarm); + } + SwarmEvent::ListenerError { listener_id, error } => { + event_string = "ListenerError"; + warn!("ListenerError {listener_id:?} with non-fatal error {error:?}"); + } + SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { + event_string = "NewExternalAddrOfPeer"; + debug!(%peer_id, %address, "New external address of peer"); + } other => { event_string = "Other"; diff --git a/sn_networking/src/external_address.rs b/sn_networking/src/external_address.rs index 4b64f10cc3..acaca7d806 100644 --- a/sn_networking/src/external_address.rs +++ b/sn_networking/src/external_address.rs @@ -13,6 +13,10 @@ use std::{collections::HashMap, net::IpAddr}; /// The maximum number of reports before an candidate address is confirmed const MAX_REPORTS_BEFORE_CONFIRMATION: u8 = 3; +/// The maximum number of reports for a confirmed address before switching to a new IP address +const MAX_REPORTS_BEFORE_SWITCHING_IP: u8 = 10; +/// The maximum number of confirmed addresses needed before switching to a new IP address +const MAX_CONFIRMED_ADDRESSES_BEFORE_SWITCHING_IP: u8 = 5; /// The maximum number of candidates to store const MAX_CANDIDATES: usize = 50; @@ -78,42 +82,37 @@ impl ExternalAddressManager { { state.increment_reports(); - match state { - ExternalAddressState::Candidate { - num_reports, - ip_address, - .. - } => { - if *num_reports >= MAX_REPORTS_BEFORE_CONFIRMATION { - // if the IP address of our confirmed address is the same as the new address, then add it - let confirmed = if let Some(current_ip_address) = self.current_ip_address { - current_ip_address == *ip_address - } else { - true - }; + if state.is_candidate() { + if state.num_reports() >= MAX_REPORTS_BEFORE_CONFIRMATION { + // if the IP address of our confirmed address is the same as the new address, then add it + let confirmed = if let Some(current_ip_address) = self.current_ip_address { + current_ip_address == *state.ip_address() + } else { + true + }; - if confirmed { - info!("External address confirmed, adding it to swarm: {address:?}"); - swarm.add_external_address(address.clone()); - *state = ExternalAddressState::Confirmed { - address: address.clone(), - num_reports: *num_reports, - ip_address: *ip_address, - }; + if confirmed { + info!("External address confirmed, adding it to swarm: {address:?}"); + swarm.add_external_address(address.clone()); + *state = ExternalAddressState::Confirmed { + address: address.clone(), + num_reports: state.num_reports(), + ip_address: *state.ip_address(), + }; - Self::print_swarm_state(swarm); - return; - } else { - debug!( - "External address {address:?} is not confirmed due to mismatched IP address. Checking if we can switch to new IP." - ); - } + Self::print_swarm_state(swarm); + return; + } else { + debug!( + "External address {address:?} is not confirmed due to mismatched IP address. Checking if we can switch to new IP." + ); } } - ExternalAddressState::Confirmed { .. } => { - debug!("External address: {address:?} is already confirmed. Do nothing"); - return; - } + } else { + debug!( + "External address: {address:?} is already confirmed or a listener. Do nothing" + ); + return; } } // check if we need to update to new ip. @@ -129,7 +128,7 @@ impl ExternalAddressManager { } = state { if current_ip_address != *ip_address - && *num_reports >= MAX_REPORTS_BEFORE_CONFIRMATION + && *num_reports >= MAX_REPORTS_BEFORE_SWITCHING_IP { *new_ip_map.entry(ip_address).or_insert(0) += 1; } @@ -139,8 +138,8 @@ impl ExternalAddressManager { if let Some((&&new_ip, count)) = new_ip_map.iter().sorted_by_key(|(_, count)| *count).last() { - if *count >= 3 { - info!("New IP map as count>=3: {new_ip_map:?}"); + if *count >= MAX_CONFIRMED_ADDRESSES_BEFORE_SWITCHING_IP { + info!("New IP map as count>= {MAX_CONFIRMED_ADDRESSES_BEFORE_SWITCHING_IP}: {new_ip_map:?}"); self.switch_to_new_ip(new_ip, swarm); return; } @@ -157,6 +156,7 @@ impl ExternalAddressManager { .iter() .any(|state| state.multiaddr() == &address) { + // incremented in the previous find(). debug!( "External address {address:?} already exists in manager. Report count incremented." ); @@ -177,11 +177,7 @@ impl ExternalAddressManager { /// Adds a non-local listen-addr to the swarm and the manager. /// If the IP address of the listen-addr is different from the current IP address, then we directly /// switch to the new IP address. - pub fn add_listen_addr_as_external_address( - &mut self, - listen_addr: Multiaddr, - swarm: &mut Swarm, - ) { + pub fn on_new_listen_addr(&mut self, listen_addr: Multiaddr, swarm: &mut Swarm) { // only add our global addresses let address = if multiaddr_is_global(&listen_addr) { let Some(address) = self.craft_external_address(&listen_addr) else { @@ -197,14 +193,19 @@ impl ExternalAddressManager { return; }; + // set the current IP address if it is not set + if self.current_ip_address.is_none() { + self.current_ip_address = Some(ip_address); + } + + // Switch to new IP early. if let Some(current_ip_address) = self.current_ip_address { if current_ip_address != ip_address { - // add as candidate with MAX_REPORTS to be confirmed inside switch_to_new_ip - self.address_states.push(ExternalAddressState::Candidate { + self.address_states.push(ExternalAddressState::Listener { address: address.clone(), - num_reports: MAX_REPORTS_BEFORE_CONFIRMATION, ip_address, }); + // this will add it as external addr self.switch_to_new_ip(ip_address, swarm); return; } @@ -218,33 +219,65 @@ impl ExternalAddressManager { match state { ExternalAddressState::Candidate { ip_address, .. } => { info!("Listen Addr was found as a candidate. Adding it as external to the swarm {address:?}"); + swarm.add_external_address(address.clone()); - *state = ExternalAddressState::Confirmed { + *state = ExternalAddressState::Listener { address: address.clone(), - num_reports: MAX_REPORTS_BEFORE_CONFIRMATION, ip_address: *ip_address, }; Self::print_swarm_state(swarm); return; } - ExternalAddressState::Confirmed { .. } => { - debug!("Listen address is already confirmed {address:?}. Do nothing"); + ExternalAddressState::Confirmed { ip_address, .. } => { + debug!("Listen address was found as confirmed. Changing it to Listener {address:?}."); + *state = ExternalAddressState::Listener { + address: address.clone(), + ip_address: *ip_address, + }; + return; + } + ExternalAddressState::Listener { .. } => { + debug!("Listen address is already a listener {address:?}. Do nothing"); return; } } } - // if it is a new one, add it as a confirmed address + // if it is a new one, add it as a Listener info!("Listen Addr was not found in the manager. Adding it as external to the swarm {address:?}"); - self.address_states.push(ExternalAddressState::Confirmed { + self.address_states.push(ExternalAddressState::Listener { address: address.clone(), - num_reports: MAX_REPORTS_BEFORE_CONFIRMATION, ip_address, }); swarm.add_external_address(address); } + /// Remove a listen-addr from the manager if expired. + pub fn on_expired_listen_addr(&mut self, listen_addr: Multiaddr, swarm: &Swarm) { + let address = if multiaddr_is_global(&listen_addr) { + let Some(address) = self.craft_external_address(&listen_addr) else { + error!("Listen address is ill formed, ignoring {listen_addr:?}"); + return; + }; + address + } else { + debug!("Listen address is not global, ignoring: {listen_addr:?}"); + return; + }; + + self.address_states.retain(|state| { + if state.multiaddr() == &address { + debug!("Removing listen address from manager: {address:?}"); + // Todo: should we call swarm.remove_listener()? or is it already removed? Confirm with the below debug. + Self::print_swarm_state(swarm); + false + } else { + true + } + }); + } + /// Switch to a new IP address. The old external addresses are removed and the new ones are added. /// The new IP address is set as the current IP address. fn switch_to_new_ip(&mut self, new_ip: IpAddr, swarm: &mut Swarm) { @@ -253,40 +286,49 @@ impl ExternalAddressManager { // remove all the old confirmed addresses with different ip let mut removed_addresses = Vec::new(); - for state in &mut self.address_states { - if let ExternalAddressState::Confirmed { - address, - ip_address, - .. - } = state - { - if *ip_address != new_ip { - removed_addresses.push(address.clone()); - swarm.remove_external_address(address); - } + let mut to_remove_indices = Vec::new(); + for (idx, state) in &mut self.address_states.iter().enumerate() { + if state.is_candidate() { + continue; + } + + if state.ip_address() != &new_ip { + // todo: should we remove listener from swarm? + swarm.remove_external_address(state.multiaddr()); + removed_addresses.push(state.multiaddr().clone()); + to_remove_indices.push(idx); } } + for idx in to_remove_indices.iter().rev() { + self.address_states.remove(*idx); + } info!("Removed addresses due to change of IP: {removed_addresses:?}"); - self.address_states - .retain(|state| !matches!(state, ExternalAddressState::Confirmed { .. })); - // add the new confirmed addresses with new ip for state in &mut self.address_states { - if let ExternalAddressState::Candidate { - address, - num_reports, - ip_address, - } = state - { - if *ip_address == new_ip && *num_reports >= MAX_REPORTS_BEFORE_CONFIRMATION { - info!("Switching to new IP, adding confirmed address: {address:?}"); - swarm.add_external_address(address.clone()); - *state = ExternalAddressState::Confirmed { - address: address.clone(), - num_reports: *num_reports, - ip_address: *ip_address, - }; + if state.ip_address() == &new_ip { + match state { + ExternalAddressState::Candidate { + address, + num_reports, + ip_address, + } => { + if *num_reports >= MAX_REPORTS_BEFORE_SWITCHING_IP { + info!("Switching to new IP, adding confirmed address: {address:?}"); + swarm.add_external_address(address.clone()); + *state = ExternalAddressState::Confirmed { + address: address.clone(), + num_reports: *num_reports, + ip_address: *ip_address, + }; + } + } + + ExternalAddressState::Listener { address, .. } => { + info!("Switching to new IP, adding listen address as external address {address:?}"); + swarm.add_external_address(address.clone()); + } + _ => {} } } } @@ -311,7 +353,7 @@ impl ExternalAddressManager { Some(output_address) } - fn print_swarm_state(swarm: &mut Swarm) { + fn print_swarm_state(swarm: &Swarm) { let listen_addr = swarm.listeners().collect::>(); info!("All Listen addresses: {listen_addr:?}"); let external_addr = swarm.external_addresses().collect::>(); @@ -331,6 +373,10 @@ enum ExternalAddressState { num_reports: u8, ip_address: IpAddr, }, + Listener { + address: Multiaddr, + ip_address: IpAddr, + }, } impl ExternalAddressState { @@ -338,6 +384,15 @@ impl ExternalAddressState { match self { Self::Candidate { address, .. } => address, Self::Confirmed { address, .. } => address, + Self::Listener { address, .. } => address, + } + } + + fn ip_address(&self) -> &IpAddr { + match self { + Self::Candidate { ip_address, .. } => ip_address, + Self::Confirmed { ip_address, .. } => ip_address, + Self::Listener { ip_address, .. } => ip_address, } } @@ -350,6 +405,7 @@ impl ExternalAddressState { match self { Self::Candidate { num_reports, .. } => *num_reports = num_reports.saturating_add(1), Self::Confirmed { num_reports, .. } => *num_reports = num_reports.saturating_add(1), + Self::Listener { .. } => {} } } @@ -357,6 +413,11 @@ impl ExternalAddressState { match self { Self::Candidate { num_reports, .. } => *num_reports, Self::Confirmed { num_reports, .. } => *num_reports, + Self::Listener { .. } => u8::MAX, } } + + fn is_candidate(&self) -> bool { + matches!(self, Self::Candidate { .. }) + } } From b160676bc65b1f5d6124af102a3b3fe46410278c Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 31 Oct 2024 22:20:51 +0530 Subject: [PATCH 2/5] feat: remove external address on too many connection error --- sn_networking/src/event/swarm.rs | 13 ++- sn_networking/src/external_address.rs | 116 +++++++++++++++++++++++++- sn_networking/src/lib.rs | 7 ++ 3 files changed, 131 insertions(+), 5 deletions(-) diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index 95ae4b2d0f..1a24db8776 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -15,6 +15,7 @@ use libp2p::mdns; #[cfg(feature = "open-metrics")] use libp2p::metrics::Recorder; use libp2p::{ + core::ConnectedPoint, kad::K_VALUE, multiaddr::Protocol, swarm::{ @@ -306,6 +307,9 @@ impl SwarmDriver { } => { event_string = "new listen addr"; + info!("Local node is listening {listener_id:?} on {address:?}"); + println!("Local node is listening on {address:?}"); // TODO: make it print only once + let local_peer_id = *self.swarm.local_peer_id(); // Make sure the address ends with `/p2p/`. In case of relay, `/p2p` is already there. if address.iter().last() != Some(Protocol::P2p(local_peer_id)) { @@ -326,9 +330,6 @@ impl SwarmDriver { } self.send_event(NetworkEvent::NewListenAddr(address.clone())); - - info!("Local node is listening {listener_id:?} on {address:?}"); - println!("Local node is listening on {address:?}"); // TODO: make it print only once } SwarmEvent::ListenerClosed { listener_id, @@ -358,6 +359,10 @@ impl SwarmDriver { } => { event_string = "ConnectionEstablished"; debug!(%peer_id, num_established, ?concurrent_dial_errors, "ConnectionEstablished ({connection_id:?}) in {established_in:?}: {}", endpoint_str(&endpoint)); + if let ConnectedPoint::Listener { local_addr, .. } = &endpoint { + self.external_address_manager + .on_established_incoming_connection(local_addr.clone()); + } let _ = self.live_connected_peers.insert( connection_id, @@ -528,6 +533,8 @@ impl SwarmDriver { } else { debug!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); } + self.external_address_manager + .on_incoming_connection_error(local_addr.clone(), &mut self.swarm); let _ = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); } diff --git a/sn_networking/src/external_address.rs b/sn_networking/src/external_address.rs index acaca7d806..4adb3222b3 100644 --- a/sn_networking/src/external_address.rs +++ b/sn_networking/src/external_address.rs @@ -6,10 +6,13 @@ // KIND, either express or implied. Please review the Licences for the specific language governing // permissions and limitations relating to use of the SAFE Network Software. -use crate::{driver::NodeBehaviour, multiaddr_get_ip, multiaddr_is_global}; +use crate::{driver::NodeBehaviour, multiaddr_get_ip, multiaddr_get_port, multiaddr_is_global}; use itertools::Itertools; use libp2p::{multiaddr::Protocol, Multiaddr, PeerId, Swarm}; -use std::{collections::HashMap, net::IpAddr}; +use std::{ + collections::{HashMap, HashSet}, + net::IpAddr, +}; /// The maximum number of reports before an candidate address is confirmed const MAX_REPORTS_BEFORE_CONFIRMATION: u8 = 3; @@ -28,9 +31,44 @@ const MAX_CANDIDATES: usize = 50; pub struct ExternalAddressManager { /// All the external addresses of the node address_states: Vec, + /// The current IP address of all the external addresses. current_ip_address: Option, /// The peer id of the node peer_id: PeerId, + // Port -> (ok, error) count + connection_stats: HashMap, + // Bad ports + bad_ports: HashSet, +} + +#[derive(Debug, Default)] +struct PortStats { + ok: usize, + error: usize, +} + +impl PortStats { + fn success_rate(&self) -> f64 { + if self.ok + self.error == 0 { + 0.0 + } else { + self.ok as f64 / (self.ok + self.error) as f64 + } + } + + fn is_faulty(&self) -> bool { + // Give the address a chance to prove itself + if self.ok + self.error < 10 { + return false; + } + + // Still give the address a chance to prove itself + if self.ok + self.error < 100 { + return self.success_rate() < 0.5; + } + + self.success_rate() < 0.9 + } } impl ExternalAddressManager { @@ -39,6 +77,8 @@ impl ExternalAddressManager { address_states: Vec::new(), current_ip_address: None, peer_id, + connection_stats: HashMap::new(), + bad_ports: HashSet::new(), } } @@ -75,6 +115,15 @@ impl ExternalAddressManager { return; }; + let Some(port) = multiaddr_get_port(&address) else { + return; + }; + + if self.bad_ports.contains(&port) { + debug!("External address had problem earlier, ignoring: {address:?}"); + return; + } + if let Some(state) = self .address_states .iter_mut() @@ -278,6 +327,65 @@ impl ExternalAddressManager { }); } + pub fn on_incoming_connection_error( + &mut self, + on_address: Multiaddr, + swarm: &mut Swarm, + ) { + let Some(port) = multiaddr_get_port(&on_address) else { + return; + }; + + let stats = self.connection_stats.entry(port).or_default(); + stats.error = stats.error.saturating_add(1); + + if stats.is_faulty() { + info!("Connection on port {port} is considered as faulty. Removing all addresses with this port"); + // remove all the addresses with this port + let mut removed_confirmed = Vec::new(); + let mut removed_candidates = Vec::new(); + let mut to_remove_indices = Vec::new(); + + for (idx, state) in &mut self.address_states.iter().enumerate() { + if state.is_confirmed() || state.is_candidate() { + let Some(state_port) = multiaddr_get_port(state.multiaddr()) else { + continue; + }; + + if state_port == port { + if state.is_confirmed() { + removed_confirmed.push(state.multiaddr().clone()); + } else { + removed_candidates.push(state.multiaddr().clone()); + } + to_remove_indices.push(idx); + } + } + } + for idx in to_remove_indices.iter().rev() { + swarm.remove_external_address(self.address_states[*idx].multiaddr()); + self.address_states.remove(*idx); + } + if !removed_candidates.is_empty() { + debug!("Removed external candidates due to connection errors on port {port}: {removed_candidates:?}"); + } + if !removed_confirmed.is_empty() { + info!("Removed external addresses due to connection errors on port {port}: {removed_confirmed:?}"); + } + Self::print_swarm_state(swarm); + } + } + + /// Reset the incoming connection errors for a port + pub fn on_established_incoming_connection(&mut self, on_address: Multiaddr) { + let Some(port) = multiaddr_get_port(&on_address) else { + return; + }; + + let stats = self.connection_stats.entry(port).or_default(); + stats.ok = stats.ok.saturating_add(1); + } + /// Switch to a new IP address. The old external addresses are removed and the new ones are added. /// The new IP address is set as the current IP address. fn switch_to_new_ip(&mut self, new_ip: IpAddr, swarm: &mut Swarm) { @@ -420,4 +528,8 @@ impl ExternalAddressState { fn is_candidate(&self) -> bool { matches!(self, Self::Candidate { .. }) } + + fn is_confirmed(&self) -> bool { + matches!(self, Self::Confirmed { .. }) + } } diff --git a/sn_networking/src/lib.rs b/sn_networking/src/lib.rs index cd5c513fad..a1c5484cf2 100644 --- a/sn_networking/src/lib.rs +++ b/sn_networking/src/lib.rs @@ -1286,6 +1286,13 @@ pub(crate) fn multiaddr_get_ip(addr: &Multiaddr) -> Option { }) } +pub(crate) fn multiaddr_get_port(addr: &Multiaddr) -> Option { + addr.iter().find_map(|p| match p { + Protocol::Udp(port) => Some(port), + _ => None, + }) +} + pub(crate) fn send_local_swarm_cmd(swarm_cmd_sender: Sender, cmd: LocalSwarmCmd) { let capacity = swarm_cmd_sender.capacity(); From fdccb3f4da29fb4ffe960e15e4a96637ed760ed0 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Mon, 4 Nov 2024 19:02:38 +0530 Subject: [PATCH 3/5] feat(network): enable addr or relay managers when required --- sn_networking/src/driver.rs | 35 ++++++++--- sn_networking/src/event/swarm.rs | 84 +++++++++++++++------------ sn_networking/src/external_address.rs | 2 +- sn_networking/src/relay_manager.rs | 30 +--------- 4 files changed, 76 insertions(+), 75 deletions(-) diff --git a/sn_networking/src/driver.rs b/sn_networking/src/driver.rs index e68415d2dd..c31235ada9 100644 --- a/sn_networking/src/driver.rs +++ b/sn_networking/src/driver.rs @@ -690,11 +690,23 @@ impl NetworkBuilder { let bootstrap = ContinuousNetworkDiscover::new(); let replication_fetcher = ReplicationFetcher::new(peer_id, network_event_sender.clone()); - let mut relay_manager = RelayManager::new(peer_id); - if !is_client { - relay_manager.enable_hole_punching(self.is_behind_home_network); - } - let external_address_manager = ExternalAddressManager::new(peer_id); + + // Enable relay manager for nodes behind home network + let relay_manager = if !is_client && self.is_behind_home_network { + let relay_manager = RelayManager::new(peer_id); + Some(relay_manager) + } else { + info!("Relay manager is disabled for this node."); + None + }; + // Enable external address manager for public nodes and not behind nat + let external_address_manager = if !is_client && !self.local && !self.is_behind_home_network + { + Some(ExternalAddressManager::new(peer_id)) + } else { + info!("External address manager is disabled for this node."); + None + }; let swarm_driver = SwarmDriver { swarm, @@ -707,6 +719,7 @@ impl NetworkBuilder { peers_in_rt: 0, bootstrap, relay_manager, + connected_relay_clients: Default::default(), external_address_manager, replication_fetcher, #[cfg(feature = "open-metrics")] @@ -799,8 +812,10 @@ pub struct SwarmDriver { pub(crate) close_group: Vec, pub(crate) peers_in_rt: usize, pub(crate) bootstrap: ContinuousNetworkDiscover, - pub(crate) external_address_manager: ExternalAddressManager, - pub(crate) relay_manager: RelayManager, + pub(crate) external_address_manager: Option, + pub(crate) relay_manager: Option, + /// The peers that are using our relay service. + pub(crate) connected_relay_clients: HashSet, /// The peers that are closer to our PeerId. Includes self. pub(crate) replication_fetcher: ReplicationFetcher, #[cfg(feature = "open-metrics")] @@ -933,7 +948,11 @@ impl SwarmDriver { } } } - _ = relay_manager_reservation_interval.tick() => self.relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes), + _ = relay_manager_reservation_interval.tick() => { + if let Some(relay_manager) = &mut self.relay_manager { + relay_manager.try_connecting_to_relay(&mut self.swarm, &self.bad_nodes) + } + }, } } } diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index 1a24db8776..0853949ada 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -63,8 +63,10 @@ impl SwarmDriver { relay_peer_id, .. } = *event { - self.relay_manager - .on_successful_reservation_by_client(&relay_peer_id, &mut self.swarm); + if let Some(relay_manager) = self.relay_manager.as_mut() { + relay_manager + .on_successful_reservation_by_client(&relay_peer_id, &mut self.swarm); + } } } #[cfg(feature = "upnp")] @@ -98,11 +100,10 @@ impl SwarmDriver { src_peer_id, renewed: _, } => { - self.relay_manager - .on_successful_reservation_by_server(src_peer_id); + self.connected_relay_clients.insert(src_peer_id); } libp2p::relay::Event::ReservationTimedOut { src_peer_id } => { - self.relay_manager.on_reservation_timeout(src_peer_id); + self.connected_relay_clients.remove(&src_peer_id); } _ => {} } @@ -174,13 +175,15 @@ impl SwarmDriver { .any(|(_ilog2, peers)| peers.contains(&peer_id)); // Do not use an `already relayed` peer as `potential relay candidate`. - if !has_relayed && !is_bootstrap_peer && !self.is_client { - debug!("Adding candidate relay server {peer_id:?}, it's not a bootstrap node"); - self.relay_manager.add_potential_candidates( - &peer_id, - &addrs, - &info.protocols, - ); + if !has_relayed && !is_bootstrap_peer { + if let Some(relay_manager) = self.relay_manager.as_mut() { + debug!("Adding candidate relay server {peer_id:?}, it's not a bootstrap node"); + relay_manager.add_potential_candidates( + &peer_id, + &addrs, + &info.protocols, + ); + } } // When received an identify from un-dialed peer, try to dial it @@ -323,9 +326,13 @@ impl SwarmDriver { // all addresses are effectively external here... // this is needed for Kad Mode::Server self.swarm.add_external_address(address.clone()); + } else if let Some(external_add_manager) = + self.external_address_manager.as_mut() + { + external_add_manager.on_new_listen_addr(address.clone(), &mut self.swarm); } else { - self.external_address_manager - .on_new_listen_addr(address.clone(), &mut self.swarm); + // just for future reference. + warn!("External address manager is not enabled for a public node. This should not happen."); } } @@ -338,8 +345,9 @@ impl SwarmDriver { } => { event_string = "listener closed"; info!("Listener {listener_id:?} with add {addresses:?} has been closed for {reason:?}"); - self.relay_manager - .on_listener_closed(&listener_id, &mut self.swarm); + if let Some(relay_manager) = self.relay_manager.as_mut() { + relay_manager.on_listener_closed(&listener_id, &mut self.swarm); + } } SwarmEvent::IncomingConnection { connection_id, @@ -359,9 +367,11 @@ impl SwarmDriver { } => { event_string = "ConnectionEstablished"; debug!(%peer_id, num_established, ?concurrent_dial_errors, "ConnectionEstablished ({connection_id:?}) in {established_in:?}: {}", endpoint_str(&endpoint)); - if let ConnectedPoint::Listener { local_addr, .. } = &endpoint { - self.external_address_manager - .on_established_incoming_connection(local_addr.clone()); + if let Some(external_addr_manager) = self.external_address_manager.as_mut() { + if let ConnectedPoint::Listener { local_addr, .. } = &endpoint { + external_addr_manager + .on_established_incoming_connection(local_addr.clone()); + } } let _ = self.live_connected_peers.insert( @@ -533,8 +543,10 @@ impl SwarmDriver { } else { debug!("IncomingConnectionError from local_addr:?{local_addr:?}, send_back_addr {send_back_addr:?} on {connection_id:?} with error {error:?}"); } - self.external_address_manager - .on_incoming_connection_error(local_addr.clone(), &mut self.swarm); + if let Some(external_addr_manager) = self.external_address_manager.as_mut() { + external_addr_manager + .on_incoming_connection_error(local_addr.clone(), &mut self.swarm); + } let _ = self.live_connected_peers.remove(&connection_id); self.record_connection_metrics(); } @@ -548,16 +560,8 @@ impl SwarmDriver { SwarmEvent::NewExternalAddrCandidate { address } => { event_string = "NewExternalAddrCandidate"; - if !self.is_client - // If we are behind a home network, then our IP is returned here. We should be only having - // relay server as our external address - // todo: can our relay address be reported here? If so, maybe we should add them. - && !self.is_behind_home_network - // When running a local network, we just need the local listen address to work. - && !self.local - { - self.external_address_manager - .add_external_address_candidate(address, &mut self.swarm); + if let Some(external_addr_manager) = self.external_address_manager.as_mut() { + external_addr_manager.add_external_address_candidate(address, &mut self.swarm); } } SwarmEvent::ExternalAddrConfirmed { address } => { @@ -574,17 +578,14 @@ impl SwarmDriver { } => { event_string = "ExpiredListenAddr"; info!("Listen address has expired. {listener_id:?} on {address:?}"); - self.external_address_manager - .on_expired_listen_addr(address, &self.swarm); + if let Some(external_addr_manager) = self.external_address_manager.as_mut() { + external_addr_manager.on_expired_listen_addr(address, &self.swarm); + } } SwarmEvent::ListenerError { listener_id, error } => { event_string = "ListenerError"; warn!("ListenerError {listener_id:?} with non-fatal error {error:?}"); } - SwarmEvent::NewExternalAddrOfPeer { peer_id, address } => { - event_string = "NewExternalAddrOfPeer"; - debug!(%peer_id, %address, "New external address of peer"); - } other => { event_string = "Other"; @@ -659,7 +660,14 @@ impl SwarmDriver { } // skip if the peer is a relay server that we're connected to - if self.relay_manager.keep_alive_peer(peer_id) { + if let Some(relay_manager) = self.relay_manager.as_ref() { + if relay_manager.keep_alive_peer(peer_id) { + return true; // retain peer + } + } + + // skip if the peer is a node that is being relayed through us + if self.connected_relay_clients.contains(peer_id) { return true; // retain peer } diff --git a/sn_networking/src/external_address.rs b/sn_networking/src/external_address.rs index 4adb3222b3..79b8f3a9a7 100644 --- a/sn_networking/src/external_address.rs +++ b/sn_networking/src/external_address.rs @@ -24,7 +24,7 @@ const MAX_CONFIRMED_ADDRESSES_BEFORE_SWITCHING_IP: u8 = 5; const MAX_CANDIDATES: usize = 50; /// Manages the external addresses of a Public node. For a relayed node, the RelayManager should deal with -/// adding and removing external addresses. We don't manage "local" addresses here. +/// adding and removing external addresses. Also, we don't manage "local" addresses here. // TODO: // 1. if the max candidate is reached, kick out the oldest candidate sorted by # of reports #[derive(Debug)] diff --git a/sn_networking/src/relay_manager.rs b/sn_networking/src/relay_manager.rs index 8628b08151..92a1fb8888 100644 --- a/sn_networking/src/relay_manager.rs +++ b/sn_networking/src/relay_manager.rs @@ -23,14 +23,11 @@ pub(crate) fn is_a_relayed_peer(addrs: &HashSet) -> bool { .any(|multiaddr| multiaddr.iter().any(|p| matches!(p, Protocol::P2pCircuit))) } -/// To manager relayed connections. +/// Manage the relay servers that we are connected to. +/// This is the client side of the relay server protocol. #[derive(Debug)] pub(crate) struct RelayManager { self_peer_id: PeerId, - // server states - reserved_by: HashSet, - // client states - enable_client: bool, candidates: VecDeque<(PeerId, Multiaddr)>, waiting_for_reservation: BTreeMap, connected_relays: BTreeMap, @@ -43,8 +40,6 @@ impl RelayManager { pub(crate) fn new(self_peer_id: PeerId) -> Self { Self { self_peer_id, - reserved_by: Default::default(), - enable_client: false, connected_relays: Default::default(), waiting_for_reservation: Default::default(), candidates: Default::default(), @@ -52,17 +47,10 @@ impl RelayManager { } } - pub(crate) fn enable_hole_punching(&mut self, enable: bool) { - info!("Setting relay client mode to {enable:?}"); - self.enable_client = enable; - } - /// Should we keep this peer alive? Closing a connection to that peer would remove that server from the listen addr. pub(crate) fn keep_alive_peer(&self, peer_id: &PeerId) -> bool { self.connected_relays.contains_key(peer_id) || self.waiting_for_reservation.contains_key(peer_id) - // but servers provide connections to bad nodes. - || self.reserved_by.contains(peer_id) } /// Add a potential candidate to the list if it satisfies all the identify checks and also supports the relay server @@ -100,10 +88,6 @@ impl RelayManager { swarm: &mut Swarm, bad_nodes: &BadNodes, ) { - if !self.enable_client { - return; - } - if self.connected_relays.len() >= MAX_CONCURRENT_RELAY_CONNECTIONS || self.candidates.is_empty() { @@ -159,16 +143,6 @@ impl RelayManager { } } - /// Update relay server state on incoming reservation from a client - pub(crate) fn on_successful_reservation_by_server(&mut self, peer_id: PeerId) { - self.reserved_by.insert(peer_id); - } - - /// Update relay server state on reservation timeout - pub(crate) fn on_reservation_timeout(&mut self, peer_id: PeerId) { - self.reserved_by.remove(&peer_id); - } - /// Update client state after we've successfully made reservation with a relay. pub(crate) fn on_successful_reservation_by_client( &mut self, From 339189219caf40a2734088dd1e5d77f2fc8db2b5 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Sat, 9 Nov 2024 20:19:37 +0530 Subject: [PATCH 4/5] feat(network): add ws addresses as external address --- sn_networking/src/external_address.rs | 30 ++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/sn_networking/src/external_address.rs b/sn_networking/src/external_address.rs index 79b8f3a9a7..ad71dd2c16 100644 --- a/sn_networking/src/external_address.rs +++ b/sn_networking/src/external_address.rs @@ -443,7 +443,10 @@ impl ExternalAddressManager { Self::print_swarm_state(swarm); } - /// Craft a proper address to avoid any ill formed addresses + /// Craft a proper address Ws or Quic address to avoid any ill formed addresses + /// Example: + /// /ip4/131.131.131.131/tcp/53620/ws/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5 + /// /ip4/131.131.131.131/udp/53620/quic-v1/p2p/12D3KooWD2aV1f3qkhggzEFaJ24CEFYkSdZF5RKoMLpU6CwExYV5 fn craft_external_address(&self, given_address: &Multiaddr) -> Option { let mut output_address = Multiaddr::empty(); @@ -451,11 +454,28 @@ impl ExternalAddressManager { .iter() .find(|protocol| matches!(protocol, Protocol::Ip4(_)))?; output_address.push(ip); - let port = given_address + + if let Some(ws_protocol) = given_address + .iter() + .find(|protocol| matches!(protocol, Protocol::Ws(_))) + { + let port = given_address + .iter() + .find(|protocol| matches!(protocol, Protocol::Tcp(_)))?; + output_address.push(port); + output_address.push(ws_protocol); + } else if given_address .iter() - .find(|protocol| matches!(protocol, Protocol::Udp(_)))?; - output_address.push(port); - output_address.push(Protocol::QuicV1); + .any(|protocol| matches!(protocol, Protocol::QuicV1)) + { + let port = given_address + .iter() + .find(|protocol| matches!(protocol, Protocol::Udp(_)))?; + output_address.push(port); + output_address.push(Protocol::QuicV1); + } else { + return None; + } output_address.push(Protocol::P2p(self.peer_id)); Some(output_address) From 55f413f4b47da812d575f165bdfd5a7634dc8fc2 Mon Sep 17 00:00:00 2001 From: Roland Sherwin Date: Thu, 14 Nov 2024 05:04:13 +0530 Subject: [PATCH 5/5] chore: remove listen address print statement --- sn_networking/src/event/swarm.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/sn_networking/src/event/swarm.rs b/sn_networking/src/event/swarm.rs index 0853949ada..f4d7db952f 100644 --- a/sn_networking/src/event/swarm.rs +++ b/sn_networking/src/event/swarm.rs @@ -311,7 +311,6 @@ impl SwarmDriver { event_string = "new listen addr"; info!("Local node is listening {listener_id:?} on {address:?}"); - println!("Local node is listening on {address:?}"); // TODO: make it print only once let local_peer_id = *self.swarm.local_peer_id(); // Make sure the address ends with `/p2p/`. In case of relay, `/p2p` is already there.