diff --git a/crates/astria-sequencer/src/app/mod.rs b/crates/astria-sequencer/src/app/mod.rs index 34a96202f2..688c6c1ece 100644 --- a/crates/astria-sequencer/src/app/mod.rs +++ b/crates/astria-sequencer/src/app/mod.rs @@ -49,6 +49,7 @@ use cnidarium::{ StagedWriteBatch, StateDelta, StateRead, + StateWrite, Storage, }; use prost::Message as _; @@ -124,6 +125,14 @@ use crate::{ transaction::InvalidNonce, }; +// ephemeral store key for the cache of results of executing of transactions in `prepare_proposal`. +// cleared in `process_proposal` if we're the proposer. +const EXECUTION_RESULTS_KEY: &str = "execution_results"; + +// ephemeral store key for the cache of results of executing of transactions in `process_proposal`. +// cleared at the end of the block. +const POST_TRANSACTION_EXECUTION_RESULT_KEY: &str = "post_transaction_execution_result"; + /// The inter-block state being written to by the application. type InterBlockState = Arc>; @@ -166,10 +175,6 @@ pub(crate) struct App { // to the mempool to recost all transactions. recost_mempool: bool, - // cache of results of executing of transactions in `prepare_proposal` or `process_proposal`. - // cleared at the end of each block. - execution_results: Option>, - // the current `StagedWriteBatch` which contains the rocksdb write batch // of the current block being executed, created from the state delta, // and set after `finalize_block`. @@ -214,7 +219,6 @@ impl App { mempool, validator_address: None, executed_proposal_hash: Hash::default(), - execution_results: None, recost_mempool: false, write_batch: None, app_hash, @@ -301,10 +305,11 @@ impl App { // but `self.state` was changed due to executing the previous round's data. // // if the previous round was committed, then the state stays the same. + // + // this also clears the ephemeral storage. self.state = Arc::new(StateDelta::new(storage.latest_snapshot())); - // clear the cache of transaction execution results - self.execution_results = None; + // clear the cached executed proposal hash self.executed_proposal_hash = Hash::default(); } @@ -357,9 +362,9 @@ impl App { // generate commitment to sequence::Actions and deposits and commitment to the rollup IDs // included in the block let res = generate_rollup_datas_commitment(&signed_txs_included, deposits); - + let txs = res.into_transactions(included_tx_bytes); Ok(abci::response::PrepareProposal { - txs: res.into_transactions(included_tx_bytes), + txs, }) } @@ -382,6 +387,27 @@ impl App { debug!("skipping process_proposal as we are the proposer for this block"); self.validator_address = None; self.executed_proposal_hash = process_proposal.hash; + + // if we're the proposer, we should have the execution results from + // `prepare_proposal`. run the post-tx-execution hook to generate the + // `SequencerBlock` and to set `self.finalize_block`. + // + // we can't run this in `prepare_proposal` as we don't know the block hash there. + let Some(tx_results) = self.state.object_get(EXECUTION_RESULTS_KEY) else { + bail!("execution results must be present after executing transactions") + }; + + self.post_execute_transactions( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + return Ok(()); } self.metrics.increment_process_proposal_skipped_proposal(); @@ -394,7 +420,7 @@ impl App { self.update_state_for_new_round(&storage); - let mut txs = VecDeque::from(process_proposal.txs); + let mut txs = VecDeque::from(process_proposal.txs.clone()); let received_rollup_datas_root: [u8; 32] = txs .pop_front() .ok_or_eyre("no transaction commitment in proposal")? @@ -438,20 +464,17 @@ impl App { .filter_map(|bytes| signed_transaction_from_bytes(bytes.as_ref()).ok()) .collect::>(); - self.execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) + let tx_results = self + .execute_transactions_process_proposal(signed_txs.clone(), &mut block_size_constraints) .await .wrap_err("failed to execute transactions")?; - let Some(execution_results) = self.execution_results.as_ref() else { - bail!("execution results must be present after executing transactions") - }; - // all txs in the proposal should be deserializable and executable // if any txs were not deserializeable or executable, they would not have been - // added to the `execution_results` list, thus the length of `txs_to_include` - // will be shorter than that of `execution_results`. + // added to the `tx_results` list, thus the length of `txs_to_include` + // will be shorter than that of `tx_results`. ensure!( - execution_results.len() == expected_txs_len, + tx_results.len() == expected_txs_len, "transactions to be included do not match expected", ); self.metrics.record_proposal_transactions(signed_txs.len()); @@ -474,6 +497,16 @@ impl App { ); self.executed_proposal_hash = process_proposal.hash; + self.post_execute_transactions( + process_proposal.hash, + process_proposal.height, + process_proposal.time, + process_proposal.proposer_address, + process_proposal.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; Ok(()) } @@ -482,7 +515,7 @@ impl App { /// writing to the app's `StateDelta`. /// /// The result of execution of every transaction which is successful - /// is stored in `self.execution_results`. + /// is stored in ephemeral storage for usage in `process_proposal`. /// /// Returns the transactions which were successfully executed /// in both their [`SignedTransaction`] and raw bytes form. @@ -629,15 +662,20 @@ impl App { self.metrics .set_transactions_in_mempool_total(self.mempool.len().await); - self.execution_results = Some(execution_results); + // XXX: we need to unwrap the app's state arc to write + // to the ephemeral store. + // this is okay as we should have the only reference to the state + // at this point. + let mut state_tx = Arc::try_begin_transaction(&mut self.state) + .expect("state Arc should not be referenced elsewhere"); + state_tx.object_put(EXECUTION_RESULTS_KEY, execution_results); + let _ = state_tx.apply(); + Ok((validated_txs, included_signed_txs)) } /// Executes the given transactions, writing to the app's `StateDelta`. /// - /// The result of execution of every transaction which is successful - /// is stored in `self.execution_results`. - /// /// Unlike the usual flow of an ABCI application, this is called during /// the proposal phase, ie. `process_proposal`. /// @@ -653,8 +691,8 @@ impl App { &mut self, txs: Vec, block_size_constraints: &mut BlockSizeConstraints, - ) -> Result<()> { - let mut excluded_tx_count = 0_f64; + ) -> Result> { + let mut excluded_tx_count = 0u32; let mut execution_results = Vec::new(); for tx in txs { @@ -677,7 +715,7 @@ impl App { tx_data_bytes = tx_sequence_data_bytes, "excluding transaction: max block sequenced data limit reached" ); - excluded_tx_count += 1.0; + excluded_tx_count = excluded_tx_count.saturating_add(1); continue; } @@ -701,12 +739,12 @@ impl App { error = AsRef::::as_ref(&e), "failed to execute transaction, not including in block" ); - excluded_tx_count += 1.0; + excluded_tx_count = excluded_tx_count.saturating_add(1); } } } - if excluded_tx_count > 0.0 { + if excluded_tx_count > 0 { info!( excluded_tx_count = excluded_tx_count, included_tx_count = execution_results.len(), @@ -714,8 +752,7 @@ impl App { ); } - self.execution_results = Some(execution_results); - Ok(()) + Ok(execution_results) } /// sets up the state for execution of the block's transactions. @@ -774,18 +811,24 @@ impl App { Ok(()) } - /// Executes the given block, but does not write it to disk. + /// updates the app state after transaction execution, and generates the resulting + /// `SequencerBlock`. /// - /// `commit` must be called after this to write the block to disk. - /// - /// This is called by cometbft after the block has already been - /// committed by the network's consensus. - #[instrument(name = "App::finalize_block", skip_all)] - pub(crate) async fn finalize_block( + /// this must be called after a block's transactions are executed. + #[instrument(name = "App::post_execute_transactions", skip_all)] + async fn post_execute_transactions( &mut self, - finalize_block: abci::request::FinalizeBlock, - storage: Storage, - ) -> Result { + block_hash: Hash, + height: tendermint::block::Height, + time: tendermint::Time, + proposer_address: account::Id, + txs: Vec, + tx_results: Vec, + ) -> Result<()> { + let Hash::Sha256(block_hash) = block_hash else { + bail!("block hash is empty; this should not occur") + }; + let chain_id = self .state .get_chain_id() @@ -797,16 +840,71 @@ impl App { .await .wrap_err("failed to get sudo address from state")?; - // convert tendermint id to astria address; this assumes they are - // the same address, as they are both ed25519 keys - let proposer_address = finalize_block.proposer_address; + let end_block = self.end_block(height.value(), &sudo_address).await?; - let height = finalize_block.height; - let time = finalize_block.time; - let Hash::Sha256(block_hash) = finalize_block.hash else { - bail!("finalized block hash is empty; this should not occur") + // get deposits for this block from state's ephemeral cache and put them to storage. + let mut state_tx = StateDelta::new(self.state.clone()); + let deposits_in_this_block = self.state.get_cached_block_deposits(); + debug!( + deposits = %telemetry::display::json(&deposits_in_this_block), + "got block deposits from state" + ); + + state_tx + .put_deposits(&block_hash, deposits_in_this_block.clone()) + .wrap_err("failed to put deposits to state")?; + + // cometbft expects a result for every tx in the block, so we need to return a + // tx result for the commitments, even though they're not actually user txs. + // + // the tx_results passed to this function only contain results for every user + // transaction, not the commitment, so its length is len(txs) - 2. + let mut finalize_block_tx_results: Vec = Vec::with_capacity(txs.len()); + finalize_block_tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); + finalize_block_tx_results.extend(tx_results); + + let sequencer_block = SequencerBlock::try_from_block_info_and_data( + block_hash, + chain_id, + height, + time, + proposer_address, + txs, + deposits_in_this_block, + ) + .wrap_err("failed to convert block info and data to SequencerBlock")?; + state_tx + .put_sequencer_block(sequencer_block) + .wrap_err("failed to write sequencer block to state")?; + + let result = PostTransactionExecutionResult { + events: end_block.events, + validator_updates: end_block.validator_updates, + consensus_param_updates: end_block.consensus_param_updates, + tx_results: finalize_block_tx_results, }; + state_tx.object_put(POST_TRANSACTION_EXECUTION_RESULT_KEY, result); + + // events that occur after end_block are ignored here; + // there should be none anyways. + let _ = self.apply(state_tx); + + Ok(()) + } + + /// Executes the given block, but does not write it to disk. + /// + /// `commit` must be called after this to write the block to disk. + /// + /// This is called by cometbft after the block has already been + /// committed by the network's consensus. + #[instrument(name = "App::finalize_block", skip_all)] + pub(crate) async fn finalize_block( + &mut self, + finalize_block: abci::request::FinalizeBlock, + storage: Storage, + ) -> Result { // If we previously executed txs in a different proposal than is being processed, // reset cached state changes. if self.executed_proposal_hash != finalize_block.hash { @@ -819,13 +917,14 @@ impl App { rollup IDs commitment" ); - // cometbft expects a result for every tx in the block, so we need to return a - // tx result for the commitments, even though they're not actually user txs. - let mut tx_results: Vec = Vec::with_capacity(finalize_block.txs.len()); - tx_results.extend(std::iter::repeat(ExecTxResult::default()).take(2)); - // When the hash is not empty, we have already executed and cached the results if self.executed_proposal_hash.is_empty() { + // convert tendermint id to astria address; this assumes they are + // the same address, as they are both ed25519 keys + let proposer_address = finalize_block.proposer_address; + let height = finalize_block.height; + let time = finalize_block.time; + // we haven't executed anything yet, so set up the state for execution. let block_data = BlockData { misbehavior: finalize_block.misbehavior, @@ -839,6 +938,7 @@ impl App { .await .wrap_err("failed to execute block")?; + let mut tx_results = Vec::with_capacity(finalize_block.txs.len()); // skip the first two transactions, as they are the rollup data commitments for tx in finalize_block.txs.iter().skip(2) { let signed_tx = signed_transaction_from_bytes(tx) @@ -869,68 +969,48 @@ impl App { } } } - } else { - let execution_results = self.execution_results.take().expect( - "execution results must be present if txs were already executed during proposal \ - phase", - ); - tx_results.extend(execution_results); - }; - - let end_block = self.end_block(height.value(), &sudo_address).await?; - - // get deposits for this block from state's ephemeral cache and put them to storage. - let mut state_tx = StateDelta::new(self.state.clone()); - let deposits_in_this_block = self.state.get_cached_block_deposits(); - debug!( - deposits = %telemetry::display::json(&deposits_in_this_block), - "got block deposits from state" - ); - state_tx - .put_deposits(&block_hash, deposits_in_this_block.clone()) - .wrap_err("failed to put deposits to state")?; - let sequencer_block = SequencerBlock::try_from_block_info_and_data( - block_hash, - chain_id, - height, - time, - proposer_address, - finalize_block - .txs - .into_iter() - .map(std::convert::Into::into) - .collect(), - deposits_in_this_block, - ) - .wrap_err("failed to convert block info and data to SequencerBlock")?; - state_tx - .put_sequencer_block(sequencer_block) - .wrap_err("failed to write sequencer block to state")?; + self.post_execute_transactions( + finalize_block.hash, + height, + time, + proposer_address, + finalize_block.txs, + tx_results, + ) + .await + .wrap_err("failed to run post execute transactions handler")?; + } // update the priority of any txs in the mempool based on the updated app state if self.recost_mempool { self.metrics.increment_mempool_recosted(); } - update_mempool_after_finalization(&mut self.mempool, &state_tx, self.recost_mempool).await; + update_mempool_after_finalization(&mut self.mempool, &self.state, self.recost_mempool) + .await; - // events that occur after end_block are ignored here; - // there should be none anyways. - let _ = self.apply(state_tx); + let post_transaction_execution_result: PostTransactionExecutionResult = self + .state + .object_get(POST_TRANSACTION_EXECUTION_RESULT_KEY) + .expect( + "post_transaction_execution_result must be present, as txs were already executed \ + just now or during the proposal phase", + ); // prepare the `StagedWriteBatch` for a later commit. let app_hash = self - .prepare_commit(storage.clone()) + .prepare_commit(storage) .await .wrap_err("failed to prepare commit")?; - - Ok(abci::response::FinalizeBlock { - events: end_block.events, - validator_updates: end_block.validator_updates, - consensus_param_updates: end_block.consensus_param_updates, - tx_results, + let finalize_block = abci::response::FinalizeBlock { + events: post_transaction_execution_result.events, + validator_updates: post_transaction_execution_result.validator_updates, + consensus_param_updates: post_transaction_execution_result.consensus_param_updates, app_hash, - }) + tx_results: post_transaction_execution_result.tx_results, + }; + + Ok(finalize_block) } #[instrument(skip_all, err)] @@ -1189,3 +1269,11 @@ fn signed_transaction_from_bytes(bytes: &[u8]) -> Result { Ok(tx) } + +#[derive(Clone, Debug)] +struct PostTransactionExecutionResult { + events: Vec, + tx_results: Vec, + validator_updates: Vec, + consensus_param_updates: Option, +} diff --git a/crates/astria-sequencer/src/app/tests_app/mempool.rs b/crates/astria-sequencer/src/app/tests_app/mempool.rs index a147e643e0..27f9549629 100644 --- a/crates/astria-sequencer/src/app/tests_app/mempool.rs +++ b/crates/astria-sequencer/src/app/tests_app/mempool.rs @@ -266,6 +266,7 @@ async fn maintenance_recosting_promotes() { .insert(Arc::new(tx_recost.clone()), 0, judy_funds, tx_cost) .await .unwrap(); + assert_eq!(app.mempool.len().await, 2, "two txs in mempool"); // create block with prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -281,15 +282,42 @@ async fn maintenance_recosting_promotes() { let res = app .prepare_proposal(prepare_args, storage.clone()) .await - .expect(""); + .unwrap(); assert_eq!( res.txs.len(), 3, "only one transaction should've been valid (besides 2 generated txs)" ); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not yet removed" + ); + // set dummy hash app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + + let process_proposal = abci::request::ProcessProposal { + hash: app.executed_proposal_hash, + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: [1u8; 20].to_vec().try_into().unwrap(), + txs: res.txs.clone(), + proposed_last_commit: None, + misbehavior: vec![], + }; + app.process_proposal(process_proposal, storage.clone()) + .await + .unwrap(); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not + yet removed" + ); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { hash: app.executed_proposal_hash, @@ -304,10 +332,12 @@ async fn maintenance_recosting_promotes() { }, misbehavior: vec![], }; + app.finalize_block(finalize_block.clone(), storage.clone()) .await .unwrap(); app.commit(storage.clone()).await; + assert_eq!(app.mempool.len().await, 1, "recosted tx should remain"); // mempool re-costing should've occurred to allow other transaction to execute let prepare_args = abci::request::PrepareProposal { @@ -328,28 +358,9 @@ async fn maintenance_recosting_promotes() { assert_eq!( res.txs.len(), 3, - "only one transaction should've been valid (besides 2 generated txs)" + "one transaction should've been valid (besides 2 generated txs)" ); - // set dummy hash - app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); - // finalize with finalize block - let finalize_block = abci::request::FinalizeBlock { - hash: app.executed_proposal_hash, - height: 1u32.into(), - time: Time::now(), - next_validators_hash: Hash::default(), - proposer_address: [0u8; 20].to_vec().try_into().unwrap(), - txs: res.txs, - decided_last_commit: CommitInfo { - votes: vec![], - round: Round::default(), - }, - misbehavior: vec![], - }; - app.finalize_block(finalize_block.clone(), storage.clone()) - .await - .unwrap(); - app.commit(storage.clone()).await; + // see transfer went through assert_eq!( app.state @@ -459,8 +470,31 @@ async fn maintenance_funds_added_promotes() { 3, "only one transactions should've been valid (besides 2 generated txs)" ); + + app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + let process_proposal = abci::request::ProcessProposal { + hash: app.executed_proposal_hash, + height: Height::default(), + time: Time::now(), + next_validators_hash: Hash::default(), + proposer_address: [1u8; 20].to_vec().try_into().unwrap(), + txs: res.txs.clone(), + proposed_last_commit: None, + misbehavior: vec![], + }; + app.process_proposal(process_proposal, storage.clone()) + .await + .unwrap(); + assert_eq!( + app.mempool.len().await, + 2, + "two txs in mempool; one included in proposal is not + yet removed" + ); + // set dummy hash app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { hash: app.executed_proposal_hash, @@ -501,11 +535,10 @@ async fn maintenance_funds_added_promotes() { 3, "only one transactions should've been valid (besides 2 generated txs)" ); - // set dummy hash - app.executed_proposal_hash = Hash::try_from([97u8; 32].to_vec()).unwrap(); + // finalize with finalize block let finalize_block = abci::request::FinalizeBlock { - hash: app.executed_proposal_hash, + hash: Hash::try_from([97u8; 32].to_vec()).unwrap(), height: 1u32.into(), time: Time::now(), next_validators_hash: Hash::default(), diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 071fdac943..4e1868ceff 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -268,7 +268,7 @@ mod tests { txs, proposed_last_commit: None, misbehavior: vec![], - hash: Hash::default(), + hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(), next_validators_hash: Hash::default(), time: Time::now(), @@ -502,16 +502,17 @@ mod tests { let mut header = default_header(); header.data_hash = Some(Hash::try_from(data_hash.to_vec()).unwrap()); + mempool + .insert(signed_tx, 0, mock_balances(0, 0), mock_tx_cost(0, 0, 0)) + .await + .unwrap(); + let process_proposal = new_process_proposal_request(block_data.clone()); consensus_service .handle_request(ConsensusRequest::ProcessProposal(process_proposal)) .await .unwrap(); - mempool - .insert(signed_tx, 0, mock_balances(0, 0), mock_tx_cost(0, 0, 0)) - .await - .unwrap(); let finalize_block = request::FinalizeBlock { hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(),