From 5e10eaa3cd3c6d169f4748d0f5e06ee3cfc21367 Mon Sep 17 00:00:00 2001 From: Mitchell Turner Date: Wed, 20 Nov 2024 07:30:28 +0700 Subject: [PATCH] Replace task `run()` return result with custom enum (#2429) --- CHANGELOG.md | 1 + .../fuel-core/src/graphql_api/api_service.rs | 15 ++-- .../src/graphql_api/worker_service.rs | 18 ++--- .../src/graphql_api/worker_service/tests.rs | 2 +- crates/fuel-core/src/service.rs | 6 +- .../adapters/consensus_parameters_provider.rs | 15 ++-- .../consensus_module/poa/src/service.rs | 52 ++++++++------ .../services/consensus_module/poa/src/sync.rs | 13 ++-- .../gas_price_service/src/v0/service.rs | 34 ++++++--- .../gas_price_service/src/v0/tests.rs | 4 +- .../src/v1/da_source_service/service.rs | 20 +++--- .../gas_price_service/src/v1/service.rs | 69 +++++++++++++------ crates/services/p2p/src/service.rs | 42 +++++------ crates/services/relayer/src/service.rs | 11 ++- crates/services/src/lib.rs | 1 + crates/services/src/service.rs | 64 ++++++++++++----- crates/services/sync/src/service.rs | 14 ++-- crates/services/txpool_v2/src/service.rs | 31 +++++---- 18 files changed, 250 insertions(+), 162 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d9072f0b3b0..fdd6424972a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message. +- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks - [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently. #### Breaking diff --git a/crates/fuel-core/src/graphql_api/api_service.rs b/crates/fuel-core/src/graphql_api/api_service.rs index 28a714f8b0e..e9a1411085a 100644 --- a/crates/fuel-core/src/graphql_api/api_service.rs +++ b/crates/fuel-core/src/graphql_api/api_service.rs @@ -64,6 +64,7 @@ use fuel_core_services::{ RunnableService, RunnableTask, StateWatcher, + TaskNextAction, }; use fuel_core_storage::transactional::AtomicView; use fuel_core_types::fuel_types::BlockHeight; @@ -196,11 +197,15 @@ impl RunnableService for GraphqlService { #[async_trait::async_trait] impl RunnableTask for Task { - async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result { - self.server.as_mut().await?; - // The `axum::Server` has its internal loop. If `await` is finished, we get an internal - // error or stop signal. - Ok(false /* should_continue */) + async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction { + match self.server.as_mut().await { + Ok(()) => { + // The `axum::Server` has its internal loop. If `await` is finished, we get an internal + // error or stop signal. + TaskNextAction::Stop + } + Err(err) => TaskNextAction::ErrorContinue(err.into()), + } } async fn shutdown(self) -> anyhow::Result<()> { diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 959733d4919..8d5b0bc923f 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -36,6 +36,7 @@ use fuel_core_services::{ RunnableTask, ServiceRunner, StateWatcher, + TaskNextAction, }; use fuel_core_storage::{ Error as StorageError, @@ -551,13 +552,12 @@ where TxPool: ports::worker::TxPool, D: ports::worker::OffChainDatabase, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let should_continue; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { - should_continue = false; + TaskNextAction::Stop } result = self.block_importer.next() => { @@ -567,17 +567,19 @@ where // In the case of an error, shut down the service to avoid a huge // de-synchronization between on-chain and off-chain databases. if let Err(e) = result { - tracing::error!("Error processing block: {:?}", e); - should_continue = self.continue_on_error; + if self.continue_on_error { + TaskNextAction::ErrorContinue(e) + } else { + TaskNextAction::Stop + } } else { - should_continue = true + TaskNextAction::Continue } } else { - should_continue = false + TaskNextAction::Stop } } } - Ok(should_continue) } async fn shutdown(mut self) -> anyhow::Result<()> { diff --git a/crates/fuel-core/src/graphql_api/worker_service/tests.rs b/crates/fuel-core/src/graphql_api/worker_service/tests.rs index 8b9ad758975..401d25db455 100644 --- a/crates/fuel-core/src/graphql_api/worker_service/tests.rs +++ b/crates/fuel-core/src/graphql_api/worker_service/tests.rs @@ -46,7 +46,7 @@ async fn run__relayed_transaction_events_are_added_to_storage() { // when let mut task = worker_task_with_block_importer_and_db(block_importer, database.clone()); - task.run(&mut state_watcher).await.unwrap(); + task.run(&mut state_watcher).await; tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; // then diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 6b42dd3960c..1f751f1de5e 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -27,6 +27,7 @@ use fuel_core_services::{ ServiceRunner, State, StateWatcher, + TaskNextAction, }; use fuel_core_storage::{ not_found, @@ -428,7 +429,7 @@ impl RunnableService for Task { #[async_trait::async_trait] impl RunnableTask for Task { #[tracing::instrument(skip_all)] - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { let mut stop_signals = vec![]; for service in self.services.iter() { stop_signals.push(service.await_stop()) @@ -443,8 +444,7 @@ impl RunnableTask for Task { // We received the stop signal from any of one source, so stop this service and // all sub-services. - let should_continue = false; - Ok(should_continue) + TaskNextAction::Stop } async fn shutdown(self) -> anyhow::Result<()> { diff --git a/crates/fuel-core/src/service/adapters/consensus_parameters_provider.rs b/crates/fuel-core/src/service/adapters/consensus_parameters_provider.rs index a0c54aac7be..5da5b88e10b 100644 --- a/crates/fuel-core/src/service/adapters/consensus_parameters_provider.rs +++ b/crates/fuel-core/src/service/adapters/consensus_parameters_provider.rs @@ -10,6 +10,7 @@ use fuel_core_services::{ ServiceRunner, SharedMutex, StateWatcher, + TaskNextAction, }; use fuel_core_storage::{ not_found, @@ -111,13 +112,12 @@ impl SharedState { #[async_trait::async_trait] impl RunnableTask for Task { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let should_continue; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { - should_continue = false; + TaskNextAction::Stop } Some(event) = self.blocks_events.next() => { @@ -135,16 +135,13 @@ impl RunnableTask for Task { } Err(err) => { tracing::error!("Failed to cache consensus parameters: {:?}", err); - should_continue = false; - return Ok(should_continue) + return TaskNextAction::Stop } } } - should_continue = true; + TaskNextAction::Continue } } - - Ok(should_continue) } async fn shutdown(self) -> anyhow::Result<()> { @@ -358,7 +355,7 @@ mod tests { // When let result_with_new_version = result_with_new_version(new_version); let _ = block_sender.send(result_with_new_version); - task.run(&mut StateWatcher::started()).await.unwrap(); + task.run(&mut StateWatcher::started()).await; // Then assert_eq!( diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index ffb88f8db2a..795f4ced784 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -42,6 +42,7 @@ use fuel_core_services::{ Service as OtherService, ServiceRunner, StateWatcher, + TaskNextAction, }; use fuel_core_storage::transactional::Changes; use fuel_core_types::{ @@ -518,16 +519,14 @@ where PB: PredefinedBlocks, C: GetTime, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let should_continue; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { let mut sync_state = self.sync_task_handle.shared.clone(); // make sure we're synced first if *sync_state.borrow_and_update() == SyncState::NotSynced { tokio::select! { biased; result = watcher.while_started() => { - should_continue = result?.started(); - return Ok(should_continue); + return result.map(|state| state.started()).into() } _ = sync_state.changed() => {} } @@ -538,26 +537,37 @@ where } let next_height = self.next_height(); - let maybe_block = self.predefined_blocks.get_block(&next_height)?; + let maybe_block = match self.predefined_blocks.get_block(&next_height) { + Ok(option) => option, + Err(err) => return TaskNextAction::ErrorContinue(err), + }; if let Some(block) = maybe_block { - self.produce_predefined_block(&block).await?; - should_continue = true; - return Ok(should_continue) + let res = self.produce_predefined_block(&block).await; + return match res { + Ok(()) => TaskNextAction::Continue, + Err(err) => TaskNextAction::ErrorContinue(err), + } } let next_block_production: BoxFuture<()> = match self.trigger { Trigger::Never | Trigger::Instant => Box::pin(core::future::pending()), - Trigger::Interval { block_time } => Box::pin(sleep_until( - self.last_block_created + Trigger::Interval { block_time } => { + let next_block_time = match self + .last_block_created .checked_add(block_time) - .ok_or(anyhow!("Time exceeds system limits"))?, - )), + .ok_or(anyhow!("Time exceeds system limits")) + { + Ok(time) => time, + Err(err) => return TaskNextAction::ErrorContinue(err), + }; + Box::pin(sleep_until(next_block_time)) + } }; tokio::select! { biased; _ = watcher.while_started() => { - should_continue = false; + TaskNextAction::Stop } request = self.request_receiver.recv() => { if let Some(request) = request { @@ -567,29 +577,27 @@ where let _ = response.send(result); } } - should_continue = true; + TaskNextAction::Continue } else { tracing::error!("The PoA task should be the holder of the `Sender`"); - should_continue = false; + TaskNextAction::Stop } } _ = next_block_production => { match self.on_timer().await.context("While processing timer event") { - Ok(()) => should_continue = true, + Ok(()) => TaskNextAction::Continue, Err(err) => { // Wait some time in case of error to avoid spamming retry block production tokio::time::sleep(Duration::from_secs(1)).await; - return Err(err); + TaskNextAction::ErrorContinue(err) } - }; + } } _ = self.new_txs_watcher.changed() => { - self.on_txpool_event().await.context("While processing txpool event")?; - should_continue = true; + let res = self.on_txpool_event().await.context("While processing txpool event"); + TaskNextAction::always_continue(res) } } - - Ok(should_continue) } async fn shutdown(self) -> anyhow::Result<()> { diff --git a/crates/services/consensus_module/poa/src/sync.rs b/crates/services/consensus_module/poa/src/sync.rs index 4331ba3040f..febad5b7c0d 100644 --- a/crates/services/consensus_module/poa/src/sync.rs +++ b/crates/services/consensus_module/poa/src/sync.rs @@ -11,6 +11,7 @@ use fuel_core_services::{ RunnableService, RunnableTask, StateWatcher, + TaskNextAction, }; use fuel_core_types::{ blockchain::header::BlockHeader, @@ -137,10 +138,7 @@ impl RunnableService for SyncTask { #[async_trait::async_trait] impl RunnableTask for SyncTask { - #[tracing::instrument(level = "debug", skip_all, err, ret)] - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let mut should_continue = true; - + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { let tick: BoxFuture = if let Some(timer) = &mut self.timer { Box::pin(timer.tick()) } else { @@ -150,7 +148,7 @@ impl RunnableTask for SyncTask { tokio::select! { biased; _ = watcher.while_started() => { - should_continue = false; + TaskNextAction::Stop } Some(latest_peer_count) = self.peer_connections_stream.next() => { let sufficient_peers = latest_peer_count >= self.min_connected_reserved_peers; @@ -172,6 +170,7 @@ impl RunnableTask for SyncTask { } _ => {}, } + TaskNextAction::Continue } Some(block_info) = self.block_stream.next() => { let new_block_height = block_info.block_header.height(); @@ -205,6 +204,7 @@ impl RunnableTask for SyncTask { } _ => {} } + TaskNextAction::Continue } _ = tick => { if let InnerSyncState::SufficientPeers(block_header) = &self.inner_state { @@ -215,10 +215,9 @@ impl RunnableTask for SyncTask { }; self.update_sync_state(SyncState::Synced(Arc::new(block_header))); } + TaskNextAction::Continue } } - - Ok(should_continue) } async fn shutdown(self) -> anyhow::Result<()> { diff --git a/crates/services/gas_price_service/src/v0/service.rs b/crates/services/gas_price_service/src/v0/service.rs index 7ef806fdd23..7591c8a377b 100644 --- a/crates/services/gas_price_service/src/v0/service.rs +++ b/crates/services/gas_price_service/src/v0/service.rs @@ -12,6 +12,7 @@ use async_trait::async_trait; use fuel_core_services::{ RunnableTask, StateWatcher, + TaskNextAction, }; use fuel_gas_price_algorithm::v0::{ AlgorithmUpdaterV0, @@ -116,30 +117,41 @@ where } } +impl GasPriceServiceV0 +where + L2: L2BlockSource, + Metadata: MetadataStorage, +{ + async fn process_l2_block_res( + &mut self, + l2_block_res: crate::common::utils::Result, + ) -> anyhow::Result<()> { + tracing::info!("Received L2 block result: {:?}", l2_block_res); + let block = l2_block_res?; + + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + Ok(()) + } +} #[async_trait] impl RunnableTask for GasPriceServiceV0 where L2: L2BlockSource, Metadata: MetadataStorage, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let should_continue; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { tracing::debug!("Stopping gas price service"); - should_continue = false; + TaskNextAction::Stop } l2_block_res = self.l2_block_source.get_l2_block() => { - tracing::info!("Received L2 block result: {:?}", l2_block_res); - let block = l2_block_res?; - - tracing::debug!("Updating gas price algorithm"); - self.apply_block_info_to_gas_algorithm(block).await?; - should_continue = true; + let res = self.process_l2_block_res(l2_block_res).await; + TaskNextAction::always_continue(res) } } - Ok(should_continue) } async fn shutdown(mut self) -> anyhow::Result<()> { @@ -256,7 +268,7 @@ mod tests { let initial_price = read_algo.next_gas_price(); // when - service.run(&mut watcher).await.unwrap(); + service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); service.shutdown().await.unwrap(); diff --git a/crates/services/gas_price_service/src/v0/tests.rs b/crates/services/gas_price_service/src/v0/tests.rs index 7526975e467..8a224a482c6 100644 --- a/crates/services/gas_price_service/src/v0/tests.rs +++ b/crates/services/gas_price_service/src/v0/tests.rs @@ -172,7 +172,7 @@ async fn next_gas_price__affected_by_new_l2_block() { let mut watcher = StateWatcher::default(); // when - service.run(&mut watcher).await.unwrap(); + service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); service.shutdown().await.unwrap(); @@ -217,7 +217,7 @@ async fn next__new_l2_block_saves_old_metadata() { let mut watcher = StateWatcher::default(); let start = read_algo.next_gas_price(); - service.run(&mut watcher).await.unwrap(); + service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); service.shutdown().await.unwrap(); diff --git a/crates/services/gas_price_service/src/v1/da_source_service/service.rs b/crates/services/gas_price_service/src/v1/da_source_service/service.rs index 74f1fcafb42..328f73e6a20 100644 --- a/crates/services/gas_price_service/src/v1/da_source_service/service.rs +++ b/crates/services/gas_price_service/src/v1/da_source_service/service.rs @@ -3,6 +3,7 @@ use fuel_core_services::{ RunnableTask, ServiceRunner, StateWatcher, + TaskNextAction, }; use std::time::Duration; use tokio::{ @@ -15,7 +16,6 @@ use tokio::{ use crate::v1::da_source_service::DaBlockCosts; pub use anyhow::Result; -use fuel_core_services::stream::BoxFuture; #[derive(Clone)] pub struct SharedState(Sender); @@ -59,6 +59,12 @@ where source, } } + + async fn process_block_costs(&mut self) -> Result<()> { + let da_block_costs = self.source.request_da_block_cost().await?; + self.shared_state.0.send(da_block_costs)?; + Ok(()) + } } /// This trait is implemented by the sources to obtain the @@ -102,21 +108,17 @@ where { /// This function polls the source according to a polling interval /// described by the DaBlockCostsService - async fn run(&mut self, state_watcher: &mut StateWatcher) -> Result { - let continue_running; - + async fn run(&mut self, state_watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = state_watcher.while_started() => { - continue_running = false; + TaskNextAction::Stop } _ = self.poll_interval.tick() => { - let da_block_costs = self.source.request_da_block_cost().await?; - self.shared_state.0.send(da_block_costs)?; - continue_running = true; + let da_block_costs_res = self.process_block_costs().await; + TaskNextAction::always_continue(da_block_costs_res) } } - Ok(continue_running) } /// There are no shutdown hooks required by the sources *yet* diff --git a/crates/services/gas_price_service/src/v1/service.rs b/crates/services/gas_price_service/src/v1/service.rs index 5ab56160334..760464ba5ea 100644 --- a/crates/services/gas_price_service/src/v1/service.rs +++ b/crates/services/gas_price_service/src/v1/service.rs @@ -30,6 +30,7 @@ use fuel_core_services::{ RunnableService, RunnableTask, StateWatcher, + TaskNextAction, }; use fuel_gas_price_algorithm::{ v0::AlgorithmUpdaterV0, @@ -40,7 +41,12 @@ use fuel_gas_price_algorithm::{ }; use futures::FutureExt; use std::num::NonZeroU64; -use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::{ + error::RecvError, + Receiver, +}; + +use crate::common::utils::Result as GasPriceResult; /// The service that updates the gas price algorithm. pub struct GasPriceServiceV1 @@ -61,6 +67,38 @@ where da_source_channel: Receiver, } +impl GasPriceServiceV1 +where + L2: L2BlockSource, + Metadata: MetadataStorage, + DA: DaBlockCostsSource, +{ + async fn process_l2_block_res( + &mut self, + l2_block_res: GasPriceResult, + ) -> anyhow::Result<()> { + tracing::info!("Received L2 block result: {:?}", l2_block_res); + let block = l2_block_res?; + + tracing::debug!("Updating gas price algorithm"); + self.apply_block_info_to_gas_algorithm(block).await?; + Ok(()) + } + + async fn process_da_block_costs_res( + &mut self, + da_block_costs: Result, + ) -> anyhow::Result<()> { + tracing::info!("Received DA block costs: {:?}", da_block_costs); + let da_block_costs = da_block_costs?; + + tracing::debug!("Updating DA block costs"); + self.apply_da_block_costs_to_gas_algorithm(da_block_costs) + .await?; + Ok(()) + } +} + impl GasPriceServiceV1 where Metadata: MetadataStorage, @@ -194,32 +232,22 @@ where Metadata: MetadataStorage, DA: DaBlockCostsSource, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - let should_continue; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { tracing::debug!("Stopping gas price service"); - should_continue = false; + TaskNextAction::Stop } l2_block_res = self.l2_block_source.get_l2_block() => { - tracing::info!("Received L2 block result: {:?}", l2_block_res); - let block = l2_block_res?; - - tracing::debug!("Updating gas price algorithm"); - self.apply_block_info_to_gas_algorithm(block).await?; - should_continue = true; + let res = self.process_l2_block_res(l2_block_res).await; + TaskNextAction::always_continue(res) } da_block_costs = self.da_source_channel.recv() => { - tracing::info!("Received DA block costs: {:?}", da_block_costs); - let da_block_costs = da_block_costs?; - - tracing::debug!("Updating DA block costs"); - self.apply_da_block_costs_to_gas_algorithm(da_block_costs).await?; - should_continue = true; + let res = self.process_da_block_costs_res(da_block_costs).await; + TaskNextAction::always_continue(res) } } - Ok(should_continue) } async fn shutdown(mut self) -> anyhow::Result<()> { @@ -424,7 +452,7 @@ mod tests { let initial_price = read_algo.next_gas_price(); // when - service.run(&mut watcher).await.unwrap(); + service.run(&mut watcher).await; l2_block_sender.send(l2_block).await.unwrap(); service.shutdown().await.unwrap(); @@ -501,11 +529,10 @@ mod tests { service .da_source_adapter_handle .run(&mut da_source_watcher) - .await - .unwrap(); + .await; // when - service.run(&mut watcher).await.unwrap(); + service.run(&mut watcher).await; tokio::time::sleep(Duration::from_millis(100)).await; service.shutdown().await.unwrap(); diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 9e59f8a8f74..5a12de61abf 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -39,6 +39,7 @@ use fuel_core_services::{ ServiceRunner, StateWatcher, SyncProcessor, + TaskNextAction, TraceErr, }; use fuel_core_storage::transactional::AtomicView; @@ -877,26 +878,22 @@ where B: Broadcast + 'static, T: TxPool + 'static, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - tracing::debug!("P2P task is running"); - let mut should_continue; - + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { - should_continue = false; + TaskNextAction::Stop }, latest_block_height = self.next_block_height.next() => { if let Some(latest_block_height) = latest_block_height { let _ = self.p2p_service.update_block_height(latest_block_height); - should_continue = true; + TaskNextAction::Continue } else { - should_continue = false; + TaskNextAction::Stop } }, next_service_request = self.request_receiver.recv() => { - should_continue = true; match next_service_request { Some(TaskRequest::BroadcastTransaction(transaction)) => { let tx_id = transaction.id(&self.chain_id); @@ -912,7 +909,7 @@ where let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { let _ = channel.send(Err(TaskError::NoPeerFound)); - return Ok(should_continue); + return TaskNextAction::Continue }; let channel = ResponseSender::SealedHeaders(channel); let request_msg = RequestMessage::SealedHeaders(block_height_range.clone()); @@ -922,7 +919,7 @@ where let height = BlockHeight::from(block_height_range.end.saturating_sub(1)); let Some(peer) = self.p2p_service.get_peer_id_with_height(&height) else { let _ = channel.send(Err(TaskError::NoPeerFound)); - return Ok(should_continue); + return TaskNextAction::Continue }; let channel = ResponseSender::Transactions(channel); let request_msg = RequestMessage::Transactions(block_height_range.clone()); @@ -944,8 +941,10 @@ where self.p2p_service.send_request_msg(Some(from_peer), request_msg, channel).expect("We always have a peer here, so send has a target"); } Some(TaskRequest::RespondWithGossipsubMessageReport((message, acceptance))) => { - // report_message(&mut self.p2p_service, message, acceptance); - self.p2p_service.report_message(message, acceptance)?; + let res = self.p2p_service.report_message(message, acceptance); + if let Err(err) = res { + return TaskNextAction::ErrorContinue(err) + } } Some(TaskRequest::RespondWithPeerReport { peer_id, score, reporting_service }) => { let _ = self.p2p_service.report_peer(peer_id, score, reporting_service); @@ -971,12 +970,12 @@ where } None => { tracing::error!("The P2P `Task` should be holder of the `Sender`"); - should_continue = false; + return TaskNextAction::Stop } } + TaskNextAction::Continue } p2p_event = self.p2p_service.next_event() => { - should_continue = true; match p2p_event { Some(FuelP2PEvent::PeerInfoUpdated { peer_id, block_height }) => { let peer_id: Vec = peer_id.into(); @@ -998,7 +997,10 @@ where } }, Some(FuelP2PEvent::InboundRequestMessage { request_message, request_id }) => { - self.process_request(request_message, request_id)? + let res = self.process_request(request_message, request_id); + if let Err(err) = res { + return TaskNextAction::ErrorContinue(err) + } }, Some(FuelP2PEvent::NewSubscription { peer_id, tag }) => { if tag == GossipTopicTag::NewTx { @@ -1007,9 +1009,9 @@ where }, _ => (), } + TaskNextAction::Continue }, _ = tokio::time::sleep_until(self.next_check_time) => { - should_continue = true; let res = self.peer_heartbeat_reputation_checks(); match res { Ok(_) => tracing::debug!("Peer heartbeat reputation checks completed"), @@ -1018,11 +1020,9 @@ where } } self.next_check_time += self.heartbeat_check_interval; + TaskNextAction::Continue } } - - tracing::debug!("P2P task is finished"); - Ok(should_continue) } async fn shutdown(self) -> anyhow::Result<()> { @@ -1836,7 +1836,7 @@ pub mod tests { watcher: &mut StateWatcher, ) -> (FuelPeerId, AppScore, String) { loop { - task.run(watcher).await.unwrap(); + task.run(watcher).await; if let Ok((peer_id, recv_report, service)) = report_receiver.try_recv() { return (peer_id, recv_report, service); } @@ -1888,7 +1888,7 @@ pub mod tests { for _ in 0..100 { // When - task.run(&mut watcher).await.unwrap(); + task.run(&mut watcher).await; // Then block_processed_receiver diff --git a/crates/services/relayer/src/service.rs b/crates/services/relayer/src/service.rs index 7cf2b4519cf..4b8e56c0154 100644 --- a/crates/services/relayer/src/service.rs +++ b/crates/services/relayer/src/service.rs @@ -28,6 +28,7 @@ use fuel_core_services::{ RunnableTask, ServiceRunner, StateWatcher, + TaskNextAction, }; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, @@ -218,7 +219,7 @@ where P: Middleware + 'static, D: RelayerDb + 'static, { - async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result { + async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction { let now = tokio::time::Instant::now(); let result = run::run(self).await; @@ -238,14 +239,12 @@ where if let Err(err) = result { if !self.retry_on_error { tracing::error!("Exiting due to Error in relayer task: {:?}", err); - let should_continue = false; - Ok(should_continue) + TaskNextAction::Stop } else { - Err(err) + TaskNextAction::ErrorContinue(err) } } else { - let should_continue = true; - Ok(should_continue) + TaskNextAction::Continue } } diff --git a/crates/services/src/lib.rs b/crates/services/src/lib.rs index e7fa438631f..717dc1f2c06 100644 --- a/crates/services/src/lib.rs +++ b/crates/services/src/lib.rs @@ -80,6 +80,7 @@ pub use service::{ RunnableTask, Service, ServiceRunner, + TaskNextAction, }; pub use state::{ State, diff --git a/crates/services/src/service.rs b/crates/services/src/service.rs index 83cb095700a..5b298ffe5e0 100644 --- a/crates/services/src/service.rs +++ b/crates/services/src/service.rs @@ -83,6 +83,43 @@ pub trait RunnableService: Send { ) -> anyhow::Result; } +/// The result of a single iteration of the service task +pub enum TaskNextAction { + /// Request the task to be run again + Continue, + /// Request the task to be abandoned + Stop, + /// Request the task to be run again, but report an error + ErrorContinue(anyhow::Error), +} + +impl TaskNextAction { + /// Creates a `TaskRunResult` from a `Result` where `Ok` means `Continue` and any error is reported + pub fn always_continue>( + res: Result, + ) -> TaskNextAction { + match res { + Ok(_) => TaskNextAction::Continue, + Err(e) => TaskNextAction::ErrorContinue(e.into()), + } + } +} + +impl From> for TaskNextAction { + fn from(result: Result) -> Self { + match result { + Ok(should_continue) => { + if should_continue { + TaskNextAction::Continue + } else { + TaskNextAction::Stop + } + } + Err(e) => TaskNextAction::ErrorContinue(e), + } + } +} + /// The trait is implemented by the service task and contains a single iteration of the infinity /// loop. #[async_trait::async_trait] @@ -96,7 +133,7 @@ pub trait RunnableTask: Send { /// `State::Started`. So first, the `run` method should return a value, and after, the service /// will stop. If the service should react to the state change earlier, it should handle it in /// the `run` loop on its own. See [`StateWatcher::while_started`]. - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result; + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction; /// Gracefully shutdowns the task after the end of the execution cycle. async fn shutdown(self) -> anyhow::Result<()>; @@ -361,14 +398,14 @@ async fn run_task( let result = tracked_result.extract(metric); match result { - Ok(should_continue) => { - if !should_continue { - tracing::debug!("stopping"); - break; - } + TaskNextAction::Continue => { tracing::debug!("run loop"); } - Err(e) => { + TaskNextAction::Stop => { + tracing::debug!("stopping"); + break; + } + TaskNextAction::ErrorContinue(e) => { let e: &dyn std::error::Error = &*e; tracing::error!(e); } @@ -447,7 +484,7 @@ mod tests { fn run<'_self, '_state, 'a>( &'_self mut self, state: &'_state mut StateWatcher - ) -> BoxFuture<'a, anyhow::Result> + ) -> BoxFuture<'a, TaskNextAction> where '_self: 'a, '_state: 'a, @@ -467,8 +504,7 @@ mod tests { let mut watcher = watcher.clone(); Box::pin(async move { watcher.while_started().await.unwrap(); - let should_continue = false; - Ok(should_continue) + TaskNextAction::Stop }) }); mock.expect_shutdown().times(1).returning(|| Ok(())); @@ -532,12 +568,8 @@ mod tests { mock.expect_shared_data().returning(|| EmptyShared); mock.expect_into_task().returning(|_, _| { let mut mock = MockTask::default(); - mock.expect_run().returning(|_| { - Box::pin(async move { - let should_continue = false; - Ok(should_continue) - }) - }); + mock.expect_run() + .returning(|_| Box::pin(async move { TaskNextAction::Stop })); mock.expect_shutdown() .times(1) .returning(|| panic!("Shutdown should fail")); diff --git a/crates/services/sync/src/service.rs b/crates/services/sync/src/service.rs index 7665f657a1c..5d1f4950943 100644 --- a/crates/services/sync/src/service.rs +++ b/crates/services/sync/src/service.rs @@ -27,6 +27,7 @@ use fuel_core_services::{ ServiceRunner, SharedMutex, StateWatcher, + TaskNextAction, }; use fuel_core_types::fuel_types::BlockHeight; use futures::StreamExt; @@ -118,9 +119,11 @@ where E: BlockImporterPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, { - #[tracing::instrument(level = "debug", skip_all, err, ret)] - async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result { - Ok(self.sync_heights.sync().await.is_some()) + async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction { + match self.sync_heights.sync().await { + None => TaskNextAction::Stop, + Some(_) => TaskNextAction::Continue, + } } async fn shutdown(self) -> anyhow::Result<()> { @@ -175,9 +178,8 @@ where E: BlockImporterPort + Send + Sync + 'static, C: ConsensusPort + Send + Sync + 'static, { - #[tracing::instrument(level = "debug", skip_all, err, ret)] - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { - self.0.import(watcher).await + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { + self.0.import(watcher).await.into() } async fn shutdown(self) -> anyhow::Result<()> { diff --git a/crates/services/txpool_v2/src/service.rs b/crates/services/txpool_v2/src/service.rs index c80952f7885..91c96964802 100644 --- a/crates/services/txpool_v2/src/service.rs +++ b/crates/services/txpool_v2/src/service.rs @@ -1,6 +1,7 @@ use crate::{ self as fuel_core_txpool, }; +use fuel_core_services::TaskNextAction; use fuel_core_metrics::txpool_metrics::txpool_metrics; use fuel_core_services::{ @@ -224,43 +225,43 @@ impl RunnableTask for Task where View: TxPoolPersistentStorage, { - async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result { + async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction { tokio::select! { biased; _ = watcher.while_started() => { - return Ok(false) + TaskNextAction::Stop } block_result = self.subscriptions.imported_blocks.next() => { if let Some(result) = block_result { self.import_block(result); - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Stop } } select_transaction_request = self.subscriptions.borrow_txpool.recv() => { if let Some(select_transaction_request) = select_transaction_request { self.borrow_txpool(select_transaction_request); - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Stop } } _ = self.pruner.ttl_timer.tick() => { self.try_prune_transactions(); - return Ok(true) + TaskNextAction::Continue } write_pool_request = self.subscriptions.write_pool.recv() => { if let Some(write_pool_request) = write_pool_request { self.process_write(write_pool_request); - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Continue } } @@ -269,27 +270,27 @@ where if let Some(tx) = data { self.manage_tx_from_p2p(tx, message_id, peer_id); } - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Stop } } new_peer_subscribed = self.subscriptions.new_tx_source.next() => { if let Some(peer_id) = new_peer_subscribed { self.manage_new_peer_subscribed(peer_id); - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Stop } } read_pool_request = self.subscriptions.read_pool.recv() => { if let Some(read_pool_request) = read_pool_request { self.process_read(read_pool_request); - return Ok(true) + TaskNextAction::Continue } else { - return Ok(false) + TaskNextAction::Stop } } }