Skip to content

Commit

Permalink
add allowlisted affinity
Browse files Browse the repository at this point in the history
  • Loading branch information
longbowlu committed Jul 18, 2023
1 parent 0f0ae8d commit f1f151b
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 9 deletions.
11 changes: 7 additions & 4 deletions crates/anemo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ pub struct Config {
/// Maximum number of concurrent connections to have established at a given point in time.
///
/// This limit is applied in the following ways:
/// - Inbound connections from [`KnownPeers`] with [`PeerAffinity::High`] bypass this limit. All
/// other inbound connections are only accepted if the total number of inbound and outbound
/// - Inbound connections from [`KnownPeers`] with [`PeerAffinity::High`] or
/// [`PeerAffinity::Allowed`] bypass this limit. All other inbound
/// connections are only accepted if the total number of inbound and outbound
/// connections, irrespective of affinity, is less than this limit.
/// - Outbound connections explicitly made by the application via [`Network::connect`] or
/// [`Network::connect_with_peer_id`] bypass this limit.
/// - Outbound connections made in the background, due to configured [`KnownPeers`], to peers with
/// [`PeerAffinity::High`] bypass this limit and are always attempted, while peers with lower
/// affinity respect this limit.
/// [`PeerAffinity::High`] bypass this limit and are always attempted.
///
/// If unspecified, there will be no limit on the number of concurrent connections.
///
/// [`KnownPeers`]: crate::KnownPeers
/// [`PeerAffinity::Allowed`]: crate::types::PeerAffinity::Allowed
/// [`PeerAffinity::High`]: crate::types::PeerAffinity::High
/// [`Network::connect`]: crate::Network::connect
/// [`Network::connect_with_peer_id`]: crate::Network::connect_with_peer_id
Expand Down Expand Up @@ -501,6 +502,7 @@ impl EndpointConfigBuilder {
Ok(server)
}

#[allow(deprecated)]
fn client_config(
cert: rustls::Certificate,
pkcs8_der: rustls::PrivateKey,
Expand Down Expand Up @@ -561,6 +563,7 @@ impl EndpointConfig {
&self.quinn_client_config
}

#[allow(deprecated)]
pub fn client_config_with_expected_server_identity(
&self,
peer_id: PeerId,
Expand Down
6 changes: 4 additions & 2 deletions crates/anemo/src/network/connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,9 @@ impl ConnectionManager {
// TODO close the connection explicitly with a reason once we have machine
// readable errors. See https://github.com/MystenLabs/anemo/issues/13 for more info.
match known_peers.get(&connection.peer_id()) {
Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::High) => {
Some(PeerInfo { affinity, .. })
if matches!(affinity, PeerAffinity::High | PeerAffinity::Allowed) =>
{
// Do nothing, let the connection through
}
Some(PeerInfo { affinity, .. }) if matches!(affinity, PeerAffinity::Never) => {
Expand Down Expand Up @@ -374,7 +376,7 @@ impl ConnectionManager {
known_peers
.values()
.filter(|peer_info| {
!matches!(peer_info.affinity, PeerAffinity::Never)
matches!(peer_info.affinity, PeerAffinity::High)
&& peer_info.peer_id != self.endpoint.peer_id() // We don't dial ourself
&& !peer_info.address.is_empty() // The peer has an address we can dial
&& !active_peers.contains(&peer_info.peer_id) // The node is not already connected.
Expand Down
62 changes: 60 additions & 2 deletions crates/anemo/src/network/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::{types::PeerEvent, Network, NetworkRef, Request, Response, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use std::convert::Infallible;
use futures::FutureExt;
use std::{convert::Infallible, time::Duration};
use tower::{util::BoxCloneService, ServiceExt};
use tracing::trace;

Expand Down Expand Up @@ -241,6 +242,7 @@ async fn peers_with_affinity_never_are_not_dialed_in_the_background() -> Result<
let network_1 = build_network()?;
let network_2 = build_network()?;
let network_3 = build_network()?;
let network_4 = build_network()?;

let mut subscriber_1 = network_1.subscribe()?.0;

Expand All @@ -258,14 +260,21 @@ async fn peers_with_affinity_never_are_not_dialed_in_the_background() -> Result<
address: vec![network_3.local_addr().into()],
};
network_1.known_peers().insert(peer_info_3);
// Configure peer 4 with Allowed affinity
let peer_info_4 = crate::types::PeerInfo {
peer_id: network_4.peer_id(),
affinity: crate::types::PeerAffinity::Allowed,
address: vec![network_4.local_addr().into()],
};
network_1.known_peers().insert(peer_info_4);

// When peer 2 tries to connect peer 1 will reject it
network_2
.connect_with_peer_id(network_1.local_addr(), network_1.peer_id())
.await
.unwrap_err();

// We only ever see connections being made/lost with peer 3 and not peer 2
// We only ever see connections being made/lost with peer 3 and not peer 2 or 4
let peer_id_3 = network_3.peer_id();
assert_eq!(PeerEvent::NewPeer(peer_id_3), subscriber_1.recv().await?);

Expand Down Expand Up @@ -419,6 +428,55 @@ async fn basic_connectivity_check() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn basic_connectivity_check_with_allowlist_affinity() -> Result<()> {
use crate::types::PeerEvent::*;

let _guard = crate::init_tracing_for_testing();

let network_1 = build_network()?;
let network_2 = build_network()?;

let peer_id_1 = network_1.peer_id();
let peer_id_2 = network_2.peer_id();

let mut subscriber_1 = network_1.subscribe()?.0;
let mut subscriber_2 = network_2.subscribe()?.0;

network_1.known_peers().insert(crate::types::PeerInfo {
peer_id: peer_id_2,
affinity: crate::types::PeerAffinity::Allowed,
address: vec![network_2.local_addr().into()],
});

network_2.known_peers().insert(crate::types::PeerInfo {
peer_id: peer_id_1,
affinity: crate::types::PeerAffinity::Allowed,
address: vec![network_1.local_addr().into()],
});
// Peers shouldn't connect each other.
let mut timeout = tokio::time::sleep(Duration::from_secs(5)).boxed();
tokio::select! {
_ = subscriber_1.recv() => return Err(anyhow::anyhow!("peer 1 should not have received a peer event")),
_ = subscriber_2.recv() => return Err(anyhow::anyhow!("peer 2 should not have received a peer event")),
_ = &mut timeout => (),
};

// Now remove peer2 from network1 and add it back as High affinity.
network_1.known_peers().remove(&peer_id_2).unwrap();
network_1.known_peers().insert(crate::types::PeerInfo {
peer_id: peer_id_2,
affinity: crate::types::PeerAffinity::High,
address: vec![network_2.local_addr().into()],
});

// Expect both to have new peer events.
assert_eq!(NewPeer(peer_id_2), subscriber_1.recv().await?);
assert_eq!(NewPeer(peer_id_1), subscriber_2.recv().await?);

Ok(())
}

#[tokio::test]
async fn test_network_isolation() -> Result<()> {
let _guard = crate::init_tracing_for_testing();
Expand Down
3 changes: 2 additions & 1 deletion crates/anemo/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,9 @@ pub mod header {
pub enum PeerAffinity {
/// Always attempt to maintain a connection with this Peer.
High,
/// Not proactively attempt to estlish a connection but always accept inbound connection requests.
Allowed,
/// Never attempt to maintain a connection with this Peer.
///
/// Inbound connection requests from these Peers are rejected.
Never,
}
Expand Down

0 comments on commit f1f151b

Please sign in to comment.