Skip to content

Commit

Permalink
feat(composer)!: Add chain_id check on executor build (#1175)
Browse files Browse the repository at this point in the history
## Summary
Added check to ensure the executor's configured chain_id and the
sequencer client's actual chain_id match.

## Background
This check ensures that the composer is communicating with the sequencer
RPC on the correct network.

## Changes
Added chain_id check to the composer executor builder.

Made executor builder async to accommodate for async communication with
sequencer client.

Moved nonce guard declaration from mock startup to individual tests in
executor tests.

Added function to mount a status response with a given chain_id to the
mock. This is so the executor builder can gather the status response
without throwing an HTTP 404 error.

## Testing
Added executor test to validate the failure of an executor build with a
chain_id mismatch.

## Related Issues
Part of #986

---------

Co-authored-by: Richard Janis Goldschmidt <github@aberrat.io>
Co-authored-by: Ethan Oroshiba <ethan@astria.org>
  • Loading branch information
3 people committed Jul 22, 2024
1 parent a6d3d96 commit b8f26d2
Show file tree
Hide file tree
Showing 3 changed files with 292 additions and 25 deletions.
84 changes: 80 additions & 4 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@ use futures::{
use pin_project_lite::pin_project;
use prost::Message as _;
use sequencer_client::{
tendermint_rpc::endpoint::broadcast::tx_sync,
tendermint_rpc::{
endpoint::broadcast::tx_sync,
Client as _,
},
Address,
SequencerClientExt as _,
};
use tendermint::crypto::Sha256;
use tokio::{
select,
sync::{
mpsc,
mpsc::error::SendTimeoutError,
mpsc::{
self,
error::SendTimeoutError,
},
watch,
},
time::{
Expand Down Expand Up @@ -94,7 +99,16 @@ pub(crate) use builder::Builder;
const BUNDLE_DRAINING_DURATION: Duration = Duration::from_secs(16);

type StdError = dyn std::error::Error;

#[derive(Debug, thiserror::Error)]
pub(crate) enum EnsureChainIdError {
#[error("failed to obtain sequencer chain ID after multiple retries")]
GetChainId(#[source] sequencer_client::tendermint_rpc::Error),
#[error("expected chain ID `{expected}`, but received `{actual}`")]
WrongChainId {
expected: String,
actual: tendermint::chain::Id,
},
}
/// The `Executor` interfaces with the sequencer. It handles account nonces, transaction signing,
/// and transaction submission.
/// The `Executor` receives `Vec<Action>` from the bundling logic, packages them with a nonce into
Expand Down Expand Up @@ -199,6 +213,17 @@ impl Executor {
/// An error is returned if connecting to the sequencer fails.
#[instrument(skip_all, fields(address = %self.address))]
pub(super) async fn run_until_stopped(mut self) -> eyre::Result<()> {
select!(
biased;
() = self.shutdown_token.cancelled() => {
info!("received shutdown signal while running initialization routines; exiting");
return Ok(());
}

res = self.pre_run_checks() => {
res.wrap_err("required pre-run checks failed")?;
}
);
let mut submission_fut: Fuse<Instrumented<SubmitFut>> = Fuse::terminated();
let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics)
.await
Expand Down Expand Up @@ -417,6 +442,57 @@ impl Executor {

reason.map(|_| ())
}

/// Performs initialization checks prior to running the executor
async fn pre_run_checks(&self) -> eyre::Result<()> {
self.ensure_chain_id_is_correct().await?;
Ok(())
}

/// Performs check to ensure the configured chain ID matches the remote chain ID
pub(crate) async fn ensure_chain_id_is_correct(&self) -> Result<(), EnsureChainIdError> {
let remote_chain_id = self
.get_sequencer_chain_id()
.await
.map_err(EnsureChainIdError::GetChainId)?;
if remote_chain_id.as_str() != self.sequencer_chain_id {
return Err(EnsureChainIdError::WrongChainId {
expected: self.sequencer_chain_id.clone(),
actual: remote_chain_id,
});
}
Ok(())
}

/// Fetch chain id from the sequencer client
async fn get_sequencer_chain_id(
&self,
) -> Result<tendermint::chain::Id, sequencer_client::tendermint_rpc::Error> {
let retry_config = tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
|attempt: u32,
next_delay: Option<Duration>,
error: &sequencer_client::tendermint_rpc::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to fetch sequencer genesis info; retrying after backoff",
);
futures::future::ready(())
},
);
let client_genesis: tendermint::Genesis =
tryhard::retry_fn(|| self.sequencer_client.genesis())
.with_config(retry_config)
.await?;
Ok(client_genesis.chain_id)
}
}

/// Queries the sequencer for the latest nonce with an exponential backoff
Expand Down
173 changes: 152 additions & 21 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use astria_core::{
generated::protocol::account::v1alpha1::NonceResponse,
primitive::v1::{
RollupId,
ROLLUP_ID_LEN,
Expand All @@ -16,6 +17,17 @@ use prost::Message;
use sequencer_client::SignedTransaction;
use serde_json::json;
use tempfile::NamedTempFile;
use tendermint::{
consensus::{
params::{
AbciParams,
ValidatorParams,
},
Params,
},
Genesis,
Time,
};
use tendermint_rpc::{
endpoint::broadcast::tx_sync,
request,
Expand All @@ -42,6 +54,7 @@ use wiremock::{

use crate::{
executor,
executor::EnsureChainIdError,
metrics::Metrics,
test_utils::sequence_action_of_max_size,
Config,
Expand Down Expand Up @@ -74,19 +87,9 @@ fn sequence_action() -> SequenceAction {
}

/// Start a mock sequencer server and mount a mock for the `accounts/nonce` query.
async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) {
use astria_core::generated::protocol::account::v1alpha1::NonceResponse;
async fn setup() -> (MockServer, Config, NamedTempFile) {
Lazy::force(&TELEMETRY);
let server = MockServer::start().await;
let startup_guard = mount_nonce_query_mock(
&server,
"accounts/nonce",
NonceResponse {
height: 0,
nonce: 0,
},
)
.await;

let keyfile = NamedTempFile::new().unwrap();
(&keyfile)
Expand All @@ -112,15 +115,37 @@ async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) {
grpc_addr: "127.0.0.1:0".parse().unwrap(),
fee_asset: "nria".parse().unwrap(),
};
(server, startup_guard, cfg, keyfile)
(server, cfg, keyfile)
}

/// Assert that given error is of correct type and contains the expected chain IDs.
#[track_caller]
fn assert_chain_id_err(
err: &EnsureChainIdError,
configured_expected: &str,
configured_actual: &tendermint::chain::Id,
) {
match err {
EnsureChainIdError::WrongChainId {
expected,
actual,
} => {
assert_eq!(*expected, configured_expected);
assert_eq!(*actual, *configured_actual);
}
other @ EnsureChainIdError::GetChainId(_) => {
panic!("expected `EnsureChainIdError::WrongChainId`, but got '{other:?}'")
}
}
}

/// Mount a mock for the `abci_query` endpoint.
async fn mount_nonce_query_mock(
server: &MockServer,
query_path: &str,
response: impl Message,
) -> MockGuard {
async fn mount_default_nonce_query_mock(server: &MockServer) -> MockGuard {
let query_path = "accounts/nonce";
let response = NonceResponse {
height: 0,
nonce: 0,
};
let expected_body = json!({
"method": "abci_query"
});
Expand Down Expand Up @@ -191,6 +216,53 @@ async fn mount_broadcast_tx_sync_seq_actions_mock(server: &MockServer) -> MockGu
.await
}

/// Mounts genesis file with specified sequencer chain ID
async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) {
Mock::given(body_partial_json(
json!({"jsonrpc": "2.0", "method": "genesis", "params": null}),
))
.respond_with(ResponseTemplate::new(200).set_body_json(
tendermint_rpc::response::Wrapper::new_with_id(
tendermint_rpc::Id::uuid_v4(),
Some(
tendermint_rpc::endpoint::genesis::Response::<serde_json::Value> {
genesis: Genesis {
genesis_time: Time::from_unix_timestamp(1, 1).unwrap(),
chain_id: mock_sequencer_chain_id.try_into().unwrap(),
initial_height: 1,
consensus_params: Params {
block: tendermint::block::Size {
max_bytes: 1024,
max_gas: 1024,
time_iota_ms: 1000,
},
evidence: tendermint::evidence::Params {
max_age_num_blocks: 1000,
max_age_duration: tendermint::evidence::Duration(
Duration::from_secs(3600),
),
max_bytes: 1_048_576,
},
validator: ValidatorParams {
pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519],
},
version: None,
abci: AbciParams::default(),
},
validators: vec![],
app_hash: tendermint::hash::AppHash::default(),
app_state: serde_json::Value::Null,
},
},
),
None,
),
))
.expect(1..)
.mount(server)
.await;
}

/// Helper to wait for the executor to connect to the mock sequencer
async fn wait_for_startup(
mut status: watch::Receiver<executor::Status>,
Expand All @@ -211,14 +283,16 @@ async fn wait_for_startup(

Ok(())
}

/// Test to check that the executor sends a signed transaction to the sequencer as soon as it
/// receives a `SequenceAction` that fills it beyond its `max_bundle_size`.
#[tokio::test]
async fn full_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -233,6 +307,7 @@ async fn full_bundle() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());
Expand Down Expand Up @@ -305,9 +380,10 @@ async fn full_bundle() {
#[tokio::test]
async fn bundle_triggered_by_block_timer() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -322,6 +398,7 @@ async fn bundle_triggered_by_block_timer() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());
Expand Down Expand Up @@ -391,9 +468,10 @@ async fn bundle_triggered_by_block_timer() {
#[tokio::test]
async fn two_seq_actions_single_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -408,8 +486,8 @@ async fn two_seq_actions_single_bundle() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());

// wait for sequencer to get the initial nonce request from sequencer
Expand Down Expand Up @@ -481,3 +559,56 @@ async fn two_seq_actions_single_bundle() {
);
}
}

/// Test to check that executor's chain ID check is properly checked against the sequencer's chain
/// ID
#[tokio::test]
async fn chain_id_mismatch_returns_error() {
use tendermint::chain::Id;

// set up sequencer mock
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));

// mount a status response with an incorrect chain_id
mount_genesis(&sequencer, "bad-chain-id").await;

// build the executor with the correct chain_id
let (executor, _executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
private_key_file: cfg.private_key_file.clone(),
sequencer_address_prefix: cfg.sequencer_address_prefix.clone(),
block_time_ms: cfg.block_time_ms,
max_bytes_per_bundle: cfg.max_bytes_per_bundle,
bundle_queue_capacity: cfg.bundle_queue_capacity,
shutdown_token: shutdown_token.clone(),
metrics,
}
.build()
.unwrap();

// ensure that run_until_stopped returns WrongChainId error
let err = executor.run_until_stopped().await.expect_err(
"should exit with an error when reading a bad chain ID, but exited with success",
);
let mut found = false;
for cause in err.chain() {
if let Some(err) = cause.downcast_ref::<EnsureChainIdError>() {
assert_chain_id_err(
err,
&cfg.sequencer_chain_id,
&Id::try_from("bad-chain-id".to_string()).unwrap(),
);
found = true;
break;
}
}

// ensure that the error chain contains the expected error
assert!(
found,
"expected `EnsureChainIdError::WrongChainId` in error chain, but it was not found"
);
}
Loading

0 comments on commit b8f26d2

Please sign in to comment.