Skip to content

Commit

Permalink
respond to comment from @Fraser999
Browse files Browse the repository at this point in the history
  • Loading branch information
Lilyjjo committed Sep 30, 2024
1 parent 83e4718 commit af24538
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 50 deletions.
2 changes: 1 addition & 1 deletion crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,8 @@ pub(crate) async fn initialize_app_with_storage(
.await
.expect("failed to create temp storage backing chain state");
let snapshot = storage.latest_snapshot();
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mut app = App::new(snapshot, mempool, metrics).await.unwrap();

let genesis_state = genesis_state.unwrap_or_else(self::genesis_state);
Expand Down
10 changes: 7 additions & 3 deletions crates/astria-sequencer/src/grpc/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ mod tests {
sequencerblock::v1alpha1::SequencerBlock,
};
use cnidarium::StateDelta;
use telemetry::Metrics;

use super::*;
use crate::{
Expand All @@ -246,7 +247,8 @@ mod tests {
async fn test_get_sequencer_block() {
let block = make_test_sequencer_block(1);
let storage = cnidarium::TempStorage::new().await.unwrap();
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mut state_tx = StateDelta::new(storage.latest_snapshot());
state_tx.put_block_height(1);
state_tx.put_sequencer_block(block.clone()).unwrap();
Expand All @@ -264,7 +266,8 @@ mod tests {
#[tokio::test]
async fn get_pending_nonce_in_mempool() {
let storage = cnidarium::TempStorage::new().await.unwrap();
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);

let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
Expand Down Expand Up @@ -307,7 +310,8 @@ mod tests {
use crate::accounts::StateWriteExt as _;

let storage = cnidarium::TempStorage::new().await.unwrap();
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let mut state_tx = StateDelta::new(storage.latest_snapshot());
let alice = get_alice_signing_key();
let alice_address = astria_address(&alice.address_bytes());
Expand Down
4 changes: 3 additions & 1 deletion crates/astria-sequencer/src/mempool/benchmarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sha2::{
Digest as _,
Sha256,
};
use telemetry::Metrics;

use crate::{
app::test_utils::{
Expand Down Expand Up @@ -103,7 +104,8 @@ fn init_mempool<T: MempoolSize>() -> Mempool {
.enable_all()
.build()
.unwrap();
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let account_mock_balance = mock_balances(0, 0);
let tx_mock_cost = mock_tx_cost(0, 0, 0);
runtime.block_on(async {
Expand Down
98 changes: 76 additions & 22 deletions crates/astria-sequencer/src/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ use transactions_container::{
TimemarkedTransaction,
};

use crate::accounts;
use crate::{
accounts,
Metrics,
};

#[derive(Debug, Eq, PartialEq, Clone)]
pub(crate) enum RemovalReason {
Expand Down Expand Up @@ -133,11 +136,12 @@ pub(crate) struct Mempool {
parked: Arc<RwLock<ParkedTransactions<MAX_PARKED_TXS_PER_ACCOUNT>>>,
comet_bft_removal_cache: Arc<RwLock<RemovalCache>>,
contained_txs: Arc<RwLock<HashSet<[u8; 32]>>>,
metrics: &'static Metrics,
}

impl Mempool {
#[must_use]
pub(crate) fn new() -> Self {
pub(crate) fn new(metrics: &'static Metrics) -> Self {
Self {
pending: Arc::new(RwLock::new(PendingTransactions::new(TX_TTL))),
parked: Arc::new(RwLock::new(ParkedTransactions::new(TX_TTL))),
Expand All @@ -146,6 +150,7 @@ impl Mempool {
.expect("Removal cache cannot be zero sized"),
))),
contained_txs: Arc::new(RwLock::new(HashSet::new())),
metrics,
}
}

Expand All @@ -156,6 +161,30 @@ impl Mempool {
self.contained_txs.read().await.len()
}

/// Adds a transaction to the mempool's tracked transactions.
/// Will increment logic error metrics and log error if transaction is already present.
fn add_to_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) {
if !contained_txs.insert(tx_hash) {
self.metrics.increment_mempool_tx_logic_error();
error!(
"attempted to re-add transaction {tx_hash:?} to mempool's tracked container, is \
logic error"
);
}
}

/// Removes a transaction from the mempool's tracked transactions.
/// Will increment logic error metrics and log error if transaction is not present.
fn remove_from_contained_txs(&self, tx_hash: [u8; 32], contained_txs: &mut HashSet<[u8; 32]>) {
if !contained_txs.remove(&tx_hash) {
self.metrics.increment_mempool_tx_logic_error();
error!(
"attempted to remove transaction non present {tx_hash:?} from mempool's tracked \
container, is logic error"
);
}
}

/// Inserts a transaction into the mempool and does not allow for transaction replacement.
/// Will return the reason for insertion failure if failure occurs.
#[instrument(skip_all)]
Expand Down Expand Up @@ -214,18 +243,23 @@ impl Mempool {
);
// promote the transactions
for ttx in to_promote {
let tx_id = ttx.id();
if let Err(error) =
pending.add(ttx, current_account_nonce, &current_account_balances)
{
// remove from tracked
let mut contained_txs = self.contained_txs.write().await;
self.remove_from_contained_txs(timemarked_tx.id(), &mut contained_txs);
error!(
current_account_nonce,
"failed to promote transaction during insertion: {error:#}"
"failed to promote transaction {tx_id:?} during insertion: {error:#}"
);
}
}

// track in contained txs
self.contained_txs.write().await.insert(timemarked_tx.id());
let mut contained_txs = self.contained_txs.write().await;
self.add_to_contained_txs(timemarked_tx.id(), &mut contained_txs);

Ok(())
}
Expand Down Expand Up @@ -277,8 +311,9 @@ impl Mempool {
// Add the original tx first to preserve its reason for removal. The second
// attempt to add it inside the loop below will be a no-op.
removal_cache.add(tx_hash, reason);
let mut contained_txs = self.contained_txs.write().await;
for removed_tx in removed_txs {
self.contained_txs.write().await.remove(&removed_tx);
self.remove_from_contained_txs(removed_tx, &mut contained_txs);
removal_cache.add(removed_tx, RemovalReason::LowerNonceInvalidated);
}
}
Expand All @@ -292,7 +327,7 @@ impl Mempool {

/// Returns true if the transaction is tracked as inserted.
#[instrument(skip_all)]
pub(crate) async fn tracked(&self, tx_hash: [u8; 32]) -> bool {
pub(crate) async fn is_tracked(&self, tx_hash: [u8; 32]) -> bool {
self.contained_txs.read().await.contains(&tx_hash)
}

Expand Down Expand Up @@ -371,18 +406,29 @@ impl Mempool {
parked.find_promotables(&address, highest_pending_nonce, &remaining_balances);

for tx in promtion_txs {
let tx_id = tx.id();
if let Err(error) = pending.add(tx, current_nonce, &current_balances) {
// remove from tracked
let mut contained_txs = self.contained_txs.write().await;
self.remove_from_contained_txs(tx_id, &mut contained_txs);
// this shouldn't happen
self.metrics.increment_mempool_tx_logic_error();
error!(
current_nonce,
"failed to promote transaction during maintenance: {error:#}"
"failed to promote transaction {tx_id:?} during maintenance: {error:#}"
);
}
}
} else {
// add demoted transactions to parked
for tx in demotion_txs {
let tx_id = tx.id();
if let Err(err) = parked.add(tx, current_nonce, &current_balances) {
// remove from tracked
let mut contained_txs = self.contained_txs.write().await;
self.remove_from_contained_txs(tx_id, &mut contained_txs);
// this shouldn't happen
self.metrics.increment_mempool_tx_logic_error();
error!(
address = %telemetry::display::base64(&address),
"failed to demote transaction during maintenance: {err:#}"
Expand Down Expand Up @@ -428,6 +474,7 @@ impl Mempool {
#[cfg(test)]
mod tests {
use astria_core::crypto::SigningKey;
use telemetry::Metrics;

use super::*;
use crate::app::test_utils::{
Expand All @@ -441,7 +488,8 @@ mod tests {

#[tokio::test]
async fn insert() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 0);
Expand Down Expand Up @@ -502,7 +550,8 @@ mod tests {
// odder edge cases that can be hit if a node goes offline or fails to see
// some transactions that other nodes include into their proposed blocks.

let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let signing_address = signing_key.verification_key().address_bytes();
let account_balances = mock_balances(100, 100);
Expand Down Expand Up @@ -593,7 +642,8 @@ mod tests {

#[tokio::test]
async fn run_maintenance_promotion() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let signing_address = signing_key.verification_key().address_bytes();

Expand Down Expand Up @@ -660,7 +710,8 @@ mod tests {
#[allow(clippy::too_many_lines)]
#[tokio::test]
async fn run_maintenance_demotion() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let signing_address = signing_key.verification_key().address_bytes();

Expand Down Expand Up @@ -740,7 +791,8 @@ mod tests {

#[tokio::test]
async fn remove_invalid() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let account_balances = mock_balances(100, 100);
let tx_cost = mock_tx_cost(10, 10, 10);
Expand Down Expand Up @@ -843,7 +895,8 @@ mod tests {

#[tokio::test]
async fn should_get_pending_nonce() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key_0 = SigningKey::from([1; 32]);
let signing_key_1 = SigningKey::from([2; 32]);
let signing_key_2 = SigningKey::from([3; 32]);
Expand Down Expand Up @@ -967,7 +1020,8 @@ mod tests {

#[tokio::test]
async fn tx_tracked_set() {
let mempool = Mempool::new();
let metrics = Box::leak(Box::new(Metrics::noop_metrics(&()).unwrap()));
let mempool = Mempool::new(metrics);
let signing_key = SigningKey::from([1; 32]);
let signing_address = signing_key.verification_key().address_bytes();
let account_balances = mock_balances(100, 100);
Expand All @@ -981,23 +1035,23 @@ mod tests {
.insert(tx1.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
assert!(mempool.tracked(tx1.id().get()).await);
assert!(mempool.is_tracked(tx1.id().get()).await);

// check that the pending transaction is in the tracked set
mempool
.insert(tx0.clone(), 0, account_balances.clone(), tx_cost.clone())
.await
.unwrap();
assert!(mempool.tracked(tx0.id().get()).await);
assert!(mempool.is_tracked(tx0.id().get()).await);

// remove the transactions from the mempool
mempool
.remove_tx_invalid(tx0.clone(), RemovalReason::Expired)
.await;

// check that the transactions are not in the tracked set
assert!(!mempool.tracked(tx0.id().get()).await);
assert!(!mempool.tracked(tx1.id().get()).await);
assert!(!mempool.is_tracked(tx0.id().get()).await);
assert!(!mempool.is_tracked(tx1.id().get()).await);

// re-insert the transactions into the mempool
mempool
Expand All @@ -1010,16 +1064,16 @@ mod tests {
.unwrap();

// check that the transactions are in the tracked set
assert!(mempool.tracked(tx0.id().get()).await);
assert!(mempool.tracked(tx1.id().get()).await);
assert!(mempool.is_tracked(tx0.id().get()).await);
assert!(mempool.is_tracked(tx1.id().get()).await);

// remove the transacitons from the mempool via maintenance
let mut mock_state = mock_state_getter().await;
mock_state_put_account_nonce(&mut mock_state, signing_address, 2);
mempool.run_maintenance(&mock_state, false).await;

// check that the transactions are not in the tracked set
assert!(!mempool.tracked(tx0.id().get()).await);
assert!(!mempool.tracked(tx1.id().get()).await);
assert!(!mempool.is_tracked(tx0.id().get()).await);
assert!(!mempool.is_tracked(tx1.id().get()).await);
}
}
Loading

0 comments on commit af24538

Please sign in to comment.