Skip to content

Commit

Permalink
add cache for immutable check-tx results in sequencer
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraser999 committed Jul 15, 2024
1 parent 1555c03 commit 9046a25
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 154 deletions.
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/astria-sequencer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ anyhow = "1"
borsh = { version = "1", features = ["derive"] }
matchit = "0.7.2"
priority-queue = "2.0.2"
quick_cache = "0.6.0"
tower = "0.4"
tower-abci = "0.12.0"
tower-actor = "0.1.0"
Expand Down
6 changes: 4 additions & 2 deletions crates/astria-sequencer/src/app/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use astria_core::{
crypto::SigningKey,
primitive::v1::{
Expand Down Expand Up @@ -152,7 +154,7 @@ pub(crate) async fn initialize_app(
app
}

pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
pub(crate) fn get_mock_tx(nonce: u32) -> Arc<SignedTransaction> {
let (alice_signing_key, _) = get_alice_signing_key_and_address();
let tx = UnsignedTransaction {
params: TransactionParams::builder()
Expand All @@ -169,5 +171,5 @@ pub(crate) fn get_mock_tx(nonce: u32) -> SignedTransaction {
],
};

tx.into_signed(&alice_signing_key)
Arc::new(tx.into_signed(&alice_signing_key))
}
10 changes: 5 additions & 5 deletions crates/astria-sequencer/src/app/tests_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ async fn app_execution_results_match_proposal_vs_after_proposal() {
// don't commit the result, now call prepare_proposal with the same data.
// this will reset the app state.
// this simulates executing the same block as a validator (specifically the proposer).
app.mempool.insert(signed_tx, 0).await.unwrap();
app.mempool.insert(Arc::new(signed_tx), 0).await.unwrap();

let proposer_address = [88u8; 20].to_vec().try_into().unwrap();
let prepare_proposal = PrepareProposal {
Expand Down Expand Up @@ -553,8 +553,8 @@ async fn app_prepare_proposal_cometbft_max_bytes_overflow_ok() {
}
.into_signed(&alice_signing_key);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand Down Expand Up @@ -626,8 +626,8 @@ async fn app_prepare_proposal_sequencer_max_bytes_overflow_ok() {
}
.into_signed(&alice_signing_key);

app.mempool.insert(tx_pass, 0).await.unwrap();
app.mempool.insert(tx_overflow, 0).await.unwrap();
app.mempool.insert(Arc::new(tx_pass), 0).await.unwrap();
app.mempool.insert(Arc::new(tx_overflow), 0).await.unwrap();

// send to prepare_proposal
let prepare_args = abci::request::PrepareProposal {
Expand Down
33 changes: 16 additions & 17 deletions crates/astria-sequencer/src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,11 @@ pub(crate) struct EnqueuedTransaction {
}

impl EnqueuedTransaction {
fn new(signed_tx: SignedTransaction) -> Self {
fn new(signed_tx: Arc<SignedTransaction>) -> Self {
let address = crate::address::base_prefixed(signed_tx.verification_key().address_bytes());
Self {
tx_hash: signed_tx.sha256_of_proto_encoding(),
signed_tx: Arc::new(signed_tx),
signed_tx,
address,
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ pub(crate) enum RemovalReason {
FailedPrepareProposal(String),
}

const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes
const TX_TTL: Duration = Duration::from_secs(600); // 10 minutes
const REMOVAL_CACHE_SIZE: usize = 4096;

/// `RemovalCache` is used to signal to `CometBFT` that a
Expand Down Expand Up @@ -230,7 +230,7 @@ impl Mempool {
/// note: the oldest timestamp from found priorities is maintained.
pub(crate) async fn insert(
&self,
tx: SignedTransaction,
tx: Arc<SignedTransaction>,
current_account_nonce: u32,
) -> anyhow::Result<()> {
let enqueued_tx = EnqueuedTransaction::new(tx);
Expand Down Expand Up @@ -310,10 +310,7 @@ impl Mempool {

/// checks if a transaction was flagged to be removed from the `CometBFT` mempool
/// and removes entry
pub(crate) async fn check_removed_comet_bft(
&mut self,
tx_hash: [u8; 32],
) -> Option<RemovalReason> {
pub(crate) async fn check_removed_comet_bft(&self, tx_hash: [u8; 32]) -> Option<RemovalReason> {
self.comet_bft_removal_cache.write().await.remove(tx_hash)
}

Expand Down Expand Up @@ -497,21 +494,21 @@ mod test {
// Check enqueued txs compare equal if and only if their tx hashes are equal.
let tx0 = EnqueuedTransaction {
tx_hash: [0; 32],
signed_tx: Arc::new(get_mock_tx(0)),
signed_tx: get_mock_tx(0),
address: crate::address::base_prefixed(
get_mock_tx(0).verification_key().address_bytes(),
),
};
let other_tx0 = EnqueuedTransaction {
tx_hash: [0; 32],
signed_tx: Arc::new(get_mock_tx(1)),
signed_tx: get_mock_tx(1),
address: crate::address::base_prefixed(
get_mock_tx(1).verification_key().address_bytes(),
),
};
let tx1 = EnqueuedTransaction {
tx_hash: [1; 32],
signed_tx: Arc::new(get_mock_tx(0)),
signed_tx: get_mock_tx(0),
address: crate::address::base_prefixed(
get_mock_tx(0).verification_key().address_bytes(),
),
Expand Down Expand Up @@ -610,16 +607,17 @@ mod test {

// Insert txs from a different signer with nonces 100 and 102.
let other_signing_key = SigningKey::from([1; 32]);
let other_mock_tx = |nonce: u32| -> SignedTransaction {
let other_mock_tx = |nonce: u32| -> Arc<SignedTransaction> {
let actions = get_mock_tx(0).actions().to_vec();
UnsignedTransaction {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions,
}
.into_signed(&other_signing_key)
.into_signed(&other_signing_key);
Arc::new(tx)
};
mempool.insert(other_mock_tx(100), 0).await.unwrap();
mempool.insert(other_mock_tx(102), 0).await.unwrap();
Expand Down Expand Up @@ -748,16 +746,17 @@ mod test {

// Insert txs from a different signer with nonces 100 and 101.
let other_signing_key = SigningKey::from([1; 32]);
let other_mock_tx = |nonce: u32| -> SignedTransaction {
let other_mock_tx = |nonce: u32| -> Arc<SignedTransaction> {
let actions = get_mock_tx(0).actions().to_vec();
UnsignedTransaction {
let tx = UnsignedTransaction {
params: TransactionParams::builder()
.nonce(nonce)
.chain_id("test")
.build(),
actions,
}
.into_signed(&other_signing_key)
.into_signed(&other_signing_key);
Arc::new(tx)
};
mempool.insert(other_mock_tx(100), 0).await.unwrap();
mempool.insert(other_mock_tx(101), 0).await.unwrap();
Expand Down
32 changes: 32 additions & 0 deletions crates/astria-sequencer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub(crate) struct Metrics {
check_tx_duration_seconds_check_balance: Histogram,
check_tx_duration_seconds_check_removed: Histogram,
check_tx_duration_seconds_insert_to_app_mempool: Histogram,
check_tx_cache_hit: Counter,
check_tx_cache_miss: Counter,
actions_per_transaction_in_mempool: Histogram,
transaction_in_mempool_size_bytes: Histogram,
transactions_in_mempool_total: Gauge,
Expand Down Expand Up @@ -184,6 +186,20 @@ impl Metrics {
CHECK_TX_STAGE => "insert to app mempool"
);

describe_counter!(
CHECK_TX_CACHE_HIT,
Unit::Count,
"The number of `check_tx` attempts which have been retrieved from the cached results"
);
let check_tx_cache_hit = counter!(CHECK_TX_CACHE_HIT);

describe_counter!(
CHECK_TX_CACHE_MISS,
Unit::Count,
"The number of `check_tx` rechecks which have not been found in the cached results"
);
let check_tx_cache_miss = counter!(CHECK_TX_CACHE_MISS);

describe_histogram!(
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
Unit::Count,
Expand Down Expand Up @@ -226,6 +242,8 @@ impl Metrics {
check_tx_duration_seconds_check_balance,
check_tx_duration_seconds_check_removed,
check_tx_duration_seconds_insert_to_app_mempool,
check_tx_cache_hit,
check_tx_cache_miss,
actions_per_transaction_in_mempool,
transaction_in_mempool_size_bytes,
transactions_in_mempool_total,
Expand Down Expand Up @@ -329,6 +347,14 @@ impl Metrics {
.record(duration);
}

pub(crate) fn increment_check_tx_cache_hit(&self) {
self.check_tx_cache_hit.increment(1);
}

pub(crate) fn increment_check_tx_cache_miss(&self) {
self.check_tx_cache_miss.increment(1);
}

pub(crate) fn record_actions_per_transaction_in_mempool(&self, count: usize) {
// allow: precision loss is unlikely (values too small) but also unimportant in histograms.
#[allow(clippy::cast_precision_loss)]
Expand Down Expand Up @@ -362,6 +388,8 @@ metric_names!(pub const METRICS_NAMES:
CHECK_TX_REMOVED_STALE_NONCE,
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
CHECK_TX_DURATION_SECONDS,
CHECK_TX_CACHE_HIT,
CHECK_TX_CACHE_MISS,
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
TRANSACTION_IN_MEMPOOL_SIZE_BYTES,
TRANSACTIONS_IN_MEMPOOL_TOTAL
Expand All @@ -371,6 +399,8 @@ metric_names!(pub const METRICS_NAMES:
mod tests {
use super::{
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
CHECK_TX_CACHE_HIT,
CHECK_TX_CACHE_MISS,
CHECK_TX_DURATION_SECONDS,
CHECK_TX_REMOVED_ACCOUNT_BALANCE,
CHECK_TX_REMOVED_EXPIRED,
Expand Down Expand Up @@ -437,6 +467,8 @@ mod tests {
"check_tx_removed_account_balance",
);
assert_const(CHECK_TX_DURATION_SECONDS, "check_tx_duration_seconds");
assert_const(CHECK_TX_CACHE_HIT, "check_tx_cache_hit");
assert_const(CHECK_TX_CACHE_MISS, "check_tx_cache_miss");
assert_const(
ACTIONS_PER_TRANSACTION_IN_MEMPOOL,
"actions_per_transaction_in_mempool",
Expand Down
8 changes: 6 additions & 2 deletions crates/astria-sequencer/src/service/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ mod test {
use std::{
collections::HashMap,
str::FromStr,
sync::Arc,
};

use astria_core::{
Expand Down Expand Up @@ -311,7 +312,10 @@ mod test {
let signed_tx = tx.into_signed(&signing_key);
let tx_bytes = signed_tx.clone().into_raw().encode_to_vec();
let txs = vec![tx_bytes.into()];
mempool.insert(signed_tx.clone(), 0).await.unwrap();
mempool
.insert(Arc::new(signed_tx.clone()), 0)
.await
.unwrap();

let res = generate_rollup_datas_commitment(&vec![signed_tx], HashMap::new());

Expand Down Expand Up @@ -530,7 +534,7 @@ mod test {
.await
.unwrap();

mempool.insert(signed_tx, 0).await.unwrap();
mempool.insert(Arc::new(signed_tx), 0).await.unwrap();
let finalize_block = request::FinalizeBlock {
hash: Hash::try_from([0u8; 32].to_vec()).unwrap(),
height: 1u32.into(),
Expand Down
Loading

0 comments on commit 9046a25

Please sign in to comment.