Skip to content

Commit

Permalink
Replace task run() return result with custom enum (#2429)
Browse files Browse the repository at this point in the history
  • Loading branch information
MitchTurner authored Nov 20, 2024
1 parent be4e33c commit 5e10eaa
Show file tree
Hide file tree
Showing 18 changed files with 250 additions and 162 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

### Changed
- [2378](https://github.com/FuelLabs/fuel-core/pull/2378): Use cached hash of the topic instead of calculating it on each publishing gossip message.
- [2429](https://github.com/FuelLabs/fuel-core/pull/2429): Introduce custom enum for representing result of running service tasks
- [2377](https://github.com/FuelLabs/fuel-core/pull/2377): Add more errors that can be returned as responses when using protocol `/fuel/req_res/0.0.2`. The errors supported are `ProtocolV1EmptyResponse` (status code `0`) for converting empty responses sent via protocol `/fuel/req_res/0.0.1`, `RequestedRangeTooLarge`(status code `1`) if the client requests a range of objects such as sealed block headers or transactions too large, `Timeout` (status code `2`) if the remote peer takes too long to fulfill a request, or `SyncProcessorOutOfCapacity` if the remote peer is fulfilling too many requests concurrently.

#### Breaking
Expand Down
15 changes: 10 additions & 5 deletions crates/fuel-core/src/graphql_api/api_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::transactional::AtomicView;
use fuel_core_types::fuel_types::BlockHeight;
Expand Down Expand Up @@ -196,11 +197,15 @@ impl RunnableService for GraphqlService {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, _: &mut StateWatcher) -> anyhow::Result<bool> {
self.server.as_mut().await?;
// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
Ok(false /* should_continue */)
async fn run(&mut self, _: &mut StateWatcher) -> TaskNextAction {
match self.server.as_mut().await {
Ok(()) => {
// The `axum::Server` has its internal loop. If `await` is finished, we get an internal
// error or stop signal.
TaskNextAction::Stop
}
Err(err) => TaskNextAction::ErrorContinue(err.into()),
}
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
18 changes: 10 additions & 8 deletions crates/fuel-core/src/graphql_api/worker_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use fuel_core_services::{
RunnableTask,
ServiceRunner,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
Error as StorageError,
Expand Down Expand Up @@ -551,13 +552,12 @@ where
TxPool: ports::worker::TxPool,
D: ports::worker::OffChainDatabase,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
tokio::select! {
biased;

_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}

result = self.block_importer.next() => {
Expand All @@ -567,17 +567,19 @@ where
// In the case of an error, shut down the service to avoid a huge
// de-synchronization between on-chain and off-chain databases.
if let Err(e) = result {
tracing::error!("Error processing block: {:?}", e);
should_continue = self.continue_on_error;
if self.continue_on_error {
TaskNextAction::ErrorContinue(e)
} else {
TaskNextAction::Stop
}
} else {
should_continue = true
TaskNextAction::Continue
}
} else {
should_continue = false
TaskNextAction::Stop
}
}
}
Ok(should_continue)
}

async fn shutdown(mut self) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion crates/fuel-core/src/graphql_api/worker_service/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn run__relayed_transaction_events_are_added_to_storage() {
// when
let mut task =
worker_task_with_block_importer_and_db(block_importer, database.clone());
task.run(&mut state_watcher).await.unwrap();
task.run(&mut state_watcher).await;
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;

// then
Expand Down
6 changes: 3 additions & 3 deletions crates/fuel-core/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use fuel_core_services::{
ServiceRunner,
State,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -428,7 +429,7 @@ impl RunnableService for Task {
#[async_trait::async_trait]
impl RunnableTask for Task {
#[tracing::instrument(skip_all)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let mut stop_signals = vec![];
for service in self.services.iter() {
stop_signals.push(service.await_stop())
Expand All @@ -443,8 +444,7 @@ impl RunnableTask for Task {

// We received the stop signal from any of one source, so stop this service and
// all sub-services.
let should_continue = false;
Ok(should_continue)
TaskNextAction::Stop
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use fuel_core_services::{
ServiceRunner,
SharedMutex,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::{
not_found,
Expand Down Expand Up @@ -111,13 +112,12 @@ impl SharedState {

#[async_trait::async_trait]
impl RunnableTask for Task {
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
tokio::select! {
biased;

_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}

Some(event) = self.blocks_events.next() => {
Expand All @@ -135,16 +135,13 @@ impl RunnableTask for Task {
}
Err(err) => {
tracing::error!("Failed to cache consensus parameters: {:?}", err);
should_continue = false;
return Ok(should_continue)
return TaskNextAction::Stop
}
}
}
should_continue = true;
TaskNextAction::Continue
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down Expand Up @@ -358,7 +355,7 @@ mod tests {
// When
let result_with_new_version = result_with_new_version(new_version);
let _ = block_sender.send(result_with_new_version);
task.run(&mut StateWatcher::started()).await.unwrap();
task.run(&mut StateWatcher::started()).await;

// Then
assert_eq!(
Expand Down
52 changes: 30 additions & 22 deletions crates/services/consensus_module/poa/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use fuel_core_services::{
Service as OtherService,
ServiceRunner,
StateWatcher,
TaskNextAction,
};
use fuel_core_storage::transactional::Changes;
use fuel_core_types::{
Expand Down Expand Up @@ -518,16 +519,14 @@ where
PB: PredefinedBlocks,
C: GetTime,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let mut sync_state = self.sync_task_handle.shared.clone();
// make sure we're synced first
if *sync_state.borrow_and_update() == SyncState::NotSynced {
tokio::select! {
biased;
result = watcher.while_started() => {
should_continue = result?.started();
return Ok(should_continue);
return result.map(|state| state.started()).into()
}
_ = sync_state.changed() => {}
}
Expand All @@ -538,26 +537,37 @@ where
}

let next_height = self.next_height();
let maybe_block = self.predefined_blocks.get_block(&next_height)?;
let maybe_block = match self.predefined_blocks.get_block(&next_height) {
Ok(option) => option,
Err(err) => return TaskNextAction::ErrorContinue(err),
};
if let Some(block) = maybe_block {
self.produce_predefined_block(&block).await?;
should_continue = true;
return Ok(should_continue)
let res = self.produce_predefined_block(&block).await;
return match res {
Ok(()) => TaskNextAction::Continue,
Err(err) => TaskNextAction::ErrorContinue(err),
}
}

let next_block_production: BoxFuture<()> = match self.trigger {
Trigger::Never | Trigger::Instant => Box::pin(core::future::pending()),
Trigger::Interval { block_time } => Box::pin(sleep_until(
self.last_block_created
Trigger::Interval { block_time } => {
let next_block_time = match self
.last_block_created
.checked_add(block_time)
.ok_or(anyhow!("Time exceeds system limits"))?,
)),
.ok_or(anyhow!("Time exceeds system limits"))
{
Ok(time) => time,
Err(err) => return TaskNextAction::ErrorContinue(err),
};
Box::pin(sleep_until(next_block_time))
}
};

tokio::select! {
biased;
_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}
request = self.request_receiver.recv() => {
if let Some(request) = request {
Expand All @@ -567,29 +577,27 @@ where
let _ = response.send(result);
}
}
should_continue = true;
TaskNextAction::Continue
} else {
tracing::error!("The PoA task should be the holder of the `Sender`");
should_continue = false;
TaskNextAction::Stop
}
}
_ = next_block_production => {
match self.on_timer().await.context("While processing timer event") {
Ok(()) => should_continue = true,
Ok(()) => TaskNextAction::Continue,
Err(err) => {
// Wait some time in case of error to avoid spamming retry block production
tokio::time::sleep(Duration::from_secs(1)).await;
return Err(err);
TaskNextAction::ErrorContinue(err)
}
};
}
}
_ = self.new_txs_watcher.changed() => {
self.on_txpool_event().await.context("While processing txpool event")?;
should_continue = true;
let res = self.on_txpool_event().await.context("While processing txpool event");
TaskNextAction::always_continue(res)
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
13 changes: 6 additions & 7 deletions crates/services/consensus_module/poa/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use fuel_core_services::{
RunnableService,
RunnableTask,
StateWatcher,
TaskNextAction,
};
use fuel_core_types::{
blockchain::header::BlockHeader,
Expand Down Expand Up @@ -137,10 +138,7 @@ impl RunnableService for SyncTask {

#[async_trait::async_trait]
impl RunnableTask for SyncTask {
#[tracing::instrument(level = "debug", skip_all, err, ret)]
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let mut should_continue = true;

async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
let tick: BoxFuture<tokio::time::Instant> = if let Some(timer) = &mut self.timer {
Box::pin(timer.tick())
} else {
Expand All @@ -150,7 +148,7 @@ impl RunnableTask for SyncTask {
tokio::select! {
biased;
_ = watcher.while_started() => {
should_continue = false;
TaskNextAction::Stop
}
Some(latest_peer_count) = self.peer_connections_stream.next() => {
let sufficient_peers = latest_peer_count >= self.min_connected_reserved_peers;
Expand All @@ -172,6 +170,7 @@ impl RunnableTask for SyncTask {
}
_ => {},
}
TaskNextAction::Continue
}
Some(block_info) = self.block_stream.next() => {
let new_block_height = block_info.block_header.height();
Expand Down Expand Up @@ -205,6 +204,7 @@ impl RunnableTask for SyncTask {
}
_ => {}
}
TaskNextAction::Continue
}
_ = tick => {
if let InnerSyncState::SufficientPeers(block_header) = &self.inner_state {
Expand All @@ -215,10 +215,9 @@ impl RunnableTask for SyncTask {
};
self.update_sync_state(SyncState::Synced(Arc::new(block_header)));
}
TaskNextAction::Continue
}
}

Ok(should_continue)
}

async fn shutdown(self) -> anyhow::Result<()> {
Expand Down
Loading

0 comments on commit 5e10eaa

Please sign in to comment.