Skip to content

Commit

Permalink
Use dedicated thread for the block importer (#2014)
Browse files Browse the repository at this point in the history
Related: #642

### Before requesting review
- [x] I have reviewed the code myself
  • Loading branch information
xgreenx authored Jul 5, 2024
1 parent da1f69f commit cd4158e
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 28 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
## [Unreleased]

### Added
- [#2014](https://github.com/FuelLabs/fuel-core/pull/2014): Added a separate thread for the block importer.
- [#2013](https://github.com/FuelLabs/fuel-core/pull/2013): Added a separate thread to process P2P database lookups.
- [#2004](https://github.com/FuelLabs/fuel-core/pull/2004): Added new CLI argument `continue-services-on-error` to control internal flow of services.
- [#2004](https://github.com/FuelLabs/fuel-core/pull/2004): Added handling of incorrect shutdown of the off-chain GraphQL worker by using state rewind feature.
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ test-case = "3.3"
impl-tools = "0.10"
test-strategy = "0.3"
parquet = { version = "49.0", default-features = false }
rayon = "1.10.0"
bytes = "1.5.0"
pretty_assertions = "1.4.0"
proptest = "1.1"
Expand Down
2 changes: 1 addition & 1 deletion crates/services/importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ fuel-core-metrics = { workspace = true }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true }
parking_lot = { workspace = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tokio-rayon = { workspace = true }
tracing = { workspace = true }

[dev-dependencies]
Expand Down
90 changes: 65 additions & 25 deletions crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub enum Error {
StorageError(StorageError),
UnsupportedConsensusVariant(String),
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
RayonTaskWasCanceled,
}

impl From<Error> for anyhow::Error {
Expand Down Expand Up @@ -124,6 +125,7 @@ pub struct Importer<D, E, V> {
/// the block importer stops committing new blocks and waits for
/// the resolution of the previous one.
active_import_results: Arc<Semaphore>,
process_thread: rayon::ThreadPool,
}

impl<D, E, V> Importer<D, E, V> {
Expand All @@ -139,6 +141,10 @@ impl<D, E, V> Importer<D, E, V> {
// that will not be processed.
let max_block_notify_buffer = config.max_block_notify_buffer;
let (broadcast, _) = broadcast::channel(max_block_notify_buffer);
let process_thread = rayon::ThreadPoolBuilder::new()
.num_threads(1)
.build()
.expect("Failed to create a thread pool for the block processing");

Self {
database: Mutex::new(database),
Expand All @@ -148,6 +154,7 @@ impl<D, E, V> Importer<D, E, V> {
broadcast,
active_import_results: Arc::new(Semaphore::new(max_block_notify_buffer)),
guard: Semaphore::new(1),
process_thread,
}
}

Expand Down Expand Up @@ -179,11 +186,28 @@ impl<D, E, V> Importer<D, E, V> {
}
}
}

async fn async_run<OP, Output>(&self, op: OP) -> Result<Output, Error>
where
OP: FnOnce() -> Output,
OP: Send,
Output: Send,
{
let (sender, receiver) = tokio::sync::oneshot::channel();
self.process_thread.scope_fifo(|_| {
let result = op();
let _ = sender.send(result);
});
let result = receiver.await.map_err(|_| Error::RayonTaskWasCanceled)?;
Ok(result)
}
}

impl<D, E, V> Importer<D, E, V>
where
D: ImporterDatabase + Transactional,
E: Send + Sync,
V: Send + Sync,
{
/// The method commits the result of the block execution attaching the consensus data.
/// It expects that the `UncommittedResult` contains the result of the block
Expand Down Expand Up @@ -221,13 +245,15 @@ where
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();

self._commit_result(result, permit, database)
self.async_run(move || {
let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
self._commit_result(result, permit, database)
})
.await?
}

/// The method commits the result of the block execution and notifies about a new imported block.
Expand Down Expand Up @@ -454,14 +480,18 @@ where

let executor = self.executor.clone();
let verifier = self.verifier.clone();
let (result, execute_time) = tokio_rayon::spawn_fifo(|| {
let start = Instant::now();
let result =
Self::verify_and_execute_block_inner(executor, verifier, sealed_block);
let execute_time = start.elapsed().as_secs_f64();
(result, execute_time)
})
.await;
let (result, execute_time) = self
.async_run(|| {
let start = Instant::now();
let result = Self::verify_and_execute_block_inner(
executor,
verifier,
sealed_block,
);
let execute_time = start.elapsed().as_secs_f64();
(result, execute_time)
})
.await?;

let result = result?;

Expand All @@ -482,19 +512,29 @@ where
};
let permit = permit.map_err(Error::ActiveBlockResultsSemaphoreClosed)?;

let start = Instant::now();
let commit_result = self
.async_run(move || {
let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();

let start = Instant::now();
self._commit_result(result, permit, database).map(|_| start)
})
.await?;

let time = if let Ok(start_instant) = commit_result {
let commit_time = start_instant.elapsed().as_secs_f64();
execute_time + commit_time
} else {
execute_time
};

let mut guard = self
.database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
let commit_result = self._commit_result(result, permit, database);
let commit_time = start.elapsed().as_secs_f64();
let time = execute_time + commit_time;
importer_metrics().execute_and_commit_duration.observe(time);
// return execution result
commit_result
commit_result.map(|_| ())
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/services/p2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ prometheus-client = { workspace = true }
quick-protobuf = "0.8.1"
quick-protobuf-codec = "0.3.0"
rand = { workspace = true }
rayon = "1.10.0"
rayon = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_with = { workspace = true }
sha2 = "0.10"
Expand Down

0 comments on commit cd4158e

Please sign in to comment.