Skip to content

Commit

Permalink
fix: correctly poll NetworkState (#12973)
Browse files Browse the repository at this point in the history
  • Loading branch information
klkvr authored Nov 28, 2024
1 parent da53d76 commit 793fc23
Showing 1 changed file with 29 additions and 29 deletions.
58 changes: 29 additions & 29 deletions crates/net/network/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,7 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}

/// Handle the outcome of processed response, for example directly queue another request.
fn on_block_response_outcome(
&mut self,
outcome: BlockResponseOutcome,
) -> Option<StateAction<N>> {
fn on_block_response_outcome(&mut self, outcome: BlockResponseOutcome) {
match outcome {
BlockResponseOutcome::Request(peer, request) => {
self.handle_block_request(peer, request);
Expand All @@ -397,29 +394,26 @@ impl<N: NetworkPrimitives> NetworkState<N> {
self.peers_manager.apply_reputation_change(&peer, reputation_change);
}
}
None
}

/// Invoked when received a response from a connected peer.
///
/// Delegates the response result to the fetcher which may return an outcome specific
/// instruction that needs to be handled in [`Self::on_block_response_outcome`]. This could be
/// a follow-up request or an instruction to slash the peer's reputation.
fn on_eth_response(
&mut self,
peer: PeerId,
resp: PeerResponseResult<N>,
) -> Option<StateAction<N>> {
match resp {
fn on_eth_response(&mut self, peer: PeerId, resp: PeerResponseResult<N>) {
let outcome = match resp {
PeerResponseResult::BlockHeaders(res) => {
let outcome = self.state_fetcher.on_block_headers_response(peer, res)?;
self.on_block_response_outcome(outcome)
self.state_fetcher.on_block_headers_response(peer, res)
}
PeerResponseResult::BlockBodies(res) => {
let outcome = self.state_fetcher.on_block_bodies_response(peer, res)?;
self.on_block_response_outcome(outcome)
self.state_fetcher.on_block_bodies_response(peer, res)
}
_ => None,
};

if let Some(outcome) = outcome {
self.on_block_response_outcome(outcome);
}
}

Expand All @@ -443,13 +437,14 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}
}

// need to buffer results here to make borrow checker happy
let mut closed_sessions = Vec::new();
let mut received_responses = Vec::new();
loop {
// need to buffer results here to make borrow checker happy
let mut closed_sessions = Vec::new();
let mut received_responses = Vec::new();

// poll all connected peers for responses
for (id, peer) in &mut self.active_peers {
if let Some(mut response) = peer.pending_response.take() {
// poll all connected peers for responses
for (id, peer) in &mut self.active_peers {
let Some(mut response) = peer.pending_response.take() else { continue };
match response.poll(cx) {
Poll::Ready(res) => {
// check if the error is due to a closed channel to the session
Expand All @@ -460,7 +455,8 @@ impl<N: NetworkPrimitives> NetworkState<N> {
"Request canceled, response channel from session closed."
);
// if the channel is closed, this means the peer session is also
// closed, in which case we can invoke the [Self::on_closed_session]
// closed, in which case we can invoke the
// [Self::on_closed_session]
// immediately, preventing followup requests and propagate the
// connection dropped error
closed_sessions.push(*id);
Expand All @@ -474,15 +470,17 @@ impl<N: NetworkPrimitives> NetworkState<N> {
}
};
}
}

for peer in closed_sessions {
self.on_session_closed(peer)
}
for peer in closed_sessions {
self.on_session_closed(peer)
}

if received_responses.is_empty() {
break;
}

for (peer_id, resp) in received_responses {
if let Some(action) = self.on_eth_response(peer_id, resp) {
self.queued_messages.push_back(action);
for (peer_id, resp) in received_responses {
self.on_eth_response(peer_id, resp);
}
}

Expand All @@ -491,6 +489,8 @@ impl<N: NetworkPrimitives> NetworkState<N> {
self.on_peer_action(action);
}

// We need to poll again tn case we have received any responses because they may have
// triggered follow-up requests.
if self.queued_messages.is_empty() {
return Poll::Pending
}
Expand Down

0 comments on commit 793fc23

Please sign in to comment.