From a33a100e38d45f8ff3722b3a21b5a76d622da328 Mon Sep 17 00:00:00 2001 From: Hannes Karppila Date: Mon, 5 Feb 2024 04:04:14 +0200 Subject: [PATCH] Decrease peer reputation on request timeouts and decode errors (#1574) Closes #1345. Closes #1346 Closes #1350. This PR stops discarding request errors from libp2p, and instead returns them to the sender of the request. Also penalizes peers for sending invalid responses or for not replying at all. Making penalty configurable should be a follow-up PR, as there are other penalties that should be configurable as well TODO: - [x] Make timeout configutable: Already seems to be case on master branch - [x] Add tests - [x] Fix current tests that for some reason don't terminate --------- Co-authored-by: xgreenx --- CHANGELOG.md | 1 + crates/services/p2p/src/behavior.rs | 2 +- crates/services/p2p/src/p2p_service.rs | 215 ++++++++++++++---- .../p2p/src/request_response/messages.rs | 32 ++- crates/services/p2p/src/service.rs | 52 +++-- 5 files changed, 221 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ce75bfffe03..e7f59edc3f6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ Description of the upcoming release here. - [#1585](https://github.com/FuelLabs/fuel-core/pull/1585): Let `NetworkBehaviour` macro generate `FuelBehaviorEvent` in p2p - [#1579](https://github.com/FuelLabs/fuel-core/pull/1579): The change extracts the off-chain-related logic from the executor and moves it to the GraphQL off-chain worker. It creates two new concepts - Off-chain and On-chain databases where the GraphQL worker has exclusive ownership of the database and may modify it without intersecting with the On-chain database. - [#1577](https://github.com/FuelLabs/fuel-core/pull/1577): Moved insertion of sealed blocks into the `BlockImporter` instead of the executor. +- [#1574](https://github.com/FuelLabs/fuel-core/pull/1574): Penalizes peers for sending invalid responses or for not replying at all. - [#1601](https://github.com/FuelLabs/fuel-core/pull/1601): Fix formatting in docs and check that `cargo doc` passes in the CI. - [#1636](https://github.com/FuelLabs/fuel-core/pull/1636): Add more docs to GraphQL DAP API. diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index a8ccd9a38f0..8771028efa8 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -187,7 +187,7 @@ impl FuelBehaviour { Ok(true) => { tracing::debug!(target: "fuel-p2p", "Sent a report for MessageId: {} from PeerId: {}", msg_id, propagation_source); if should_check_score { - return self.gossipsub.peer_score(propagation_source) + return self.gossipsub.peer_score(propagation_source); } } Ok(false) => { diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 3324b09f055..3f0c0724edc 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -27,9 +27,10 @@ use crate::{ request_response::messages::{ RequestError, RequestMessage, - ResponseChannelItem, + ResponseError, ResponseMessage, ResponseSendError, + ResponseSender, }, TryPeerId, }; @@ -96,14 +97,18 @@ pub struct FuelP2PService { /// Swarm handler for FuelBehaviour swarm: Swarm, - /// Holds the Sender(s) part of the Oneshot Channel from the NetworkOrchestrator - /// Once the ResponseMessage is received from the p2p Network - /// It will send it to the NetworkOrchestrator via its unique Sender - outbound_requests_table: HashMap, - - /// Holds the ResponseChannel(s) for the inbound requests from the p2p Network - /// Once the Response is prepared by the NetworkOrchestrator - /// It will send it to the specified Peer via its unique ResponseChannel + /// Holds active outbound requests and associated oneshot channels. + /// When we send a request to the p2p network, we add it here. The sender + /// must provide a channel to receive the response. + /// Whenever a response (or an error) is received from the p2p network, + /// the request is removed from this table, and the channel is used to + /// send the result to the caller. + outbound_requests_table: HashMap, + + /// Holds active inbound requests and associated oneshot channels. + /// Whenever we're done processing the request, it's removed from this table, + /// and the channel is used to send the result to libp2p, which will forward it + /// to the peer that requested it. inbound_requests_table: HashMap>, /// NetworkCodec used as `` for encoding and decoding of Gossipsub messages @@ -248,7 +253,7 @@ impl FuelP2PService { loop { if let SwarmEvent::NewListenAddr { .. } = self.swarm.select_next_some().await { - break + break; } } } @@ -296,7 +301,7 @@ impl FuelP2PService { &mut self, peer_id: Option, message_request: RequestMessage, - channel_item: ResponseChannelItem, + on_response: ResponseSender, ) -> Result { let peer_id = match peer_id { Some(peer_id) => peer_id, @@ -305,7 +310,7 @@ impl FuelP2PService { let peers_count = self.peer_manager.total_peers_connected(); if peers_count == 0 { - return Err(RequestError::NoPeersConnected) + return Err(RequestError::NoPeersConnected); } let mut range = rand::thread_rng(); @@ -318,8 +323,7 @@ impl FuelP2PService { .behaviour_mut() .send_request_msg(message_request, &peer_id); - self.outbound_requests_table - .insert(request_id, channel_item); + self.outbound_requests_table.insert(request_id, on_response); Ok(request_id) } @@ -332,7 +336,7 @@ impl FuelP2PService { ) -> Result<(), ResponseSendError> { let Some(channel) = self.inbound_requests_table.remove(&request_id) else { debug!("ResponseChannel for {:?} does not exist!", request_id); - return Err(ResponseSendError::ResponseChannelDoesNotExist) + return Err(ResponseSendError::ResponseChannelDoesNotExist); }; if self @@ -342,7 +346,7 @@ impl FuelP2PService { .is_err() { debug!("Failed to send ResponseMessage for {:?}", request_id); - return Err(ResponseSendError::SendingResponseFailed) + return Err(ResponseSendError::SendingResponseFailed); } Ok(()) @@ -531,14 +535,14 @@ impl FuelP2PService { { let _ = self.swarm.disconnect_peer_id(peer_id); } else if initial_connection { - return Some(FuelP2PEvent::PeerConnected(peer_id)) + return Some(FuelP2PEvent::PeerConnected(peer_id)); } } PeerReportEvent::PeerDisconnected { peer_id } => { if self.peer_manager.handle_peer_disconnect(peer_id) { let _ = self.swarm.dial(peer_id); } - return Some(FuelP2PEvent::PeerDisconnected(peer_id)) + return Some(FuelP2PEvent::PeerDisconnected(peer_id)); } } None @@ -560,7 +564,7 @@ impl FuelP2PService { return Some(FuelP2PEvent::InboundRequestMessage { request_id, request_message: request, - }) + }); } request_response::Message::Response { request_id, @@ -572,26 +576,35 @@ impl FuelP2PService { return None; }; - let send_ok = match (channel, response) { - ( - ResponseChannelItem::Transactions(channel), - ResponseMessage::Transactions(transactions), - ) => channel.send(transactions).is_ok(), - ( - ResponseChannelItem::SealedHeaders(channel), - ResponseMessage::SealedHeaders(headers), - ) => channel.send((peer, headers)).is_ok(), - - (_, _) => { - tracing::error!( - "Mismatching request and response channel types" - ); - return None; - } + let send_ok = match channel { + ResponseSender::SealedHeaders(c) => match response { + ResponseMessage::SealedHeaders(v) => { + c.send((peer, Ok(v))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + } + }, + ResponseSender::Transactions(c) => match response { + ResponseMessage::Transactions(v) => { + c.send((peer, Ok(v))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + } + }, }; if !send_ok { - debug!("Failed to send through the channel for {:?}", request_id); + warn!("Failed to send through the channel for {:?}", request_id); } } }, @@ -601,6 +614,9 @@ impl FuelP2PService { request_id, } => { tracing::error!("RequestResponse inbound error for peer: {:?} with id: {:?} and error: {:?}", peer, request_id, error); + + // Drop the channel, as we can't send a response + let _ = self.inbound_requests_table.remove(&request_id); } request_response::Event::OutboundFailure { peer, @@ -609,7 +625,16 @@ impl FuelP2PService { } => { tracing::error!("RequestResponse outbound error for peer: {:?} with id: {:?} and error: {:?}", peer, request_id, error); - let _ = self.outbound_requests_table.remove(&request_id); + if let Some(channel) = self.outbound_requests_table.remove(&request_id) { + match channel { + ResponseSender::SealedHeaders(c) => { + let _ = c.send((peer, Err(ResponseError::P2P(error)))); + } + ResponseSender::Transactions(c) => { + let _ = c.send((peer, Err(ResponseError::P2P(error)))); + } + }; + } } _ => {} } @@ -697,8 +722,9 @@ mod tests { peer_manager::PeerInfo, request_response::messages::{ RequestMessage, - ResponseChannelItem, + ResponseError, ResponseMessage, + ResponseSender, }, service::to_message_acceptance, }; @@ -1203,6 +1229,7 @@ mod tests { // let's update our BlockHeight node_b.update_block_height(latest_block_height); } + tracing::info!("Node B Event: {:?}", node_b_event); } } @@ -1511,7 +1538,7 @@ mod tests { match request_msg.clone() { RequestMessage::SealedHeaders(range) => { let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); - assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::SealedHeaders(tx_orchestrator)).is_ok()); + assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseSender::SealedHeaders(tx_orchestrator)).is_ok()); let tx_test_end = tx_test_end.clone(); tokio::spawn(async move { @@ -1519,7 +1546,7 @@ mod tests { let expected = arbitrary_headers_for_range(range.clone()); - if let Ok((_, sealed_headers)) = response_message { + if let Ok((_, Ok(sealed_headers))) = response_message { let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b)); let _ = tx_test_end.send(check).await; } else { @@ -1530,13 +1557,13 @@ mod tests { } RequestMessage::Transactions(_range) => { let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); - assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseChannelItem::Transactions(tx_orchestrator)).is_ok()); + assert!(node_a.send_request_msg(None, request_msg.clone(), ResponseSender::Transactions(tx_orchestrator)).is_ok()); let tx_test_end = tx_test_end.clone(); tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok(Some(transactions)) = response_message { + if let Ok((_, Ok(Some(transactions)))) = response_message { let check = transactions.len() == 1 && transactions[0].0.len() == 5; let _ = tx_test_end.send(check).await; } else { @@ -1589,6 +1616,83 @@ mod tests { request_response_works_with(RequestMessage::SealedHeaders(arbitrary_range)).await } + /// We send a request for transactions, but it's responded by only headers + #[tokio::test] + #[instrument] + async fn invalid_response_type_is_detected() { + let mut p2p_config = + Config::default_initialized("invalid_response_type_is_detected"); + + // Node A + let mut node_a = build_service_from_config(p2p_config.clone()).await; + + // Node B + p2p_config.bootstrap_nodes = node_a.multiaddrs(); + let mut node_b = build_service_from_config(p2p_config.clone()).await; + + let (tx_test_end, mut rx_test_end) = mpsc::channel::(1); + + let mut request_sent = false; + + loop { + tokio::select! { + message_sent = rx_test_end.recv() => { + // we received a signal to end the test + assert!(message_sent.unwrap(), "Received incorrect or missing message"); + break; + } + node_a_event = node_a.next_event() => { + if let Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height: _ }) = node_a_event { + if node_a.peer_manager.get_peer_info(&peer_id).is_some() { + // 0. verifies that we've got at least a single peer address to request message from + if !request_sent { + request_sent = true; + + let (tx_orchestrator, rx_orchestrator) = oneshot::channel(); + assert!(node_a.send_request_msg(None, RequestMessage::Transactions(0..2), ResponseSender::Transactions(tx_orchestrator)).is_ok()); + let tx_test_end = tx_test_end.clone(); + + tokio::spawn(async move { + let response_message = rx_orchestrator.await; + + match response_message { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::TypeMismatch))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error: {:?}", err); + }, + Err(_) => { + let _ = tx_test_end.send(false).await; + panic!("Channel closed unexpectedly"); + }, + } + }); + } + } + } + + tracing::info!("Node A Event: {:?}", node_a_event); + }, + node_b_event = node_b.next_event() => { + // 2. Node B receives the RequestMessage from Node A initiated by the NetworkOrchestrator + if let Some(FuelP2PEvent::InboundRequestMessage{ request_id, request_message: _ }) = &node_b_event { + let sealed_headers: Vec<_> = arbitrary_headers_for_range(1..3); + let _ = node_b.send_response_msg(*request_id, ResponseMessage::SealedHeaders(Some(sealed_headers))); + } + + tracing::info!("Node B Event: {:?}", node_b_event); + } + }; + } + } + #[tokio::test] #[instrument] async fn req_res_outbound_timeout_works() { @@ -1596,13 +1700,14 @@ mod tests { Config::default_initialized("req_res_outbound_timeout_works"); // Node A - // setup request timeout to 0 in order for the Request to fail - p2p_config.set_request_timeout = Duration::from_secs(0); + // setup request timeout to 1ms in order for the Request to fail + p2p_config.set_request_timeout = Duration::from_millis(1); let mut node_a = build_service_from_config(p2p_config.clone()).await; // Node B p2p_config.bootstrap_nodes = node_a.multiaddrs(); + p2p_config.set_request_timeout = Duration::from_secs(20); let mut node_b = build_service_from_config(p2p_config.clone()).await; let (tx_test_end, mut rx_test_end) = tokio::sync::mpsc::channel(1); @@ -1627,7 +1732,7 @@ mod tests { // Request successfully sent let requested_block_height = RequestMessage::SealedHeaders(0..0); - assert!(node_a.send_request_msg(None, requested_block_height, ResponseChannelItem::SealedHeaders(tx_orchestrator)).is_ok()); + assert!(node_a.send_request_msg(None, requested_block_height, ResponseSender::SealedHeaders(tx_orchestrator)).is_ok()); // 2b. there should be ONE pending outbound requests in the table assert_eq!(node_a.outbound_requests_table.len(), 1); @@ -1636,8 +1741,21 @@ mod tests { tokio::spawn(async move { // 3. Simulating NetworkOrchestrator receiving a Timeout Error Message! - if (rx_orchestrator.await).is_err() { - let _ = tx_test_end.send(()).await; + match rx_orchestrator.await { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly")}, + Ok((_, Err(ResponseError::P2P(_)))) => { + // Got timeout as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Channel closed unexpectedly: {:?}", e)}, } }); } @@ -1646,7 +1764,8 @@ mod tests { tracing::info!("Node A Event: {:?}", node_a_event); }, - _ = rx_test_end.recv() => { + recv = rx_test_end.recv() => { + assert_eq!(recv, Some(true), "Test failed"); // we received a signal to end the test // 4. there should be ZERO pending outbound requests in the table // after the Outbound Request Failed with Timeout diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index b0afb931058..517156c3642 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -2,7 +2,10 @@ use fuel_core_types::{ blockchain::SealedBlockHeader, services::p2p::Transactions, }; -use libp2p::PeerId; +use libp2p::{ + request_response::OutboundFailure, + PeerId, +}; use serde::{ Deserialize, Serialize, @@ -23,25 +26,36 @@ pub enum RequestMessage { Transactions(Range), } -/// Holds oneshot channels for specific responses -#[derive(Debug)] -pub enum ResponseChannelItem { - SealedHeaders(oneshot::Sender<(PeerId, Option>)>), - Transactions(oneshot::Sender>>), -} - -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub enum ResponseMessage { SealedHeaders(Option>), Transactions(Option>), } +pub type OnResponse = oneshot::Sender<(PeerId, Result)>; + +#[derive(Debug)] +pub enum ResponseSender { + SealedHeaders(OnResponse>>), + Transactions(OnResponse>>), +} + #[derive(Debug, Error)] pub enum RequestError { #[error("Not currently connected to any peers")] NoPeersConnected, } +#[derive(Debug, Error)] +pub enum ResponseError { + /// This is the raw error from [`libp2p-request-response`] + #[error("P2P outbound error {0}")] + P2P(OutboundFailure), + /// The peer responded with an invalid response type + #[error("Peer response message was of incorrect type")] + TypeMismatch, +} + /// Errors than can occur when attempting to send a response #[derive(Debug, Eq, PartialEq, Error)] pub enum ResponseSendError { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index dd75ac8708f..c6d35ff8a33 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -18,9 +18,10 @@ use crate::{ P2pDb, }, request_response::messages::{ + OnResponse, RequestMessage, - ResponseChannelItem, ResponseMessage, + ResponseSender, }, }; use anyhow::anyhow; @@ -96,12 +97,12 @@ enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: oneshot::Sender<(PeerId, Option>)>, + channel: OnResponse>>, }, GetTransactions { block_height_range: Range, from_peer: PeerId, - channel: oneshot::Sender>>, + channel: OnResponse>>, }, // Responds back to the p2p network RespondWithGossipsubMessageReport((GossipsubMessageInfo, GossipsubMessageAcceptance)), @@ -162,7 +163,7 @@ pub trait TaskP2PService: Send { &mut self, peer_id: Option, request_msg: RequestMessage, - channel_item: ResponseChannelItem, + on_response: ResponseSender, ) -> anyhow::Result<()>; fn send_response_msg( @@ -216,9 +217,9 @@ impl TaskP2PService for FuelP2PService { &mut self, peer_id: Option, request_msg: RequestMessage, - channel_item: ResponseChannelItem, + on_response: ResponseSender, ) -> anyhow::Result<()> { - self.send_request_msg(peer_id, request_msg, channel_item)?; + self.send_request_msg(peer_id, request_msg, on_response)?; Ok(()) } @@ -530,25 +531,22 @@ where let peer_ids = self.p2p_service.get_peer_ids(); let _ = channel.send(peer_ids); } - Some(TaskRequest::GetSealedHeaders { block_height_range, channel: response}) => { + Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => { + let channel = ResponseSender::SealedHeaders(channel); let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); - let channel_item = ResponseChannelItem::SealedHeaders(response); // Note: this range has already been checked for // validity in `SharedState::get_sealed_block_headers`. - let block_height = BlockHeight::from(block_height_range.end.saturating_sub(1)); - let peer = self.p2p_service - .get_peer_id_with_height(&block_height); - let found_peers = self.p2p_service.send_request_msg(peer, request_msg, channel_item).is_ok(); - if !found_peers { - tracing::debug!("No peers found for block at height {:?}", block_height); + let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); + let peer = self.p2p_service.get_peer_id_with_height(&height); + if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { + tracing::warn!("No peers found for block at height {:?}", height); } } Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { + let channel = ResponseSender::Transactions(channel); let request_msg = RequestMessage::Transactions(block_height_range); - let channel_item = ResponseChannelItem::Transactions(channel); - self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel_item) - .expect("We always a peer here, so send has a target"); + self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { // report_message(&mut self.p2p_service, message, acceptance); @@ -717,10 +715,10 @@ impl SharedState { }) .await?; - receiver - .await - .map(|(peer_id, headers)| (peer_id.to_bytes(), headers)) - .map_err(|e| anyhow!("{}", e)) + let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + + let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + Ok((peer_id.to_bytes(), data)) } pub async fn get_transactions_from_peer( @@ -738,7 +736,15 @@ impl SharedState { }; self.request_sender.send(request).await?; - receiver.await.map_err(|e| anyhow!("{}", e)) + let (response_from_peer, response) = + receiver.await.map_err(|e| anyhow!("{e}"))?; + assert_eq!( + peer_id, + response_from_peer.to_bytes(), + "Bug: response from non-requested peer" + ); + + response.map_err(|e| anyhow!("Invalid response from peer {e:?}")) } pub fn broadcast_transaction( @@ -975,7 +981,7 @@ pub mod tests { &mut self, _peer_id: Option, _request_msg: RequestMessage, - _channel_item: ResponseChannelItem, + _on_response: ResponseSender, ) -> anyhow::Result<()> { todo!() }