From 42719bf4226101cfea4bf437bcb48699f298c4b9 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 21 Oct 2024 14:36:56 +0200 Subject: [PATCH 1/4] Add a way to fetch transactions without specifying a peer in P2P --- crates/services/p2p/src/service.rs | 45 ++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 3ff09da0beb..a2798c4bdc6 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -115,6 +115,10 @@ pub enum TaskRequest { channel: OnResponse>>, }, GetTransactions { + block_height_range: Range, + channel: OnResponse>>, + }, + GetTransactionsFromPeer { block_height_range: Range, from_peer: PeerId, channel: OnResponse>>, @@ -165,6 +169,9 @@ impl Debug for TaskRequest { TaskRequest::GetTransactions { .. } => { write!(f, "TaskRequest::GetTransactions") } + TaskRequest::GetTransactionsFromPeer { .. } => { + write!(f, "TaskRequest::GetTransactionsFromPeer") + } TaskRequest::TxPoolGetAllTxIds { .. } => { write!(f, "TaskRequest::TxPoolGetAllTxIds") } @@ -856,7 +863,16 @@ where tracing::warn!("No peers found for block at height {:?}", height); } } - Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { + Some(TaskRequest::GetTransactions {block_height_range, channel }) => { + let channel = ResponseSender::Transactions(channel); + let request_msg = RequestMessage::Transactions(block_height_range.clone()); + let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); + let peer = self.p2p_service.get_peer_id_with_height(&height); + if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { + tracing::warn!("No peers found for block at height {:?}", block_height_range.end); + } + } + Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => { let channel = ResponseSender::Transactions(channel); let request_msg = RequestMessage::Transactions(block_height_range); self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); @@ -1020,6 +1036,31 @@ impl SharedState { Ok((peer_id.to_bytes(), data)) } + pub async fn get_transactions( + &self, + range: Range, + ) -> anyhow::Result<(Vec, Option>)> { + let (sender, receiver) = oneshot::channel(); + + if range.is_empty() { + return Err(anyhow!( + "Cannot retrieve transactions for an empty range of block heights" + )); + } + + self.request_sender + .send(TaskRequest::GetTransactions { + block_height_range: range, + channel: sender, + }) + .await?; + + let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + + let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + Ok((peer_id.to_bytes(), data)) + } + pub async fn get_transactions_from_peer( &self, peer_id: FuelPeerId, @@ -1028,7 +1069,7 @@ impl SharedState { let (sender, receiver) = oneshot::channel(); let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); - let request = TaskRequest::GetTransactions { + let request = TaskRequest::GetTransactionsFromPeer { block_height_range: range, from_peer, channel: sender, From c80f3acea550dc6382922cedc9f43057841ccb95 Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Mon, 21 Oct 2024 14:41:03 +0200 Subject: [PATCH 2/4] Update CANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e4e728c2881..25b9955881a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Added - [2321](https://github.com/FuelLabs/fuel-core/pull/2321): New metrics for the txpool: "The size of transactions in the txpool" (`txpool_tx_size`), "The time spent by a transaction in the txpool in seconds" (`txpool_tx_time_in_txpool_seconds`), The number of transactions in the txpool (`txpool_number_of_transactions`), "The number of transactions pending verification before entering the txpool" (`txpool_number_of_transactions_pending_verification`), "The number of executable transactions in the txpool" (`txpool_number_of_executable_transactions`), "The time it took to select transactions for inclusion in a block in nanoseconds" (`txpool_select_transaction_time_nanoseconds`), The time it took to insert a transaction in the txpool in milliseconds (`txpool_insert_transaction_time_milliseconds`). +- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. ## [Version 0.40.0] From 130cb86d3c7115ae9831f04b3a50a4a3125aca6e Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Tue, 22 Oct 2024 09:55:52 +0200 Subject: [PATCH 3/4] Cahnge print ot err in case no peer. --- crates/services/p2p/src/service.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index a2798c4bdc6..a6ac3a903ee 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -869,7 +869,7 @@ where let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); let peer = self.p2p_service.get_peer_id_with_height(&height); if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { - tracing::warn!("No peers found for block at height {:?}", block_height_range.end); + return Err(anyhow!("No peers found for block at height {:?}", height)); } } Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => { From 024c33f8a407ee6f49b83a8f0529635f84abdb3a Mon Sep 17 00:00:00 2001 From: AurelienFT Date: Wed, 30 Oct 2024 10:42:11 +0100 Subject: [PATCH 4/4] Add returned errors in channel when peer selection failed. --- crates/services/p2p/src/p2p_service.rs | 144 +++++++++++++----- .../p2p/src/request_response/messages.rs | 11 +- crates/services/p2p/src/service.rs | 49 +++--- 3 files changed, 144 insertions(+), 60 deletions(-) diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 05dd2ec1b38..47d189ae85f 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -677,17 +677,31 @@ impl FuelP2PService { V2ResponseMessage::SealedHeaders(v) => { // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 // Change type of ResponseSender and remove the .ok() here - c.send((peer, Ok(v.ok()))).is_ok() + c.send(Ok((peer, Ok(v.ok())))).is_ok() } _ => { warn!( "Invalid response type received for request {:?}", request_id ); - c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() } }, ResponseSender::Transactions(c) => match response { + V2ResponseMessage::Transactions(v) => { + c.send(Ok((peer, Ok(v.ok())))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() + } + }, + ResponseSender::TransactionsFromPeer(c) => match response { V2ResponseMessage::Transactions(v) => { c.send((peer, Ok(v.ok()))).is_ok() } @@ -750,9 +764,12 @@ impl FuelP2PService { if let Some(channel) = self.outbound_requests_table.remove(&request_id) { match channel { ResponseSender::SealedHeaders(c) => { - let _ = c.send((peer, Err(ResponseError::P2P(error)))); + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); } ResponseSender::Transactions(c) => { + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); + } + ResponseSender::TransactionsFromPeer(c) => { let _ = c.send((peer, Err(ResponseError::P2P(error)))); } ResponseSender::TxPoolAllTransactionsIds(c) => { @@ -1700,9 +1717,25 @@ mod tests { let expected = arbitrary_headers_for_range(range.clone()); - if let Ok((_, Ok(sealed_headers))) = response_message { - let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b)); - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(sealed_headers)))) => { + let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b)); + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any headers"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1717,9 +1750,25 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { - let check = transactions.len() == 1 && transactions[0].0.len() == 5; - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(transactions)))) => { + let check = transactions.len() == 1 && transactions[0].0.len() == 5; + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any transactions"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1878,23 +1927,28 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - match response_message { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly"); - }, - Ok((_, Err(ResponseError::TypeMismatch))) => { - // Got Invalid Response Type as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(_) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly"); - }, + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::TypeMismatch))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } @@ -1964,21 +2018,29 @@ mod tests { tokio::spawn(async move { // 3. Simulating NetworkOrchestrator receiving a Timeout Error Message! - match rx_orchestrator.await { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly")}, - Ok((_, Err(ResponseError::P2P(_)))) => { - // Got timeout as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(e) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly: {:?}", e)}, + let response_message = rx_orchestrator.await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::P2P(_)))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 2a0e03ba2cd..f1c3b176f4b 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,6 +18,8 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; +use crate::service::TaskError; + pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; @@ -104,11 +106,16 @@ impl From for V1ResponseMessage { } pub type OnResponse = oneshot::Sender<(PeerId, Result)>; +// This type is more complex because it's used in tasks that need to select a peer to send the request and this +// can cause errors where the peer is not defined. +pub type OnResponseWithPeerSelection = + oneshot::Sender), TaskError>>; #[derive(Debug)] pub enum ResponseSender { - SealedHeaders(OnResponse>>), - Transactions(OnResponse>>), + SealedHeaders(OnResponseWithPeerSelection>>), + Transactions(OnResponseWithPeerSelection>>), + TransactionsFromPeer(OnResponse>>), TxPoolAllTransactionsIds(OnResponse>>), TxPoolFullTransactions(OnResponse>>>), } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 9ea4ded6289..30c5dd9310a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -21,6 +21,7 @@ use crate::{ }, request_response::messages::{ OnResponse, + OnResponseWithPeerSelection, RequestMessage, ResponseMessageErrorCode, ResponseSender, @@ -84,6 +85,7 @@ use std::{ ops::Range, sync::Arc, }; +use thiserror::Error; use tokio::{ sync::{ broadcast, @@ -104,6 +106,12 @@ const CHANNEL_SIZE: usize = 1024 * 10; pub type Service = ServiceRunner>; +#[derive(Debug, Error)] +pub enum TaskError { + #[error("No peer found to send request to")] + NoPeerFound, +} + pub enum TaskRequest { // Broadcast requests to p2p network BroadcastTransaction(Arc), @@ -113,11 +121,11 @@ pub enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: OnResponse>>, + channel: OnResponseWithPeerSelection>>, }, GetTransactions { block_height_range: Range, - channel: OnResponse>>, + channel: OnResponseWithPeerSelection>>, }, GetTransactionsFromPeer { block_height_range: Range, @@ -876,28 +884,29 @@ where } } Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => { - let channel = ResponseSender::SealedHeaders(channel); - let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); - // Note: this range has already been checked for // validity in `SharedState::get_sealed_block_headers`. let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); - let peer = self.p2p_service.get_peer_id_with_height(&height); - if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { - tracing::warn!("No peers found for block at height {:?}", height); - } + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; + let channel = ResponseSender::SealedHeaders(channel); + let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); } Some(TaskRequest::GetTransactions {block_height_range, channel }) => { + let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; let channel = ResponseSender::Transactions(channel); let request_msg = RequestMessage::Transactions(block_height_range.clone()); - let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); - let peer = self.p2p_service.get_peer_id_with_height(&height); - if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { - return Err(anyhow!("No peers found for block at height {:?}", height)); - } + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); } Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => { - let channel = ResponseSender::Transactions(channel); + let channel = ResponseSender::TransactionsFromPeer(channel); let request_msg = RequestMessage::Transactions(block_height_range); self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); } @@ -1054,7 +1063,10 @@ impl SharedState { }) .await?; - let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; Ok((peer_id.to_bytes(), data)) @@ -1079,7 +1091,10 @@ impl SharedState { }) .await?; - let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; Ok((peer_id.to_bytes(), data))