Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(composer)!: Add chain_id check on executor build #1175

Merged
merged 38 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ee76511
feat(composer): check chain_id against sequencer_chain_id
eoroshiba May 31, 2024
6259c3a
feat(composer): check sequencer_chain_id against client chain_id
eoroshiba Jun 4, 2024
451dec2
Merge from upstream
eoroshiba Jun 4, 2024
89f03ab
Merge branch 'main' into eoroshiba
eoroshiba Jun 4, 2024
58d47b2
Swapped Genesis for Sequencer Client Status
eoroshiba Jun 5, 2024
c020f7e
Added chain_id check on composer startup. Formatted.
eoroshiba Jun 7, 2024
406a97a
Added chain_id check on composer startup
eoroshiba Jun 7, 2024
31ad1e9
minor syntax changes, added comments for test cases
eoroshiba Jun 7, 2024
39c9be1
Merge remote-tracking branch 'upstream' into eoroshiba
eoroshiba Jun 7, 2024
e915b8f
merge upstream changes
eoroshiba Jun 7, 2024
abf0144
minor updates
eoroshiba Jun 10, 2024
4923b8f
merge from upstream
eoroshiba Jun 10, 2024
42250cb
minor changes
eoroshiba Jun 10, 2024
5382a75
Merge branch 'main' of https://github.com/eoroshiba/astria into eoros…
eoroshiba Jun 13, 2024
edf423e
cargo format
eoroshiba Jun 13, 2024
7f1d9a1
make build function sync, remove status mock guard, ad-hoc json creation
eoroshiba Jun 13, 2024
b69f7a7
move response json inside mount, move chain_id check out of run_until…
eoroshiba Jun 13, 2024
f3c96ff
use tendermint genesis, move validation to executor, other minor changes
eoroshiba Jun 15, 2024
f7201de
Merge branch 'main' into eoroshiba
eoroshiba Jun 15, 2024
24e4b17
minor changes
eoroshiba Jun 15, 2024
aef7fb1
Merge branch 'eoroshiba' of https://github.com/eoroshiba/astria into …
eoroshiba Jun 15, 2024
b8d148f
minor changes
eoroshiba Jun 25, 2024
7a26d15
Merge remote-tracking branch 'upstream' into eoroshiba
eoroshiba Jun 25, 2024
be387af
minor changes
eoroshiba Jun 25, 2024
e3f13d7
Merge branch 'main' into eoroshiba
eoroshiba Jun 27, 2024
531bb4e
Merge remote-tracking branch 'upstream' into eoroshiba
eoroshiba Jul 1, 2024
d3c638c
Merge branch 'eoroshiba' of https://github.com/eoroshiba/astria into …
eoroshiba Jul 1, 2024
8523e04
Streamline chain_id mismatch implementation and testing
eoroshiba Jul 1, 2024
f5c9fd4
Suggested change from @SuperFluffy
eoroshiba Jul 1, 2024
03d14dc
Add helper function for asserting chain ID error
eoroshiba Jul 1, 2024
b853f2a
Merge branch 'eoroshiba' of https://github.com/eoroshiba/astria into …
eoroshiba Jul 1, 2024
33ae83d
minor improvements requested by @superfluffy
eoroshiba Jul 2, 2024
7d1a2e9
Merge branch 'main' into eoroshiba
eoroshiba Jul 2, 2024
4d2ebb7
minor update for clippy
eoroshiba Jul 3, 2024
2f86488
Merge branch 'eoroshiba' of https://github.com/eoroshiba/astria into …
eoroshiba Jul 3, 2024
0a931f5
@superfluffy suggestion
eoroshiba Jul 3, 2024
01c938e
Merge branch 'astriaorg:main' into eoroshiba
eoroshiba Jul 7, 2024
c23101b
Merge branch 'main' into eoroshiba
ethanoroshiba Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 80 additions & 4 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,21 @@ use futures::{
use pin_project_lite::pin_project;
use prost::Message as _;
use sequencer_client::{
tendermint_rpc::endpoint::broadcast::tx_sync,
tendermint_rpc::{
endpoint::broadcast::tx_sync,
Client as _,
},
Address,
SequencerClientExt as _,
};
use tendermint::crypto::Sha256;
use tokio::{
select,
sync::{
mpsc,
mpsc::error::SendTimeoutError,
mpsc::{
self,
error::SendTimeoutError,
},
watch,
},
time::{
Expand Down Expand Up @@ -94,7 +99,16 @@ pub(crate) use builder::Builder;
const BUNDLE_DRAINING_DURATION: Duration = Duration::from_secs(16);

type StdError = dyn std::error::Error;

#[derive(Debug, thiserror::Error)]
pub(crate) enum EnsureChainIdError {
#[error("failed to obtain sequencer chain ID after multiple retries")]
GetChainId(#[source] sequencer_client::tendermint_rpc::Error),
#[error("expected chain ID `{expected}`, but received `{actual}`")]
WrongChainId {
expected: String,
actual: tendermint::chain::Id,
},
}
/// The `Executor` interfaces with the sequencer. It handles account nonces, transaction signing,
/// and transaction submission.
/// The `Executor` receives `Vec<Action>` from the bundling logic, packages them with a nonce into
Expand Down Expand Up @@ -199,6 +213,17 @@ impl Executor {
/// An error is returned if connecting to the sequencer fails.
#[instrument(skip_all, fields(address = %self.address))]
pub(super) async fn run_until_stopped(mut self) -> eyre::Result<()> {
select!(
biased;
() = self.shutdown_token.cancelled() => {
info!("received shutdown signal while running initialization routines; exiting");
return Ok(());
}

res = self.pre_run_checks() => {
res.wrap_err("required pre-run checks failed")?;
}
);
let mut submission_fut: Fuse<Instrumented<SubmitFut>> = Fuse::terminated();
let mut nonce = get_latest_nonce(self.sequencer_client.clone(), self.address, self.metrics)
.await
Expand Down Expand Up @@ -417,6 +442,57 @@ impl Executor {

reason.map(|_| ())
}

/// Performs initialization checks prior to running the executor
async fn pre_run_checks(&self) -> eyre::Result<()> {
self.ensure_chain_id_is_correct().await?;
Ok(())
}

/// Performs check to ensure the configured chain ID matches the remote chain ID
pub(crate) async fn ensure_chain_id_is_correct(&self) -> Result<(), EnsureChainIdError> {
let remote_chain_id = self
.get_sequencer_chain_id()
.await
eoroshiba marked this conversation as resolved.
Show resolved Hide resolved
.map_err(EnsureChainIdError::GetChainId)?;
if remote_chain_id.as_str() != self.sequencer_chain_id {
return Err(EnsureChainIdError::WrongChainId {
expected: self.sequencer_chain_id.clone(),
actual: remote_chain_id,
});
}
Ok(())
}

/// Fetch chain id from the sequencer client
async fn get_sequencer_chain_id(
&self,
) -> Result<tendermint::chain::Id, sequencer_client::tendermint_rpc::Error> {
let retry_config = tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
|attempt: u32,
next_delay: Option<Duration>,
error: &sequencer_client::tendermint_rpc::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to fetch sequencer genesis info; retrying after backoff",
);
futures::future::ready(())
},
);
let client_genesis: tendermint::Genesis =
tryhard::retry_fn(|| self.sequencer_client.genesis())
.with_config(retry_config)
.await?;
Ok(client_genesis.chain_id)
}
}

/// Queries the sequencer for the latest nonce with an exponential backoff
Expand Down
173 changes: 152 additions & 21 deletions crates/astria-composer/src/executor/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::{
};

use astria_core::{
generated::protocol::account::v1alpha1::NonceResponse,
primitive::v1::{
RollupId,
ROLLUP_ID_LEN,
Expand All @@ -16,6 +17,17 @@ use prost::Message;
use sequencer_client::SignedTransaction;
use serde_json::json;
use tempfile::NamedTempFile;
use tendermint::{
consensus::{
params::{
AbciParams,
ValidatorParams,
},
Params,
},
Genesis,
Time,
};
use tendermint_rpc::{
endpoint::broadcast::tx_sync,
request,
Expand All @@ -42,6 +54,7 @@ use wiremock::{

use crate::{
executor,
executor::EnsureChainIdError,
metrics::Metrics,
test_utils::sequence_action_of_max_size,
Config,
Expand Down Expand Up @@ -74,19 +87,9 @@ fn sequence_action() -> SequenceAction {
}

/// Start a mock sequencer server and mount a mock for the `accounts/nonce` query.
async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) {
use astria_core::generated::protocol::account::v1alpha1::NonceResponse;
async fn setup() -> (MockServer, Config, NamedTempFile) {
Lazy::force(&TELEMETRY);
let server = MockServer::start().await;
let startup_guard = mount_nonce_query_mock(
&server,
"accounts/nonce",
NonceResponse {
height: 0,
nonce: 0,
},
)
.await;

let keyfile = NamedTempFile::new().unwrap();
(&keyfile)
Expand All @@ -112,15 +115,37 @@ async fn setup() -> (MockServer, MockGuard, Config, NamedTempFile) {
grpc_addr: "127.0.0.1:0".parse().unwrap(),
fee_asset: "nria".parse().unwrap(),
};
(server, startup_guard, cfg, keyfile)
(server, cfg, keyfile)
}

/// Assert that given error is of correct type and contains the expected chain IDs.
#[track_caller]
fn assert_chain_id_err(
eoroshiba marked this conversation as resolved.
Show resolved Hide resolved
err: &EnsureChainIdError,
configured_expected: &str,
configured_actual: &tendermint::chain::Id,
) {
match err {
EnsureChainIdError::WrongChainId {
expected,
actual,
} => {
assert_eq!(*expected, configured_expected);
assert_eq!(*actual, *configured_actual);
}
other @ EnsureChainIdError::GetChainId(_) => {
panic!("expected `EnsureChainIdError::WrongChainId`, but got '{other:?}'")
}
}
}

/// Mount a mock for the `abci_query` endpoint.
async fn mount_nonce_query_mock(
server: &MockServer,
query_path: &str,
response: impl Message,
) -> MockGuard {
async fn mount_default_nonce_query_mock(server: &MockServer) -> MockGuard {
let query_path = "accounts/nonce";
let response = NonceResponse {
height: 0,
nonce: 0,
};
let expected_body = json!({
"method": "abci_query"
});
Expand Down Expand Up @@ -191,6 +216,53 @@ async fn mount_broadcast_tx_sync_seq_actions_mock(server: &MockServer) -> MockGu
.await
}

/// Mounts genesis file with specified sequencer chain ID
async fn mount_genesis(server: &MockServer, mock_sequencer_chain_id: &str) {
eoroshiba marked this conversation as resolved.
Show resolved Hide resolved
Mock::given(body_partial_json(
json!({"jsonrpc": "2.0", "method": "genesis", "params": null}),
))
.respond_with(ResponseTemplate::new(200).set_body_json(
tendermint_rpc::response::Wrapper::new_with_id(
tendermint_rpc::Id::uuid_v4(),
Some(
tendermint_rpc::endpoint::genesis::Response::<serde_json::Value> {
genesis: Genesis {
genesis_time: Time::from_unix_timestamp(1, 1).unwrap(),
chain_id: mock_sequencer_chain_id.try_into().unwrap(),
initial_height: 1,
consensus_params: Params {
block: tendermint::block::Size {
max_bytes: 1024,
max_gas: 1024,
time_iota_ms: 1000,
},
evidence: tendermint::evidence::Params {
max_age_num_blocks: 1000,
max_age_duration: tendermint::evidence::Duration(
Duration::from_secs(3600),
),
max_bytes: 1_048_576,
},
validator: ValidatorParams {
pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519],
},
version: None,
abci: AbciParams::default(),
},
validators: vec![],
app_hash: tendermint::hash::AppHash::default(),
app_state: serde_json::Value::Null,
},
},
),
None,
),
))
.expect(1..)
.mount(server)
.await;
}

/// Helper to wait for the executor to connect to the mock sequencer
async fn wait_for_startup(
mut status: watch::Receiver<executor::Status>,
Expand All @@ -211,14 +283,16 @@ async fn wait_for_startup(

Ok(())
}

/// Test to check that the executor sends a signed transaction to the sequencer as soon as it
/// receives a `SequenceAction` that fills it beyond its `max_bundle_size`.
#[tokio::test]
async fn full_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -233,6 +307,7 @@ async fn full_bundle() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());
Expand Down Expand Up @@ -305,9 +380,10 @@ async fn full_bundle() {
#[tokio::test]
async fn bundle_triggered_by_block_timer() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -322,6 +398,7 @@ async fn bundle_triggered_by_block_timer() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());
Expand Down Expand Up @@ -391,9 +468,10 @@ async fn bundle_triggered_by_block_timer() {
#[tokio::test]
async fn two_seq_actions_single_bundle() {
// set up the executor, channel for writing seq actions, and the sequencer mock
let (sequencer, nonce_guard, cfg, _keyfile) = setup().await;
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));
mount_genesis(&sequencer, &cfg.sequencer_chain_id).await;
let (executor, executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
Expand All @@ -408,8 +486,8 @@ async fn two_seq_actions_single_bundle() {
.build()
.unwrap();

let nonce_guard = mount_default_nonce_query_mock(&sequencer).await;
let status = executor.subscribe();

let _executor_task = tokio::spawn(executor.run_until_stopped());

// wait for sequencer to get the initial nonce request from sequencer
Expand Down Expand Up @@ -481,3 +559,56 @@ async fn two_seq_actions_single_bundle() {
);
}
}

/// Test to check that executor's chain ID check is properly checked against the sequencer's chain
/// ID
#[tokio::test]
async fn chain_id_mismatch_returns_error() {
use tendermint::chain::Id;

// set up sequencer mock
let (sequencer, cfg, _keyfile) = setup().await;
let shutdown_token = CancellationToken::new();
let metrics = Box::leak(Box::new(Metrics::new(cfg.parse_rollups().unwrap().keys())));

// mount a status response with an incorrect chain_id
mount_genesis(&sequencer, "bad-chain-id").await;

// build the executor with the correct chain_id
let (executor, _executor_handle) = executor::Builder {
sequencer_url: cfg.sequencer_url.clone(),
sequencer_chain_id: cfg.sequencer_chain_id.clone(),
private_key_file: cfg.private_key_file.clone(),
sequencer_address_prefix: cfg.sequencer_address_prefix.clone(),
block_time_ms: cfg.block_time_ms,
max_bytes_per_bundle: cfg.max_bytes_per_bundle,
bundle_queue_capacity: cfg.bundle_queue_capacity,
shutdown_token: shutdown_token.clone(),
metrics,
}
.build()
.unwrap();

// ensure that run_until_stopped returns WrongChainId error
let err = executor.run_until_stopped().await.expect_err(
"should exit with an error when reading a bad chain ID, but exited with success",
);
let mut found = false;
for cause in err.chain() {
if let Some(err) = cause.downcast_ref::<EnsureChainIdError>() {
assert_chain_id_err(
err,
&cfg.sequencer_chain_id,
&Id::try_from("bad-chain-id".to_string()).unwrap(),
);
found = true;
break;
}
}

// ensure that the error chain contains the expected error
assert!(
found,
"expected `EnsureChainIdError::WrongChainId` in error chain, but it was not found"
);
}
Loading
Loading