diff --git a/crates/astria-composer/src/executor/mod.rs b/crates/astria-composer/src/executor/mod.rs index e2b35e734..c0e60240f 100644 --- a/crates/astria-composer/src/executor/mod.rs +++ b/crates/astria-composer/src/executor/mod.rs @@ -40,7 +40,10 @@ use futures::{ use pin_project_lite::pin_project; use prost::Message as _; use sequencer_client::{ - tendermint_rpc::endpoint::broadcast::tx_sync, + tendermint_rpc::{ + endpoint::broadcast::tx_sync, + Client as _, + }, Address, SequencerClientExt as _, }; @@ -48,8 +51,10 @@ use tendermint::crypto::Sha256; use tokio::{ select, sync::{ - mpsc, - mpsc::error::SendTimeoutError, + mpsc::{ + self, + error::SendTimeoutError, + }, watch, }, time::{ @@ -94,7 +99,16 @@ pub(crate) use builder::Builder; const BUNDLE_DRAINING_DURATION: Duration = Duration::from_secs(16); type StdError = dyn std::error::Error; - +#[derive(Debug, thiserror::Error)] +pub(crate) enum EnsureChainIdError { + #[error("failed to obtain sequencer chain ID after multiple retries")] + GetChainId(#[source] sequencer_client::tendermint_rpc::Error), + #[error("expected chain ID `{expected}`, but received `{actual}`")] + WrongChainId { + expected: String, + actual: tendermint::chain::Id, + }, +} /// The `Executor` interfaces with the sequencer. It handles account nonces, transaction signing, /// and transaction submission. /// The `Executor` receives `Vec` from the bundling logic, packages them with a nonce into @@ -199,6 +213,17 @@ impl Executor { /// An error is returned if connecting to the sequencer fails. #[instrument(skip_all, fields(address = %self.address))] pub(super) async fn run_until_stopped(mut self) -> eyre::Result<()> { + select!( + biased; + () = self.shutdown_token.cancelled() => { + info!("received shutdown signal while running initialization routines; exiting"); + return Ok(()); + } + + res = self.pre_run_checks() => { + res.wrap_err("required pre-run checks failed")?; + } + ); let mut submission_fut: Fuse> = Fuse::terminated(); let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics) .await @@ -417,6 +442,57 @@ impl Executor { reason.map(|_| ()) } + + /// Performs initialization checks prior to running the executor + async fn pre_run_checks(&self) -> eyre::Result<()> { + self.ensure_chain_id_is_correct().await?; + Ok(()) + } + + /// Performs check to ensure the configured chain ID matches the remote chain ID + pub(crate) async fn ensure_chain_id_is_correct(&self) -> Result<(), EnsureChainIdError> { + let remote_chain_id = self + .get_sequencer_chain_id() + .await + .map_err(EnsureChainIdError::GetChainId)?; + if remote_chain_id.as_str() != self.sequencer_chain_id { + return Err(EnsureChainIdError::WrongChainId { + expected: self.sequencer_chain_id.clone(), + actual: remote_chain_id, + }); + } + Ok(()) + } + + /// Fetch chain id from the sequencer client + async fn get_sequencer_chain_id( + &self, + ) -> Result { + let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, + next_delay: Option, + error: &sequencer_client::tendermint_rpc::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to fetch sequencer genesis info; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + let client_genesis: tendermint::Genesis = + tryhard::retry_fn(|| self.sequencer_client.genesis()) + .with_config(retry_config) + .await?; + Ok(client_genesis.chain_id) + } } /// Queries the sequencer for the latest nonce with an exponential backoff diff --git a/crates/astria-composer/src/executor/tests.rs b/crates/astria-composer/src/executor/tests.rs index 4dd4f15c6..463cddfd1 100644 --- a/crates/astria-composer/src/executor/tests.rs +++ b/crates/astria-composer/src/executor/tests.rs @@ -4,6 +4,7 @@ use std::{ }; use astria_core::{ + generated::protocol::account::v1alpha1::NonceResponse, primitive::v1::{ RollupId, ROLLUP_ID_LEN, @@ -16,6 +17,17 @@ use prost::Message; use sequencer_client::SignedTransaction; use serde_json::json; use tempfile::NamedTempFile; +use tendermint::{ + consensus::{ + params::{ + AbciParams, + ValidatorParams, + }, + Params, + }, + Genesis, + Time, +}; use tendermint_rpc::{ endpoint::broadcast::tx_sync, request, @@ -42,6 +54,7 @@ use wiremock::{ use crate::{ executor, + executor::EnsureChainIdError, metrics::Metrics, test_utils::sequence_action_of_max_size, Config, @@ -74,19 +87,9 @@ fn sequence_action() -> SequenceAction { } /// Start a mock sequencer server and mount a mock for the `accounts/nonce` query. -async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) { - use astria_core::generated::protocol::account::v1alpha1::NonceResponse; +async fn setup() -> (MockServer, Config, NamedTempFile) { Lazy::force(&TELEMETRY); let server = MockServer::start().await; - let startup_guard = mount_nonce_query_mock( - &server, - "accounts/nonce", - NonceResponse { - height: 0, - nonce: 0, - }, - ) - .await; let keyfile = NamedTempFile::new().unwrap(); (&keyfile) @@ -112,15 +115,37 @@ async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) { grpc_addr: "127.0.0.1:0".parse().unwrap(), fee_asset: "nria".parse().unwrap(), }; - (server, startup_guard, cfg, keyfile) + (server, cfg, keyfile) +} + +/// Assert that given error is of correct type and contains the expected chain IDs. +#[track_caller] +fn assert_chain_id_err( + err: &EnsureChainIdError, + configured_expected: &str, + configured_actual: &tendermint::chain::Id, +) { + match err { + EnsureChainIdError::WrongChainId { + expected, + actual, + } => { + assert_eq!(*expected, configured_expected); + assert_eq!(*actual, *configured_actual); + } + other @ EnsureChainIdError::GetChainId(_) => { + panic!("expected `EnsureChainIdError::WrongChainId`, but got '{other:?}'") + } + } } /// Mount a mock for the `abci_query` endpoint. -async fn mount_nonce_query_mock( - server: &MockServer, - query_path: &str, - response: impl Message, -) -> MockGuard { +async fn mount_default_nonce_query_mock(server: &MockServer) -> MockGuard { + let query_path = "accounts/nonce"; + let response = NonceResponse { + height: 0, + nonce: 0, + }; let expected_body = json!({ "method": "abci_query" }); @@ -191,6 +216,53 @@ async fn mount_broadcast_tx_sync_seq_actions_mock(server: &MockServer) -> MockGu .await } +/// Mounts genesis file with specified sequencer chain ID +async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) { + Mock::given(body_partial_json( + json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), + )) + .respond_with(ResponseTemplate::new(200).set_body_json( + tendermint_rpc::response::Wrapper::new_with_id( + tendermint_rpc::Id::uuid_v4(), + Some( + tendermint_rpc::endpoint::genesis::Response:: { + genesis: Genesis { + genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), + chain_id: mock_sequencer_chain_id.try_into().unwrap(), + initial_height: 1, + consensus_params: Params { + block: tendermint::block::Size { + max_bytes: 1024, + max_gas: 1024, + time_iota_ms: 1000, + }, + evidence: tendermint::evidence::Params { + max_age_num_blocks: 1000, + max_age_duration: tendermint::evidence::Duration( + Duration::from_secs(3600), + ), + max_bytes: 1_048_576, + }, + validator: ValidatorParams { + pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], + }, + version: None, + abci: AbciParams::default(), + }, + validators: vec![], + app_hash: tendermint::hash::AppHash::default(), + app_state: serde_json::Value::Null, + }, + }, + ), + None, + ), + )) + .expect(1..) + .mount(server) + .await; +} + /// Helper to wait for the executor to connect to the mock sequencer async fn wait_for_startup( mut status: watch::Receiver, @@ -211,14 +283,16 @@ async fn wait_for_startup( Ok(()) } + /// Test to check that the executor sends a signed transaction to the sequencer as soon as it /// receives a `SequenceAction` that fills it beyond its `max_bundle_size`. #[tokio::test] async fn full_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; + let (sequencer, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); + mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -233,6 +307,7 @@ async fn full_bundle() { .build() .unwrap(); + let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; let status = executor.subscribe(); let _executor_task = tokio::spawn(executor.run_until_stopped()); @@ -305,9 +380,10 @@ async fn full_bundle() { #[tokio::test] async fn bundle_triggered_by_block_timer() { // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; + let (sequencer, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); + mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -322,6 +398,7 @@ async fn bundle_triggered_by_block_timer() { .build() .unwrap(); + let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; let status = executor.subscribe(); let _executor_task = tokio::spawn(executor.run_until_stopped()); @@ -391,9 +468,10 @@ async fn bundle_triggered_by_block_timer() { #[tokio::test] async fn two_seq_actions_single_bundle() { // set up the executor, channel for writing seq actions, and the sequencer mock - let (sequencer, nonce_guard, cfg, _keyfile) = setup().await; + let (sequencer, cfg, _keyfile) = setup().await; let shutdown_token = CancellationToken::new(); let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); + mount_genesis(&sequencer, &cfg.sequencer_chain_id).await; let (executor, executor_handle) = executor::Builder { sequencer_url: cfg.sequencer_url.clone(), sequencer_chain_id: cfg.sequencer_chain_id.clone(), @@ -408,8 +486,8 @@ async fn two_seq_actions_single_bundle() { .build() .unwrap(); + let nonce_guard = mount_default_nonce_query_mock(&sequencer).await; let status = executor.subscribe(); - let _executor_task = tokio::spawn(executor.run_until_stopped()); // wait for sequencer to get the initial nonce request from sequencer @@ -481,3 +559,56 @@ async fn two_seq_actions_single_bundle() { ); } } + +/// Test to check that executor's chain ID check is properly checked against the sequencer's chain +/// ID +#[tokio::test] +async fn chain_id_mismatch_returns_error() { + use tendermint::chain::Id; + + // set up sequencer mock + let (sequencer, cfg, _keyfile) = setup().await; + let shutdown_token = CancellationToken::new(); + let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys()))); + + // mount a status response with an incorrect chain_id + mount_genesis(&sequencer, "bad-chain-id").await; + + // build the executor with the correct chain_id + let (executor, _executor_handle) = executor::Builder { + sequencer_url: cfg.sequencer_url.clone(), + sequencer_chain_id: cfg.sequencer_chain_id.clone(), + private_key_file: cfg.private_key_file.clone(), + sequencer_address_prefix: cfg.sequencer_address_prefix.clone(), + block_time_ms: cfg.block_time_ms, + max_bytes_per_bundle: cfg.max_bytes_per_bundle, + bundle_queue_capacity: cfg.bundle_queue_capacity, + shutdown_token: shutdown_token.clone(), + metrics, + } + .build() + .unwrap(); + + // ensure that run_until_stopped returns WrongChainId error + let err = executor.run_until_stopped().await.expect_err( + "should exit with an error when reading a bad chain ID, but exited with success", + ); + let mut found = false; + for cause in err.chain() { + if let Some(err) = cause.downcast_ref::() { + assert_chain_id_err( + err, + &cfg.sequencer_chain_id, + &Id::try_from("bad-chain-id".to_string()).unwrap(), + ); + found = true; + break; + } + } + + // ensure that the error chain contains the expected error + assert!( + found, + "expected `EnsureChainIdError::WrongChainId` in error chain, but it was not found" + ); +} diff --git a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs b/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs index 548633c27..546fce21d 100644 --- a/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs +++ b/crates/astria-composer/tests/blackbox/helper/mock_sequencer.rs @@ -1,5 +1,18 @@ +use std::time::Duration; + use prost::Message; use serde_json::json; +use tendermint::{ + consensus::{ + params::{ + AbciParams, + ValidatorParams, + }, + Params, + }, + Genesis, + Time, +}; use tendermint_rpc::{ response, Id, @@ -27,6 +40,7 @@ pub async fn start() -> (MockServer, MockGuard) { }, ) .await; + mount_genesis(&server, "test-chain-1").await; (server, startup_guard) } @@ -57,3 +71,49 @@ pub async fn mount_abci_query_mock( .mount_as_scoped(server) .await } + +async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) { + Mock::given(body_partial_json( + json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), + )) + .respond_with(ResponseTemplate::new(200).set_body_json( + tendermint_rpc::response::Wrapper::new_with_id( + tendermint_rpc::Id::uuid_v4(), + Some( + tendermint_rpc::endpoint::genesis::Response:: { + genesis: Genesis { + genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), + chain_id: mock_sequencer_chain_id.try_into().unwrap(), + initial_height: 1, + consensus_params: Params { + block: tendermint::block::Size { + max_bytes: 1024, + max_gas: 1024, + time_iota_ms: 1000, + }, + evidence: tendermint::evidence::Params { + max_age_num_blocks: 1000, + max_age_duration: tendermint::evidence::Duration( + Duration::from_secs(3600), + ), + max_bytes: 1_048_576, + }, + validator: ValidatorParams { + pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], + }, + version: None, + abci: AbciParams::default(), + }, + validators: vec![], + app_hash: tendermint::hash::AppHash::default(), + app_state: serde_json::Value::Null, + }, + }, + ), + None, + ), + )) + .expect(1..) + .mount(server) + .await; +}