Skip to content

Commit

Permalink
Merge pull request #2 from paradigmxyz/main
Browse files Browse the repository at this point in the history
Revert "refactor(stages): input target reached & output done checks" …
  • Loading branch information
dereksione authored Jun 12, 2023
2 parents 9a8c680 + 7ec4b0a commit b8877e9
Show file tree
Hide file tree
Showing 26 changed files with 379 additions and 313 deletions.
34 changes: 21 additions & 13 deletions bin/reth/src/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,30 @@ impl Command {

let mut account_hashing_done = false;
while !account_hashing_done {
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = account_hashing_stage.execute(&mut tx, input).await?;
account_hashing_done = output.is_done(input);
let output = account_hashing_stage
.execute(
&mut tx,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
account_hashing_done = output.done;
}

let mut storage_hashing_done = false;
while !storage_hashing_done {
let input = ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
};
let output = storage_hashing_stage.execute(&mut tx, input).await?;
storage_hashing_done = output.is_done(input);
let output = storage_hashing_stage
.execute(
&mut tx,
ExecInput {
target: Some(block),
checkpoint: progress.map(StageCheckpoint::new),
},
)
.await?;
storage_hashing_done = output.done;
}

let incremental_result = merkle_stage
Expand Down Expand Up @@ -162,7 +170,7 @@ impl Command {
loop {
let clean_result = merkle_stage.execute(&mut tx, clean_input).await;
assert!(clean_result.is_ok(), "Clean state root calculation failed");
if clean_result.unwrap().is_done(clean_input) {
if clean_result.unwrap().done {
break
}
}
Expand Down
3 changes: 1 addition & 2 deletions bin/reth/src/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ impl NodeState {
pipeline_position,
pipeline_total,
stage_id,
result: ExecOutput { checkpoint },
done,
result: ExecOutput { checkpoint, done },
} => {
self.current_checkpoint = checkpoint;

Expand Down
15 changes: 10 additions & 5 deletions bin/reth/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,16 @@ async fn dry_run(

let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
}

tx.drop()?;
Expand Down
15 changes: 10 additions & 5 deletions bin/reth/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,16 @@ async fn dry_run(

let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = exec_stage.execute(&mut tx, exec_input).await?.is_done(exec_input);
exec_output = exec_stage
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.done;
}

tx.drop()?;
Expand Down
18 changes: 10 additions & 8 deletions bin/reth/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,19 @@ async fn dry_run(
let mut tx = Transaction::new(&output_db)?;
let mut exec_output = false;
while !exec_output {
let exec_input = reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
};
exec_output = MerkleStage::Execution {
// Forces updating the root instead of calculating from scratch
clean_threshold: u64::MAX,
clean_threshold: u64::MAX, /* Forces updating the root instead of calculating from
* scratch */
}
.execute(&mut tx, exec_input)
.execute(
&mut tx,
reth_stages::ExecInput {
target: Some(to),
checkpoint: Some(StageCheckpoint::new(from)),
},
)
.await?
.is_done(exec_input);
.done;
}

tx.drop()?;
Expand Down
13 changes: 5 additions & 8 deletions bin/reth/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use reth_stages::{
IndexAccountHistoryStage, IndexStorageHistoryStage, MerkleStage, SenderRecoveryStage,
StorageHashingStage, TransactionLookupStage,
},
ExecInput, Stage, UnwindInput,
ExecInput, ExecOutput, Stage, UnwindInput,
};
use std::{any::Any, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc};
use tracing::*;
Expand Down Expand Up @@ -236,13 +236,10 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};

loop {
let result = exec_stage.execute(&mut tx, input).await?;
if result.is_done(input) {
break
}

input.checkpoint = Some(result.checkpoint);
while let ExecOutput { checkpoint: stage_progress, done: false } =
exec_stage.execute(&mut tx, input).await?
{
input.checkpoint = Some(stage_progress);

if self.commit {
tx.commit()?;
Expand Down
87 changes: 46 additions & 41 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,6 @@ mod tests {
chain_spec: Arc<ChainSpec>,
pipeline_exec_outputs: VecDeque<Result<ExecOutput, StageError>>,
executor_results: Vec<PostState>,
max_block: Option<BlockNumber>,
) -> (TestBeaconConsensusEngine, TestEnv<Arc<Env<WriteMap>>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
Expand All @@ -1382,13 +1381,10 @@ mod tests {

// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(H256::default());
let mut pipeline_builder = Pipeline::builder()
let pipeline = Pipeline::builder()
.add_stages(TestStages::new(pipeline_exec_outputs, Default::default()))
.with_tip_sender(tip_tx);
if let Some(max_block) = max_block {
pipeline_builder = pipeline_builder.with_max_block(max_block);
}
let pipeline = pipeline_builder.build(db.clone());
.with_tip_sender(tip_tx)
.build(db.clone());

// Setup blockchain tree
let externals =
Expand All @@ -1408,7 +1404,7 @@ mod tests {
blockchain_provider,
Box::<TokioTaskExecutor>::default(),
Box::<NoopSyncStateUpdater>::default(),
max_block,
None,
false,
payload_builder,
None,
Expand Down Expand Up @@ -1443,7 +1439,6 @@ mod tests {
chain_spec,
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let res = spawn_consensus_engine(consensus_engine);

Expand Down Expand Up @@ -1473,7 +1468,6 @@ mod tests {
chain_spec,
VecDeque::from([Err(StageError::ChannelClosed)]),
Vec::default(),
Some(1),
);
let mut rx = spawn_consensus_engine(consensus_engine);

Expand Down Expand Up @@ -1513,11 +1507,10 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(1), done: true }),
Err(StageError::ChannelClosed),
]),
Vec::default(),
Some(2),
);
let rx = spawn_consensus_engine(consensus_engine);

Expand All @@ -1530,9 +1523,7 @@ mod tests {

assert_matches!(
rx.await,
Ok(
Err(BeaconConsensusEngineError::Pipeline(n))
) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
Ok(Err(BeaconConsensusEngineError::Pipeline(n))) if matches!(*n.as_ref(),PipelineError::Stage(StageError::ChannelClosed))
);
}

Expand All @@ -1546,12 +1537,15 @@ mod tests {
.paris_activated()
.build(),
);
let (consensus_engine, env) = setup_consensus_engine(
let (mut consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(max_block) })]),
VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(max_block),
done: true,
})]),
Vec::default(),
Some(max_block),
);
consensus_engine.sync.set_max_block(max_block);
let rx = spawn_consensus_engine(consensus_engine);

