Skip to content

Commit

Permalink
Make ETL file size configurable (#6927)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
Co-authored-by: joshieDo <ranriver@protonmail.com>
  • Loading branch information
3 people authored Mar 13, 2024
1 parent c9c269b commit 5d6ac4c
Show file tree
Hide file tree
Showing 14 changed files with 87 additions and 14 deletions.
1 change: 1 addition & 0 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
stage_conf.etl.etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
Expand Down
4 changes: 4 additions & 0 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ impl ImportCommand {
reth_revm::EvmProcessorFactory::new(self.chain.clone(), EthEvmConfig::default());

let max_block = file_client.max_block().unwrap_or(0);

let etl_file_size = config.stages.etl.etl_file_size;

let mut pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
Expand All @@ -190,6 +193,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand Down
8 changes: 7 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ pub struct Command {
#[arg(long)]
batch_size: Option<u64>,

/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
#[arg(long)]
etl_file_size: Option<usize>,

/// Normally, running the stage requires unwinding for stages that already
/// have been run, in order to not rewrite to the same database slots.
///
Expand Down Expand Up @@ -151,6 +155,8 @@ impl Command {

let batch_size = self.batch_size.unwrap_or(self.to - self.from + 1);

let etl_file_size = self.etl_file_size.unwrap_or(500 * 1024 * 1024);

let (mut exec_stage, mut unwind_stage): (Box<dyn Stage<_>>, Option<Box<dyn Stage<_>>>) =
match self.stage {
StageEnum::Bodies => {
Expand Down Expand Up @@ -229,7 +235,7 @@ impl Command {
)
}
StageEnum::TxLookup => {
(Box::new(TransactionLookupStage::new(batch_size, None)), None)
(Box::new(TransactionLookupStage::new(batch_size, etl_file_size, None)), None)
}
StageEnum::AccountHashing => {
(Box::new(AccountHashingStage::new(1, batch_size)), None)
Expand Down
2 changes: 1 addition & 1 deletion book/cli/reth/node.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ RPC:
--rpc-max-tracing-requests <COUNT>
Maximum number of concurrent tracing requests

[default: 8]
[default: 10]

--rpc-max-blocks-per-filter <COUNT>
Maximum number of blocks that could be scanned per filter request. (0 = entire chain)
Expand Down
3 changes: 3 additions & 0 deletions book/cli/reth/stage/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ Options:
--batch-size <BATCH_SIZE>
Batch size for stage execution and unwind

--etl-file-size <ETL_FILE_SIZE>
Size for temporary file during ETL stages

-s, --skip-unwind
Normally, running the stage requires unwinding for stages that already have been run, in order to not rewrite to the same database slots.

Expand Down
13 changes: 13 additions & 0 deletions book/run/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,19 @@ The storage history indexing stage builds an index of what blocks a particular s
commit_threshold = 100000
```

### `etl`

An ETL (extract, transform, load) data collector. Used mainly to insert data into `MDBX` in a sorted manner.

```toml
[stages.etl]
# The maximum size in bytes of data held in memory before being flushed to disk as a file.
#
# Lower threshold corresponds to more frequent flushes,
# but lowers temporary storage usage
file_size = 524_288_000 # 500 * 1024 * 1024
```

## The `[peers]` section

The peers section is used to configure how the networking component of reth establishes and maintains connections to peers.
Expand Down
18 changes: 17 additions & 1 deletion crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ pub struct StageConfig {
pub index_account_history: IndexHistoryConfig,
/// Index Storage History stage configuration.
pub index_storage_history: IndexHistoryConfig,
/// Common ETL related configuration.
pub etl: EtlConfig,
}

/// Header stage configuration.
Expand Down Expand Up @@ -235,7 +237,21 @@ impl Default for TransactionLookupConfig {
}
}

/// History History stage configuration.
/// Common ETL related configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct EtlConfig {
/// The maximum size in bytes of data held in memory before being flushed to disk as a file.
pub etl_file_size: usize,
}

impl Default for EtlConfig {
fn default() -> Self {
Self { etl_file_size: 500 * (1024 * 1024) }
}
}

/// History stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct IndexHistoryConfig {
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ where
header_downloader,
body_downloader,
executor_factory.clone(),
500 * (1024 * 1024),
))
}
};
Expand Down
2 changes: 2 additions & 0 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,6 +837,7 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
stage_config.etl.etl_file_size,
)
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -870,6 +871,7 @@ impl NodeConfig {
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.chunk_size,
stage_config.etl.etl_file_size,
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/benches/criterion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn transaction_lookup(c: &mut Criterion) {
let mut group = c.benchmark_group("Stages");
// don't need to run each stage for that many times
group.sample_size(10);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, None);
let stage = TransactionLookupStage::new(DEFAULT_NUM_BLOCKS, 500 * 1024 * 1024, None);

let db = setup::txs_testdata(DEFAULT_NUM_BLOCKS);

Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
//! headers_downloader,
//! bodies_downloader,
//! executor_factory,
//! 500*1024*1024,
//! )
//! )
//! .build(provider_factory, static_file_producer);
Expand Down
17 changes: 15 additions & 2 deletions crates/stages/src/sets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
header_downloader: H,
body_downloader: B,
executor_factory: EF,
etl_file_size: usize,
) -> Self
where
EF: ExecutorFactory,
Expand All @@ -111,6 +112,7 @@ impl<Provider, H, B, EF> DefaultStages<Provider, H, B, EF> {
consensus,
header_downloader,
body_downloader,
etl_file_size,
),
executor_factory,
}
Expand Down Expand Up @@ -162,6 +164,8 @@ pub struct OnlineStages<Provider, H, B> {
header_downloader: H,
/// The block body downloader
body_downloader: B,
/// The size of temporary files in bytes for ETL data collector.
etl_file_size: usize,
}

impl<Provider, H, B> OnlineStages<Provider, H, B> {
Expand All @@ -172,8 +176,9 @@ impl<Provider, H, B> OnlineStages<Provider, H, B> {
consensus: Arc<dyn Consensus>,
header_downloader: H,
body_downloader: B,
etl_file_size: usize,
) -> Self {
Self { provider, header_mode, consensus, header_downloader, body_downloader }
Self { provider, header_mode, consensus, header_downloader, body_downloader, etl_file_size }
}
}

Expand All @@ -198,9 +203,16 @@ where
mode: HeaderSyncMode,
header_downloader: H,
consensus: Arc<dyn Consensus>,
etl_file_size: usize,
) -> StageSetBuilder<DB> {
StageSetBuilder::default()
.add_stage(HeaderStage::new(provider, header_downloader, mode, consensus.clone()))
.add_stage(HeaderStage::new(
provider,
header_downloader,
mode,
consensus.clone(),
etl_file_size,
))
.add_stage(bodies)
}
}
Expand All @@ -219,6 +231,7 @@ where
self.header_downloader,
self.header_mode,
self.consensus.clone(),
self.etl_file_size,
))
.add_stage(BodyStage::new(self.body_downloader))
}
Expand Down
6 changes: 4 additions & 2 deletions crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,16 @@ where
downloader: Downloader,
mode: HeaderSyncMode,
consensus: Arc<dyn Consensus>,
etl_file_size: usize,
) -> Self {
Self {
provider: database,
downloader,
mode,
consensus,
sync_gap: None,
hash_collector: Collector::new(100 * (1024 * 1024)),
header_collector: Collector::new(100 * (1024 * 1024)),
hash_collector: Collector::new(etl_file_size / 2),
header_collector: Collector::new(etl_file_size / 2),
is_etl_ready: false,
}
}
Expand Down Expand Up @@ -419,6 +420,7 @@ mod tests {
(*self.downloader_factory)(),
HeaderSyncMode::Tip(self.channel.1.clone()),
self.consensus.clone(),
500 * (1024 * 1024),
)
}
}
Expand Down
23 changes: 17 additions & 6 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,20 @@ pub struct TransactionLookupStage {
/// The maximum number of lookup entries to hold in memory before pushing them to
/// [`reth_etl::Collector`].
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupStage {
fn default() -> Self {
Self { chunk_size: 5_000_000, prune_mode: None }
Self { chunk_size: 5_000_000, etl_file_size: 500 * 1024 * 1024, prune_mode: None }
}
}

impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(chunk_size: u64, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, prune_mode }
pub fn new(chunk_size: u64, etl_file_size: usize, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, etl_file_size, prune_mode }
}
}

Expand Down Expand Up @@ -99,7 +100,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
}

// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> = Collector::new(500 * (1024 * 1024));
let mut hash_collector: Collector<TxHash, TxNumber> = Collector::new(self.etl_file_size);

debug!(
target: "sync::stages::transaction_lookup",
Expand Down Expand Up @@ -397,12 +398,18 @@ mod tests {
struct TransactionLookupTestRunner {
db: TestStageDB,
chunk_size: u64,
etl_file_size: usize,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None }
Self {
db: TestStageDB::default(),
chunk_size: 1000,
etl_file_size: 500 * 1024 * 1024,
prune_mode: None,
}
}
}

Expand Down Expand Up @@ -449,7 +456,11 @@ mod tests {
}

fn stage(&self) -> Self::S {
TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode }
TransactionLookupStage {
chunk_size: self.chunk_size,
etl_file_size: self.etl_file_size,
prune_mode: self.prune_mode,
}
}
}

Expand Down

0 comments on commit 5d6ac4c

Please sign in to comment.