diff --git a/Cargo.lock b/Cargo.lock index 94e5939f2e..23016a8b07 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom 0.2.14", "once_cell", "version_check", "zerocopy", @@ -792,6 +793,7 @@ dependencies = [ "penumbra-tower-trace", "priority-queue", "prost", + "quick_cache", "rand 0.8.5", "rand_chacha 0.3.1", "regex", @@ -6029,6 +6031,18 @@ dependencies = [ "byteorder", ] +[[package]] +name = "quick_cache" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e00e03be638aaab399c951dba6bea0161f99f26df0ccab4ab0fc6eb9535bd48" +dependencies = [ + "ahash", + "equivalent", + "hashbrown 0.14.3", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.36" diff --git a/crates/astria-sequencer/Cargo.toml b/crates/astria-sequencer/Cargo.toml index a85fb864d5..c4288c0081 100644 --- a/crates/astria-sequencer/Cargo.toml +++ b/crates/astria-sequencer/Cargo.toml @@ -27,6 +27,7 @@ anyhow = "1" borsh = { version = "1", features = ["derive"] } matchit = "0.7.2" priority-queue = "2.0.2" +quick_cache = "0.6.0" tower = "0.4" tower-abci = "0.12.0" tower-actor = "0.1.0" diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index d942e99ec5..90d4766a59 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use astria_core::{ crypto::SigningKey, primitive::v1::{ @@ -152,7 +154,7 @@ pub(crate) async fn initialize_app( app } -pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { +pub(crate) fn get_mock_tx(nonce: u32) -> Arc { let (alice_signing_key, _) = get_alice_signing_key_and_address(); let tx = UnsignedTransaction { params: TransactionParams::builder() @@ -169,5 +171,5 @@ pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction { ], }; - tx.into_signed(&alice_signing_key) + Arc::new(tx.into_signed(&alice_signing_key)) } diff --git a/crates/astria-sequencer/src/app/tests_app.rs b/crates/astria-sequencer/src/app/tests_app.rs index 9b89547c05..80edc30ba4 100644 --- a/crates/astria-sequencer/src/app/tests_app.rs +++ b/crates/astria-sequencer/src/app/tests_app.rs @@ -444,7 +444,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() { // don't commit the result, now call prepare_proposal with the same data. // this will reset the app state. // this simulates executing the same block as a validator (specifically the proposer). - app.mempool.insert(signed_tx, 0).await.unwrap(); + app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap(); let proposer_address = [88u8; 20].to_vec().try_into().unwrap(); let prepare_proposal = PrepareProposal { @@ -553,8 +553,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() { } .into_signed(&alice_signing_key); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { @@ -626,8 +626,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() { } .into_signed(&alice_signing_key); - app.mempool.insert(tx_pass, 0).await.unwrap(); - app.mempool.insert(tx_overflow, 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap(); + app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap(); // send to prepare_proposal let prepare_args = abci::request::PrepareProposal { diff --git a/crates/astria-sequencer/src/mempool.rs b/crates/astria-sequencer/src/mempool.rs index a33035d65e..953bc7811d 100644 --- a/crates/astria-sequencer/src/mempool.rs +++ b/crates/astria-sequencer/src/mempool.rs @@ -77,11 +77,11 @@ pub(crate) struct EnqueuedTransaction { } impl EnqueuedTransaction { - fn new(signed_tx: SignedTransaction) -> Self { + fn new(signed_tx: Arc) -> Self { let address = crate::address::base_prefixed(signed_tx.verification_key().address_bytes()); Self { tx_hash: signed_tx.sha256_of_proto_encoding(), - signed_tx: Arc::new(signed_tx), + signed_tx, address, } } @@ -139,7 +139,7 @@ pub(crate) enum RemovalReason { FailedPrepareProposal(String), } -const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes +const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes const REMOVAL_CACHE_SIZE: usize = 4096; /// `RemovalCache` is used to signal to `CometBFT` that a @@ -230,7 +230,7 @@ impl Mempool { /// note: the oldest timestamp from found priorities is maintained. pub(crate) async fn insert( &self, - tx: SignedTransaction, + tx: Arc, current_account_nonce: u32, ) -> anyhow::Result<()> { let enqueued_tx = EnqueuedTransaction::new(tx); @@ -310,10 +310,7 @@ impl Mempool { /// checks if a transaction was flagged to be removed from the `CometBFT` mempool /// and removes entry - pub(crate) async fn check_removed_comet_bft( - &mut self, - tx_hash: [u8; 32], - ) -> Option { + pub(crate) async fn check_removed_comet_bft(&self, tx_hash: [u8; 32]) -> Option { self.comet_bft_removal_cache.write().await.remove(tx_hash) } @@ -497,21 +494,21 @@ mod test { // Check enqueued txs compare equal if and only if their tx hashes are equal. let tx0 = EnqueuedTransaction { tx_hash: [0; 32], - signed_tx: Arc::new(get_mock_tx(0)), + signed_tx: get_mock_tx(0), address: crate::address::base_prefixed( get_mock_tx(0).verification_key().address_bytes(), ), }; let other_tx0 = EnqueuedTransaction { tx_hash: [0; 32], - signed_tx: Arc::new(get_mock_tx(1)), + signed_tx: get_mock_tx(1), address: crate::address::base_prefixed( get_mock_tx(1).verification_key().address_bytes(), ), }; let tx1 = EnqueuedTransaction { tx_hash: [1; 32], - signed_tx: Arc::new(get_mock_tx(0)), + signed_tx: get_mock_tx(0), address: crate::address::base_prefixed( get_mock_tx(0).verification_key().address_bytes(), ), @@ -610,16 +607,17 @@ mod test { // Insert txs from a different signer with nonces 100 and 102. let other_signing_key = SigningKey::from([1; 32]); - let other_mock_tx = |nonce: u32| -> SignedTransaction { + let other_mock_tx = |nonce: u32| -> Arc { let actions = get_mock_tx(0).actions().to_vec(); - UnsignedTransaction { + let tx = UnsignedTransaction { params: TransactionParams::builder() .nonce(nonce) .chain_id("test") .build(), actions, } - .into_signed(&other_signing_key) + .into_signed(&other_signing_key); + Arc::new(tx) }; mempool.insert(other_mock_tx(100), 0).await.unwrap(); mempool.insert(other_mock_tx(102), 0).await.unwrap(); @@ -748,16 +746,17 @@ mod test { // Insert txs from a different signer with nonces 100 and 101. let other_signing_key = SigningKey::from([1; 32]); - let other_mock_tx = |nonce: u32| -> SignedTransaction { + let other_mock_tx = |nonce: u32| -> Arc { let actions = get_mock_tx(0).actions().to_vec(); - UnsignedTransaction { + let tx = UnsignedTransaction { params: TransactionParams::builder() .nonce(nonce) .chain_id("test") .build(), actions, } - .into_signed(&other_signing_key) + .into_signed(&other_signing_key); + Arc::new(tx) }; mempool.insert(other_mock_tx(100), 0).await.unwrap(); mempool.insert(other_mock_tx(101), 0).await.unwrap(); diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index eadd2365f8..a1953ee263 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -37,6 +37,8 @@ pub(crate) struct Metrics { check_tx_duration_seconds_check_balance: Histogram, check_tx_duration_seconds_check_removed: Histogram, check_tx_duration_seconds_insert_to_app_mempool: Histogram, + check_tx_cache_hit: Counter, + check_tx_cache_miss: Counter, actions_per_transaction_in_mempool: Histogram, transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, @@ -184,6 +186,20 @@ impl Metrics { CHECK_TX_STAGE => "insert to app mempool" ); + describe_counter!( + CHECK_TX_CACHE_HIT, + Unit::Count, + "The number of `check_tx` attempts which have been retrieved from the cached results" + ); + let check_tx_cache_hit = counter!(CHECK_TX_CACHE_HIT); + + describe_counter!( + CHECK_TX_CACHE_MISS, + Unit::Count, + "The number of `check_tx` rechecks which have not been found in the cached results" + ); + let check_tx_cache_miss = counter!(CHECK_TX_CACHE_MISS); + describe_histogram!( ACTIONS_PER_TRANSACTION_IN_MEMPOOL, Unit::Count, @@ -226,6 +242,8 @@ impl Metrics { check_tx_duration_seconds_check_balance, check_tx_duration_seconds_check_removed, check_tx_duration_seconds_insert_to_app_mempool, + check_tx_cache_hit, + check_tx_cache_miss, actions_per_transaction_in_mempool, transaction_in_mempool_size_bytes, transactions_in_mempool_total, @@ -329,6 +347,14 @@ impl Metrics { .record(duration); } + pub(crate) fn increment_check_tx_cache_hit(&self) { + self.check_tx_cache_hit.increment(1); + } + + pub(crate) fn increment_check_tx_cache_miss(&self) { + self.check_tx_cache_miss.increment(1); + } + pub(crate) fn record_actions_per_transaction_in_mempool(&self, count: usize) { // allow: precision loss is unlikely (values too small) but also unimportant in histograms. #[allow(clippy::cast_precision_loss)] @@ -362,6 +388,8 @@ metric_names!(pub const METRICS_NAMES: CHECK_TX_REMOVED_STALE_NONCE, CHECK_TX_REMOVED_ACCOUNT_BALANCE, CHECK_TX_DURATION_SECONDS, + CHECK_TX_CACHE_HIT, + CHECK_TX_CACHE_MISS, ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL @@ -371,6 +399,8 @@ metric_names!(pub const METRICS_NAMES: mod tests { use super::{ ACTIONS_PER_TRANSACTION_IN_MEMPOOL, + CHECK_TX_CACHE_HIT, + CHECK_TX_CACHE_MISS, CHECK_TX_DURATION_SECONDS, CHECK_TX_REMOVED_ACCOUNT_BALANCE, CHECK_TX_REMOVED_EXPIRED, @@ -437,6 +467,8 @@ mod tests { "check_tx_removed_account_balance", ); assert_const(CHECK_TX_DURATION_SECONDS, "check_tx_duration_seconds"); + assert_const(CHECK_TX_CACHE_HIT, "check_tx_cache_hit"); + assert_const(CHECK_TX_CACHE_MISS, "check_tx_cache_miss"); assert_const( ACTIONS_PER_TRANSACTION_IN_MEMPOOL, "actions_per_transaction_in_mempool", diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index acb92a0a80..02c6e94510 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -222,6 +222,7 @@ mod test { use std::{ collections::HashMap, str::FromStr, + sync::Arc, }; use astria_core::{ @@ -311,7 +312,10 @@ mod test { let signed_tx = tx.into_signed(&signing_key); let tx_bytes = signed_tx.clone().into_raw().encode_to_vec(); let txs = vec![tx_bytes.into()]; - mempool.insert(signed_tx.clone(), 0).await.unwrap(); + mempool + .insert(Arc::new(signed_tx.clone()), 0) + .await + .unwrap(); let res = generate_rollup_datas_commitment(&vec![signed_tx], HashMap::new()); @@ -530,7 +534,7 @@ mod test { .await .unwrap(); - mempool.insert(signed_tx, 0).await.unwrap(); + mempool.insert(Arc::new(signed_tx), 0).await.unwrap(); let finalize_block = request::FinalizeBlock { hash: Hash::try_from([0u8; 32].to_vec()).unwrap(), height: 1u32.into(), diff --git a/crates/astria-sequencer/src/service/mempool.rs b/crates/astria-sequencer/src/service/mempool.rs index dc93c3e991..4afcecb3ea 100644 --- a/crates/astria-sequencer/src/service/mempool.rs +++ b/crates/astria-sequencer/src/service/mempool.rs @@ -1,5 +1,6 @@ use std::{ pin::Pin, + sync::Arc, task::{ Context, Poll, @@ -14,12 +15,15 @@ use astria_core::{ transaction::v1alpha1::SignedTransaction, }, }; +use bytes::Bytes; use cnidarium::Storage; use futures::{ Future, FutureExt, }; use prost::Message as _; +use quick_cache::sync::Cache; +use sha2::Digest as _; use tendermint::v0_38::abci::{ request, response, @@ -28,10 +32,13 @@ use tendermint::v0_38::abci::{ }; use tower::Service; use tower_abci::BoxError; -use tracing::Instrument as _; +use tracing::{ + warn, + Instrument as _, +}; use crate::{ - accounts::state_ext::StateReadExt, + accounts::state_ext::StateReadExt as _, mempool::{ Mempool as AppMempool, RemovalReason, @@ -41,15 +48,21 @@ use crate::{ }; const MAX_TX_SIZE: usize = 256_000; // 256 KB +/// The number of entries in the immutable checks cache. +const CACHE_SIZE: usize = 10_000; + +type ImmutableChecksResult = Result, response::CheckTx>; -/// Mempool handles [`request::CheckTx`] abci requests. -// -/// It performs a stateless check of the given transaction, -/// returning a [`tendermint::v0_38::abci::response::CheckTx`]. +/// `Mempool` handles [`request::CheckTx`] abci requests. +/// +/// It performs stateless and stateful checks of the given transaction, +/// returning a [`response::CheckTx`]. #[derive(Clone)] pub(crate) struct Mempool { storage: Storage, inner: AppMempool, + /// A cache of recent results of immutable checks, indexed by tx hash. + cached_immutable_checks: Arc>, metrics: &'static Metrics, } @@ -58,6 +71,7 @@ impl Mempool { Self { storage, inner: mempool, + cached_immutable_checks: Arc::new(Cache::new(CACHE_SIZE)), metrics, } } @@ -76,12 +90,20 @@ impl Service for Mempool { use penumbra_tower_trace::v038::RequestExt as _; let span = req.create_span(); let storage = self.storage.clone(); - let mut mempool = self.inner.clone(); + let mempool = self.inner.clone(); + let cached_immutable_checks = self.cached_immutable_checks.clone(); let metrics = self.metrics; async move { let rsp = match req { MempoolRequest::CheckTx(req) => MempoolResponse::CheckTx( - handle_check_tx(req, storage.latest_snapshot(), &mut mempool, metrics).await, + handle_check_tx( + req, + storage.latest_snapshot(), + mempool, + cached_immutable_checks, + metrics, + ) + .await, ), }; Ok(rsp) @@ -97,132 +119,165 @@ impl Service for Mempool { /// as well as stateful checks (nonce and balance checks). /// /// If the tx passes all checks, status code 0 is returned. -#[allow(clippy::too_many_lines)] async fn handle_check_tx( - req: request::CheckTx, + request::CheckTx { + tx, + kind, + }: request::CheckTx, state: S, - mempool: &mut AppMempool, + mempool: AppMempool, + cached_immutable_checks: Arc>, metrics: &'static Metrics, ) -> response::CheckTx { - use sha2::Digest as _; - - let start_parsing = Instant::now(); - - let request::CheckTx { - tx, .. - } = req; + let start = Instant::now(); - let tx_hash = sha2::Sha256::digest(&tx).into(); + // So we don't waste time hashing a large object, we don't check the cache before the size + // check. let tx_len = tx.len(); - if tx_len > MAX_TX_SIZE { - mempool.remove(tx_hash).await; metrics.increment_check_tx_removed_too_large(); - return response::CheckTx { - code: AbciErrorCode::TRANSACTION_TOO_LARGE.into(), - log: format!( - "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got {}", - tx.len() + return new_error_response( + AbciErrorCode::TRANSACTION_TOO_LARGE, + format!( + "transaction size too large; allowed: {MAX_TX_SIZE} bytes, got: {tx_len} bytes", ), - info: AbciErrorCode::TRANSACTION_TOO_LARGE.to_string(), - ..response::CheckTx::default() - }; + ); } - let raw_signed_tx = match raw::SignedTransaction::decode(tx) { + // Ok to hash the tx now and check in the cache. + let tx_hash = sha2::Sha256::digest(&tx).into(); + let signed_tx = match cached_immutable_checks + .get_value_or_guard_async(&tx_hash) + .await + { + Ok(Ok(cached_tx)) => { + // The previous `parse_and_run_immutable_checks` call was `Ok`: rerun mutable checks. + metrics.increment_check_tx_cache_hit(); + cached_tx + } + Ok(Err(cached_error_response)) => { + // The previous `parse_and_run_immutable_checks` call was `Err`: just return it. + metrics.increment_check_tx_cache_hit(); + return cached_error_response; + } + Err(guard) => { + if kind == request::CheckTxKind::Recheck { + warn!( + tx_hash = %telemetry::display::base64(&tx_hash), + "got a cache miss for recheck of tx" + ); + metrics.increment_check_tx_cache_miss(); + } + let immutable_checks_result = + parse_tx_and_run_immutable_checks(tx, start, &state, metrics).await; + + if guard.insert(immutable_checks_result.clone()).is_err() { + warn!( + tx_hash = %telemetry::display::base64(&tx_hash), + "failed to cache the check tx result" + ); + } + + match immutable_checks_result { + Ok(tx) => tx, + Err(response) => return response, + } + } + }; + + run_mutable_checks(signed_tx, tx_hash, tx_len, state, mempool, metrics).await +} + +/// Parses and returns the signed tx from the request if and only if it passes immutable checks, +/// i.e. checks which will always pass or always fail. +async fn parse_tx_and_run_immutable_checks( + serialized_tx: Bytes, + mut start: Instant, + state: &S, + metrics: &'static Metrics, +) -> ImmutableChecksResult { + let raw_signed_tx = match raw::SignedTransaction::decode(serialized_tx) { Ok(tx) => tx, Err(e) => { - mempool.remove(tx_hash).await; - return response::CheckTx { - code: AbciErrorCode::INVALID_PARAMETER.into(), - log: e.to_string(), - info: "failed decoding bytes as a protobuf SignedTransaction".into(), - ..response::CheckTx::default() - }; + return Err(new_error_response( + AbciErrorCode::INVALID_PARAMETER, + format!("failed decoding bytes as a protobuf SignedTransaction: {e}"), + )); } }; let signed_tx = match SignedTransaction::try_from_raw(raw_signed_tx) { Ok(tx) => tx, Err(e) => { - mempool.remove(tx_hash).await; - return response::CheckTx { - code: AbciErrorCode::INVALID_PARAMETER.into(), - info: "the provided bytes was not a valid protobuf-encoded SignedTransaction, or \ - the signature was invalid" - .into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + return Err(new_error_response( + AbciErrorCode::INVALID_PARAMETER, + format!( + "the provided bytes were not a valid protobuf-encoded SignedTransaction, or \ + the signature was invalid: {e:#}" + ), + )); } }; - let finished_parsing = Instant::now(); - metrics.record_check_tx_duration_seconds_parse_tx( - finished_parsing.saturating_duration_since(start_parsing), - ); + let mut end = Instant::now(); + metrics.record_check_tx_duration_seconds_parse_tx(end.saturating_duration_since(start)); + start = end; if let Err(e) = transaction::check_stateless(&signed_tx).await { - mempool.remove(tx_hash).await; metrics.increment_check_tx_removed_failed_stateless(); - return response::CheckTx { - code: AbciErrorCode::INVALID_PARAMETER.into(), - info: "transaction failed stateless check".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + return Err(new_error_response( + AbciErrorCode::INVALID_PARAMETER, + format!("transaction failed stateless check: {e:#}"), + )); }; - let finished_check_stateless = Instant::now(); - metrics.record_check_tx_duration_seconds_check_stateless( - finished_check_stateless.saturating_duration_since(finished_parsing), - ); + end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_stateless(end.saturating_duration_since(start)); + start = end; - if let Err(e) = transaction::check_nonce_mempool(&signed_tx, &state).await { - mempool.remove(tx_hash).await; - metrics.increment_check_tx_removed_stale_nonce(); - return response::CheckTx { - code: AbciErrorCode::INVALID_NONCE.into(), - info: "failed verifying transaction nonce".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; - }; + if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, state).await { + return Err(new_error_response( + AbciErrorCode::INVALID_CHAIN_ID, + format!("{e:#}"), + )); + } - let finished_check_nonce = Instant::now(); - metrics.record_check_tx_duration_seconds_check_nonce( - finished_check_nonce.saturating_duration_since(finished_check_stateless), - ); + metrics.record_check_tx_duration_seconds_check_chain_id(start.elapsed()); - if let Err(e) = transaction::check_chain_id_mempool(&signed_tx, &state).await { - mempool.remove(tx_hash).await; - return response::CheckTx { - code: AbciErrorCode::INVALID_CHAIN_ID.into(), - info: "failed verifying chain id".into(), - log: e.to_string(), - ..response::CheckTx::default() + Ok(Arc::new(signed_tx)) +} + +async fn run_mutable_checks( + signed_tx: Arc, + tx_hash: [u8; 32], + tx_len: usize, + state: S, + mempool: AppMempool, + metrics: &'static Metrics, +) -> response::CheckTx { + let mut start = Instant::now(); + let current_account_nonce = + match transaction::get_current_nonce_if_tx_nonce_valid(&signed_tx, &state).await { + Ok(nonce) => nonce, + Err(e) => { + mempool.remove(tx_hash).await; + metrics.increment_check_tx_removed_stale_nonce(); + return new_error_response(AbciErrorCode::INVALID_NONCE, format!("{e:#}")); + } }; - } - let finished_check_chain_id = Instant::now(); - metrics.record_check_tx_duration_seconds_check_chain_id( - finished_check_chain_id.saturating_duration_since(finished_check_nonce), - ); + let mut end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_nonce(end.saturating_duration_since(start)); + start = end; if let Err(e) = transaction::check_balance_mempool(&signed_tx, &state).await { mempool.remove(tx_hash).await; metrics.increment_check_tx_removed_account_balance(); - return response::CheckTx { - code: AbciErrorCode::INSUFFICIENT_FUNDS.into(), - info: "failed verifying account balance".into(), - log: e.to_string(), - ..response::CheckTx::default() - }; + return new_error_response(AbciErrorCode::INSUFFICIENT_FUNDS, format!("{e:#}")); }; - let finished_check_balance = Instant::now(); - metrics.record_check_tx_duration_seconds_check_balance( - finished_check_balance.saturating_duration_since(finished_check_chain_id), - ); + end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_balance(end.saturating_duration_since(start)); + start = end; if let Some(removal_reason) = mempool.check_removed_comet_bft(tx_hash).await { mempool.remove(tx_hash).await; @@ -230,40 +285,27 @@ async fn handle_check_tx( match removal_reason { RemovalReason::Expired => { metrics.increment_check_tx_removed_expired(); - return response::CheckTx { - code: AbciErrorCode::TRANSACTION_EXPIRED.into(), - info: "transaction expired in app's mempool".into(), - log: "Transaction expired in the app's mempool".into(), - ..response::CheckTx::default() - }; + return new_error_response( + AbciErrorCode::TRANSACTION_EXPIRED, + "transaction expired in the app's mempool", + ); } RemovalReason::FailedPrepareProposal(err) => { metrics.increment_check_tx_removed_failed_execution(); - return response::CheckTx { - code: AbciErrorCode::TRANSACTION_FAILED.into(), - info: "transaction failed execution in prepare_proposal()".into(), - log: format!("transaction failed execution because: {err}"), - ..response::CheckTx::default() - }; + return new_error_response( + AbciErrorCode::TRANSACTION_FAILED, + format!("transaction failed execution: {err:#}"), + ); } } }; - let finished_check_removed = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_check_removed.saturating_duration_since(finished_check_balance), - ); + end = Instant::now(); + metrics.record_check_tx_duration_seconds_check_removed(end.saturating_duration_since(start)); + start = end; // tx is valid, push to mempool - let current_account_nonce = state - .get_account_nonce(crate::address::base_prefixed( - signed_tx.verification_key().address_bytes(), - )) - .await - .expect("can fetch account nonce"); - let actions_count = signed_tx.actions().len(); - mempool .insert(signed_tx, current_account_nonce) .await @@ -273,11 +315,129 @@ async fn handle_check_tx( ); let mempool_len = mempool.len().await; - metrics - .record_check_tx_duration_seconds_insert_to_app_mempool(finished_check_removed.elapsed()); + metrics.record_check_tx_duration_seconds_insert_to_app_mempool(start.elapsed()); metrics.record_actions_per_transaction_in_mempool(actions_count); metrics.record_transaction_in_mempool_size_bytes(tx_len); metrics.set_transactions_in_mempool_total(mempool_len); response::CheckTx::default() } + +fn new_error_response>(code: AbciErrorCode, log: T) -> response::CheckTx { + response::CheckTx { + code: tendermint::abci::Code::from(code), + info: code.info().to_string(), + log: log.as_ref().to_string(), + ..response::CheckTx::default() + } +} + +#[cfg(test)] +mod tests { + use astria_core::{ + crypto::SigningKey, + protocol::transaction::v1alpha1::{ + action::ValidatorUpdate, + Action, + TransactionParams, + UnsignedTransaction, + }, + }; + use cnidarium::{ + StateDelta, + TempStorage, + }; + use tendermint::abci::Code; + + use super::*; + use crate::{ + accounts::state_ext::StateWriteExt as _, + bridge::state_ext::StateWriteExt as _, + ibc::state_ext::StateWriteExt as _, + state_ext::StateWriteExt as _, + }; + + #[tokio::test] + async fn should_cache_failure() { + let storage = TempStorage::new().await.unwrap(); + let mempool = AppMempool::new(); + let cached_immutable_checks = Arc::new(Cache::new(CACHE_SIZE)); + let metrics = Box::leak(Box::new(Metrics::new())); + let request = request::CheckTx { + tx: Bytes::new(), + kind: request::CheckTxKind::New, + }; + let tx_hash: [u8; 32] = sha2::Sha256::digest(&request.tx).into(); + + // Should fail to parse and get added to the cache as `Err(response::CheckTx)`. + let response = handle_check_tx( + request, + storage.latest_snapshot(), + mempool.clone(), + cached_immutable_checks.clone(), + metrics, + ) + .await; + assert_eq!( + response.code, + AbciErrorCode::INVALID_PARAMETER.into(), + "{response:?}" + ); + assert_eq!(cached_immutable_checks.len(), 1); + let cached_result = cached_immutable_checks.get(&tx_hash).unwrap(); + assert_eq!(cached_result.unwrap_err(), response); + } + + #[tokio::test] + async fn should_cache_success() { + let nonce = 1; + let chain_id = "chain-id"; + + let storage = TempStorage::new().await.unwrap(); + let snapshot = storage.latest_snapshot(); + let mut state_delta = StateDelta::new(snapshot); + state_delta + .put_chain_id_and_revision_number(tendermint::chain::Id::try_from(chain_id).unwrap()); + state_delta.put_transfer_base_fee(1).unwrap(); + state_delta.put_ics20_withdrawal_base_fee(1).unwrap(); + state_delta.put_init_bridge_account_base_fee(1); + state_delta.put_bridge_lock_byte_cost_multiplier(1); + state_delta.put_bridge_sudo_change_base_fee(1); + let mempool = AppMempool::new(); + let cached_immutable_checks = Arc::new(Cache::new(CACHE_SIZE)); + let metrics = Box::leak(Box::new(Metrics::new())); + let signing_key = SigningKey::from([1; 32]); + let action = ValidatorUpdate { + power: 0, + verification_key: signing_key.verification_key(), + }; + let unsigned_tx = UnsignedTransaction { + actions: vec![Action::ValidatorUpdate(action)], + params: TransactionParams::builder() + .nonce(nonce) + .chain_id(chain_id) + .build(), + }; + let signed_tx = unsigned_tx.into_signed(&signing_key).to_raw(); + let request = request::CheckTx { + tx: signed_tx.encode_to_vec().into(), + kind: request::CheckTxKind::New, + }; + let tx_hash: [u8; 32] = sha2::Sha256::digest(&request.tx).into(); + + // Should parse, pass immutable checks and get added to the cache as + // `Ok(SignedTransaction)`. + let response = handle_check_tx( + request, + state_delta, + mempool.clone(), + cached_immutable_checks.clone(), + metrics, + ) + .await; + assert_eq!(response.code, Code::Ok, "{response:?}"); + assert_eq!(cached_immutable_checks.len(), 1); + let cached_result = cached_immutable_checks.get(&tx_hash).unwrap(); + assert_eq!(cached_result.unwrap().to_raw(), signed_tx); + } +} diff --git a/crates/astria-sequencer/src/transaction/checks.rs b/crates/astria-sequencer/src/transaction/checks.rs index 6af1de5db7..12bcdff3fc 100644 --- a/crates/astria-sequencer/src/transaction/checks.rs +++ b/crates/astria-sequencer/src/transaction/checks.rs @@ -27,17 +27,19 @@ use crate::{ state_ext::StateReadExt as _, }; -pub(crate) async fn check_nonce_mempool( +/// Returns the currently stored nonce of the tx signer's account if the tx nonce is not less than +/// it. +pub(crate) async fn get_current_nonce_if_tx_nonce_valid( tx: &SignedTransaction, state: &S, -) -> anyhow::Result<()> { +) -> anyhow::Result { let signer_address = crate::address::base_prefixed(tx.verification_key().address_bytes()); let curr_nonce = state .get_account_nonce(signer_address) .await .context("failed to get account nonce")?; ensure!(tx.nonce() >= curr_nonce, "nonce already used by account"); - Ok(()) + Ok(curr_nonce) } pub(crate) async fn check_chain_id_mempool( @@ -59,7 +61,7 @@ pub(crate) async fn check_balance_mempool( let signer_address = crate::address::base_prefixed(tx.verification_key().address_bytes()); check_balance_for_total_fees_and_transfers(tx.unsigned_transaction(), signer_address, state) .await - .context("failed to check balance for total fees and transfers")?; + .context("balance check for total fees and transfers failed")?; Ok(()) } diff --git a/crates/astria-sequencer/src/transaction/mod.rs b/crates/astria-sequencer/src/transaction/mod.rs index d7e8e8e06c..3ddb07ec8c 100644 --- a/crates/astria-sequencer/src/transaction/mod.rs +++ b/crates/astria-sequencer/src/transaction/mod.rs @@ -21,7 +21,7 @@ pub(crate) use checks::{ check_balance_for_total_fees_and_transfers, check_balance_mempool, check_chain_id_mempool, - check_nonce_mempool, + get_current_nonce_if_tx_nonce_valid, }; use tracing::instrument;