From 7101a91e92b5b5384e1eab4e302cfef481319cbc Mon Sep 17 00:00:00 2001 From: bayk Date: Tue, 17 Dec 2024 15:49:30 -0800 Subject: [PATCH] Peers exchange/handling improvements --- p2p/src/peers.rs | 37 +++++++++-------- p2p/src/protocol.rs | 17 +++++--- p2p/src/serv.rs | 14 ++++++- p2p/src/store.rs | 33 +++++++++++---- p2p/src/types.rs | 9 ++++ servers/src/mwc/seed.rs | 60 ++++++++++++-------------- servers/src/mwc/sync/body_sync.rs | 67 +++++++++++++++--------------- servers/src/mwc/sync/sync_peers.rs | 14 ++----- servers/src/mwc/sync/syncer.rs | 22 +++++++++- 9 files changed, 161 insertions(+), 112 deletions(-) diff --git a/p2p/src/peers.rs b/p2p/src/peers.rs index db5f26ca9..6d0c212a2 100644 --- a/p2p/src/peers.rs +++ b/p2p/src/peers.rs @@ -143,7 +143,7 @@ impl Peers { ban_reason: ReasonForBan::None, last_connected: Utc::now().timestamp(), }; - debug!("Adding newly connected peer {}.", peer_data.addr); + info!("Adding newly connected Healthy peer {}.", peer_data.addr); peers.insert(peer_data.addr.clone(), peer); } if let Err(e) = self.save_peer(&peer_data) { @@ -164,7 +164,7 @@ impl Peers { ban_reason, last_connected: Utc::now().timestamp(), }; - debug!("Banning peer {}, ban_reason={:?}", addr, ban_reason); + info!("Banning peer {}, ban_reason={:?}", addr, ban_reason); self.save_peer(&peer_data) } @@ -223,7 +223,7 @@ impl Peers { message: &str, ) -> Result<(), Error> { info!( - "Trying to ban peer {}, ban_reason {:?}, {}", + "Banning peer {}, ban_reason {:?}, {}", peer_addr, ban_reason, message ); // Update the peer in peers db @@ -232,7 +232,10 @@ impl Peers { // Update the peer in the peers Vec match self.get_connected_peer(peer_addr) { Some(peer) => { - info!("Banning peer {}, ban_reason {:?}", peer_addr, ban_reason); + debug!( + "Updating online peer with Ban {}, ban_reason {:?}", + peer_addr, ban_reason + ); // setting peer status will get it removed at the next clean_peer peer.send_ban_reason(ban_reason)?; peer.set_banned(); @@ -372,8 +375,8 @@ impl Peers { } /// Find peers in store (not necessarily connected) and return their data - pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec { - match self.store.find_peers(state, cap, count) { + pub fn find_peers(&self, state: State, cap: Capabilities) -> Vec { + match self.store.find_peers(state, cap) { Ok(peers) => peers, Err(e) => { error!("failed to find peers: {:?}", e); @@ -428,15 +431,15 @@ impl Peers { for peer in self.iter() { let ref peer: &Peer = peer.as_ref(); if peer.is_banned() { - debug!("clean_peers {:?}, peer banned", peer.info.addr); + info!("clean_peers {:?}, peer banned", peer.info.addr); rm.push(peer.info.addr.clone()); } else if !peer.is_connected() { - debug!("clean_peers {:?}, not connected", peer.info.addr); + info!("clean_peers {:?}, not connected", peer.info.addr); rm.push(peer.info.addr.clone()); } else if peer.is_abusive() { let received = peer.tracker().received_bytes.read().count_per_min(); let sent = peer.tracker().sent_bytes.read().count_per_min(); - debug!( + info!( "clean_peers {:?}, abusive ({} sent, {} recv)", peer.info.addr, sent, received, ); @@ -447,7 +450,7 @@ impl Peers { match self.adapter.total_difficulty() { Ok(total_difficulty) => { if stuck && diff < total_difficulty { - debug!("clean_peers {:?}, stuck peer", peer.info.addr); + info!("clean_peers {:?}, stuck peer", peer.info.addr); let _ = self.update_state(&peer.info.addr, State::Defunct); rm.push(peer.info.addr.clone()); } @@ -612,11 +615,6 @@ impl ChainAdapter for Peers { if !self.adapter.block_received(b, peer_info, opts)? { // if the peer sent us a block that's intrinsically bad // they are either mistaken or malevolent, both of which require a ban - debug!( - "Received a bad block {} from {}, the peer will be banned", - hash, - peer_info.addr.clone(), - ); self.ban_peer( &peer_info.addr, ReasonForBan::BadBlock, @@ -642,7 +640,6 @@ impl ChainAdapter for Peers { "Received a bad compact block {} from {}, the peer will be banned", hash, peer_info.addr ); - debug!("{}", msg); self.ban_peer(&peer_info.addr, ReasonForBan::BadCompactBlock, &msg) .map_err(|e| chain::Error::Other(format!("ban peer error {}", e)))?; Ok(false) @@ -832,7 +829,11 @@ impl NetAdapter for Peers { /// Find good peers we know with the provided capability and return their /// addresses. fn find_peer_addrs(&self, capab: Capabilities) -> Vec { - let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize); + let peers: Vec = self + .find_peers(State::Healthy, capab) + .into_iter() + .take(MAX_PEER_ADDRS as usize) + .collect(); trace!("find_peer_addrs: {} healthy peers picked", peers.len()); map_vec!(peers, |p| p.addr.clone()) } @@ -854,7 +855,7 @@ impl NetAdapter for Peers { flags: State::Healthy, last_banned: 0, ban_reason: ReasonForBan::None, - last_connected: Utc::now().timestamp(), + last_connected: 0, }; to_save.push(peer); } diff --git a/p2p/src/protocol.rs b/p2p/src/protocol.rs index dee1c792b..60e122417 100644 --- a/p2p/src/protocol.rs +++ b/p2p/src/protocol.rs @@ -23,7 +23,7 @@ use crate::msg::{ SegmentRequest, SegmentResponse, StartHeadersHashResponse, TxHashSetArchive, Type, }; use crate::serv::Server; -use crate::types::{Error, NetAdapter, PeerAddr, PeerAddr::Onion, PeerInfo}; +use crate::types::{Error, NetAdapter, PeerAddr, PeerInfo}; use std::sync::Arc; pub struct Protocol { @@ -199,9 +199,12 @@ impl MessageHandler for Protocol { } Message::GetPeerAddrs(get_peers) => { - let peers = + let mut peers = adapter.find_peer_addrs(get_peers.capabilities & !Capabilities::TOR_ADDRESS); + // Loopbacks really not interesting for other peers. Loopbacks are possible because of the local setup + peers.retain(|p| !p.is_loopback()); + // if this peer does not support TOR, do not send them the tor peers. // doing so will cause them to ban us because it's not part of the old protocol. let peers = if !get_peers.capabilities.contains(Capabilities::TOR_ADDRESS) { @@ -230,7 +233,7 @@ impl MessageHandler for Protocol { let mut peers: Vec = Vec::new(); for peer in peer_addrs.peers { match peer.clone() { - Onion(address) => { + PeerAddr::Onion(address) => { let self_address = self.server.self_onion_address.as_ref(); if self_address.is_none() { peers.push(peer); @@ -242,8 +245,12 @@ impl MessageHandler for Protocol { } } } - _ => { - peers.push(peer); + PeerAddr::Ip(_) => { + if peer.is_loopback() { + debug!("Not pushing loopback addresse = {:?}", peer); + } else { + peers.push(peer); + } } } } diff --git a/p2p/src/serv.rs b/p2p/src/serv.rs index 635a1d5ab..7ce483a52 100644 --- a/p2p/src/serv.rs +++ b/p2p/src/serv.rs @@ -38,6 +38,7 @@ use crate::types::{ }; use crate::util::secp::pedersen::RangeProof; use crate::util::StopState; +use crate::PeerAddr::Ip; use mwc_chain::txhashset::Segmenter; use mwc_chain::SyncState; @@ -210,7 +211,12 @@ impl Server { } debug!("not self, connecting to {}", address); } - _ => {} + Ip(_) => { + if addr.is_loopback() { + debug!("error trying to connect with self: {:?}", addr); + return Err(Error::PeerWithSelf); + } + } } } @@ -339,7 +345,11 @@ impl Server { self.sync_state.clone(), self.clone(), )?; - self.peers.add_connected(Arc::new(peer))?; + // if we are using TOR, we must reject the local addressed because it comes from the proxy + // Will add peer after it share the TOR address + if self.self_onion_address.is_none() { + self.peers.add_connected(Arc::new(peer))?; + } Ok(()) } diff --git a/p2p/src/store.rs b/p2p/src/store.rs index c3494570f..b49680e5b 100644 --- a/p2p/src/store.rs +++ b/p2p/src/store.rs @@ -17,12 +17,12 @@ use chrono::Utc; use num::FromPrimitive; -use rand::seq::SliceRandom; use rand::thread_rng; use crate::mwc_core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer}; use crate::types::{Capabilities, PeerAddr, ReasonForBan}; use mwc_store::{self, option_to_not_found, to_key, Error}; +use mwc_util::secp::rand::Rng; const DB_NAME: &str = "peerV2"; const STORE_SUBPATH: &str = "peers"; @@ -168,12 +168,7 @@ impl PeerStore { } /// Find some peers in our local db. - pub fn find_peers( - &self, - state: State, - cap: Capabilities, - count: usize, - ) -> Result, Error> { + pub fn find_peers(&self, state: State, cap: Capabilities) -> Result, Error> { // All new peers has flags Capabilities::UNKNOWN, that is why we better to return themn as well. // Node will try to connect to them and find the capability. let mut peers = self @@ -183,8 +178,20 @@ impl PeerStore { && (p.capabilities == Capabilities::UNKNOWN || p.capabilities.contains(cap)) }) .collect::>(); - peers[..].shuffle(&mut thread_rng()); - Ok(peers.iter().take(count).cloned().collect()) + // We want last used to go first. + let peers_num = peers.len(); + if peers_num > 1 { + peers.sort_by_key(|p| -p.last_connected); + // Then shuffle most of them + let shuffle_steps = peers_num / 4; + let mut rng = thread_rng(); + for _ in 0..shuffle_steps { + let i1 = rng.gen_range(0, peers_num); + let i2 = rng.gen_range(0, peers_num); + peers.swap(i1, i2); + } + } + Ok(peers) } /// Iterator over all known peers. @@ -213,6 +220,14 @@ impl PeerStore { batch.get_ser::(&peer_key(peer_addr)[..], None), || format!("Peer at address: {}", peer_addr), )?; + + if peer.flags != new_state { + info!( + "Changing peer {:?} state form {:?} to {:?}", + peer_addr, peer.flags, new_state + ); + } + peer.flags = new_state; if new_state == State::Banned { peer.last_banned = Utc::now().timestamp(); diff --git a/p2p/src/types.rs b/p2p/src/types.rs index cfad2a96a..de4a2e2c3 100644 --- a/p2p/src/types.rs +++ b/p2p/src/types.rs @@ -350,6 +350,15 @@ impl PeerAddr { } } } + + pub fn is_loopback(&self) -> bool { + match self { + Ip(ip) => ip.ip().is_loopback(), + Onion(_) => { + false // we can't detect self onion address here in any case + } + } + } } /// Configuration for the peer-to-peer server. diff --git a/servers/src/mwc/seed.rs b/servers/src/mwc/seed.rs index 516dcee69..e4caf6eb6 100644 --- a/servers/src/mwc/seed.rs +++ b/servers/src/mwc/seed.rs @@ -44,7 +44,7 @@ const PEERS_CHECK_TIME_BOOST: i64 = 3; const PEERS_MONITOR_INTERVAL: i64 = 60; const PEER_RECONNECT_INTERVAL: i64 = 600; -const PEER_MAX_INITIATE_CONNECTIONS: usize = 200; +const PEER_MAX_INITIATE_CONNECTIONS: usize = 50; const PEER_PING_INTERVAL: i64 = 10; @@ -130,7 +130,6 @@ pub fn connect_and_monitor( p2p_server.config.clone(), use_tor_connection, tx.clone(), - peers.is_boosting_mode(), ); if peers.is_sync_mode() { @@ -188,7 +187,6 @@ fn monitor_peers( config: p2p::P2PConfig, use_tor_connection: bool, tx: mpsc::Sender, - is_boost: bool, ) { // regularly check if we need to acquire more peers and if so, gets // them from db @@ -296,20 +294,20 @@ fn monitor_peers( // and queue them up for a connection attempt // intentionally make too many attempts (2x) as some (most?) will fail // as many nodes in our db are not publicly accessible - let max_peer_attempts = if is_boost { 500 } else { 128 }; - let new_peers = peers.find_peers( - p2p::State::Healthy, - boost_peers_capabilities, - max_peer_attempts as usize, - ); + let new_peers = peers.find_peers(p2p::State::Healthy, boost_peers_capabilities); // Only queue up connection attempts for candidate peers where we // are confident we do not yet know about this peer. // The call to is_known() may fail due to contention on the peers map. // Do not attempt any connection where is_known() fails for any reason. + let mut max_addresses = 0; for p in new_peers { if let Ok(false) = peers.is_known(&p.addr) { tx.send(p.addr.clone()).unwrap(); + max_addresses += 1; + if max_addresses > 200 { + break; + } } } } @@ -345,10 +343,9 @@ fn connect_to_seeds_and_peers( let mut found_peers = peers.find_peers( p2p::State::Healthy, p2p::Capabilities::PEER_LIST | peers.get_boost_peers_capabilities(), - 100, ); if found_peers.is_empty() { - found_peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST, 100); + found_peers = peers.find_peers(p2p::State::Healthy, p2p::Capabilities::PEER_LIST); } // if so, get their addresses, otherwise use our seeds @@ -366,9 +363,14 @@ fn connect_to_seeds_and_peers( } // connect to this initial set of peer addresses (either seeds or from our local db). + let mut max_addresses = 0; for addr in peer_addrs { if !peers_deny.as_slice().contains(&addr) { let _ = tx.send(addr); + max_addresses += 1; + if max_addresses > 200 { + break; + } } } } @@ -392,33 +394,33 @@ fn listen_for_addrs( // It is expected that peers are come with expected capabilites { let mut addrs: Vec = rx.try_iter().collect(); + if listen_q_addrs.len() > PEER_MAX_INITIATE_CONNECTIONS * 5 { + listen_q_addrs.drain(0..listen_q_addrs.len() - PEER_MAX_INITIATE_CONNECTIONS * 5); + } listen_q_addrs.append(&mut addrs); } + let now = Utc::now(); + let connection_time_limit = now - Duration::seconds(PEER_RECONNECT_INTERVAL); + connecting_history.retain(|_, time| *time > connection_time_limit); + + listen_q_addrs + .retain(|p| !(peers.is_known(p).unwrap_or(false) || connecting_history.contains_key(p))); + // If we have a healthy number of outbound peers then we are done here. debug_assert!(!peers.enough_outbound_peers()); - let now = Utc::now(); while !listen_q_addrs.is_empty() { debug_assert!(connections_in_action.load(Ordering::Relaxed) >= 0); - if connecting_history.len() as i32 + connections_in_action.load(Ordering::Relaxed) - > PEER_MAX_INITIATE_CONNECTIONS as i32 - { + if connections_in_action.load(Ordering::Relaxed) > PEER_MAX_INITIATE_CONNECTIONS as i32 { break; } let addr = listen_q_addrs.pop().expect("listen_q_addrs is not empty"); // listen_q_addrs can have duplicated requests or already processed, so still need to dedup - if let Some(last_connect_time) = connecting_history.get(&addr) { - if *last_connect_time + Duration::seconds(PEER_RECONNECT_INTERVAL) > now { - debug!( - "peer_connect: ignore a duplicate request to {}. previous connecting time: {}", - addr, - last_connect_time.format("%H:%M:%S%.3f").to_string(), - ); - continue; - } + if peers.is_known(&addr).unwrap_or(false) || connecting_history.contains_key(&addr) { + continue; } connecting_history.insert(addr.clone(), now); @@ -490,7 +492,7 @@ fn listen_for_addrs( let _ = peers_c.update_state(&addr_c, p2p::State::Healthy); } Err(e) => { - debug!("Connection to the peer {} was rejected, {}", addr_c, e); + info!("Connection to the peer {} was rejected, {}", addr_c, e); let _ = peers_c.update_state(&addr_c, p2p::State::Defunct); } } @@ -499,14 +501,6 @@ fn listen_for_addrs( }) .expect("failed to launch peer_connect thread"); } - - // shrink the connecting history. - // put a threshold here to avoid frequent shrinking in every call - if connecting_history.len() > PEER_MAX_INITIATE_CONNECTIONS * 10 { - let now = Utc::now(); - connecting_history - .retain(|_, time| *time + Duration::seconds(PEER_RECONNECT_INTERVAL) > now); - } } pub fn default_dns_seeds() -> Box Vec + Send> { diff --git a/servers/src/mwc/sync/body_sync.rs b/servers/src/mwc/sync/body_sync.rs index 4d657433c..8a68915cf 100644 --- a/servers/src/mwc/sync/body_sync.rs +++ b/servers/src/mwc/sync/body_sync.rs @@ -268,43 +268,44 @@ impl BodySync { peers: &Arc, sync_peers: &SyncPeers, ) { + // Applying valid only to initiated requests. Others and duplicated shouldn't affect this workflow if let Some(peer_adr) = self.request_tracker.remove_request(block_hash, peer) { - if valid_block { - if peer_adr == *peer { + if peer_adr == *peer { + if valid_block { sync_peers.report_ok_response(peer); + } else { + sync_peers.report_error_response( + peer, + format!("Get bad block {} for peer {}", block_hash, peer), + ); } - } - } - - if !valid_block { - sync_peers.report_error_response( - peer, - format!("Get bad block {} for peer {}", block_hash, peer), - ); - } - // let's request next package since we get this one... - if self.request_tracker.get_update_requests_to_next_ask() == 0 { - if let Ok(head) = self.chain.head() { - let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers( - peers, - self.pibd_params.get_blocks_request_per_peer(), - *self.required_capabilities.read(), - head.height, - &self.request_tracker, - ); - if !peers.is_empty() { - // requested_blocks, check for expiration - let mut need_request = self.request_tracker.calculate_needed_requests( - peers.len(), - excluded_requests as usize, - excluded_peers as usize, - self.pibd_params.get_blocks_request_per_peer(), - self.pibd_params.get_blocks_request_limit(), - ); - if need_request > 0 { - if let Err(e) = self.send_requests(&mut need_request, &peers, sync_peers) { - error!("Unable to call send_requests, error: {}", e); + // let's request next package since we get this one... + if self.request_tracker.get_update_requests_to_next_ask() == 0 { + if let Ok(head) = self.chain.head() { + let (peers, excluded_requests, excluded_peers) = sync_utils::get_sync_peers( + peers, + self.pibd_params.get_blocks_request_per_peer(), + *self.required_capabilities.read(), + head.height, + &self.request_tracker, + ); + if !peers.is_empty() { + // requested_blocks, check for expiration + let mut need_request = self.request_tracker.calculate_needed_requests( + peers.len(), + excluded_requests as usize, + excluded_peers as usize, + self.pibd_params.get_blocks_request_per_peer(), + self.pibd_params.get_blocks_request_limit(), + ); + if need_request > 0 { + if let Err(e) = + self.send_requests(&mut need_request, &peers, sync_peers) + { + error!("Unable to call send_requests, error: {}", e); + } + } } } } diff --git a/servers/src/mwc/sync/sync_peers.rs b/servers/src/mwc/sync/sync_peers.rs index 28207a9bb..e9954cf78 100644 --- a/servers/src/mwc/sync/sync_peers.rs +++ b/servers/src/mwc/sync/sync_peers.rs @@ -16,7 +16,6 @@ // Normally we would put that into the base class, but rust doesn't support that. use mwc_p2p::{PeerAddr, Peers, ReasonForBan}; -use mwc_util::secp::rand::Rng; use mwc_util::RwLock; use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Arc; @@ -163,25 +162,18 @@ impl SyncPeers { let mut offline_peers: Vec = Vec::new(); for cp in check_peers.iter() { if let Some(status) = peers_status.get_mut(cp) { - let (mut ban, offline, comment) = status.check_for_ban(cp); + let (ban, offline, comment) = status.check_for_ban(cp); let peer_addr = PeerAddr::from_str(cp); - if offline { - // let's ban some offlines with a chances... - let mut rng = rand::thread_rng(); - if rng.gen_range(0, 7) == 3 { - ban = true; - } - } if ban { if let Err(e) = peers.ban_peer(&peer_addr, ReasonForBan::PibdFailure, &comment) { warn!("ban_peer is failed with error: {}", e); } + status.reset(); self.banned_peers.write().insert(peer_addr.clone()); } - if ban || offline { + if offline { offline_peers.push(peer_addr); - status.reset(); } } } diff --git a/servers/src/mwc/sync/syncer.rs b/servers/src/mwc/sync/syncer.rs index 2e013a216..ea35a4856 100644 --- a/servers/src/mwc/sync/syncer.rs +++ b/servers/src/mwc/sync/syncer.rs @@ -18,7 +18,8 @@ use crate::mwc::sync::sync_manager::SyncManager; use crate::mwc::sync::sync_utils::SyncRequestResponses; use crate::p2p; use crate::util::StopState; -use mwc_p2p::Capabilities; +use chrono::Utc; +use mwc_p2p::{Capabilities, Peer}; use std::sync::Arc; use std::thread; use std::time; @@ -114,6 +115,7 @@ impl SyncRunner { } // Main syncing loop + let mut last_peer_dump = Utc::now(); let mut sleep_time = 1000; loop { if self.stop_state.is_stopped() { @@ -123,6 +125,24 @@ impl SyncRunner { // waiting time for 1000ms is reasonable. thread::sleep(time::Duration::from_millis(sleep_time)); + // Onle in a while let's dump the peers. Needed to understand how network is doing + let now = Utc::now(); + if (now - last_peer_dump).num_seconds() > 60 { + last_peer_dump = now; + let peers: Vec> = self.peers.iter().connected().into_iter().collect(); + info!("Has connected peers: {}", peers.len()); + for p in peers { + info!( + "Peer: {:?} {:?} H:{} Diff:{} Cap: {}", + p.info.addr, + p.info.direction, + p.info.height(), + p.info.total_difficulty().to_num(), + p.info.capabilities.bits() + ); + } + } + // run each sync stage, each of them deciding whether they're needed // except for state sync that only runs if body sync return true (means txhashset is needed) let sync_reponse = self.sync_manager.request(&self.peers);