Skip to content

Commit

Permalink
rewrite to allow reinsertion on recheck for edge case
Browse files Browse the repository at this point in the history
  • Loading branch information
Lilyjjo committed Sep 24, 2024
1 parent 0e889ef commit 0de80ec
Show file tree
Hide file tree
Showing 7 changed files with 489 additions and 157 deletions.
99 changes: 87 additions & 12 deletions crates/astria-sequencer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use astria_core::{
use astria_eyre::eyre::Result;
pub(crate) use mempool_state::get_account_balances;
use tokio::{
join,
sync::{
RwLock,
RwLockWriteGuard,
Expand Down Expand Up @@ -133,6 +132,7 @@ pub(crate) struct Mempool {
pending: Arc<RwLock<PendingTransactions>>,
parked: Arc<RwLock<ParkedTransactions<MAX_PARKED_TXS_PER_ACCOUNT>>>,
comet_bft_removal_cache: Arc<RwLock<RemovalCache>>,
contained_txs: Arc<RwLock<HashSet<[u8; 32]>>>,
}

impl Mempool {
Expand All @@ -145,19 +145,15 @@ impl Mempool {
NonZeroUsize::try_from(REMOVAL_CACHE_SIZE)
.expect("Removal cache cannot be zero sized"),
))),
contained_txs: Arc::new(RwLock::new(HashSet::new())),
}
}

/// Returns the number of transactions in the mempool.
#[must_use]
#[instrument(skip_all)]
pub(crate) async fn len(&self) -> usize {
#[rustfmt::skip]
let (pending_len, parked_len) = join!(
async { self.pending.read().await.len() },
async { self.parked.read().await.len() }
);
pending_len.saturating_add(parked_len)
self.contained_txs.read().await.len()
}

/// Inserts a transaction into the mempool and does not allow for transaction replacement.
Expand All @@ -184,11 +180,18 @@ impl Mempool {
// Release the lock asap.
drop(pending);
// try to add to parked queue
parked.add(
timemarked_tx,
match parked.add(
timemarked_tx.clone(),
current_account_nonce,
&current_account_balances,
)
) {
Ok(()) => {
// track in contained txs
self.contained_txs.write().await.insert(timemarked_tx.id());
Ok(())
}
Err(err) => Err(err),
}
}
error @ Err(
InsertionError::AlreadyPresent
Expand Down Expand Up @@ -220,6 +223,10 @@ impl Mempool {
);
}
}

// track in contained txs
self.contained_txs.write().await.insert(timemarked_tx.id());

Ok(())
}
}
Expand Down Expand Up @@ -266,10 +273,12 @@ impl Mempool {

// Add all removed to removal cache for cometbft.
let mut removal_cache = self.comet_bft_removal_cache.write().await;
// Add the original tx first, since it will also be listed in `removed_txs`. The second

// Add the original tx first to preserve its reason for removal. The second
// attempt to add it inside the loop below will be a no-op.
removal_cache.add(tx_hash, reason);
for removed_tx in removed_txs {
self.contained_txs.write().await.remove(&removed_tx);
removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated);
}
}
Expand All @@ -281,6 +290,12 @@ impl Mempool {
self.comet_bft_removal_cache.write().await.remove(tx_hash)
}

/// Returns true if the transaction is tracked as inserted.
#[instrument(skip_all)]
pub(crate) async fn tracked(&self, tx_hash: [u8; 32]) -> bool {
self.contained_txs.read().await.contains(&tx_hash)
}

/// Updates stored transactions to reflect current blockchain state. Will remove transactions
/// that have stale nonces or are expired. Will also shift transation between pending and
/// parked to relfect changes in account balances.
Expand Down Expand Up @@ -380,10 +395,12 @@ impl Mempool {
drop(parked);
drop(pending);

// add to removal cache for cometbft
// add to removal cache for cometbft and remove from the tracked set
let mut removal_cache = self.comet_bft_removal_cache.write().await;
let mut tracked_txs = self.contained_txs.write().await;
for (tx_hash, reason) in removed_txs {
removal_cache.add(tx_hash, reason);
tracked_txs.remove(&tx_hash);
}
}