let _ = env
Expand Down Expand Up @@ -1588,9 +1582,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let mut engine_rx = spawn_consensus_engine(consensus_engine);
Expand All @@ -1617,9 +1613,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1664,11 +1662,10 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1713,9 +1710,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1749,11 +1748,10 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1799,11 +1797,10 @@ mod tests {
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
Ok(ExecOutput { done: true, checkpoint: StageCheckpoint::new(0) }),
]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1846,9 +1843,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let mut engine_rx = spawn_consensus_engine(consensus_engine);
Expand Down Expand Up @@ -1877,9 +1876,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1921,9 +1922,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::default(),
None,
);

let genesis = random_block(0, None, None, Some(0));
Expand Down Expand Up @@ -1976,9 +1979,11 @@ mod tests {
);
let (consensus_engine, env) = setup_consensus_engine(
chain_spec,
VecDeque::from([Ok(ExecOutput { checkpoint: StageCheckpoint::new(0) })]),
VecDeque::from([Ok(ExecOutput {
done: true,
checkpoint: StageCheckpoint::new(0),
})]),
Vec::from([exec_result2]),
None,
);

insert_blocks(env.db.as_ref(), [&data.genesis, &block1].into_iter());
Expand Down
Loading

0 comments on commit b8877e9

Please sign in to comment.