From cd4158effa9a83c69eebec019c1a4e325622bea0 Mon Sep 17 00:00:00 2001 From: Green Baneling Date: Fri, 5 Jul 2024 16:17:57 +0200 Subject: [PATCH] Use dedicated thread for the block importer (#2014) Related: https://github.com/FuelLabs/fuel-core/issues/642 ### Before requesting review - [x] I have reviewed the code myself --- CHANGELOG.md | 1 + Cargo.lock | 2 +- Cargo.toml | 1 + crates/services/importer/Cargo.toml | 2 +- crates/services/importer/src/importer.rs | 90 +++++++++++++++++------- crates/services/p2p/Cargo.toml | 2 +- 6 files changed, 70 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b046584b0e2..45f97fdd7a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] ### Added +- [#2014](https://github.com/FuelLabs/fuel-core/pull/2014): Added a separate thread for the block importer. - [#2013](https://github.com/FuelLabs/fuel-core/pull/2013): Added a separate thread to process P2P database lookups. - [#2004](https://github.com/FuelLabs/fuel-core/pull/2004): Added new CLI argument `continue-services-on-error` to control internal flow of services. - [#2004](https://github.com/FuelLabs/fuel-core/pull/2004): Added handling of incorrect shutdown of the off-chain GraphQL worker by using state rewind feature. diff --git a/Cargo.lock b/Cargo.lock index d1e347059e1..647067f48dc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3061,9 +3061,9 @@ dependencies = [ "fuel-core-types", "mockall", "parking_lot", + "rayon", "test-case", "tokio", - "tokio-rayon", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index ad33c966879..9543090b6d5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -126,6 +126,7 @@ test-case = "3.3" impl-tools = "0.10" test-strategy = "0.3" parquet = { version = "49.0", default-features = false } +rayon = "1.10.0" bytes = "1.5.0" pretty_assertions = "1.4.0" proptest = "1.1" diff --git a/crates/services/importer/Cargo.toml b/crates/services/importer/Cargo.toml index b6dae64a88f..d68d54bdec9 100644 --- a/crates/services/importer/Cargo.toml +++ b/crates/services/importer/Cargo.toml @@ -16,8 +16,8 @@ fuel-core-metrics = { workspace = true } fuel-core-storage = { workspace = true } fuel-core-types = { workspace = true } parking_lot = { workspace = true } +rayon = { workspace = true } tokio = { workspace = true, features = ["full"] } -tokio-rayon = { workspace = true } tracing = { workspace = true } [dev-dependencies] diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index f77efef82d4..d877390cbaa 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -97,6 +97,7 @@ pub enum Error { StorageError(StorageError), UnsupportedConsensusVariant(String), ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError), + RayonTaskWasCanceled, } impl From for anyhow::Error { @@ -124,6 +125,7 @@ pub struct Importer { /// the block importer stops committing new blocks and waits for /// the resolution of the previous one. active_import_results: Arc, + process_thread: rayon::ThreadPool, } impl Importer { @@ -139,6 +141,10 @@ impl Importer { // that will not be processed. let max_block_notify_buffer = config.max_block_notify_buffer; let (broadcast, _) = broadcast::channel(max_block_notify_buffer); + let process_thread = rayon::ThreadPoolBuilder::new() + .num_threads(1) + .build() + .expect("Failed to create a thread pool for the block processing"); Self { database: Mutex::new(database), @@ -148,6 +154,7 @@ impl Importer { broadcast, active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)), guard: Semaphore::new(1), + process_thread, } } @@ -179,11 +186,28 @@ impl Importer { } } } + + async fn async_run(&self, op: OP) -> Result + where + OP: FnOnce() -> Output, + OP: Send, + Output: Send, + { + let (sender, receiver) = tokio::sync::oneshot::channel(); + self.process_thread.scope_fifo(|_| { + let result = op(); + let _ = sender.send(result); + }); + let result = receiver.await.map_err(|_| Error::RayonTaskWasCanceled)?; + Ok(result) + } } impl Importer where D: ImporterDatabase + Transactional, + E: Send + Sync, + V: Send + Sync, { /// The method commits the result of the block execution attaching the consensus data. /// It expects that the `UncommittedResult` contains the result of the block @@ -221,13 +245,15 @@ where }; let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?; - let mut guard = self - .database - .try_lock() - .expect("Semaphore prevents concurrent access to the database"); - let database = guard.deref_mut(); - - self._commit_result(result, permit, database) + self.async_run(move || { + let mut guard = self + .database + .try_lock() + .expect("Semaphore prevents concurrent access to the database"); + let database = guard.deref_mut(); + self._commit_result(result, permit, database) + }) + .await? } /// The method commits the result of the block execution and notifies about a new imported block. @@ -454,14 +480,18 @@ where let executor = self.executor.clone(); let verifier = self.verifier.clone(); - let (result, execute_time) = tokio_rayon::spawn_fifo(|| { - let start = Instant::now(); - let result = - Self::verify_and_execute_block_inner(executor, verifier, sealed_block); - let execute_time = start.elapsed().as_secs_f64(); - (result, execute_time) - }) - .await; + let (result, execute_time) = self + .async_run(|| { + let start = Instant::now(); + let result = Self::verify_and_execute_block_inner( + executor, + verifier, + sealed_block, + ); + let execute_time = start.elapsed().as_secs_f64(); + (result, execute_time) + }) + .await?; let result = result?; @@ -482,19 +512,29 @@ where }; let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?; - let start = Instant::now(); + let commit_result = self + .async_run(move || { + let mut guard = self + .database + .try_lock() + .expect("Semaphore prevents concurrent access to the database"); + let database = guard.deref_mut(); + + let start = Instant::now(); + self._commit_result(result, permit, database).map(|_| start) + }) + .await?; + + let time = if let Ok(start_instant) = commit_result { + let commit_time = start_instant.elapsed().as_secs_f64(); + execute_time + commit_time + } else { + execute_time + }; - let mut guard = self - .database - .try_lock() - .expect("Semaphore prevents concurrent access to the database"); - let database = guard.deref_mut(); - let commit_result = self._commit_result(result, permit, database); - let commit_time = start.elapsed().as_secs_f64(); - let time = execute_time + commit_time; importer_metrics().execute_and_commit_duration.observe(time); // return execution result - commit_result + commit_result.map(|_| ()) } } diff --git a/crates/services/p2p/Cargo.toml b/crates/services/p2p/Cargo.toml index fcf2a360c3d..ae75ce32a7e 100644 --- a/crates/services/p2p/Cargo.toml +++ b/crates/services/p2p/Cargo.toml @@ -42,7 +42,7 @@ prometheus-client = { workspace = true } quick-protobuf = "0.8.1" quick-protobuf-codec = "0.3.0" rand = { workspace = true } -rayon = "1.10.0" +rayon = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_with = { workspace = true } sha2 = "0.10"