Expand Down Expand Up @@ -947,4 +964,62 @@ mod test {
"first removal reason should be presenved"
);
}

#[tokio::test]
async fn tx_tracked_set() {
let mempool = Mempool::new();
let signing_key = SigningKey::from([1; 32]);
let signing_address = signing_key.verification_key().address_bytes();
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);

let tx0 = mock_tx(0, &signing_key, "test");
let tx1 = mock_tx(1, &signing_key, "test");

// check that the parked transaction is in the tracked set
mempool
.insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
assert!(mempool.tracked(tx1.id().get()).await);

// check that the pending transaction is in the tracked set
mempool
.insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
assert!(mempool.tracked(tx0.id().get()).await);

// remove the transactions from the mempool
mempool
.remove_tx_invalid(tx0.clone(), RemovalReason::Expired)
.await;

// check that the transactions are not in the tracked set
assert!(!mempool.tracked(tx0.id().get()).await);
assert!(!mempool.tracked(tx1.id().get()).await);

// re-insert the transactions into the mempool
mempool
.insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
mempool
.insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();

// check that the transactions are in the tracked set
assert!(mempool.tracked(tx0.id().get()).await);
assert!(mempool.tracked(tx1.id().get()).await);

// remove the transacitons from the mempool via maintenance
let mut mock_state = mock_state_getter().await;
mock_state_put_account_nonce(&mut mock_state, signing_address, 2);
mempool.run_maintenance(&mock_state, false).await;

// check that the transactions are not in the tracked set
assert!(!mempool.tracked(tx0.id().get()).await);
assert!(!mempool.tracked(tx1.id().get()).await);
}
}
4 changes: 4 additions & 0 deletions crates/astria-sequencer/src/mempool/transactions_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ impl TimemarkedTransaction {
pub(super) fn cost(&self) -> &HashMap<IbcPrefixed, u128> {
&self.cost
}

pub(super) fn id(&self) -> [u8; 32] {
self.tx_hash
}
}

