From ba304f3504fd2272c924eea89651c3f3d356b0dc Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 17:31:20 +0530 Subject: [PATCH 1/5] chore(gas_price_service_v0): remove unused trait impl (#2410) ## Linked Issues/PRs - none ## Description `RunnableService` didn't need to be implemented for `GasPriceServiceV0` even in the tests. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --- .../gas_price_service/src/v0/service.rs | 47 ++++--------------- .../gas_price_service/src/v0/tests.rs | 34 +++++++------- 2 files changed, 27 insertions(+), 54 deletions(-) diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index b8d95650cd7..ee062097dfd 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -165,48 +165,19 @@ mod tests { }, ports::MetadataStorage, v0::{ - algorithm::SharedV0Algorithm, metadata::V0AlgorithmConfig, service::GasPriceServiceV0, uninitialized_task::initialize_algorithm, }, }; use fuel_core_services::{ - RunnableService, - Service, - ServiceRunner, + RunnableTask, StateWatcher, }; use fuel_core_types::fuel_types::BlockHeight; use std::sync::Arc; use tokio::sync::mpsc; - #[async_trait::async_trait] - impl RunnableService for GasPriceServiceV0 - where - L2: L2BlockSource, - Metadata: MetadataStorage, - { - const NAME: &'static str = "GasPriceServiceV0"; - type SharedData = SharedV0Algorithm; - type Task = Self; - type TaskParams = (); - - fn shared_data(&self) -> Self::SharedData { - self.shared_algo.clone() - } - - async fn into_task( - mut self, - _state_watcher: &StateWatcher, - _params: Self::TaskParams, - ) -> anyhow::Result { - let algorithm = self.algorithm_updater.algorithm(); - self.shared_algo.update(algorithm).await; - Ok(self) - } - } - struct FakeL2BlockSource { l2_block: mpsc::Receiver, } @@ -255,10 +226,12 @@ mod tests { gas_used: 60, block_gas_capacity: 100, }; + let (l2_block_sender, l2_block_receiver) = mpsc::channel(1); let l2_block_source = FakeL2BlockSource { l2_block: l2_block_receiver, }; + let metadata_storage = FakeMetadata::empty(); let l2_block_height = 0; let config = V0AlgorithmConfig { @@ -269,25 +242,23 @@ mod tests { }; let (algo_updater, shared_algo) = initialize_algorithm(&config, l2_block_height, &metadata_storage).unwrap(); - - let service = GasPriceServiceV0::new( + let mut service = GasPriceServiceV0::new( l2_block_source, metadata_storage, shared_algo, algo_updater, ); let read_algo = service.next_block_algorithm(); - let service = ServiceRunner::new(service); - let prev = read_algo.next_gas_price(); + let mut watcher = StateWatcher::default(); + let initial_price = read_algo.next_gas_price(); // when - service.start_and_await().await.unwrap(); + service.run(&mut watcher).await.unwrap(); l2_block_sender.send(l2_block).await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + service.shutdown().await.unwrap(); // then let actual_price = read_algo.next_gas_price(); - assert_ne!(prev, actual_price); - service.stop_and_await().await.unwrap(); + assert_ne!(initial_price, actual_price); } } diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index a43f4bca461..c395b39aeba 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -37,8 +37,8 @@ use fuel_core_services::{ BoxStream, IntoBoxStream, }, - Service, - ServiceRunner, + RunnableTask, + StateWatcher, }; use fuel_core_storage::{ transactional::AtomicView, @@ -59,7 +59,6 @@ use fuel_core_types::{ use std::{ ops::Deref, sync::Arc, - time::Duration, }; use tokio::sync::mpsc::Receiver; @@ -159,25 +158,25 @@ async fn next_gas_price__affected_by_new_l2_block() { let height = 0; let (algo_updater, shared_algo) = initialize_algorithm(&config, height, &metadata_storage).unwrap(); - let service = GasPriceServiceV0::new( + let mut service = GasPriceServiceV0::new( l2_block_source, metadata_storage, shared_algo, algo_updater, ); - let service = ServiceRunner::new(service); - let shared = service.shared.clone(); - let initial = shared.next_gas_price(); + + let read_algo = service.next_block_algorithm(); + let initial = read_algo.next_gas_price(); + let mut watcher = StateWatcher::default(); // when - service.start_and_await().await.unwrap(); + service.run(&mut watcher).await.unwrap(); l2_block_sender.send(l2_block).await.unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + service.shutdown().await.unwrap(); // then - let new = shared.next_gas_price(); + let new = read_algo.next_gas_price(); assert_ne!(initial, new); - service.stop_and_await().await.unwrap(); } #[tokio::test] @@ -202,7 +201,7 @@ async fn next__new_l2_block_saves_old_metadata() { let (algo_updater, shared_algo) = initialize_algorithm(&config, height, &metadata_storage).unwrap(); - let service = GasPriceServiceV0::new( + let mut service = GasPriceServiceV0::new( l2_block_source, metadata_storage, shared_algo, @@ -210,14 +209,17 @@ async fn next__new_l2_block_saves_old_metadata() { ); // when - let service = ServiceRunner::new(service); + let read_algo = service.next_block_algorithm(); + let mut watcher = StateWatcher::default(); + let start = read_algo.next_gas_price(); - service.start_and_await().await.unwrap(); + service.run(&mut watcher).await.unwrap(); l2_block_sender.send(l2_block).await.unwrap(); - tokio::time::sleep(Duration::from_millis(10)).await; + service.shutdown().await.unwrap(); // then - assert!(metadata_inner.lock().unwrap().is_some()); + let new = read_algo.next_gas_price(); + assert_ne!(start, new); } #[derive(Clone)] From c72afba9908fa1d3ca0eb643e5af74db534cacef Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Wed, 30 Oct 2024 14:33:42 +0100 Subject: [PATCH 2/5] Update tai64 to fix the wrong time offset (#2409) ## Linked Issues/PRs https://github.com/FuelLabs/fuel-core/issues/2338 https://github.com/RustCrypto/formats/pull/1583 https://github.com/RustCrypto/formats/pull/1590 ## Description I have fixed the wrong offset and added an automation for future update in crate `tai64`(https://github.com/RustCrypto/formats/pull/1583) and they merged it and made a new release (https://github.com/RustCrypto/formats/pull/1590). This PR updates to the new version. ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --- Cargo.lock | 4 ++-- crates/client/Cargo.toml | 2 +- crates/types/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index def920c3350..67a97f72c3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8757,9 +8757,9 @@ dependencies = [ [[package]] name = "tai64" -version = "4.0.0" +version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed7401421025f4132e6c1f7af5e7f8287383969f36e6628016cd509b8d3da9dc" +checksum = "014639506e4f425c78e823eabf56e71c093f940ae55b43e58f682e7bc2f5887a" dependencies = [ "serde", ] diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index ad8f7d48078..99e3a1e74d0 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -27,7 +27,7 @@ itertools = { workspace = true } reqwest = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { version = "1.0", features = ["raw_value"] } -tai64 = { version = "4.0", features = ["serde"] } +tai64 = { version = "4.1", features = ["serde"] } thiserror = "1.0" tracing = "0.1" diff --git a/crates/types/Cargo.toml b/crates/types/Cargo.toml index 58b9268e6c1..deb1b2f8ab6 100644 --- a/crates/types/Cargo.toml +++ b/crates/types/Cargo.toml @@ -27,7 +27,7 @@ fuel-vm-private = { workspace = true, default-features = false, features = [ rand = { workspace = true, optional = true } secrecy = "0.8" serde = { workspace = true, features = ["derive"], optional = true } -tai64 = { version = "4.0", features = ["serde"] } +tai64 = { version = "4.1", features = ["serde"] } zeroize = "1.5" [features] From ec41f562343bc65ea5e3b8cda3fa49a60985b3d3 Mon Sep 17 00:00:00 2001 From: Aaryamann Challani <43716372+rymnc@users.noreply.github.com> Date: Wed, 30 Oct 2024 23:08:42 +0530 Subject: [PATCH 3/5] fix(block_producer): immediately return error if lock cannot be acquired during production (#2413) ## Linked Issues/PRs fixes #2412 ## Description Similar to how the lock is used in the sync process, we try to lock and immediately return an error if a block was already being produced. ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else? --- CHANGELOG.md | 1 + crates/services/producer/src/block_producer.rs | 10 +++++++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e41f666a87..a6ed7c7bb58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Fixed - [2366](https://github.com/FuelLabs/fuel-core/pull/2366): The `importer_gas_price_for_block` metric is properly collected. - [2369](https://github.com/FuelLabs/fuel-core/pull/2369): The `transaction_insertion_time_in_thread_pool_milliseconds` metric is properly collected. +- [2413](https://github.com/FuelLabs/fuel-core/issues/2413): block production immediately errors if unable to lock the mutex. ### Changed diff --git a/crates/services/producer/src/block_producer.rs b/crates/services/producer/src/block_producer.rs index 0f63d2637d9..ef8747b9a33 100644 --- a/crates/services/producer/src/block_producer.rs +++ b/crates/services/producer/src/block_producer.rs @@ -115,7 +115,9 @@ where where Executor: ports::BlockProducer> + 'static, { - let _production_guard = self.lock.lock().await; + let _production_guard = self.lock.try_lock().map_err(|_| { + anyhow!("Failed to acquire the production lock, block production is already in progress") + })?; let mut transactions_source = predefined_block.transactions().to_vec(); @@ -185,8 +187,10 @@ where // 2. parallel throughput // - Execute block with production mode to correctly malleate txs outputs and block headers - // prevent simultaneous block production calls, the guard will drop at the end of this fn. - let _production_guard = self.lock.lock().await; + // prevent simultaneous block production calls + let _production_guard = self.lock.try_lock().map_err(|_| { + anyhow!("Failed to acquire the production lock, block production is already in progress") + })?; let gas_price = self.calculate_gas_price().await?; From 32b29a397621ea66d0f3e560a05a3f2a236f53da Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Thu, 31 Oct 2024 09:47:12 +0100 Subject: [PATCH 4/5] Add a way to fetch transactions in P2P without specifying a peer (#2376) ## Linked Issues/PRs This is a requirement for https://github.com/FuelLabs/fuel-core/pull/2361 ## Description This PR adds a way to fetch transactions with p2p but without giving a specific peer and let p2p choose the one they prefer. This will be used in #2361 ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here --------- Co-authored-by: Green Baneling --- CHANGELOG.md | 1 + crates/services/p2p/src/p2p_service.rs | 144 +++++++++++++----- .../p2p/src/request_response/messages.rs | 11 +- crates/services/p2p/src/service.rs | 78 ++++++++-- 4 files changed, 180 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a6ed7c7bb58..3060675e042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2347](https://github.com/FuelLabs/fuel-core/pull/2364): Add activity concept in order to protect against infinitely increasing DA gas price scenarios - [2362](https://github.com/FuelLabs/fuel-core/pull/2362): Added a new request_response protocol version `/fuel/req_res/0.0.2`. In comparison with `/fuel/req/0.0.1`, which returns an empty response when a request cannot be fulfilled, this version returns more meaningful error codes. Nodes still support the version `0.0.1` of the protocol to guarantee backward compatibility with fuel-core nodes. Empty responses received from nodes using the old protocol `/fuel/req/0.0.1` are automatically converted into an error `ProtocolV1EmptyResponse` with error code 0, which is also the only error code implemented. More specific error codes will be added in the future. - [2386](https://github.com/FuelLabs/fuel-core/pull/2386): Add a flag to define the maximum number of file descriptors that RocksDB can use. By default it's half of the OS limit. +- [2376](https://github.com/FuelLabs/fuel-core/pull/2376): Add a way to fetch transactions in P2P without specifying a peer. ## [Version 0.40.0] diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index 05dd2ec1b38..47d189ae85f 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -677,17 +677,31 @@ impl FuelP2PService { V2ResponseMessage::SealedHeaders(v) => { // TODO: https://github.com/FuelLabs/fuel-core/issues/1311 // Change type of ResponseSender and remove the .ok() here - c.send((peer, Ok(v.ok()))).is_ok() + c.send(Ok((peer, Ok(v.ok())))).is_ok() } _ => { warn!( "Invalid response type received for request {:?}", request_id ); - c.send((peer, Err(ResponseError::TypeMismatch))).is_ok() + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() } }, ResponseSender::Transactions(c) => match response { + V2ResponseMessage::Transactions(v) => { + c.send(Ok((peer, Ok(v.ok())))).is_ok() + } + _ => { + warn!( + "Invalid response type received for request {:?}", + request_id + ); + c.send(Ok((peer, Err(ResponseError::TypeMismatch)))) + .is_ok() + } + }, + ResponseSender::TransactionsFromPeer(c) => match response { V2ResponseMessage::Transactions(v) => { c.send((peer, Ok(v.ok()))).is_ok() } @@ -750,9 +764,12 @@ impl FuelP2PService { if let Some(channel) = self.outbound_requests_table.remove(&request_id) { match channel { ResponseSender::SealedHeaders(c) => { - let _ = c.send((peer, Err(ResponseError::P2P(error)))); + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); } ResponseSender::Transactions(c) => { + let _ = c.send(Ok((peer, Err(ResponseError::P2P(error))))); + } + ResponseSender::TransactionsFromPeer(c) => { let _ = c.send((peer, Err(ResponseError::P2P(error)))); } ResponseSender::TxPoolAllTransactionsIds(c) => { @@ -1700,9 +1717,25 @@ mod tests { let expected = arbitrary_headers_for_range(range.clone()); - if let Ok((_, Ok(sealed_headers))) = response_message { - let check = expected.iter().zip(sealed_headers.unwrap().iter()).all(|(a, b)| eq_except_metadata(a, b)); - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(sealed_headers)))) => { + let check = expected.iter().zip(sealed_headers.iter()).all(|(a, b)| eq_except_metadata(a, b)); + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any headers"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1717,9 +1750,25 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - if let Ok((_, Ok(Some(transactions)))) = response_message { - let check = transactions.len() == 1 && transactions[0].0.len() == 5; - let _ = tx_test_end.send(check).await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(Some(transactions)))) => { + let check = transactions.len() == 1 && transactions[0].0.len() == 5; + let _ = tx_test_end.send(check).await; + }, + Ok((_, Ok(None))) => { + tracing::error!("Node A did not return any transactions"); + let _ = tx_test_end.send(false).await; + }, + Ok((_, Err(e))) => { + tracing::error!("Error in P2P communication: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + Err(e) => { + tracing::error!("Error in P2P before sending message: {:?}", e); + let _ = tx_test_end.send(false).await; + }, + } } else { tracing::error!("Orchestrator failed to receive a message: {:?}", response_message); let _ = tx_test_end.send(false).await; @@ -1878,23 +1927,28 @@ mod tests { tokio::spawn(async move { let response_message = rx_orchestrator.await; - match response_message { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly"); - }, - Ok((_, Err(ResponseError::TypeMismatch))) => { - // Got Invalid Response Type as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(_) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly"); - }, + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::TypeMismatch))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } @@ -1964,21 +2018,29 @@ mod tests { tokio::spawn(async move { // 3. Simulating NetworkOrchestrator receiving a Timeout Error Message! - match rx_orchestrator.await { - Ok((_, Ok(_))) => { - let _ = tx_test_end.send(false).await; - panic!("Request succeeded unexpectedly")}, - Ok((_, Err(ResponseError::P2P(_)))) => { - // Got timeout as expected, so end test - let _ = tx_test_end.send(true).await; - }, - Ok((_, Err(err))) => { - let _ = tx_test_end.send(false).await; - panic!("Unexpected error: {:?}", err); - }, - Err(e) => { - let _ = tx_test_end.send(false).await; - panic!("Channel closed unexpectedly: {:?}", e)}, + let response_message = rx_orchestrator.await; + if let Ok(response) = response_message { + match response { + Ok((_, Ok(_))) => { + let _ = tx_test_end.send(false).await; + panic!("Request succeeded unexpectedly"); + }, + Ok((_, Err(ResponseError::P2P(_)))) => { + // Got Invalid Response Type as expected, so end test + let _ = tx_test_end.send(true).await; + }, + Ok((_, Err(err))) => { + let _ = tx_test_end.send(false).await; + panic!("Unexpected error in P2P communication: {:?}", err); + }, + Err(e) => { + let _ = tx_test_end.send(false).await; + panic!("Error in P2P before sending message: {:?}", e); + }, + } + } else { + let _ = tx_test_end.send(false).await; + panic!("Orchestrator failed to receive a message: {:?}", response_message); } }); } diff --git a/crates/services/p2p/src/request_response/messages.rs b/crates/services/p2p/src/request_response/messages.rs index 2a0e03ba2cd..f1c3b176f4b 100644 --- a/crates/services/p2p/src/request_response/messages.rs +++ b/crates/services/p2p/src/request_response/messages.rs @@ -18,6 +18,8 @@ use std::ops::Range; use thiserror::Error; use tokio::sync::oneshot; +use crate::service::TaskError; + pub(crate) const V1_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.1"; pub(crate) const V2_REQUEST_RESPONSE_PROTOCOL_ID: &str = "/fuel/req_res/0.0.2"; @@ -104,11 +106,16 @@ impl From for V1ResponseMessage { } pub type OnResponse = oneshot::Sender<(PeerId, Result)>; +// This type is more complex because it's used in tasks that need to select a peer to send the request and this +// can cause errors where the peer is not defined. +pub type OnResponseWithPeerSelection = + oneshot::Sender), TaskError>>; #[derive(Debug)] pub enum ResponseSender { - SealedHeaders(OnResponse>>), - Transactions(OnResponse>>), + SealedHeaders(OnResponseWithPeerSelection>>), + Transactions(OnResponseWithPeerSelection>>), + TransactionsFromPeer(OnResponse>>), TxPoolAllTransactionsIds(OnResponse>>), TxPoolFullTransactions(OnResponse>>>), } diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index c85e1e3a6c8..30c5dd9310a 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -21,6 +21,7 @@ use crate::{ }, request_response::messages::{ OnResponse, + OnResponseWithPeerSelection, RequestMessage, ResponseMessageErrorCode, ResponseSender, @@ -84,6 +85,7 @@ use std::{ ops::Range, sync::Arc, }; +use thiserror::Error; use tokio::{ sync::{ broadcast, @@ -104,6 +106,12 @@ const CHANNEL_SIZE: usize = 1024 * 10; pub type Service = ServiceRunner>; +#[derive(Debug, Error)] +pub enum TaskError { + #[error("No peer found to send request to")] + NoPeerFound, +} + pub enum TaskRequest { // Broadcast requests to p2p network BroadcastTransaction(Arc), @@ -113,9 +121,13 @@ pub enum TaskRequest { }, GetSealedHeaders { block_height_range: Range, - channel: OnResponse>>, + channel: OnResponseWithPeerSelection>>, }, GetTransactions { + block_height_range: Range, + channel: OnResponseWithPeerSelection>>, + }, + GetTransactionsFromPeer { block_height_range: Range, from_peer: PeerId, channel: OnResponse>>, @@ -167,6 +179,9 @@ impl Debug for TaskRequest { TaskRequest::GetTransactions { .. } => { write!(f, "TaskRequest::GetTransactions") } + TaskRequest::GetTransactionsFromPeer { .. } => { + write!(f, "TaskRequest::GetTransactionsFromPeer") + } TaskRequest::TxPoolGetAllTxIds { .. } => { write!(f, "TaskRequest::TxPoolGetAllTxIds") } @@ -869,19 +884,29 @@ where } } Some(TaskRequest::GetSealedHeaders { block_height_range, channel}) => { - let channel = ResponseSender::SealedHeaders(channel); - let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); - // Note: this range has already been checked for // validity in `SharedState::get_sealed_block_headers`. let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); - let peer = self.p2p_service.get_peer_id_with_height(&height); - if self.p2p_service.send_request_msg(peer, request_msg, channel).is_err() { - tracing::warn!("No peers found for block at height {:?}", height); - } + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; + let channel = ResponseSender::SealedHeaders(channel); + let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); } - Some(TaskRequest::GetTransactions { block_height_range, from_peer, channel }) => { + Some(TaskRequest::GetTransactions {block_height_range, channel }) => { + let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); + let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { + let _ = channel.send(Err(TaskError::NoPeerFound)); + return Ok(should_continue); + }; let channel = ResponseSender::Transactions(channel); + let request_msg = RequestMessage::Transactions(block_height_range.clone()); + self.p2p_service.send_request_msg(Some(peer), request_msg, channel).expect("We always have a peer here, so send has a target"); + } + Some(TaskRequest::GetTransactionsFromPeer { block_height_range, from_peer, channel }) => { + let channel = ResponseSender::TransactionsFromPeer(channel); let request_msg = RequestMessage::Transactions(block_height_range); self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always a peer here, so send has a target"); } @@ -1038,7 +1063,38 @@ impl SharedState { }) .await?; - let (peer_id, response) = receiver.await.map_err(|e| anyhow!("{e}"))?; + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; + + let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; + Ok((peer_id.to_bytes(), data)) + } + + pub async fn get_transactions( + &self, + range: Range, + ) -> anyhow::Result<(Vec, Option>)> { + let (sender, receiver) = oneshot::channel(); + + if range.is_empty() { + return Err(anyhow!( + "Cannot retrieve transactions for an empty range of block heights" + )); + } + + self.request_sender + .send(TaskRequest::GetTransactions { + block_height_range: range, + channel: sender, + }) + .await?; + + let (peer_id, response) = receiver + .await + .map_err(|e| anyhow!("{e}"))? + .map_err(|e| anyhow!("{e}"))?; let data = response.map_err(|e| anyhow!("Invalid response from peer {e:?}"))?; Ok((peer_id.to_bytes(), data)) @@ -1052,7 +1108,7 @@ impl SharedState { let (sender, receiver) = oneshot::channel(); let from_peer = PeerId::from_bytes(peer_id.as_ref()).expect("Valid PeerId"); - let request = TaskRequest::GetTransactions { + let request = TaskRequest::GetTransactionsFromPeer { block_height_range: range, from_peer, channel: sender, From 612d8674b2081294f3ab0bc3a5f793bc78020490 Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Thu, 31 Oct 2024 10:52:16 +0100 Subject: [PATCH 5/5] Add a new code owner for tx pool (#2417) --- .github/CODEOWNERS | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 93428eaf29d..0a6593d25bd 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -2,4 +2,7 @@ * @xgreenx @Dentosal @MitchTurner # Code owners for the gas price algorithm -crates/fuel-gas-price-algorithm @MitchTurner @rafal-ch \ No newline at end of file +crates/fuel-gas-price-algorithm @MitchTurner @rafal-ch + +# Code owners for the transaction pool +crates/services/txpool_v2 @AurelienFT \ No newline at end of file