From 2caca6145044bef4c92f6c64d8b5731545a4514a Mon Sep 17 00:00:00 2001 From: AurelienFT <32803821+AurelienFT@users.noreply.github.com> Date: Tue, 20 Aug 2024 18:24:54 +0200 Subject: [PATCH] Remove deadline clock in POA and replace with tokio time functions. (#2109) ## Linked Issues/PRs Closes #1917 ## Description This PR removes the DeadlineClock structure and his usage in POA. It's replaced by a mechanism using Tokio features. This changes avoid the service to hangs up on an error, it will now wait and retry a block production ## Checklist - [x] Breaking changes are clearly marked as such in the PR description and changelog - [x] New behavior is reflected in tests - [x] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [x] I have reviewed the code myself - [x] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams --------- Co-authored-by: Hannes Karppila <2204863+Dentosal@users.noreply.github.com> Co-authored-by: Green Baneling --- CHANGELOG.md | 1 + .../src/graphql_api/worker_service.rs | 23 +- crates/fuel-core/src/service/config.rs | 3 +- .../poa/src/deadline_clock.rs | 230 ------------------ .../services/consensus_module/poa/src/lib.rs | 1 - .../consensus_module/poa/src/service.rs | 80 +++--- .../consensus_module/poa/src/service_test.rs | 66 ----- .../poa/src/service_test/trigger_tests.rs | 62 +++++ 8 files changed, 113 insertions(+), 353 deletions(-) delete mode 100644 crates/services/consensus_module/poa/src/deadline_clock.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ba93d7806de..c93193261c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). - [2096](https://github.com/FuelLabs/fuel-core/pull/2096): GraphQL endpoint to fetch blob byte code by its blob ID. ### Changed +- [2106](https://github.com/FuelLabs/fuel-core/pull/2106): Remove deadline clock in POA and replace with tokio time functions. #### Breaking - [2051](https://github.com/FuelLabs/fuel-core/pull/2051): Misdocumented `CONSENSUS_KEY` environ variable has been removed, use `CONSENSUS_KEY_SECRET` instead. Also raises MSRV to `1.79.0`. diff --git a/crates/fuel-core/src/graphql_api/worker_service.rs b/crates/fuel-core/src/graphql_api/worker_service.rs index 6af5ae30ce7..1c19788d194 100644 --- a/crates/fuel-core/src/graphql_api/worker_service.rs +++ b/crates/fuel-core/src/graphql_api/worker_service.rs @@ -98,6 +98,7 @@ pub struct InitializeTask { chain_id: ChainId, continue_on_error: bool, tx_pool: TxPool, + blocks_events: BoxStream, block_importer: BlockImporter, on_chain_database: OnChain, off_chain_database: OffChain, @@ -456,6 +457,7 @@ where chain_id, tx_pool, block_importer, + blocks_events, on_chain_database, off_chain_database, continue_on_error, @@ -463,49 +465,49 @@ where let mut task = Task { tx_pool, - block_importer: block_importer.block_events(), + block_importer: blocks_events, database: off_chain_database, chain_id, continue_on_error, }; + let mut target_chain_height = on_chain_database.latest_height()?; // Process all blocks that were imported before the service started. // The block importer may produce some blocks on start-up during the // genesis stage or the recovery process. In this case, we need to // process these blocks because, without them, // our block height will be less than on the chain database. while let Some(Some(block)) = task.block_importer.next().now_or_never() { + target_chain_height = Some(*block.sealed_block.entity.header().height()); task.process_block(block)?; } - sync_databases(&mut task, &on_chain_database, &block_importer)?; + sync_databases(&mut task, target_chain_height, &block_importer)?; Ok(task) } } -fn sync_databases( +fn sync_databases( task: &mut Task, - on_chain_database: &OnChain, + target_chain_height: Option, import_result_provider: &BlockImporter, ) -> anyhow::Result<()> where TxPool: ports::worker::TxPool, BlockImporter: ports::worker::BlockImporter, - OnChain: ports::worker::OnChainDatabase, OffChain: ports::worker::OffChainDatabase, { loop { - let on_chain_height = on_chain_database.latest_height()?; let off_chain_height = task.database.latest_height()?; - if on_chain_height < off_chain_height { + if target_chain_height < off_chain_height { return Err(anyhow::anyhow!( - "The on-chain database height is lower than the off-chain database height" - )); + "The target chain height is lower than the off-chain database height" + )); } - if on_chain_height == off_chain_height { + if target_chain_height == off_chain_height { break; } @@ -587,6 +589,7 @@ where { ServiceRunner::new(InitializeTask { tx_pool, + blocks_events: block_importer.block_events(), block_importer, on_chain_database, off_chain_database, diff --git a/crates/fuel-core/src/service/config.rs b/crates/fuel-core/src/service/config.rs index f9514b73cbe..092909929a1 100644 --- a/crates/fuel-core/src/service/config.rs +++ b/crates/fuel-core/src/service/config.rs @@ -119,7 +119,8 @@ impl Config { #[cfg(not(feature = "rocksdb"))] database_type: DbType::InMemory, #[cfg(feature = "rocksdb")] - state_rewind_policy: Default::default(), + state_rewind_policy: + crate::state::historical_rocksdb::StateRewindPolicy::RewindFullRange, }; let starting_gas_price = 0; let gas_price_change_percent = 0; diff --git a/crates/services/consensus_module/poa/src/deadline_clock.rs b/crates/services/consensus_module/poa/src/deadline_clock.rs deleted file mode 100644 index bf69991dcd8..00000000000 --- a/crates/services/consensus_module/poa/src/deadline_clock.rs +++ /dev/null @@ -1,230 +0,0 @@ -#![allow(dead_code)] - -use tokio::{ - sync::mpsc::{ - channel, - Receiver, - Sender, - }, - task::JoinHandle, - time::{ - Duration, - Instant, - }, -}; - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum OnConflict { - /// Ignores the new write - Ignore, - /// Replaces the previous deadline - Overwrite, - /// Picks the earlier time - Min, - /// Picks the later time - Max, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ControlMessage { - /// Set the deadline - Set { - deadline: Instant, - on_conflict: OnConflict, - }, - /// Stop the clock, clearing any deadlines - Clear, -} - -async fn deadline_clock_task( - event_tx: Sender, - mut control_rx: Receiver, -) { - let mut active = false; - - let sleep = tokio::time::sleep(Duration::new(0, 0)); - tokio::pin!(sleep); - - loop { - tokio::select! { - ctrl = control_rx.recv() => { - match ctrl { - Some(ControlMessage::Set {deadline, on_conflict}) => { - if !active { - sleep.as_mut().reset(deadline); - } else { - match on_conflict { - OnConflict::Ignore => {}, - OnConflict::Overwrite => { - sleep.as_mut().reset(deadline); - }, - OnConflict::Min => { - let new_deadline = sleep.deadline().min(deadline); - sleep.as_mut().reset(new_deadline); - }, - OnConflict::Max => { - let new_deadline = sleep.deadline().max(deadline); - sleep.as_mut().reset(new_deadline); - }, - } - } - active = true; - } - Some(ControlMessage::Clear) => { - // disable sleep timer - active = false; - } - None => break - } - } - _ = &mut sleep, if active => { - // Trigger - active = false; - if event_tx.send(sleep.deadline()).await.is_err() { - break; - } - } - } - } -} - -/// A configurable deadline-mode clock that produces event to -/// an associated channel when the timeout expires. -pub struct DeadlineClock { - /// Invariant: bounded, limit = 1 - event: Receiver, - /// Invariant: bounded, limit = 1 - control: Sender, - // This field must be defined after `control`, as closing it ends the clock task - _handle: JoinHandle<()>, -} -impl DeadlineClock { - pub fn new() -> Self { - let (event_tx, event_rx) = channel(1); - let (control_tx, control_rx) = channel(1); - let handle = tokio::spawn(deadline_clock_task(event_tx, control_rx)); - Self { - event: event_rx, - control: control_tx, - _handle: handle, - } - } - - /// Waits until the timeout expires. Sleeps forever when not timeout is set. - pub async fn wait(&mut self) -> Instant { - self.event - .recv() - .await - .expect("Deadline clock task has panicked") - } - - /// Sets the timeout, optionally overwriting the existing value - pub async fn set_deadline(&self, deadline: Instant, on_conflict: OnConflict) { - self.control - .send(ControlMessage::Set { - deadline, - on_conflict, - }) - .await - .expect("Deadline clock task has panicked"); - } - - /// Sets the timeout, optionally overwriting the existing value - pub async fn set_timeout(&self, after: Duration, on_conflict: OnConflict) { - self.set_deadline( - Instant::now() - .checked_add(after) - .expect("Setting timeout after many years doesn't make a lot of sense"), - on_conflict, - ) - .await; - } - - /// Clears the timeout, so that now event is produced when it expires. - /// If the event has already occurred, it will not be removed. - pub async fn clear(&self) { - self.control - .send(ControlMessage::Clear) - .await - .expect("Deadline clock task has panicked"); - } -} - -#[cfg(test)] -mod tests { - use super::*; - use tokio::{ - sync::mpsc, - time::{ - self, - timeout, - Duration, - }, - }; - - #[tokio::test] - async fn deadline_clock_realtime() { - let mut c = DeadlineClock::new(); - - c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite) - .await; - timeout(Duration::from_millis(20), c.wait()) - .await - .expect("Timed out"); - - c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite) - .await; - c.clear().await; - timeout(Duration::from_millis(20), c.wait()) - .await - .expect_err("Completed unexpectedly"); - - c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite) - .await; - c.set_timeout(Duration::from_millis(100), OnConflict::Overwrite) - .await; - timeout(Duration::from_millis(20), c.wait()) - .await - .expect_err("Completed unexpectedly"); - timeout(Duration::from_millis(100), c.wait()) - .await - .expect("Timed out"); - } - - #[tokio::test(start_paused = true)] - async fn deadline_clock_mocktime_expiration() { - let mut c = DeadlineClock::new(); - - c.set_timeout(Duration::from_millis(10), OnConflict::Overwrite) - .await; - - // Must not expire immediately - assert_eq!(c.event.try_recv(), Err(mpsc::error::TryRecvError::Empty)); - - // Must not expire too soon - time::sleep(Duration::from_millis(5)).await; - assert_eq!(c.event.try_recv(), Err(mpsc::error::TryRecvError::Empty)); - - // Must expire too soon - time::sleep(Duration::from_millis(10)).await; - assert!(c.event.try_recv().is_ok(),); - } - - #[tokio::test(start_paused = true)] - async fn deadline_clock_setting_deadline_to_past_triggers_it() { - let mut c = DeadlineClock::new(); - - let in_past1 = Instant::now() - Duration::from_millis(200); - let in_past2 = Instant::now() - Duration::from_millis(100); - - // Must expire in any short amount of time - c.set_deadline(in_past1, OnConflict::Overwrite).await; - time::sleep(Duration::from_millis(1)).await; - assert!(c.event.try_recv().is_ok(),); - - // Must expire in any short amount of time - c.set_deadline(in_past2, OnConflict::Overwrite).await; - time::sleep(Duration::from_millis(1)).await; - assert!(c.event.try_recv().is_ok(),); - } -} diff --git a/crates/services/consensus_module/poa/src/lib.rs b/crates/services/consensus_module/poa/src/lib.rs index 477a342c01f..272c569ed61 100644 --- a/crates/services/consensus_module/poa/src/lib.rs +++ b/crates/services/consensus_module/poa/src/lib.rs @@ -5,7 +5,6 @@ #![deny(warnings)] #![allow(clippy::blocks_in_conditions)] // False positives with tracing macros -mod deadline_clock; mod sync; #[cfg(test)] diff --git a/crates/services/consensus_module/poa/src/service.rs b/crates/services/consensus_module/poa/src/service.rs index 1ccac8e9ca7..968359c25f7 100644 --- a/crates/services/consensus_module/poa/src/service.rs +++ b/crates/services/consensus_module/poa/src/service.rs @@ -11,15 +11,14 @@ use tokio::{ mpsc, oneshot, }, - time::Instant, + time::{ + sleep_until, + Instant, + }, }; use tokio_stream::StreamExt; use crate::{ - deadline_clock::{ - DeadlineClock, - OnConflict, - }, ports::{ BlockImporter, BlockProducer, @@ -37,7 +36,10 @@ use crate::{ Trigger, }; use fuel_core_services::{ - stream::BoxStream, + stream::{ + BoxFuture, + BoxStream, + }, RunnableService, RunnableTask, Service as OtherService, @@ -138,7 +140,6 @@ pub struct MainTask { predefined_blocks: PB, trigger: Trigger, /// Deadline clock, used by the triggers - timer: DeadlineClock, sync_task_handle: ServiceRunner, } @@ -197,7 +198,6 @@ where last_block_created, predefined_blocks, trigger, - timer: DeadlineClock::new(), sync_task_handle, } } @@ -267,7 +267,6 @@ where self.next_height(), self.next_time(RequestType::Trigger)?, TransactionsSource::TxPool, - RequestType::Trigger, ) .await } @@ -288,7 +287,6 @@ where self.next_height(), block_time, TransactionsSource::TxPool, - RequestType::Manual, ) .await?; block_time = self.next_time(RequestType::Manual)?; @@ -299,7 +297,6 @@ where self.next_height(), block_time, TransactionsSource::SpecificTransactions(txs), - RequestType::Manual, ) .await?; } @@ -312,7 +309,6 @@ where height: BlockHeight, block_time: Tai64, source: TransactionsSource, - request_type: RequestType, ) -> anyhow::Result<()> { let last_block_created = Instant::now(); // verify signing key is set @@ -350,8 +346,7 @@ where self.txpool.remove_txs(tx_ids_to_remove); // Sign the block and seal it - let seal = self.signer.seal_block(&block).await - .expect("Failed to seal block. Panicking for now, TODO: https://github.com/FuelLabs/fuel-core/issues/1917"); + let seal = self.signer.seal_block(&block).await?; let block = SealedBlock { entity: block, consensus: seal, @@ -369,25 +364,6 @@ where self.last_timestamp = block_time; self.last_block_created = last_block_created; - // Set timer for the next block - match (self.trigger, request_type) { - (Trigger::Never, RequestType::Manual) => (), - (Trigger::Never, RequestType::Trigger) => { - unreachable!("Trigger production will never produce blocks in never mode") - } - (Trigger::Instant, _) => {} - (Trigger::Interval { block_time }, RequestType::Trigger) => { - let deadline = last_block_created.checked_add(block_time).expect("It is impossible to overflow except in the case where we don't want to produce a block."); - self.timer.set_deadline(deadline, OnConflict::Min).await; - } - (Trigger::Interval { block_time }, RequestType::Manual) => { - let deadline = last_block_created.checked_add(block_time).expect("It is impossible to overflow except in the case where we don't want to produce a block."); - self.timer - .set_deadline(deadline, OnConflict::Overwrite) - .await; - } - } - Ok(()) } @@ -464,7 +440,7 @@ where } } - async fn on_timer(&mut self, _at: Instant) -> anyhow::Result<()> { + async fn on_timer(&mut self) -> anyhow::Result<()> { match self.trigger { Trigger::Instant | Trigger::Never => { unreachable!("Timer is never set in this mode"); @@ -517,12 +493,13 @@ where match self.trigger { Trigger::Never | Trigger::Instant => {} - Trigger::Interval { block_time } => { - self.timer - .set_timeout(block_time, OnConflict::Overwrite) - .await; + Trigger::Interval { .. } => { + return Ok(Self { + last_block_created: Instant::now(), + ..self + }) } - }; + } Ok(self) } @@ -554,9 +531,6 @@ where _ = self.tx_status_update_stream.next() => { // ignore txpool events while syncing } - _ = self.timer.wait() => { - // ignore timer events while syncing - } } } @@ -571,6 +545,16 @@ where should_continue = true; return Ok(should_continue) } + + let next_block_production: BoxFuture<()> = match self.trigger { + Trigger::Never | Trigger::Instant => Box::pin(core::future::pending()), + Trigger::Interval { block_time } => Box::pin(sleep_until( + self.last_block_created + .checked_add(block_time) + .ok_or(anyhow!("Time exceeds system limits"))?, + )), + }; + tokio::select! { biased; _ = watcher.while_started() => { @@ -604,9 +588,15 @@ where should_continue = false; } } - at = self.timer.wait() => { - self.on_timer(at).await.context("While processing timer event")?; - should_continue = true; + _ = next_block_production => { + match self.on_timer().await.context("While processing timer event") { + Ok(()) => should_continue = true, + Err(err) => { + // Wait some time in case of error to avoid spamming retry block production + tokio::time::sleep(Duration::from_secs(1)).await; + return Err(err); + } + }; } } diff --git a/crates/services/consensus_module/poa/src/service_test.rs b/crates/services/consensus_module/poa/src/service_test.rs index afe7eddc2d9..9d1b87c008e 100644 --- a/crates/services/consensus_module/poa/src/service_test.rs +++ b/crates/services/consensus_module/poa/src/service_test.rs @@ -399,72 +399,6 @@ async fn remove_skipped_transactions() { assert!(task.produce_next_block().await.is_ok()); } -#[tokio::test] -#[should_panic = "failed to sign block"] -async fn panics_if_signing_fails() { - // The test verifies that if `BlockProducer` returns skipped transactions, they would - // be propagated to `TxPool` for removal. - let mut rng = StdRng::seed_from_u64(2322); - let secret_key = SecretKey::random(&mut rng); - - let mut block_producer = MockBlockProducer::default(); - block_producer - .expect_produce_and_execute_block() - .times(1) - .returning(move |_, _, _| { - Ok(UncommittedResult::new( - ExecutionResult { - block: Default::default(), - skipped_transactions: Default::default(), - tx_status: Default::default(), - events: Default::default(), - }, - Default::default(), - )) - }); - - let mut block_importer = MockBlockImporter::default(); - - block_importer - .expect_commit_result() - .times(1) - .returning(|_| Ok(())); - - block_importer - .expect_block_stream() - .returning(|| Box::pin(tokio_stream::pending())); - - let TxPoolContext { - txpool, - status_sender: _, - txs: _, - } = MockTransactionPool::new_with_txs(vec![make_tx(&mut rng)]); - - let signer = SignMode::Key(Secret::new(secret_key.into())); - - let config = Config { - trigger: Trigger::Instant, - signer: signer.clone(), - metrics: false, - ..Default::default() - }; - - let p2p_port = generate_p2p_port(); - - let mut task = MainTask::new( - &BlockHeader::new_block(BlockHeight::from(1u32), Tai64::now()), - config, - txpool, - block_producer, - block_importer, - p2p_port, - FakeBlockSigner { succeeds: false }, - InMemoryPredefinedBlocks::new(HashMap::new()), - ); - - let _ = task.produce_next_block().await; // This panics for now -} - #[tokio::test] async fn does_not_produce_when_txpool_empty_in_instant_mode() { // verify the PoA service doesn't trigger empty blocks to be produced when there are diff --git a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs index 012b8848ec3..54c92fd184c 100644 --- a/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs +++ b/crates/services/consensus_module/poa/src/service_test/trigger_tests.rs @@ -1,3 +1,9 @@ +use mockall::Sequence; +use tokio::{ + sync::Notify, + time::Instant, +}; + use super::*; #[tokio::test(start_paused = true)] // Run with time paused, start/stop must still work @@ -194,6 +200,62 @@ async fn interval_trigger_produces_blocks_periodically() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(start_paused = true)] +async fn service__if_commit_result_fails_then_retry_commit_result_after_one_second( +) -> anyhow::Result<()> { + // given + let config = Config { + trigger: Trigger::Interval { + block_time: Duration::new(2, 0), + }, + signer: SignMode::Key(test_signing_key()), + metrics: false, + ..Default::default() + }; + let block_production_waitpoint = Arc::new(Notify::new()); + let block_production_waitpoint_trigger = block_production_waitpoint.clone(); + + let mut ctx_builder = TestContextBuilder::new(); + ctx_builder.with_config(config); + let mut mock_tx_pool = MockTransactionPool::no_tx_updates(); + mock_tx_pool.expect_remove_txs().returning(|_| vec![]); + ctx_builder.with_txpool(mock_tx_pool); + + let mut importer = MockBlockImporter::default(); + let mut seq = Sequence::new(); + // First attempt fails + importer + .expect_commit_result() + .times(1) + .in_sequence(&mut seq) + .returning(move |_| Err(anyhow::anyhow!("Error in production"))); + // Second attempt should be triggered after 1 second and success + importer + .expect_commit_result() + .times(1) + .in_sequence(&mut seq) + .returning(move |_| { + block_production_waitpoint_trigger.notify_waiters(); + Ok(()) + }); + importer + .expect_block_stream() + .returning(|| Box::pin(tokio_stream::pending())); + ctx_builder.with_importer(importer); + let test_ctx = ctx_builder.build(); + + let before_retry = Instant::now(); + + // when + block_production_waitpoint.notified().await; + + // then + assert!(before_retry.elapsed() >= Duration::from_secs(1)); + + test_ctx.service.stop_and_await().await?; + Ok(()) +} + #[tokio::test(start_paused = true)] async fn interval_trigger_doesnt_react_to_full_txpool() -> anyhow::Result<()> { let mut ctx = DefaultContext::new(Config {