Skip to content

Commit

Permalink
Merge pull request #940 from input-output-hk/reuse-pending-client-conns
Browse files Browse the repository at this point in the history
Reuse pending client connections
  • Loading branch information
NicolasDP authored Oct 14, 2019
2 parents fd5ad8e + b3981e5 commit 7170a41
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 100 deletions.
87 changes: 49 additions & 38 deletions jormungandr/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use futures::prelude::*;
use network_core::client::{self as core_client, Client as _};
use network_core::client::{BlockService, FragmentService, GossipService, P2pService};
use network_core::error as core_error;
use network_core::gossip::Node;
use network_core::gossip::{Gossip, Node};
use network_core::subscription::{BlockEvent, ChainPullRequest};
use slog::Logger;

Expand All @@ -34,6 +34,14 @@ where
sending_block_msg: Option<SendingBlockMsg>,
}

struct EarlyOutbound {
pub block_announcements: Subscription<Header>,
pub fragments: Subscription<Fragment>,
pub gossip: Subscription<Gossip<topology::NodeData>>,
pub block_solicitations: Subscription<Vec<HeaderHash>>,
pub chain_pulls: Subscription<ChainPullRequest<HeaderHash>>,
}

impl<S: BlockService> Client<S> {
pub fn remote_node_id(&self) -> topology::NodeId {
self.remote_node_id
Expand All @@ -58,43 +66,41 @@ where
fn subscribe(
service: S,
state: ConnectionState,
outbound: EarlyOutbound,
channels: Channels,
) -> impl Future<Item = (Self, PeerComms), Error = ()> {
let mut peer_comms = PeerComms::new();
) -> impl Future<Item = Self, Error = ()> {
let block_announcements = outbound.block_announcements;
let fragments = outbound.fragments;
let gossip = outbound.gossip;
let block_solicitations = outbound.block_solicitations;
let chain_pulls = outbound.chain_pulls;
let err_logger = state.logger().clone();
service
.ready()
.and_then(move |mut service| {
let block_req =
service.block_subscription(peer_comms.subscribe_to_block_announcements());
service
.ready()
.map(move |service| (service, peer_comms, block_req))
let block_req = service.block_subscription(block_announcements);
service.ready().map(move |service| (service, block_req))
})
.and_then(move |(mut service, mut peer_comms, block_req)| {
let content_req =
service.fragment_subscription(peer_comms.subscribe_to_fragments());
.and_then(move |(mut service, block_req)| {
let content_req = service.fragment_subscription(fragments);
service
.ready()
.map(move |service| (service, peer_comms, block_req, content_req))
.map(move |service| (service, block_req, content_req))
})
.and_then(move |(mut service, block_req, content_req)| {
let gossip_req = service.gossip_subscription(gossip);
block_req.join3(content_req, gossip_req).map(
move |(block_res, content_res, gossip_res)| {
(service, block_res, content_res, gossip_res)
},
)
})
.and_then(
move |(mut service, mut peer_comms, block_req, content_req)| {
let gossip_req = service.gossip_subscription(peer_comms.subscribe_to_gossip());
block_req.join3(content_req, gossip_req).map(
move |(block_res, content_res, gossip_res)| {
(service, peer_comms, block_res, content_res, gossip_res)
},
)
},
)
.map_err(move |err| {
info!(err_logger, "subscription request failed: {:?}", err);
})
.and_then(
move |(
service,
mut peer_comms,
(block_events, node_id),
(fragment_sub, node_id_1),
(gossip_sub, node_id_2),
Expand Down Expand Up @@ -131,12 +137,7 @@ where
logger.clone(),
);

// Plug the block solicitations and header pulls to be handled
// via client requests.
let block_solicitations = peer_comms.subscribe_to_block_solicitations();
let chain_pulls = peer_comms.subscribe_to_chain_pulls();

// Resolve with the client instance and communication handles.
// Resolve with the client instance.
let client = Client {
service,
logger,
Expand All @@ -148,7 +149,7 @@ where
chain_pulls,
sending_block_msg: None,
};
Ok((client, peer_comms))
Ok(client)
},
)
}
Expand Down Expand Up @@ -477,17 +478,26 @@ where
pub fn connect(
state: ConnectionState,
channels: Channels,
) -> impl Future<Item = (Client<grpc::Connection>, PeerComms), Error = ()> {
) -> (
PeerComms,
impl Future<Item = Client<grpc::Connection>, Error = ()>,
) {
let addr = state.connection;
let expected_block0 = state.global.block0_hash;
let connect_err_logger = state.logger().clone();
let ready_err_logger = state.logger().clone();
let handshake_err_logger = state.logger().clone();
let block0_mismatch_logger = state.logger().clone();
let mut peer_comms = PeerComms::new();
let outbound = EarlyOutbound {
block_announcements: peer_comms.subscribe_to_block_announcements(),
fragments: peer_comms.subscribe_to_fragments(),
gossip: peer_comms.subscribe_to_gossip(),
block_solicitations: peer_comms.subscribe_to_block_solicitations(),
chain_pulls: peer_comms.subscribe_to_chain_pulls(),
};

// TODO: we need to filter the `addr` to prevent to connect to invalid address

grpc::connect(addr, Some(state.global.as_ref().topology.node().id()))
let future = grpc::connect(addr, Some(state.global.as_ref().topology.node().id()))
.map_err(move |e| {
if let Some(e) = e.connect_error() {
info!(connect_err_logger, "error connecting to peer"; "reason" => %e);
Expand Down Expand Up @@ -524,9 +534,10 @@ pub fn connect(
}
})
})
.and_then(move |conn| Client::subscribe(conn, state, channels))
.map(move |(client, comms)| {
.and_then(move |conn| Client::subscribe(conn, state, outbound, channels))
.inspect(|client| {
debug!(client.logger(), "connected to peer");
(client, comms)
})
});

(peer_comms, future)
}
104 changes: 44 additions & 60 deletions jormungandr/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,23 +247,24 @@ pub fn start(
let conn_state = ConnectionState::new(state.clone(), &peer);
let state = state.clone();
info!(conn_state.logger(), "connecting to initial gossip peer");
let (mut comms, connecting) = client::connect(conn_state, conn_channels.clone());
service_info.spawn(
client::connect(conn_state, conn_channels.clone())
.and_then(move |(client, mut comms)| {
// TODO
connecting
.and_then(move |client| {
let node_id = client.remote_node_id();
let gossip = Gossip::from_nodes(iter::once(state.topology.node()));
match comms.try_send_gossip(gossip) {
Ok(()) => state.peers.insert_peer(node_id, comms),
Err(e) => {
warn!(
client.logger(),
"gossiping to peer failed just after connection: {:?}", e
);
return Err(());
}
if let Err(e) = comms.try_send_gossip(gossip) {
info!(
client.logger(),
"gossiping to peer failed just after connection: {:?}", e
);
return Err(());
}
Ok(client)
state.peers.insert_peer(node_id, comms);
let after_logger = client.logger().clone();
Ok(client.map(move |()| {
info!(after_logger, "client P2P connection closed");
}))
})
.and_then(|client| client),
);
Expand Down Expand Up @@ -336,19 +337,10 @@ fn handle_propagation_msg(msg: PropagateMsg, state: GlobalStateR, channels: Chan
if let Err(unreached_nodes) = res {
for node in unreached_nodes {
let msg = msg.clone();
connect_and_propagate_with(
node,
state.clone(),
channels.clone(),
|handles| match msg {
PropagateMsg::Block(header) => handles
.try_send_block_announcement(header)
.map_err(|e| e.kind()),
PropagateMsg::Fragment(fragment) => {
handles.try_send_fragment(fragment).map_err(|e| e.kind())
}
},
);
connect_and_propagate_with(node, state.clone(), channels.clone(), |comms| match msg {
PropagateMsg::Block(header) => comms.try_send_block_announcement(header).unwrap(),
PropagateMsg::Fragment(fragment) => comms.try_send_fragment(fragment).unwrap(),
});
}
}
}
Expand All @@ -359,8 +351,8 @@ fn send_gossip(state: GlobalStateR, channels: Channels) {
debug!(state.logger(), "sending gossip to node {}", node.id());
let res = state.peers.propagate_gossip_to(node.id(), gossip);
if let Err(gossip) = res {
connect_and_propagate_with(node, state.clone(), channels.clone(), |handles| {
handles.try_send_gossip(gossip).map_err(|e| e.kind())
connect_and_propagate_with(node, state.clone(), channels.clone(), |comms| {
comms.try_send_gossip(gossip).unwrap()
});
}
}
Expand All @@ -370,9 +362,9 @@ fn connect_and_propagate_with<F>(
node: topology::NodeData,
state: GlobalStateR,
channels: Channels,
once_connected: F,
use_comms: F,
) where
F: FnOnce(&mut PeerComms) -> Result<(), p2p::comm::ErrorKind> + Send + 'static,
F: FnOnce(&mut PeerComms),
{
let addr = match node.address() {
Some(addr) => addr,
Expand All @@ -391,45 +383,37 @@ fn connect_and_propagate_with<F>(
.logger()
.new(o!("node_id" => node_id.to_string()));
debug!(logger, "connecting to node");
let (mut comms, connecting) = client::connect(conn_state, channels.clone());
use_comms(&mut comms);
state.peers.insert_peer(node_id, comms);
let spawn_state = state.clone();
let err_state = state.clone();
let cf = client::connect(conn_state, channels.clone())
.and_then(move |(client, mut comms)| {
let conn_err_state = state.clone();
let cf = connecting
.map_err(move |()| {
conn_err_state.peers.remove_peer(node_id);
conn_err_state.topology.evict_node(node_id);
})
.and_then(move |client| {
let connected_node_id = client.remote_node_id();
if connected_node_id == node_id {
let res = once_connected(&mut comms);
match res {
Ok(()) => (),
Err(e) => {
info!(
client.logger(),
"propagation to peer failed just after connection: {:?}", e
);
return Err(());
}
}
} else {
if connected_node_id != node_id {
info!(
client.logger(),
"peer responded with different node id: {}", connected_node_id
);
state.peers.remove_peer(node_id);
state.topology.evict_node(node_id);
if let Some(comms) = state.peers.remove_peer(node_id) {
state.peers.insert_peer(connected_node_id, comms);
} else {
warn!(client.logger(), "peer no longer in map after connecting");
}
};

state.peers.insert_peer(connected_node_id, comms);

Ok(client)
let after_logger = client.logger().clone();
let future = client.map(move |()| {
info!(after_logger, "client P2P connection closed");
});
Ok(future)
})
.and_then(|client| client)
.then(move |res| {
info!(logger, "client P2P connection closed");
if let Err(()) = res {
err_state.topology.evict_node(node_id);
debug!(logger, "evicted node");
}
Ok(())
});
.and_then(|client| client);
spawn_state.spawn(cf);
}

Expand Down
4 changes: 2 additions & 2 deletions jormungandr/src/network/p2p/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,9 @@ impl Peers {
map.insert_peer(id, comms)
}

pub fn remove_peer(&self, id: topology::NodeId) {
pub fn remove_peer(&self, id: topology::NodeId) -> Option<PeerComms> {
let mut map = self.mutex.lock().unwrap();
map.remove_peer(id);
map.remove_peer(id)
}

pub fn subscribe_to_block_events(&self, id: topology::NodeId) -> BlockEventSubscription {
Expand Down

0 comments on commit 7170a41

Please sign in to comment.