Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peers exchange/handling improvements #91

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions p2p/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl Peers {
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
};
debug!("Adding newly connected peer {}.", peer_data.addr);
info!("Adding newly connected Healthy peer {}.", peer_data.addr);
peers.insert(peer_data.addr.clone(), peer);
}
if let Err(e) = self.save_peer(&peer_data) {
Expand All @@ -164,7 +164,7 @@ impl Peers {
ban_reason,
last_connected: Utc::now().timestamp(),
};
debug!("Banning peer {}, ban_reason={:?}", addr, ban_reason);
info!("Banning peer {}, ban_reason={:?}", addr, ban_reason);
self.save_peer(&peer_data)
}

Expand Down Expand Up @@ -223,7 +223,7 @@ impl Peers {
message: &str,
) -> Result<(), Error> {
info!(
"Trying to ban peer {}, ban_reason {:?}, {}",
"Banning peer {}, ban_reason {:?}, {}",
peer_addr, ban_reason, message
);
// Update the peer in peers db
Expand All @@ -232,7 +232,10 @@ impl Peers {
// Update the peer in the peers Vec
match self.get_connected_peer(peer_addr) {
Some(peer) => {
info!("Banning peer {}, ban_reason {:?}", peer_addr, ban_reason);
debug!(
"Updating online peer with Ban {}, ban_reason {:?}",
peer_addr, ban_reason
);
// setting peer status will get it removed at the next clean_peer
peer.send_ban_reason(ban_reason)?;
peer.set_banned();
Expand Down Expand Up @@ -372,8 +375,8 @@ impl Peers {
}

/// Find peers in store (not necessarily connected) and return their data
pub fn find_peers(&self, state: State, cap: Capabilities, count: usize) -> Vec<PeerData> {
match self.store.find_peers(state, cap, count) {
pub fn find_peers(&self, state: State, cap: Capabilities) -> Vec<PeerData> {
match self.store.find_peers(state, cap) {
Ok(peers) => peers,
Err(e) => {
error!("failed to find peers: {:?}", e);
Expand Down Expand Up @@ -428,15 +431,15 @@ impl Peers {
for peer in self.iter() {
let ref peer: &Peer = peer.as_ref();
if peer.is_banned() {
debug!("clean_peers {:?}, peer banned", peer.info.addr);
info!("clean_peers {:?}, peer banned", peer.info.addr);
rm.push(peer.info.addr.clone());
} else if !peer.is_connected() {
debug!("clean_peers {:?}, not connected", peer.info.addr);
info!("clean_peers {:?}, not connected", peer.info.addr);
rm.push(peer.info.addr.clone());
} else if peer.is_abusive() {
let received = peer.tracker().received_bytes.read().count_per_min();
let sent = peer.tracker().sent_bytes.read().count_per_min();
debug!(
info!(
"clean_peers {:?}, abusive ({} sent, {} recv)",
peer.info.addr, sent, received,
);
Expand All @@ -447,7 +450,7 @@ impl Peers {
match self.adapter.total_difficulty() {
Ok(total_difficulty) => {
if stuck && diff < total_difficulty {
debug!("clean_peers {:?}, stuck peer", peer.info.addr);
info!("clean_peers {:?}, stuck peer", peer.info.addr);
let _ = self.update_state(&peer.info.addr, State::Defunct);
rm.push(peer.info.addr.clone());
}
Expand Down Expand Up @@ -612,11 +615,6 @@ impl ChainAdapter for Peers {
if !self.adapter.block_received(b, peer_info, opts)? {
// if the peer sent us a block that's intrinsically bad
// they are either mistaken or malevolent, both of which require a ban
debug!(
"Received a bad block {} from {}, the peer will be banned",
hash,
peer_info.addr.clone(),
);
self.ban_peer(
&peer_info.addr,
ReasonForBan::BadBlock,
Expand All @@ -642,7 +640,6 @@ impl ChainAdapter for Peers {
"Received a bad compact block {} from {}, the peer will be banned",
hash, peer_info.addr
);
debug!("{}", msg);
self.ban_peer(&peer_info.addr, ReasonForBan::BadCompactBlock, &msg)
.map_err(|e| chain::Error::Other(format!("ban peer error {}", e)))?;
Ok(false)
Expand Down Expand Up @@ -832,7 +829,11 @@ impl NetAdapter for Peers {
/// Find good peers we know with the provided capability and return their
/// addresses.
fn find_peer_addrs(&self, capab: Capabilities) -> Vec<PeerAddr> {
let peers = self.find_peers(State::Healthy, capab, MAX_PEER_ADDRS as usize);
let peers: Vec<PeerData> = self
.find_peers(State::Healthy, capab)
.into_iter()
.take(MAX_PEER_ADDRS as usize)
.collect();
trace!("find_peer_addrs: {} healthy peers picked", peers.len());
map_vec!(peers, |p| p.addr.clone())
}
Expand All @@ -854,7 +855,7 @@ impl NetAdapter for Peers {
flags: State::Healthy,
last_banned: 0,
ban_reason: ReasonForBan::None,
last_connected: Utc::now().timestamp(),
last_connected: 0,
};
to_save.push(peer);
}
Expand Down
17 changes: 12 additions & 5 deletions p2p/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::msg::{
SegmentRequest, SegmentResponse, StartHeadersHashResponse, TxHashSetArchive, Type,
};
use crate::serv::Server;
use crate::types::{Error, NetAdapter, PeerAddr, PeerAddr::Onion, PeerInfo};
use crate::types::{Error, NetAdapter, PeerAddr, PeerInfo};
use std::sync::Arc;

pub struct Protocol {
Expand Down Expand Up @@ -199,9 +199,12 @@ impl MessageHandler for Protocol {
}

Message::GetPeerAddrs(get_peers) => {
let peers =
let mut peers =
adapter.find_peer_addrs(get_peers.capabilities & !Capabilities::TOR_ADDRESS);

// Loopbacks really not interesting for other peers. Loopbacks are possible because of the local setup
peers.retain(|p| !p.is_loopback());

// if this peer does not support TOR, do not send them the tor peers.
// doing so will cause them to ban us because it's not part of the old protocol.
let peers = if !get_peers.capabilities.contains(Capabilities::TOR_ADDRESS) {
Expand Down Expand Up @@ -230,7 +233,7 @@ impl MessageHandler for Protocol {
let mut peers: Vec<PeerAddr> = Vec::new();
for peer in peer_addrs.peers {
match peer.clone() {
Onion(address) => {
PeerAddr::Onion(address) => {
let self_address = self.server.self_onion_address.as_ref();
if self_address.is_none() {
peers.push(peer);
Expand All @@ -242,8 +245,12 @@ impl MessageHandler for Protocol {
}
}
}
_ => {
peers.push(peer);
PeerAddr::Ip(_) => {
if peer.is_loopback() {
debug!("Not pushing loopback addresse = {:?}", peer);
} else {
peers.push(peer);
}
}
}
}
Expand Down
14 changes: 12 additions & 2 deletions p2p/src/serv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::types::{
};
use crate::util::secp::pedersen::RangeProof;
use crate::util::StopState;
use crate::PeerAddr::Ip;
use mwc_chain::txhashset::Segmenter;
use mwc_chain::SyncState;

Expand Down Expand Up @@ -210,7 +211,12 @@ impl Server {
}
debug!("not self, connecting to {}", address);
}
_ => {}
Ip(_) => {
if addr.is_loopback() {
debug!("error trying to connect with self: {:?}", addr);
return Err(Error::PeerWithSelf);
}
}
}
}

Expand Down Expand Up @@ -339,7 +345,11 @@ impl Server {
self.sync_state.clone(),
self.clone(),
)?;
self.peers.add_connected(Arc::new(peer))?;
// if we are using TOR, we must reject the local addressed because it comes from the proxy
// Will add peer after it share the TOR address
if self.self_onion_address.is_none() {
self.peers.add_connected(Arc::new(peer))?;
}
Ok(())
}

Expand Down
33 changes: 24 additions & 9 deletions p2p/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

use chrono::Utc;
use num::FromPrimitive;
use rand::seq::SliceRandom;
use rand::thread_rng;

use crate::mwc_core::ser::{self, DeserializationMode, Readable, Reader, Writeable, Writer};
use crate::types::{Capabilities, PeerAddr, ReasonForBan};
use mwc_store::{self, option_to_not_found, to_key, Error};
use mwc_util::secp::rand::Rng;

const DB_NAME: &str = "peerV2";
const STORE_SUBPATH: &str = "peers";
Expand Down Expand Up @@ -168,12 +168,7 @@ impl PeerStore {
}

/// Find some peers in our local db.
pub fn find_peers(
&self,
state: State,
cap: Capabilities,
count: usize,
) -> Result<Vec<PeerData>, Error> {
pub fn find_peers(&self, state: State, cap: Capabilities) -> Result<Vec<PeerData>, Error> {
// All new peers has flags Capabilities::UNKNOWN, that is why we better to return themn as well.
// Node will try to connect to them and find the capability.
let mut peers = self
Expand All @@ -183,8 +178,20 @@ impl PeerStore {
&& (p.capabilities == Capabilities::UNKNOWN || p.capabilities.contains(cap))
})
.collect::<Vec<_>>();
peers[..].shuffle(&mut thread_rng());
Ok(peers.iter().take(count).cloned().collect())
// We want last used to go first.
let peers_num = peers.len();
if peers_num > 1 {
peers.sort_by_key(|p| -p.last_connected);
// Then shuffle most of them
let shuffle_steps = peers_num / 4;
let mut rng = thread_rng();
for _ in 0..shuffle_steps {
let i1 = rng.gen_range(0, peers_num);
let i2 = rng.gen_range(0, peers_num);
peers.swap(i1, i2);
}
}
Ok(peers)
}

/// Iterator over all known peers.
Expand Down Expand Up @@ -213,6 +220,14 @@ impl PeerStore {
batch.get_ser::<PeerData>(&peer_key(peer_addr)[..], None),
|| format!("Peer at address: {}", peer_addr),
)?;

if peer.flags != new_state {
info!(
"Changing peer {:?} state form {:?} to {:?}",
peer_addr, peer.flags, new_state
);
}

peer.flags = new_state;
if new_state == State::Banned {
peer.last_banned = Utc::now().timestamp();
Expand Down
9 changes: 9 additions & 0 deletions p2p/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ impl PeerAddr {
}
}
}

pub fn is_loopback(&self) -> bool {
match self {
Ip(ip) => ip.ip().is_loopback(),
Onion(_) => {
false // we can't detect self onion address here in any case
}
}
}
}

/// Configuration for the peer-to-peer server.
Expand Down
Loading
Loading