From 345fcab9eced0d4f9a0a52c8f0c0df3140e528dd Mon Sep 17 00:00:00 2001 From: lilyjjo Date: Sat, 28 Sep 2024 12:52:10 -0400 Subject: [PATCH] respond to comment from @Fraser999 --- crates/astria-sequencer/src/app/test_utils.rs | 2 +- crates/astria-sequencer/src/grpc/sequencer.rs | 10 +- .../src/mempool/benchmarks.rs | 4 +- crates/astria-sequencer/src/mempool/mod.rs | 98 ++++++++++++++----- crates/astria-sequencer/src/metrics.rs | 19 +++- crates/astria-sequencer/src/sequencer.rs | 2 +- .../astria-sequencer/src/service/consensus.rs | 2 +- .../src/service/mempool/mod.rs | 23 ++--- .../src/service/mempool/tests.rs | 8 +- 9 files changed, 118 insertions(+), 50 deletions(-) diff --git a/crates/astria-sequencer/src/app/test_utils.rs b/crates/astria-sequencer/src/app/test_utils.rs index 0290b3ccf0..82584ce591 100644 --- a/crates/astria-sequencer/src/app/test_utils.rs +++ b/crates/astria-sequencer/src/app/test_utils.rs @@ -189,8 +189,8 @@ pub(crate) async fn initialize_app_with_storage( .await .expect("failed to create temp storage backing chain state"); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool, metrics).await.unwrap(); let genesis_state = genesis_state.unwrap_or_else(self::genesis_state); diff --git a/crates/astria-sequencer/src/grpc/sequencer.rs b/crates/astria-sequencer/src/grpc/sequencer.rs index 5263a09da6..b3411df7dd 100644 --- a/crates/astria-sequencer/src/grpc/sequencer.rs +++ b/crates/astria-sequencer/src/grpc/sequencer.rs @@ -221,6 +221,7 @@ mod test { sequencerblock::v1alpha1::SequencerBlock, }; use cnidarium::StateDelta; + use telemetry::Metrics; use super::*; use crate::{ @@ -246,7 +247,8 @@ mod test { async fn test_get_sequencer_block() { let block = make_test_sequencer_block(1); let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); state_tx.put_block_height(1); state_tx.put_sequencer_block(block.clone()).unwrap(); @@ -264,7 +266,8 @@ mod test { #[tokio::test] async fn get_pending_nonce_in_mempool() { let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); @@ -307,7 +310,8 @@ mod test { use crate::accounts::StateWriteExt as _; let storage = cnidarium::TempStorage::new().await.unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut state_tx = StateDelta::new(storage.latest_snapshot()); let alice = get_alice_signing_key(); let alice_address = astria_address(&alice.address_bytes()); diff --git a/crates/astria-sequencer/src/mempool/benchmarks.rs b/crates/astria-sequencer/src/mempool/benchmarks.rs index d8df9dc718..7173478b38 100644 --- a/crates/astria-sequencer/src/mempool/benchmarks.rs +++ b/crates/astria-sequencer/src/mempool/benchmarks.rs @@ -17,6 +17,7 @@ use sha2::{ Digest as _, Sha256, }; +use telemetry::Metrics; use crate::{ app::test_utils::{ @@ -103,7 +104,8 @@ fn init_mempool() -> Mempool { .enable_all() .build() .unwrap(); - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let account_mock_balance = mock_balances(0, 0); let tx_mock_cost = mock_tx_cost(0, 0, 0); runtime.block_on(async { diff --git a/crates/astria-sequencer/src/mempool/mod.rs b/crates/astria-sequencer/src/mempool/mod.rs index e4ee742ce3..858dbfb43e 100644 --- a/crates/astria-sequencer/src/mempool/mod.rs +++ b/crates/astria-sequencer/src/mempool/mod.rs @@ -37,7 +37,10 @@ use transactions_container::{ TimemarkedTransaction, }; -use crate::accounts; +use crate::{ + accounts, + Metrics, +}; #[derive(Debug, Eq, PartialEq, Clone)] pub(crate) enum RemovalReason { @@ -133,11 +136,12 @@ pub(crate) struct Mempool { parked: Arc>>, comet_bft_removal_cache: Arc>, contained_txs: Arc>>, + metrics: &'static Metrics, } impl Mempool { #[must_use] - pub(crate) fn new() -> Self { + pub(crate) fn new(metrics: &'static Metrics) -> Self { Self { pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))), parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))), @@ -146,6 +150,7 @@ impl Mempool { .expect("Removal cache cannot be zero sized"), ))), contained_txs: Arc::new(RwLock::new(HashSet::new())), + metrics, } } @@ -156,6 +161,30 @@ impl Mempool { self.contained_txs.read().await.len() } + /// Adds a transaction to the mempool's tracked transactions. + /// Will increment logic error metrics and log error if transaction is already present. + fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { + if !contained_txs.insert(tx_hash) { + self.metrics.increment_mempool_tx_logic_error(); + error!( + "attempted to re-add transaction {tx_hash:?} to mempool's tracked container, is \ + logic error" + ); + } + } + + /// Removes a transaction from the mempool's tracked transactions. + /// Will increment logic error metrics and log error if transaction is not present. + fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) { + if !contained_txs.remove(&tx_hash) { + self.metrics.increment_mempool_tx_logic_error(); + error!( + "attempted to remove transaction non present {tx_hash:?} from mempool's tracked \ + container, is logic error" + ); + } + } + /// Inserts a transaction into the mempool and does not allow for transaction replacement. /// Will return the reason for insertion failure if failure occurs. #[instrument(skip_all)] @@ -214,18 +243,23 @@ impl Mempool { ); // promote the transactions for ttx in to_promote { + let tx_id = ttx.id(); if let Err(error) = pending.add(ttx, current_account_nonce, ¤t_account_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs); error!( current_account_nonce, - "failed to promote transaction during insertion: {error:#}" + "failed to promote transaction {tx_id:?} during insertion: {error:#}" ); } } // track in contained txs - self.contained_txs.write().await.insert(timemarked_tx.id()); + let mut contained_txs = self.contained_txs.write().await; + self.add_to_contained_txs(timemarked_tx.id(), &mut contained_txs); Ok(()) } @@ -277,8 +311,9 @@ impl Mempool { // Add the original tx first to preserve its reason for removal. The second // attempt to add it inside the loop below will be a no-op. removal_cache.add(tx_hash, reason); + let mut contained_txs = self.contained_txs.write().await; for removed_tx in removed_txs { - self.contained_txs.write().await.remove(&removed_tx); + self.remove_from_contained_txs(removed_tx, &mut contained_txs); removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated); } } @@ -292,7 +327,7 @@ impl Mempool { /// Returns true if the transaction is tracked as inserted. #[instrument(skip_all)] - pub(crate) async fn tracked(&self, tx_hash: [u8; 32]) -> bool { + pub(crate) async fn is_tracked(&self, tx_hash: [u8; 32]) -> bool { self.contained_txs.read().await.contains(&tx_hash) } @@ -371,18 +406,29 @@ impl Mempool { parked.find_promotables(&address, highest_pending_nonce, &remaining_balances); for tx in promtion_txs { + let tx_id = tx.id(); if let Err(error) = pending.add(tx, current_nonce, ¤t_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(tx_id, &mut contained_txs); + // this shouldn't happen + self.metrics.increment_mempool_tx_logic_error(); error!( current_nonce, - "failed to promote transaction during maintenance: {error:#}" + "failed to promote transaction {tx_id:?} during maintenance: {error:#}" ); } } } else { // add demoted transactions to parked for tx in demotion_txs { + let tx_id = tx.id(); if let Err(err) = parked.add(tx, current_nonce, ¤t_balances) { + // remove from tracked + let mut contained_txs = self.contained_txs.write().await; + self.remove_from_contained_txs(tx_id, &mut contained_txs); // this shouldn't happen + self.metrics.increment_mempool_tx_logic_error(); error!( address = %telemetry::display::base64(&address), "failed to demote transaction during maintenance: {err:#}" @@ -428,6 +474,7 @@ impl Mempool { #[cfg(test)] mod test { use astria_core::crypto::SigningKey; + use telemetry::Metrics; use super::*; use crate::app::test_utils::{ @@ -441,7 +488,8 @@ mod test { #[tokio::test] async fn insert() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 0); @@ -502,7 +550,8 @@ mod test { // odder edge cases that can be hit if a node goes offline or fails to see // some transactions that other nodes include into their proposed blocks. - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); let account_balances = mock_balances(100, 100); @@ -593,7 +642,8 @@ mod test { #[tokio::test] async fn run_maintenance_promotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); @@ -660,7 +710,8 @@ mod test { #[allow(clippy::too_many_lines)] #[tokio::test] async fn run_maintenance_demotion() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); @@ -740,7 +791,8 @@ mod test { #[tokio::test] async fn remove_invalid() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let account_balances = mock_balances(100, 100); let tx_cost = mock_tx_cost(10, 10, 10); @@ -843,7 +895,8 @@ mod test { #[tokio::test] async fn should_get_pending_nonce() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key_0 = SigningKey::from([1; 32]); let signing_key_1 = SigningKey::from([2; 32]); let signing_key_2 = SigningKey::from([3; 32]); @@ -967,7 +1020,8 @@ mod test { #[tokio::test] async fn tx_tracked_set() { - let mempool = Mempool::new(); + let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let signing_key = SigningKey::from([1; 32]); let signing_address = signing_key.verification_key().address_bytes(); let account_balances = mock_balances(100, 100); @@ -981,14 +1035,14 @@ mod test { .insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); - assert!(mempool.tracked(tx1.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); // check that the pending transaction is in the tracked set mempool .insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone()) .await .unwrap(); - assert!(mempool.tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx0.id().get()).await); // remove the transactions from the mempool mempool @@ -996,8 +1050,8 @@ mod test { .await; // check that the transactions are not in the tracked set - assert!(!mempool.tracked(tx0.id().get()).await); - assert!(!mempool.tracked(tx1.id().get()).await); + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); // re-insert the transactions into the mempool mempool @@ -1010,8 +1064,8 @@ mod test { .unwrap(); // check that the transactions are in the tracked set - assert!(mempool.tracked(tx0.id().get()).await); - assert!(mempool.tracked(tx1.id().get()).await); + assert!(mempool.is_tracked(tx0.id().get()).await); + assert!(mempool.is_tracked(tx1.id().get()).await); // remove the transacitons from the mempool via maintenance let mut mock_state = mock_state_getter().await; @@ -1019,7 +1073,7 @@ mod test { mempool.run_maintenance(&mock_state, false).await; // check that the transactions are not in the tracked set - assert!(!mempool.tracked(tx0.id().get()).await); - assert!(!mempool.tracked(tx1.id().get()).await); + assert!(!mempool.is_tracked(tx0.id().get()).await); + assert!(!mempool.is_tracked(tx1.id().get()).await); } } diff --git a/crates/astria-sequencer/src/metrics.rs b/crates/astria-sequencer/src/metrics.rs index 413196fe9d..3e961e3b41 100644 --- a/crates/astria-sequencer/src/metrics.rs +++ b/crates/astria-sequencer/src/metrics.rs @@ -38,6 +38,7 @@ pub struct Metrics { transaction_in_mempool_size_bytes: Histogram, transactions_in_mempool_total: Gauge, mempool_recosted: Counter, + mempool_tx_logic_error: Counter, } impl Metrics { @@ -154,6 +155,10 @@ impl Metrics { pub(crate) fn increment_mempool_recosted(&self) { self.mempool_recosted.increment(1); } + + pub(crate) fn increment_mempool_tx_logic_error(&self) { + self.mempool_tx_logic_error.increment(1); + } } impl telemetry::Metrics for Metrics { @@ -328,6 +333,14 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let mempool_tx_logic_error = builder + .new_counter_factory( + MEMPOOL_TX_LOGIC_ERROR, + "The number of times a transaction has been rejected due to logic errors in the \ + mempool", + )? + .register()?; + Ok(Self { prepare_proposal_excluded_transactions_cometbft_space, prepare_proposal_excluded_transactions_sequencer_space, @@ -354,6 +367,7 @@ impl telemetry::Metrics for Metrics { transaction_in_mempool_size_bytes, transactions_in_mempool_total, mempool_recosted, + mempool_tx_logic_error, }) } } @@ -380,7 +394,8 @@ metric_names!(const METRICS_NAMES: ACTIONS_PER_TRANSACTION_IN_MEMPOOL, TRANSACTION_IN_MEMPOOL_SIZE_BYTES, TRANSACTIONS_IN_MEMPOOL_TOTAL, - MEMPOOL_RECOSTED + MEMPOOL_RECOSTED, + MEMPOOL_TX_LOGIC_ERROR ); #[cfg(test)] @@ -394,6 +409,7 @@ mod tests { CHECK_TX_REMOVED_FAILED_STATELESS, CHECK_TX_REMOVED_TOO_LARGE, MEMPOOL_RECOSTED, + MEMPOOL_TX_LOGIC_ERROR, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_COMETBFT_SPACE, PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS_FAILED_EXECUTION, @@ -465,5 +481,6 @@ mod tests { "transactions_in_mempool_total", ); assert_const(MEMPOOL_RECOSTED, "mempool_recosted"); + assert_const(MEMPOOL_TX_LOGIC_ERROR, "mempool_tx_logic_error"); } } diff --git a/crates/astria-sequencer/src/sequencer.rs b/crates/astria-sequencer/src/sequencer.rs index 3bd1bfd64c..a6df51421b 100644 --- a/crates/astria-sequencer/src/sequencer.rs +++ b/crates/astria-sequencer/src/sequencer.rs @@ -84,7 +84,7 @@ impl Sequencer { .wrap_err("failed to load storage backing chain state")?; let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); + let mempool = Mempool::new(metrics); let app = App::new(snapshot, mempool.clone(), metrics) .await .wrap_err("failed to initialize app")?; diff --git a/crates/astria-sequencer/src/service/consensus.rs b/crates/astria-sequencer/src/service/consensus.rs index 4fd31287cb..052ea79750 100644 --- a/crates/astria-sequencer/src/service/consensus.rs +++ b/crates/astria-sequencer/src/service/consensus.rs @@ -473,8 +473,8 @@ mod test { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); app.init_chain(storage.clone(), genesis_state, vec![], "test".to_string()) .await diff --git a/crates/astria-sequencer/src/service/mempool/mod.rs b/crates/astria-sequencer/src/service/mempool/mod.rs index ddcfe2e499..212d823654 100644 --- a/crates/astria-sequencer/src/service/mempool/mod.rs +++ b/crates/astria-sequencer/src/service/mempool/mod.rs @@ -164,19 +164,16 @@ async fn handle_check_tx bool { let start_tracked_check = Instant::now(); - let result = mempool.tracked(tx_hash).await; + let result = mempool.is_tracked(tx_hash).await; - let finished_check_tracked = Instant::now(); - metrics.record_check_tx_duration_seconds_check_tracked( - finished_check_tracked.saturating_duration_since(start_tracked_check), - ); + metrics.record_check_tx_duration_seconds_check_tracked(start_tracked_check.elapsed()); result } /// Checks if the transaction has been removed from the appside mempool. /// -/// Returns an Err([`response::CheckTx`]) with an error code and message if the transaction has been +/// Returns an `Err(response::CheckTx)` with an error code and message if the transaction has been /// removed from the appside mempool. async fn check_removed_comet_bft( tx_hash: [u8; 32], @@ -230,17 +227,14 @@ async fn check_removed_comet_bft( } }; - let finished_removal_check = Instant::now(); - metrics.record_check_tx_duration_seconds_check_removed( - finished_removal_check.saturating_duration_since(start_removal_check), - ); + metrics.record_check_tx_duration_seconds_check_removed(start_removal_check.elapsed()); Ok(()) } /// Performs stateless checks on the transaction. /// -/// Returns an Err([`response::CheckTx`]) if the transaction fails any of the checks. +/// Returns an `Err(response::CheckTx)` if the transaction fails any of the checks. /// Otherwise, it returns the [`SignedTransaction`] to be inserted into the mempool. async fn stateless_checks( tx: Bytes, @@ -318,10 +312,7 @@ async fn stateless_checks( mempool: &AppMempool, diff --git a/crates/astria-sequencer/src/service/mempool/tests.rs b/crates/astria-sequencer/src/service/mempool/tests.rs index d9c22ad1f8..a31b6866d6 100644 --- a/crates/astria-sequencer/src/service/mempool/tests.rs +++ b/crates/astria-sequencer/src/service/mempool/tests.rs @@ -30,8 +30,8 @@ async fn future_nonce_ok() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -64,8 +64,8 @@ async fn rechecks_pass() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -103,8 +103,8 @@ async fn can_reinsert_after_recheck_fail() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state(); @@ -152,8 +152,8 @@ async fn receck_adds_non_tracked_tx() { let storage = cnidarium::TempStorage::new().await.unwrap(); let snapshot = storage.latest_snapshot(); - let mut mempool = Mempool::new(); let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap())); + let mut mempool = Mempool::new(metrics); let mut app = App::new(snapshot, mempool.clone(), metrics).await.unwrap(); let genesis_state = crate::app::test_utils::genesis_state();