Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(relay): make reservation to our closest peers #2578

Open
wants to merge 1 commit into
base: rc-2024.12.1-hotfix3
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions ant-networking/src/relay_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@
// permissions and limitations relating to use of the SAFE Network Software.

use crate::driver::{BadNodes, NodeBehaviour};
use ant_protocol::NetworkAddress;
use itertools::Itertools;
use libp2p::{
core::transport::ListenerId, multiaddr::Protocol, Multiaddr, PeerId, StreamProtocol, Swarm,
};
use rand::Rng;
use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::collections::{BTreeMap, HashMap, HashSet};

const MAX_CONCURRENT_RELAY_CONNECTIONS: usize = 4;
const MAX_POTENTIAL_CANDIDATES: usize = 1000;
Expand All @@ -28,7 +29,7 @@ pub(crate) fn is_a_relayed_peer(addrs: &HashSet<Multiaddr>) -> bool {
#[derive(Debug)]
pub(crate) struct RelayManager {
self_peer_id: PeerId,
candidates: VecDeque<(PeerId, Multiaddr)>,
candidates: Vec<(PeerId, Multiaddr)>,
waiting_for_reservation: BTreeMap<PeerId, Multiaddr>,
connected_relays: BTreeMap<PeerId, Multiaddr>,

Expand Down Expand Up @@ -72,7 +73,7 @@ impl RelayManager {
// Hence here can add the addr directly.
if let Some(relay_addr) = Self::craft_relay_address(addr, Some(*peer_id)) {
debug!("Adding {peer_id:?} with {relay_addr:?} as a potential relay candidate");
self.candidates.push_back((*peer_id, relay_addr));
self.candidates.push((*peer_id, relay_addr));
}
}
} else {
Expand Down Expand Up @@ -101,44 +102,47 @@ impl RelayManager {
// todo: should we remove all our other `listen_addr`? And should we block from adding `add_external_address` if
// we're behind nat?

// Pick a random candidate from the vector. Check if empty, or `gen_range` panics for empty range.
// Pick a random closest candidate from the vector. Check if empty, or `gen_range` panics for empty range.
let self_peer_id = NetworkAddress::from_peer(self.self_peer_id);
self.candidates.sort_by_key(|(peer_id, _)| {
self_peer_id.distance(&NetworkAddress::from_peer(*peer_id))
});
let index = if self.candidates.is_empty() {
debug!("No more relay candidates.");
break;
} else {
rand::thread_rng().gen_range(0..self.candidates.len())
let max_range = std::cmp::min(self.candidates.len(), 5);
rand::thread_rng().gen_range(0..max_range)
};

if let Some((peer_id, relay_addr)) = self.candidates.remove(index) {
// skip if detected as a bad node
if let Some((_, is_bad)) = bad_nodes.get(&peer_id) {
if *is_bad {
debug!("Peer {peer_id:?} is considered as a bad node. Skipping it.");
continue;
}
}

if self.connected_relays.contains_key(&peer_id)
|| self.waiting_for_reservation.contains_key(&peer_id)
{
debug!("We are already using {peer_id:?} as a relay server. Skipping.");
let (peer_id, relay_addr) = self.candidates.swap_remove(index);
// skip if detected as a bad node
if let Some((_, is_bad)) = bad_nodes.get(&peer_id) {
if *is_bad {
debug!("Peer {peer_id:?} is considered as a bad node. Skipping it.");
continue;
}
}

if self.connected_relays.contains_key(&peer_id)
|| self.waiting_for_reservation.contains_key(&peer_id)
{
debug!("We are already using {peer_id:?} as a relay server. Skipping.");
continue;
}

match swarm.listen_on(relay_addr.clone()) {
Ok(id) => {
info!("Sending reservation to relay {peer_id:?} on {relay_addr:?}");
self.waiting_for_reservation.insert(peer_id, relay_addr);
self.relayed_listener_id_map.insert(id, peer_id);
n_reservations += 1;
}
Err(err) => {
error!("Error while trying to listen on the relay addr: {err:?} on {relay_addr:?}");
}
match swarm.listen_on(relay_addr.clone()) {
Ok(id) => {
info!("Sending reservation to relay {peer_id:?} on {relay_addr:?}");
self.waiting_for_reservation.insert(peer_id, relay_addr);
self.relayed_listener_id_map.insert(id, peer_id);
n_reservations += 1;
}
Err(err) => {
error!(
"Error while trying to listen on the relay addr: {err:?} on {relay_addr:?}"
);
}
} else {
debug!("No more relay candidates.");
break;
}
}
}
Expand Down
Loading