From 28f3a2e2d9525bf2f6373e755e2d6dc0c2f97821 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 13 Mar 2024 16:06:50 +0000 Subject: [PATCH] feat: add `EtlConfig` as well as setting the directory to datadir (#7124) Co-authored-by: Mikhail Sozin Co-authored-by: Misha Co-authored-by: Alexey Shekhirin --- Cargo.lock | 2 ++ bin/reth/src/commands/debug_cmd/execution.rs | 2 +- bin/reth/src/commands/import.rs | 4 +-- bin/reth/src/commands/stage/run.rs | 15 +++++++-- book/cli/reth/recover/storage-tries.md | 14 +++++++++ book/cli/reth/stage/run.md | 5 ++- crates/config/src/config.rs | 31 ++++++++++++++++--- crates/consensus/beacon/Cargo.toml | 1 + .../consensus/beacon/src/engine/test_utils.rs | 3 +- crates/etl/src/lib.rs | 19 +++++++++--- crates/node-builder/src/builder.rs | 8 ++++- crates/node-core/src/node_config.rs | 4 +-- crates/stages/Cargo.toml | 1 + crates/stages/benches/criterion.rs | 3 +- crates/stages/src/lib.rs | 3 +- crates/stages/src/sets.rs | 19 ++++++------ crates/stages/src/stages/headers.rs | 9 +++--- crates/stages/src/stages/tx_lookup.rs | 18 ++++++----- 18 files changed, 118 insertions(+), 43 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 023c4d32311f..bad56fba7ed1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5598,6 +5598,7 @@ dependencies = [ "metrics", "reth-beacon-consensus-core", "reth-blockchain-tree", + "reth-config", "reth-db", "reth-downloaders", "reth-interfaces", @@ -6659,6 +6660,7 @@ dependencies = [ "rayon", "reth-blockchain-tree", "reth-codecs", + "reth-config", "reth-db", "reth-downloaders", "reth-eth-wire", diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index 4714056adb6b..9d9b4e2123c3 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -127,7 +127,7 @@ impl Command { header_downloader, body_downloader, factory.clone(), - stage_conf.etl.etl_file_size, + stage_conf.etl.clone(), ) .set(SenderRecoveryStage { commit_threshold: stage_conf.sender_recovery.commit_threshold, diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index 41264bf987e2..fa5d4488aa99 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -179,8 +179,6 @@ impl ImportCommand { let max_block = file_client.max_block().unwrap_or(0); - let etl_file_size = config.stages.etl.etl_file_size; - let mut pipeline = Pipeline::builder() .with_tip_sender(tip_tx) // we want to sync all blocks the file client provides or 0 if empty @@ -193,7 +191,7 @@ impl ImportCommand { header_downloader, body_downloader, factory.clone(), - etl_file_size, + config.stages.etl, ) .set(SenderRecoveryStage { commit_threshold: config.stages.sender_recovery.commit_threshold, diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 42747f75c341..78792c830e77 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -14,7 +14,7 @@ use crate::{ }; use clap::Parser; use reth_beacon_consensus::BeaconConsensus; -use reth_config::Config; +use reth_config::{config::EtlConfig, Config}; use reth_db::init_db; use reth_downloaders::bodies::bodies::BodiesDownloaderBuilder; use reth_node_ethereum::EthEvmConfig; @@ -86,6 +86,10 @@ pub struct Command { #[arg(long)] etl_file_size: Option, + /// Directory where to collect ETL files + #[arg(long)] + etl_dir: Option, + /// Normally, running the stage requires unwinding for stages that already /// have been run, in order to not rewrite to the same database slots. /// @@ -155,7 +159,12 @@ impl Command { let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1); - let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024); + let etl_config = EtlConfig::new( + Some( + self.etl_dir.unwrap_or_else(|| EtlConfig::from_datadir(&data_dir.data_dir_path())), + ), + self.etl_file_size.unwrap_or(EtlConfig::default_file_size()), + ); let (mut exec_stage, mut unwind_stage): (Box>, Option>>) = match self.stage { @@ -235,7 +244,7 @@ impl Command { ) } StageEnum::TxLookup => { - (Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None) + (Box::new(TransactionLookupStage::new(batch_size, etl_config, None)), None) } StageEnum::AccountHashing => { (Box::new(AccountHashingStage::new(1, batch_size)), None) diff --git a/book/cli/reth/recover/storage-tries.md b/book/cli/reth/recover/storage-tries.md index 778edb804400..f3be1b573c4f 100644 --- a/book/cli/reth/recover/storage-tries.md +++ b/book/cli/reth/recover/storage-tries.md @@ -41,6 +41,20 @@ Options: -h, --help Print help (see a summary with '-h') +Database: + --db.log-level + Database logging level. Levels higher than "notice" require a debug build + + Possible values: + - fatal: Enables logging for critical conditions, i.e. assertion failures + - error: Enables logging for error conditions + - warn: Enables logging for warning conditions + - notice: Enables logging for normal but significant condition + - verbose: Enables logging for verbose informational + - debug: Enables logging for debug-level messages + - trace: Enables logging for trace debug-level messages + - extra: Enables logging for extra debug-level messages + Logging: --log.stdout.format The format to use for logs written to stdout diff --git a/book/cli/reth/stage/run.md b/book/cli/reth/stage/run.md index a56964a7af0c..ade508237170 100644 --- a/book/cli/reth/stage/run.md +++ b/book/cli/reth/stage/run.md @@ -62,7 +62,10 @@ Options: Batch size for stage execution and unwind --etl-file-size - Size for temporary file during ETL stages + The maximum size in bytes of data held in memory before being flushed to disk as a file + + --etl-dir + Directory where to collect ETL files -s, --skip-unwind Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots. diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 2ddac702bc58..2f603edb1a27 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -5,7 +5,10 @@ use reth_network::{NetworkConfigBuilder, PeersConfig, SessionsConfig}; use reth_primitives::PruneModes; use secp256k1::SecretKey; use serde::{Deserialize, Deserializer, Serialize}; -use std::{path::PathBuf, time::Duration}; +use std::{ + path::{Path, PathBuf}, + time::Duration, +}; /// Configuration for the reth node. #[derive(Debug, Clone, Default, Deserialize, PartialEq, Serialize)] @@ -238,16 +241,36 @@ impl Default for TransactionLookupConfig { } /// Common ETL related configuration. -#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] +#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)] #[serde(default)] pub struct EtlConfig { + /// Data directory where temporary files are created. + pub dir: Option, /// The maximum size in bytes of data held in memory before being flushed to disk as a file. - pub etl_file_size: usize, + pub file_size: usize, } impl Default for EtlConfig { fn default() -> Self { - Self { etl_file_size: 500 * (1024 * 1024) } + Self { dir: None, file_size: Self::default_file_size() } + } +} + +impl EtlConfig { + /// Creates an ETL configuration + pub fn new(dir: Option, file_size: usize) -> Self { + Self { dir, file_size } + } + + /// Return default ETL directory from datadir path. + pub fn from_datadir(path: &Path) -> PathBuf { + path.join("etl-tmp") + } + + /// Default size in bytes of data held in memory before being flushed to disk as a file. + pub const fn default_file_size() -> usize { + // 500 MB + 500 * (1024 * 1024) } } diff --git a/crates/consensus/beacon/Cargo.toml b/crates/consensus/beacon/Cargo.toml index 0e25da10388a..0c5aafa9cd82 100644 --- a/crates/consensus/beacon/Cargo.toml +++ b/crates/consensus/beacon/Cargo.toml @@ -55,6 +55,7 @@ reth-tracing.workspace = true reth-revm.workspace = true reth-downloaders.workspace = true reth-node-ethereum.workspace = true +reth-config.workspace = true assert_matches.workspace = true diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 6a5df757a928..55b37f812a6d 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -6,6 +6,7 @@ use crate::{ use reth_blockchain_tree::{ config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree, }; +use reth_config::config::EtlConfig; use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE}; type DatabaseEnv = TempDatabase; use reth_downloaders::{ @@ -406,7 +407,7 @@ where header_downloader, body_downloader, executor_factory.clone(), - 500 * (1024 * 1024), + EtlConfig::default(), )) } }; diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 89488dc7f7b1..595ae02c680a 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -18,7 +18,7 @@ use std::{ cmp::Reverse, collections::BinaryHeap, io::{self, BufReader, BufWriter, Read, Seek, SeekFrom, Write}, - path::Path, + path::{Path, PathBuf}, }; use rayon::prelude::*; @@ -42,6 +42,8 @@ where ::Encoded: std::fmt::Debug, ::Compressed: std::fmt::Debug, { + /// Parent directory where to create ETL files + parent_dir: Option, /// Directory for temporary file storage dir: Option, /// Collection of temporary ETL files @@ -66,8 +68,9 @@ where /// Create a new collector with some capacity. /// /// Once the capacity (in bytes) is reached, the data is sorted and flushed to disk. - pub fn new(buffer_capacity_bytes: usize) -> Self { + pub fn new(buffer_capacity_bytes: usize, parent_dir: Option) -> Self { Self { + parent_dir, dir: None, buffer_size_bytes: 0, files: Vec::new(), @@ -115,7 +118,15 @@ where /// doesn't exist, it will be created. fn dir(&mut self) -> io::Result<&TempDir> { if self.dir.is_none() { - self.dir = Some(TempDir::new()?); + self.dir = match &self.parent_dir { + Some(dir) => { + if !dir.exists() { + std::fs::create_dir_all(dir)?; + } + Some(TempDir::new_in(dir)?) + } + None => Some(TempDir::new()?), + }; } Ok(self.dir.as_ref().unwrap()) } @@ -273,7 +284,7 @@ mod tests { let mut entries: Vec<_> = (0..10_000).map(|id| (TxHash::random(), id as TxNumber)).collect(); - let mut collector = Collector::new(1024); + let mut collector = Collector::new(1024, None); assert!(collector.dir.is_none()); for (k, v) in entries.clone() { diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index a5461d6edcdd..80840603bf1d 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -19,6 +19,7 @@ use reth_beacon_consensus::{ BeaconConsensusEngine, }; use reth_blockchain_tree::{BlockchainTreeConfig, ShareableBlockchainTree}; +use reth_config::config::EtlConfig; use reth_db::{ database::Database, database_metrics::{DatabaseMetadata, DatabaseMetrics}, @@ -512,7 +513,7 @@ where executor, data_dir, mut config, - reth_config, + mut reth_config, .. } = ctx; @@ -556,6 +557,11 @@ where hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone()))); info!(target: "reth::cli", "StaticFileProducer initialized"); + // Make sure ETL doesn't default to /tmp/, but to whatever datadir is set to + if reth_config.stages.etl.dir.is_none() { + reth_config.stages.etl.dir = Some(EtlConfig::from_datadir(&data_dir.data_dir_path())); + } + // Configure the pipeline let (mut pipeline, client) = if config.dev.dev { info!(target: "reth::cli", "Starting Reth in dev mode"); diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 1424c30e993e..93ea33fe9fca 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -837,7 +837,7 @@ impl NodeConfig { header_downloader, body_downloader, factory.clone(), - stage_config.etl.etl_file_size, + stage_config.etl.clone(), ) .set(SenderRecoveryStage { commit_threshold: stage_config.sender_recovery.commit_threshold, @@ -871,7 +871,7 @@ impl NodeConfig { .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( stage_config.transaction_lookup.chunk_size, - stage_config.etl.etl_file_size, + stage_config.etl.clone(), prune_modes.transaction_lookup, )) .set(IndexAccountHistoryStage::new( diff --git a/crates/stages/Cargo.toml b/crates/stages/Cargo.toml index 11aa2f545013..d51005a497de 100644 --- a/crates/stages/Cargo.toml +++ b/crates/stages/Cargo.toml @@ -22,6 +22,7 @@ reth-trie = { workspace = true, features = ["metrics"] } reth-tokio-util.workspace = true reth-etl.workspace = true reth-static-file.workspace = true +reth-config.workspace = true # async tokio = { workspace = true, features = ["sync"] } diff --git a/crates/stages/benches/criterion.rs b/crates/stages/benches/criterion.rs index cdeb8b5350cc..59ad5916789c 100644 --- a/crates/stages/benches/criterion.rs +++ b/crates/stages/benches/criterion.rs @@ -4,6 +4,7 @@ use criterion::{ BenchmarkGroup, Criterion, }; use pprof::criterion::{Output, PProfProfiler}; +use reth_config::config::EtlConfig; use reth_db::{test_utils::TempDatabase, DatabaseEnv}; use reth_primitives::{stage::StageCheckpoint, BlockNumber}; @@ -57,7 +58,7 @@ fn transaction_lookup(c: &mut Criterion) { let mut group = c.benchmark_group("Stages"); // don't need to run each stage for that many times group.sample_size(10); - let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None); + let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, EtlConfig::default(), None); let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS); diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 785bada6c3ee..6e123f750b5c 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -27,6 +27,7 @@ //! # use reth_provider::HeaderSyncMode; //! # use reth_provider::test_utils::create_test_provider_factory; //! # use reth_static_file::StaticFileProducer; +//! # use reth_config::config::EtlConfig; //! # //! # let chain_spec = MAINNET.clone(); //! # let consensus: Arc = Arc::new(TestConsensus::default()); @@ -59,7 +60,7 @@ //! headers_downloader, //! bodies_downloader, //! executor_factory, -//! 500*1024*1024, +//! EtlConfig::default(), //! ) //! ) //! .build(provider_factory, static_file_producer); diff --git a/crates/stages/src/sets.rs b/crates/stages/src/sets.rs index b706740963a0..4e24e9a9234b 100644 --- a/crates/stages/src/sets.rs +++ b/crates/stages/src/sets.rs @@ -54,6 +54,7 @@ use crate::{ }, StageSet, StageSetBuilder, }; +use reth_config::config::EtlConfig; use reth_db::database::Database; use reth_interfaces::{ consensus::Consensus, @@ -100,7 +101,7 @@ impl DefaultStages { header_downloader: H, body_downloader: B, executor_factory: EF, - etl_file_size: usize, + etl_config: EtlConfig, ) -> Self where EF: ExecutorFactory, @@ -112,7 +113,7 @@ impl DefaultStages { consensus, header_downloader, body_downloader, - etl_file_size, + etl_config, ), executor_factory, } @@ -164,8 +165,8 @@ pub struct OnlineStages { header_downloader: H, /// The block body downloader body_downloader: B, - /// The size of temporary files in bytes for ETL data collector. - etl_file_size: usize, + /// ETL configuration + etl_config: EtlConfig, } impl OnlineStages { @@ -176,9 +177,9 @@ impl OnlineStages { consensus: Arc, header_downloader: H, body_downloader: B, - etl_file_size: usize, + etl_config: EtlConfig, ) -> Self { - Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_file_size } + Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_config } } } @@ -203,7 +204,7 @@ where mode: HeaderSyncMode, header_downloader: H, consensus: Arc, - etl_file_size: usize, + etl_config: EtlConfig, ) -> StageSetBuilder { StageSetBuilder::default() .add_stage(HeaderStage::new( @@ -211,7 +212,7 @@ where header_downloader, mode, consensus.clone(), - etl_file_size, + etl_config, )) .add_stage(bodies) } @@ -231,7 +232,7 @@ where self.header_downloader, self.header_mode, self.consensus.clone(), - self.etl_file_size, + self.etl_config.clone(), )) .add_stage(BodyStage::new(self.body_downloader)) } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index 2dc39306e47a..a1e03fb0f8a2 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -1,6 +1,7 @@ use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use futures_util::StreamExt; use reth_codecs::Compact; +use reth_config::config::EtlConfig; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -74,7 +75,7 @@ where downloader: Downloader, mode: HeaderSyncMode, consensus: Arc, - etl_file_size: usize, + etl_config: EtlConfig, ) -> Self { Self { provider: database, @@ -82,8 +83,8 @@ where mode, consensus, sync_gap: None, - hash_collector: Collector::new(etl_file_size / 2), - header_collector: Collector::new(etl_file_size / 2), + hash_collector: Collector::new(etl_config.file_size / 2, etl_config.dir.clone()), + header_collector: Collector::new(etl_config.file_size / 2, etl_config.dir.clone()), is_etl_ready: false, } } @@ -420,7 +421,7 @@ mod tests { (*self.downloader_factory)(), HeaderSyncMode::Tip(self.channel.1.clone()), self.consensus.clone(), - 500 * (1024 * 1024), + EtlConfig::default(), ) } } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index fdc673c76d3d..d45c25c3e574 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,5 +1,6 @@ use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use num_traits::Zero; +use reth_config::config::EtlConfig; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -32,20 +33,20 @@ pub struct TransactionLookupStage { /// The maximum number of lookup entries to hold in memory before pushing them to /// [`reth_etl::Collector`]. chunk_size: u64, - etl_file_size: usize, + etl_config: EtlConfig, prune_mode: Option, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024, prune_mode: None } + Self { chunk_size: 5_000_000, etl_config: EtlConfig::default(), prune_mode: None } } } impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(chunk_size: u64, etl_file_size: usize, prune_mode: Option) -> Self { - Self { chunk_size, etl_file_size, prune_mode } + pub fn new(chunk_size: u64, etl_config: EtlConfig, prune_mode: Option) -> Self { + Self { chunk_size, etl_config, prune_mode } } } @@ -100,7 +101,8 @@ impl Stage for TransactionLookupStage { } // 500MB temporary files - let mut hash_collector: Collector = Collector::new(self.etl_file_size); + let mut hash_collector: Collector = + Collector::new(self.etl_config.file_size, self.etl_config.dir.clone()); debug!( target: "sync::stages::transaction_lookup", @@ -398,7 +400,7 @@ mod tests { struct TransactionLookupTestRunner { db: TestStageDB, chunk_size: u64, - etl_file_size: usize, + etl_config: EtlConfig, prune_mode: Option, } @@ -407,7 +409,7 @@ mod tests { Self { db: TestStageDB::default(), chunk_size: 1000, - etl_file_size: 500 * 1024 * 1024, + etl_config: EtlConfig::default(), prune_mode: None, } } @@ -458,7 +460,7 @@ mod tests { fn stage(&self) -> Self::S { TransactionLookupStage { chunk_size: self.chunk_size, - etl_file_size: self.etl_file_size, + etl_config: self.etl_config.clone(), prune_mode: self.prune_mode, } }