impl fmt::Display for TimemarkedTransaction {
Expand Down
52 changes: 28 additions & 24 deletions crates/astria-sequencer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ pub struct Metrics {
check_tx_removed_expired: Counter,
check_tx_removed_failed_execution: Counter,
check_tx_removed_failed_stateless: Counter,
check_tx_removed_stale_nonce: Counter,
check_tx_duration_seconds_parse_tx: Histogram,
check_tx_duration_seconds_check_stateless: Histogram,
check_tx_duration_seconds_check_nonce: Histogram,
check_tx_duration_seconds_fetch_nonce: Histogram,
check_tx_duration_seconds_check_tracked: Histogram,
check_tx_duration_seconds_check_chain_id: Histogram,
check_tx_duration_seconds_check_removed: Histogram,
check_tx_duration_seconds_convert_address: Histogram,
Expand Down Expand Up @@ -88,10 +88,6 @@ impl Metrics {
self.check_tx_removed_failed_stateless.increment(1);
}

pub(crate) fn increment_check_tx_removed_stale_nonce(&self) {
self.check_tx_removed_stale_nonce.increment(1);
}

pub(crate) fn record_check_tx_duration_seconds_parse_tx(&self, duration: Duration) {
self.check_tx_duration_seconds_parse_tx.record(duration);
}
Expand All @@ -101,8 +97,13 @@ impl Metrics {
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_nonce(&self, duration: Duration) {
self.check_tx_duration_seconds_check_nonce.record(duration);
pub(crate) fn record_check_tx_duration_seconds_fetch_nonce(&self, duration: Duration) {
self.check_tx_duration_seconds_fetch_nonce.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_tracked(&self, duration: Duration) {
self.check_tx_duration_seconds_check_tracked
.record(duration);
}

pub(crate) fn record_check_tx_duration_seconds_check_chain_id(&self, duration: Duration) {
Expand Down Expand Up @@ -260,22 +261,28 @@ impl telemetry::Metrics for Metrics {
)?
.register()?;

let check_tx_duration_seconds_fetch_nonce = builder
.new_histogram_factory(
CHECK_TX_DURATION_SECONDS_FETCH_NONCE,
"The amount of time taken in seconds to fetch an account's nonce",
)?
.register()?;

let check_tx_duration_seconds_check_tracked = builder
.new_histogram_factory(
CHECK_TX_DURATION_SECONDS_CHECK_TRACKED,
"The amount of time taken in seconds to check if the transaction is already in \
the mempool",
)?
.register()?;

let check_tx_removed_failed_stateless = builder
.new_counter_factory(
CHECK_TX_REMOVED_FAILED_STATELESS,
"The number of transactions that have been removed from the mempool due to \
failing the stateless check",
)?
.register()?;

let check_tx_removed_stale_nonce = builder
.new_counter_factory(
CHECK_TX_REMOVED_STALE_NONCE,
"The number of transactions that have been removed from the mempool due to having \
a stale nonce",
)?
.register()?;

let mut check_tx_duration_factory = builder.new_histogram_factory(
CHECK_TX_DURATION_SECONDS,
"The amount of time taken in seconds to successfully complete the various stages of \
Expand All @@ -286,8 +293,6 @@ impl telemetry::Metrics for Metrics {
)?;
let check_tx_duration_seconds_check_stateless = check_tx_duration_factory
.register_with_labels(&[(CHECK_TX_STAGE, "stateless check".to_string())])?;
let check_tx_duration_seconds_check_nonce = check_tx_duration_factory
.register_with_labels(&[(CHECK_TX_STAGE, "nonce check".to_string())])?;
let check_tx_duration_seconds_check_chain_id = check_tx_duration_factory
.register_with_labels(&[(CHECK_TX_STAGE, "chain id check".to_string())])?;
let check_tx_duration_seconds_check_removed = check_tx_duration_factory
Expand Down Expand Up @@ -335,10 +340,10 @@ impl telemetry::Metrics for Metrics {
check_tx_removed_expired,
check_tx_removed_failed_execution,
check_tx_removed_failed_stateless,
check_tx_removed_stale_nonce,
check_tx_duration_seconds_parse_tx,
check_tx_duration_seconds_check_stateless,
check_tx_duration_seconds_check_nonce,
check_tx_duration_seconds_fetch_nonce,
check_tx_duration_seconds_check_tracked,
check_tx_duration_seconds_check_chain_id,
check_tx_duration_seconds_check_removed,
check_tx_duration_seconds_convert_address,
Expand All @@ -365,12 +370,13 @@ metric_names!(const METRICS_NAMES:
CHECK_TX_REMOVED_EXPIRED,
CHECK_TX_REMOVED_FAILED_EXECUTION,
CHECK_TX_REMOVED_FAILED_STATELESS,
CHECK_TX_REMOVED_STALE_NONCE,
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
CHECK_TX_DURATION_SECONDS,
CHECK_TX_DURATION_SECONDS_CONVERT_ADDRESS,
CHECK_TX_DURATION_SECONDS_FETCH_BALANCES,
CHECK_TX_DURATION_SECONDS_FETCH_NONCE,
CHECK_TX_DURATION_SECONDS_FETCH_TX_COST,
CHECK_TX_DURATION_SECONDS_CHECK_TRACKED,
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
TRANSACTIONS_IN_MEMPOOL_TOTAL,
Expand All @@ -386,7 +392,6 @@ mod tests {
CHECK_TX_REMOVED_EXPIRED,
CHECK_TX_REMOVED_FAILED_EXECUTION,
CHECK_TX_REMOVED_FAILED_STATELESS,
CHECK_TX_REMOVED_STALE_NONCE,
CHECK_TX_REMOVED_TOO_LARGE,
MEMPOOL_RECOSTED,
PREPARE_PROPOSAL_EXCLUDED_TRANSACTIONS,
Expand Down Expand Up @@ -442,7 +447,6 @@ mod tests {
CHECK_TX_REMOVED_FAILED_STATELESS,
"check_tx_removed_failed_stateless",
);
assert_const(CHECK_TX_REMOVED_STALE_NONCE, "check_tx_removed_stale_nonce");
assert_const(
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
"check_tx_removed_account_balance",
Expand Down
Loading

0 comments on commit 0de80ec

Please sign in to comment.