From 9f00b1863730d1335ca0385ecba42e8c3d5a8883 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Tue, 24 Sep 2024 17:23:19 -0400 Subject: [PATCH 1/4] refactor of App to have post_execute_transactions hook so SequencerBlock can be constructed during proposal --- crates/astria-sequencer/src/app/mod.rs | 280 +++++++++++------- crates/astria-sequencer/src/app/test_utils.rs | 4 +- crates/astria-sequencer/src/sequencer.rs | 2 +- .../astria-sequencer/src/service/consensus.rs | 4 +- 4 files changed, 185 insertions(+), 105 deletions(-) diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 66a9e2c5a..ca3b4e1e4 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -68,6 +68,7 @@ use tendermint::{ AppHash, Hash, }; +use tokio::sync::watch; use tracing::{ debug, info, @@ -162,9 +163,13 @@ pub(crate) struct App { // to the mempool to recost all transactions. recost_mempool: bool, - // cache of results of executing of transactions in `prepare_proposal` or `process_proposal`. - // cleared at the end of each block. - execution_results: Option>, + // cache of results of executing of transactions in `prepare_proposal`. + // cleared in `process_proposal` if we're the proposer. + execution_results: Option>, + + // cache of results of executing of transactions in `process_proposal`. + // cleared at the end of the block. + finalize_block: Option, // the current `StagedWriteBatch` which contains the rocksdb write batch // of the current block being executed, created from the state delta, @@ -179,13 +184,24 @@ pub(crate) struct App { #[allow(clippy::struct_field_names)] app_hash: AppHash, + latest_proposed_block: Option>, + latest_committed_block: Option>, + metrics: &'static Metrics, } +// TODO: replace w proto/core type +pub(crate) struct SequencerBlockCommit { + block_hash: Hash, + height: u64, +} + impl App { pub(crate) async fn new( snapshot: Snapshot, mempool: Mempool, + latest_proposed_block: Option>, + latest_committed_block: Option>, metrics: &'static Metrics, ) -> Result { debug!("initializing App instance"); @@ -210,9 +226,12 @@ impl App { validator_address: None, executed_proposal_hash: Hash::default(), execution_results: None, + finalize_block: None, recost_mempool: false, write_batch: None, app_hash, + latest_proposed_block, + latest_committed_block, metrics, }) } @@ -288,7 +307,7 @@ impl App { self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); // clear the cache of transaction execution results - self.execution_results = None; + self.finalize_block = None; self.executed_proposal_hash = Hash::default(); } @@ -345,9 +364,9 @@ impl App { // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs // included in the block let res = generate_rollup_datas_commitment(&signed_txs_included, deposits); - + let txs = res.into_transactions(included_tx_bytes); Ok(abci::response::PrepareProposal { - txs: res.into_transactions(included_tx_bytes), + txs, }) } @@ -370,6 +389,27 @@ impl App { debug!("skipping process_proposal as we are the proposer for this block"); self.validator_address = None; self.executed_proposal_hash = process_proposal.hash; + + // if we're the proposer, we should have the execution results from + // `prepare_proposal`. run the post-tx-execution hook to set + // `self.finalize_block`. we can't run this in `prepare_proposal` as + // we don't know the block hash there. + let Some(tx_results) = self.execution_results.take() else { + bail!("execution results must be present after executing transactions") + }; + + self.post_execute_transactions( + storage, + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + return Ok(()); } self.metrics.increment_process_proposal_skipped_proposal(); @@ -382,7 +422,7 @@ impl App { self.update_state_for_new_round(&storage); - let mut txs = VecDeque::from(process_proposal.txs); + let mut txs = VecDeque::from(process_proposal.txs.clone()); let received_rollup_datas_root: [u8; 32] = txs .pop_front() .ok_or_eyre("no transaction commitment in proposal")? @@ -426,20 +466,17 @@ impl App { .filter_map(|bytes| signed_transaction_from_bytes(bytes.as_ref()).ok()) .collect::>(); - self.execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) + let tx_results = self + .execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) .await .wrap_err("failed to execute transactions")?; - let Some(execution_results) = self.execution_results.as_ref() else { - bail!("execution results must be present after executing transactions") - }; - // all txs in the proposal should be deserializable and executable // if any txs were not deserializeable or executable, they would not have been - // added to the `execution_results` list, thus the length of `txs_to_include` - // will be shorter than that of `execution_results`. + // added to the `tx_results` list, thus the length of `txs_to_include` + // will be shorter than that of `tx_results`. ensure!( - execution_results.len() == expected_txs_len, + tx_results.len() == expected_txs_len, "transactions to be included do not match expected", ); self.metrics.record_proposal_transactions(signed_txs.len()); @@ -466,6 +503,17 @@ impl App { ); self.executed_proposal_hash = process_proposal.hash; + self.post_execute_transactions( + storage, + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; Ok(()) } @@ -645,8 +693,8 @@ impl App { &mut self, txs: Vec, block_size_constraints: &mut BlockSizeConstraints, - ) -> Result<()> { - let mut excluded_tx_count = 0_f64; + ) -> Result> { + let mut excluded_tx_count = 0; let mut execution_results = Vec::new(); for tx in txs { @@ -669,7 +717,7 @@ impl App { tx_data_bytes = tx_sequence_data_bytes, "excluding transaction: max block sequenced data limit reached" ); - excluded_tx_count += 1.0; + excluded_tx_count += 1; continue; } @@ -693,12 +741,12 @@ impl App { error = AsRef::::as_ref(&e), "failed to execute transaction, not including in block" ); - excluded_tx_count += 1.0; + excluded_tx_count += 1; } } } - if excluded_tx_count > 0.0 { + if excluded_tx_count > 0 { info!( excluded_tx_count = excluded_tx_count, included_tx_count = execution_results.len(), @@ -706,8 +754,7 @@ impl App { ); } - self.execution_results = Some(execution_results); - Ok(()) + Ok(execution_results) } /// sets up the state for execution of the block's transactions. @@ -766,18 +813,17 @@ impl App { Ok(()) } - /// Executes the given block, but does not write it to disk. - /// - /// `commit` must be called after this to write the block to disk. - /// - /// This is called by cometbft after the block has already been - /// committed by the network's consensus. - #[instrument(name = "App::finalize_block", skip_all)] - pub(crate) async fn finalize_block( + #[instrument(name = "App::post_execute_transactions", skip_all)] + async fn post_execute_transactions( &mut self, - finalize_block: abci::request::FinalizeBlock, storage: Storage, - ) -> Result { + block_hash: Hash, + height: tendermint::block::Height, + time: tendermint::Time, + proposer_address: account::Id, + txs: Vec, + tx_results: Vec, + ) -> Result<()> { let chain_id = self .state .get_chain_id() @@ -789,15 +835,96 @@ impl App { .await .wrap_err("failed to get sudo address from state")?; + let end_block = self.end_block(height.value(), sudo_address).await?; + + // get and clear block deposits from state + let mut state_tx = StateDelta::new(self.state.clone()); + let deposits = self + .state + .get_block_deposits() + .await + .wrap_err("failed to get block deposits in end_block")?; + state_tx + .clear_block_deposits() + .await + .wrap_err("failed to clear block deposits")?; + debug!( + deposits = %telemetry::display::json(&deposits), + "got block deposits from state" + ); + + let Hash::Sha256(block_hash) = block_hash else { + bail!("block hash is empty; this should not occur") + }; + + // cometbft expects a result for every tx in the block, so we need to return a + // tx result for the commitments, even though they're not actually user txs. + // + // the tx_results passed to this function only contain results for every user + // transaction, not the commitment, so its length is len(txs) - 2. + let mut finalize_block_tx_results: Vec = Vec::with_capacity(txs.len()); + finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); + finalize_block_tx_results.extend(tx_results); + + let sequencer_block = SequencerBlock::try_from_block_info_and_data( + block_hash, + chain_id, + height, + time, + proposer_address, + txs, + deposits, + ) + .wrap_err("failed to convert block info and data to SequencerBlock")?; + state_tx + .put_sequencer_block(sequencer_block) + .wrap_err("failed to write sequencer block to state")?; + + // update the priority of any txs in the mempool based on the updated app state + if self.recost_mempool { + self.metrics.increment_mempool_recosted(); + } + update_mempool_after_finalization(&mut self.mempool, &state_tx, self.recost_mempool).await; + + // events that occur after end_block are ignored here; + // there should be none anyways. + let _ = self.apply(state_tx); + + // prepare the `StagedWriteBatch` for a later commit. + let app_hash = self + .prepare_commit(storage.clone()) + .await + .wrap_err("failed to prepare commit")?; + + let finalize_block = abci::response::FinalizeBlock { + events: end_block.events, + validator_updates: end_block.validator_updates, + consensus_param_updates: end_block.consensus_param_updates, + app_hash, + tx_results: finalize_block_tx_results, + }; + self.finalize_block = Some(finalize_block); + Ok(()) + } + + /// Executes the given block, but does not write it to disk. + /// + /// `commit` must be called after this to write the block to disk. + /// + /// This is called by cometbft after the block has already been + /// committed by the network's consensus. + #[instrument(name = "App::finalize_block", skip_all)] + pub(crate) async fn finalize_block( + &mut self, + finalize_block: abci::request::FinalizeBlock, + storage: Storage, + ) -> Result { // convert tendermint id to astria address; this assumes they are // the same address, as they are both ed25519 keys let proposer_address = finalize_block.proposer_address; let height = finalize_block.height; let time = finalize_block.time; - let Hash::Sha256(block_hash) = finalize_block.hash else { - bail!("finalized block hash is empty; this should not occur") - }; // If we previously executed txs in a different proposal than is being processed, // reset cached state changes. @@ -811,11 +938,6 @@ impl App { rollup IDs commitment" ); - // cometbft expects a result for every tx in the block, so we need to return a - // tx result for the commitments, even though they're not actually user txs. - let mut tx_results: Vec = Vec::with_capacity(finalize_block.txs.len()); - tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); - // When the hash is not empty, we have already executed and cached the results if self.executed_proposal_hash.is_empty() { // we haven't executed anything yet, so set up the state for execution. @@ -831,6 +953,7 @@ impl App { .await .wrap_err("failed to execute block")?; + let mut tx_results = Vec::with_capacity(finalize_block.txs.len()); // skip the first two transactions, as they are the rollup data commitments for tx in finalize_block.txs.iter().skip(2) { let signed_tx = signed_transaction_from_bytes(tx) @@ -861,73 +984,26 @@ impl App { } } } - } else { - let execution_results = self.execution_results.take().expect( - "execution results must be present if txs were already executed during proposal \ - phase", - ); - tx_results.extend(execution_results); - }; - - let end_block = self.end_block(height.value(), sudo_address).await?; - // get and clear block deposits from state - let mut state_tx = StateDelta::new(self.state.clone()); - let deposits = self - .state - .get_block_deposits() - .await - .wrap_err("failed to get block deposits in end_block")?; - state_tx - .clear_block_deposits() + self.post_execute_transactions( + storage, + finalize_block.hash, + height, + time, + proposer_address, + finalize_block.txs, + tx_results, + ) .await - .wrap_err("failed to clear block deposits")?; - debug!( - deposits = %telemetry::display::json(&deposits), - "got block deposits from state" - ); - - let sequencer_block = SequencerBlock::try_from_block_info_and_data( - block_hash, - chain_id, - height, - time, - proposer_address, - finalize_block - .txs - .into_iter() - .map(std::convert::Into::into) - .collect(), - deposits, - ) - .wrap_err("failed to convert block info and data to SequencerBlock")?; - state_tx - .put_sequencer_block(sequencer_block) - .wrap_err("failed to write sequencer block to state")?; - - // update the priority of any txs in the mempool based on the updated app state - if self.recost_mempool { - self.metrics.increment_mempool_recosted(); + .wrap_err("failed to run post execute transactions handler")?; } - update_mempool_after_finalization(&mut self.mempool, &state_tx, self.recost_mempool).await; - - // events that occur after end_block are ignored here; - // there should be none anyways. - let _ = self.apply(state_tx); - // prepare the `StagedWriteBatch` for a later commit. - let app_hash = self - .prepare_commit(storage.clone()) - .await - .wrap_err("failed to prepare commit")?; + let finalize_block = self.finalize_block.take().expect( + "finalize_block result must be present, as txs were already executed just now or \ + during the proposal phase", + ); - Ok(abci::response::FinalizeBlock { - events: end_block.events, - validator_updates: end_block.validator_updates, - consensus_param_updates: end_block.consensus_param_updates, - tx_results, - app_hash, - }) + Ok(finalize_block) } #[instrument(skip_all, err)] diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index afe705fb6..046c141e3 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -191,7 +191,9 @@ pub(crate) async fn initialize_app_with_storage( let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); + let mut app = App::new(snapshot, mempool, None, None, metrics) + .await + .unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 3bd1bfd64..7d2ec241c 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -85,7 +85,7 @@ impl Sequencer { let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); - let app = App::new(snapshot, mempool.clone(), metrics) + let app = App::new(snapshot, mempool.clone(), None, None, metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 4fd31287c..00761a252 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -475,7 +475,9 @@ mod test { let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); + let mut app = App::new(snapshot, mempool.clone(), None, None, metrics) + .await + .unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await .unwrap(); From 62d76f69b606e3ce3dd68724d9f27cee8d014141 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Sep 2024 14:51:38 -0400 Subject: [PATCH 2/4] clean up and fix unit tests --- crates/astria-sequencer/src/app/mod.rs | 68 ++++++--------- crates/astria-sequencer/src/app/test_utils.rs | 4 +- .../src/app/tests_app/mempool.rs | 83 +++++++++++++------ crates/astria-sequencer/src/sequencer.rs | 2 +- .../astria-sequencer/src/service/consensus.rs | 15 ++-- 5 files changed, 94 insertions(+), 78 deletions(-) diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index ca3b4e1e4..cf859d16c 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -68,7 +68,6 @@ use tendermint::{ AppHash, Hash, }; -use tokio::sync::watch; use tracing::{ debug, info, @@ -184,24 +183,13 @@ pub(crate) struct App { #[allow(clippy::struct_field_names)] app_hash: AppHash, - latest_proposed_block: Option>, - latest_committed_block: Option>, - metrics: &'static Metrics, } -// TODO: replace w proto/core type -pub(crate) struct SequencerBlockCommit { - block_hash: Hash, - height: u64, -} - impl App { pub(crate) async fn new( snapshot: Snapshot, mempool: Mempool, - latest_proposed_block: Option>, - latest_committed_block: Option>, metrics: &'static Metrics, ) -> Result { debug!("initializing App instance"); @@ -230,8 +218,6 @@ impl App { recost_mempool: false, write_batch: None, app_hash, - latest_proposed_block, - latest_committed_block, metrics, }) } @@ -399,7 +385,6 @@ impl App { }; self.post_execute_transactions( - storage, process_proposal.hash, process_proposal.height, process_proposal.time, @@ -504,7 +489,6 @@ impl App { self.executed_proposal_hash = process_proposal.hash; self.post_execute_transactions( - storage, process_proposal.hash, process_proposal.height, process_proposal.time, @@ -694,7 +678,7 @@ impl App { txs: Vec, block_size_constraints: &mut BlockSizeConstraints, ) -> Result> { - let mut excluded_tx_count = 0; + let mut excluded_tx_count = 0u32; let mut execution_results = Vec::new(); for tx in txs { @@ -717,7 +701,7 @@ impl App { tx_data_bytes = tx_sequence_data_bytes, "excluding transaction: max block sequenced data limit reached" ); - excluded_tx_count += 1; + excluded_tx_count = excluded_tx_count.saturating_add(1); continue; } @@ -741,7 +725,7 @@ impl App { error = AsRef::::as_ref(&e), "failed to execute transaction, not including in block" ); - excluded_tx_count += 1; + excluded_tx_count = excluded_tx_count.saturating_add(1); } } } @@ -816,7 +800,6 @@ impl App { #[instrument(name = "App::post_execute_transactions", skip_all)] async fn post_execute_transactions( &mut self, - storage: Storage, block_hash: Hash, height: tendermint::block::Height, time: tendermint::Time, @@ -880,27 +863,18 @@ impl App { .put_sequencer_block(sequencer_block) .wrap_err("failed to write sequencer block to state")?; - // update the priority of any txs in the mempool based on the updated app state - if self.recost_mempool { - self.metrics.increment_mempool_recosted(); - } - update_mempool_after_finalization(&mut self.mempool, &state_tx, self.recost_mempool).await; - // events that occur after end_block are ignored here; // there should be none anyways. let _ = self.apply(state_tx); - // prepare the `StagedWriteBatch` for a later commit. - let app_hash = self - .prepare_commit(storage.clone()) - .await - .wrap_err("failed to prepare commit")?; + // use a dummy app hash here - the actual app hash will be filled out in finalize_block. + let dummy_app_hash = AppHash::default(); let finalize_block = abci::response::FinalizeBlock { events: end_block.events, validator_updates: end_block.validator_updates, consensus_param_updates: end_block.consensus_param_updates, - app_hash, + app_hash: dummy_app_hash, tx_results: finalize_block_tx_results, }; self.finalize_block = Some(finalize_block); @@ -919,13 +893,6 @@ impl App { finalize_block: abci::request::FinalizeBlock, storage: Storage, ) -> Result { - // convert tendermint id to astria address; this assumes they are - // the same address, as they are both ed25519 keys - let proposer_address = finalize_block.proposer_address; - - let height = finalize_block.height; - let time = finalize_block.time; - // If we previously executed txs in a different proposal than is being processed, // reset cached state changes. if self.executed_proposal_hash != finalize_block.hash { @@ -940,6 +907,12 @@ impl App { // When the hash is not empty, we have already executed and cached the results if self.executed_proposal_hash.is_empty() { + // convert tendermint id to astria address; this assumes they are + // the same address, as they are both ed25519 keys + let proposer_address = finalize_block.proposer_address; + let height = finalize_block.height; + let time = finalize_block.time; + // we haven't executed anything yet, so set up the state for execution. let block_data = BlockData { misbehavior: finalize_block.misbehavior, @@ -986,7 +959,6 @@ impl App { } self.post_execute_transactions( - storage, finalize_block.hash, height, time, @@ -998,11 +970,25 @@ impl App { .wrap_err("failed to run post execute transactions handler")?; } - let finalize_block = self.finalize_block.take().expect( + let mut finalize_block = self.finalize_block.take().expect( "finalize_block result must be present, as txs were already executed just now or \ during the proposal phase", ); + // update the priority of any txs in the mempool based on the updated app state + if self.recost_mempool { + self.metrics.increment_mempool_recosted(); + } + update_mempool_after_finalization(&mut self.mempool, &self.state, self.recost_mempool) + .await; + + // prepare the `StagedWriteBatch` for a later commit. + let app_hash = self + .prepare_commit(storage) + .await + .wrap_err("failed to prepare commit")?; + finalize_block.app_hash = app_hash; + Ok(finalize_block) } diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 046c141e3..afe705fb6 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -191,9 +191,7 @@ pub(crate) async fn initialize_app_with_storage( let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut app = App::new(snapshot, mempool, None, None, metrics) - .await - .unwrap(); + let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/app/tests_app/mempool.rs b/crates/astria-sequencer/src/app/tests_app/mempool.rs index 65d327af8..b2e409e6b 100644 --- a/crates/astria-sequencer/src/app/tests_app/mempool.rs +++ b/crates/astria-sequencer/src/app/tests_app/mempool.rs @@ -279,6 +279,7 @@ async fn maintenance_recosting_promotes() { .insert(Arc::new(tx_recost.clone()), 0, judy_funds, tx_cost) .await .unwrap(); + assert_eq!(app.mempool.len().await, 2, "two txs in mempool"); // create block with prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -294,15 +295,42 @@ async fn maintenance_recosting_promotes() { let res = app .prepare_proposal(prepare_args, storage.clone()) .await - .expect(""); + .unwrap(); assert_eq!( res.txs.len(), 3, "only one transaction should've been valid (besides 2 generated txs)" ); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not yet removed" + ); + // set dummy hash app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + + let process_proposal = abci::request::ProcessProposal { + hash: app.executed_proposal_hash, + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: [1u8; 20].to_vec().try_into().unwrap(), + txs: res.txs.clone(), + proposed_last_commit: None, + misbehavior: vec![], + }; + app.process_proposal(process_proposal, storage.clone()) + .await + .unwrap(); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not + yet removed" + ); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { hash: app.executed_proposal_hash, @@ -317,10 +345,12 @@ async fn maintenance_recosting_promotes() { }, misbehavior: vec![], }; + app.finalize_block(finalize_block.clone(), storage.clone()) .await .unwrap(); app.commit(storage.clone()).await; + assert_eq!(app.mempool.len().await, 1, "recosted tx should remain"); // mempool re-costing should've occurred to allow other transaction to execute let prepare_args = abci::request::PrepareProposal { @@ -341,28 +371,9 @@ async fn maintenance_recosting_promotes() { assert_eq!( res.txs.len(), 3, - "only one transaction should've been valid (besides 2 generated txs)" + "one transaction should've been valid (besides 2 generated txs)" ); - // set dummy hash - app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); - // finalize with finalize block - let finalize_block = abci::request::FinalizeBlock { - hash: app.executed_proposal_hash, - height: 1u32.into(), - time: Time::now(), - next_validators_hash: Hash::default(), - proposer_address: [0u8; 20].to_vec().try_into().unwrap(), - txs: res.txs, - decided_last_commit: CommitInfo { - votes: vec![], - round: Round::default(), - }, - misbehavior: vec![], - }; - app.finalize_block(finalize_block.clone(), storage.clone()) - .await - .unwrap(); - app.commit(storage.clone()).await; + // see transfer went through assert_eq!( app.state @@ -476,8 +487,31 @@ async fn maintenance_funds_added_promotes() { 3, "only one transactions should've been valid (besides 2 generated txs)" ); + + app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + let process_proposal = abci::request::ProcessProposal { + hash: app.executed_proposal_hash, + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: [1u8; 20].to_vec().try_into().unwrap(), + txs: res.txs.clone(), + proposed_last_commit: None, + misbehavior: vec![], + }; + app.process_proposal(process_proposal, storage.clone()) + .await + .unwrap(); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not + yet removed" + ); + // set dummy hash app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { hash: app.executed_proposal_hash, @@ -518,11 +552,10 @@ async fn maintenance_funds_added_promotes() { 3, "only one transactions should've been valid (besides 2 generated txs)" ); - // set dummy hash - app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { - hash: app.executed_proposal_hash, + hash: Hash::try_from([97u8; 32].to_vec()).unwrap(), height: 1u32.into(), time: Time::now(), next_validators_hash: Hash::default(), diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 7d2ec241c..3bd1bfd64 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -85,7 +85,7 @@ impl Sequencer { let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); - let app = App::new(snapshot, mempool.clone(), None, None, metrics) + let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 00761a252..987ff5c44 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -271,7 +271,7 @@ mod test { txs, proposed_last_commit: None, misbehavior: vec![], - hash: Hash::default(), + hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(), next_validators_hash: Hash::default(), time: Time::now(), @@ -475,9 +475,7 @@ mod test { let snapshot = storage.latest_snapshot(); let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); - let mut app = App::new(snapshot, mempool.clone(), None, None, metrics) - .await - .unwrap(); + let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await .unwrap(); @@ -507,16 +505,17 @@ mod test { let mut header = default_header(); header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap()); + mempool + .insert(signed_tx, 0, mock_balances(0, 0), mock_tx_cost(0, 0, 0)) + .await + .unwrap(); + let process_proposal = new_process_proposal_request(block_data.clone()); consensus_service .handle_request(ConsensusRequest::ProcessProposal(process_proposal)) .await .unwrap(); - mempool - .insert(signed_tx, 0, mock_balances(0, 0), mock_tx_cost(0, 0, 0)) - .await - .unwrap(); let finalize_block = request::FinalizeBlock { hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(), From 621bb0932e08fb267fd0dcc51b28b3afa49e0fcc Mon Sep 17 00:00:00 2001 From: elizabeth Date: Wed, 25 Sep 2024 14:57:00 -0400 Subject: [PATCH 3/4] cleanup --- crates/astria-sequencer/src/app/mod.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index cf859d16c..f02b613c4 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -293,6 +293,7 @@ impl App { self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); // clear the cache of transaction execution results + self.execution_results = None; self.finalize_block = None; self.executed_proposal_hash = Hash::default(); } @@ -377,9 +378,10 @@ impl App { self.executed_proposal_hash = process_proposal.hash; // if we're the proposer, we should have the execution results from - // `prepare_proposal`. run the post-tx-execution hook to set - // `self.finalize_block`. we can't run this in `prepare_proposal` as - // we don't know the block hash there. + // `prepare_proposal`. run the post-tx-execution hook to generate the + // `SequencerBlock` and to set `self.finalize_block`. + // + // we can't run this in `prepare_proposal` as we don't know the block hash there. let Some(tx_results) = self.execution_results.take() else { bail!("execution results must be present after executing transactions") }; @@ -797,6 +799,10 @@ impl App { Ok(()) } + /// updates the app state after transaction execution, and generates the resulting + /// `SequencerBlock`. + /// + /// this must be called after a block's transactions are executed. #[instrument(name = "App::post_execute_transactions", skip_all)] async fn post_execute_transactions( &mut self, From 591fd94c89ae27b26c5b067f423dd2a657f87d41 Mon Sep 17 00:00:00 2001 From: elizabeth Date: Fri, 27 Sep 2024 16:50:42 -0400 Subject: [PATCH 4/4] move execution_results and finalize_block from App to object ephemeral store --- crates/astria-sequencer/src/app/mod.rs | 80 ++++++++++++++++---------- 1 file changed, 49 insertions(+), 31 deletions(-) diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index bb2d96137..faaf844aa 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -48,6 +48,7 @@ use cnidarium::{ StagedWriteBatch, StateDelta, StateRead, + StateWrite, Storage, }; use prost::Message as _; @@ -120,6 +121,14 @@ use crate::{ transaction::InvalidNonce, }; +// ephemeral store key for the cache of results of executing of transactions in `prepare_proposal`. +// cleared in `process_proposal` if we're the proposer. +const EXECUTION_RESULTS_KEY: &str = "execution_results"; + +// ephemeral store key for the cache of results of executing of transactions in `process_proposal`. +// cleared at the end of the block. +const POST_TRANSACTION_EXECUTION_RESULT_KEY: &str = "post_transaction_execution_result"; + /// The inter-block state being written to by the application. type InterBlockState = Arc>; @@ -162,14 +171,6 @@ pub(crate) struct App { // to the mempool to recost all transactions. recost_mempool: bool, - // cache of results of executing of transactions in `prepare_proposal`. - // cleared in `process_proposal` if we're the proposer. - execution_results: Option>, - - // cache of results of executing of transactions in `process_proposal`. - // cleared at the end of the block. - finalize_block: Option, - // the current `StagedWriteBatch` which contains the rocksdb write batch // of the current block being executed, created from the state delta, // and set after `finalize_block`. @@ -213,8 +214,6 @@ impl App { mempool, validator_address: None, executed_proposal_hash: Hash::default(), - execution_results: None, - finalize_block: None, recost_mempool: false, write_batch: None, app_hash, @@ -290,11 +289,11 @@ impl App { // but `self.state` was changed due to executing the previous round's data. // // if the previous round was committed, then the state stays the same. + // + // this also clears the ephemeral storage. self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); - // clear the cache of transaction execution results - self.execution_results = None; - self.finalize_block = None; + // clear the cached executed proposal hash self.executed_proposal_hash = Hash::default(); } @@ -378,7 +377,7 @@ impl App { // `SequencerBlock` and to set `self.finalize_block`. // // we can't run this in `prepare_proposal` as we don't know the block hash there. - let Some(tx_results) = self.execution_results.take() else { + let Some(tx_results) = self.state.object_get(EXECUTION_RESULTS_KEY) else { bail!("execution results must be present after executing transactions") }; @@ -647,7 +646,11 @@ impl App { self.metrics .set_transactions_in_mempool_total(self.mempool.len().await); - self.execution_results = Some(execution_results); + let mut state_tx = Arc::try_begin_transaction(&mut self.state) + .expect("state Arc should not be referenced elsewhere"); + state_tx.object_put(EXECUTION_RESULTS_KEY, execution_results); + let _ = state_tx.apply(); + Ok((validated_txs, included_signed_txs)) } @@ -857,21 +860,19 @@ impl App { .put_sequencer_block(sequencer_block) .wrap_err("failed to write sequencer block to state")?; - // events that occur after end_block are ignored here; - // there should be none anyways. - let _ = self.apply(state_tx); - - // use a dummy app hash here - the actual app hash will be filled out in finalize_block. - let dummy_app_hash = AppHash::default(); - - let finalize_block = abci::response::FinalizeBlock { + let result = PostTransactionExecutionResult { events: end_block.events, validator_updates: end_block.validator_updates, consensus_param_updates: end_block.consensus_param_updates, - app_hash: dummy_app_hash, tx_results: finalize_block_tx_results, }; - self.finalize_block = Some(finalize_block); + + state_tx.object_put(POST_TRANSACTION_EXECUTION_RESULT_KEY, result); + + // events that occur after end_block are ignored here; + // there should be none anyways. + let _ = self.apply(state_tx); + Ok(()) } @@ -964,11 +965,6 @@ impl App { .wrap_err("failed to run post execute transactions handler")?; } - let mut finalize_block = self.finalize_block.take().expect( - "finalize_block result must be present, as txs were already executed just now or \ - during the proposal phase", - ); - // update the priority of any txs in the mempool based on the updated app state if self.recost_mempool { self.metrics.increment_mempool_recosted(); @@ -976,12 +972,26 @@ impl App { update_mempool_after_finalization(&mut self.mempool, &self.state, self.recost_mempool) .await; + let post_transaction_execution_result: PostTransactionExecutionResult = self + .state + .object_get(POST_TRANSACTION_EXECUTION_RESULT_KEY) + .expect( + "post_transaction_execution_result must be present, as txs were already executed \ + just now or during the proposal phase", + ); + // prepare the `StagedWriteBatch` for a later commit. let app_hash = self .prepare_commit(storage) .await .wrap_err("failed to prepare commit")?; - finalize_block.app_hash = app_hash; + let finalize_block = abci::response::FinalizeBlock { + events: post_transaction_execution_result.events, + validator_updates: post_transaction_execution_result.validator_updates, + consensus_param_updates: post_transaction_execution_result.consensus_param_updates, + app_hash, + tx_results: post_transaction_execution_result.tx_results, + }; Ok(finalize_block) } @@ -1237,3 +1247,11 @@ fn signed_transaction_from_bytes(bytes: &[u8]) -> Result { Ok(tx) } + +#[derive(Clone, Debug)] +struct PostTransactionExecutionResult { + events: Vec, + tx_results: Vec, + validator_updates: Vec, + consensus_param_updates: Option, +}