diff --git a/Cargo.lock b/Cargo.lock index 522729f894..2630d1141e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1577,9 +1577,9 @@ dependencies = [ [[package]] name = "celestia-tendermint" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95f93b5cbbd62b6cfde961889bf05d5fe19e70d8500c4465694306ed2695ac23" +checksum = "ce8c92a01145f79a0f3ac7c44a43a9b5ee58e8a4c716b56d98833a3848db1afd" dependencies = [ "bytes", "celestia-tendermint-proto", @@ -1606,9 +1606,9 @@ dependencies = [ [[package]] name = "celestia-tendermint-proto" -version = "0.32.1" +version = "0.32.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8f7d49c1ececa30a4587c5fe8a4035b786b78a3253ed0f9636de591b3dc2b37" +checksum = "9a95746c5221a74d7b913a415fdbb9e7c90e1b4d818dbbff59bddc034cfce2ec" dependencies = [ "bytes", "flex-error", diff --git a/charts/evm-rollup/Chart.yaml b/charts/evm-rollup/Chart.yaml index e8838becc7..8322d55832 100644 --- a/charts/evm-rollup/Chart.yaml +++ b/charts/evm-rollup/Chart.yaml @@ -15,13 +15,13 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.27.3 +version: 0.27.4 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.14.1" +appVersion: "0.14.2" maintainers: - name: wafflesvonmaple diff --git a/charts/evm-rollup/templates/configmap.yaml b/charts/evm-rollup/templates/configmap.yaml index c69d34ef09..a8e38f5c34 100644 --- a/charts/evm-rollup/templates/configmap.yaml +++ b/charts/evm-rollup/templates/configmap.yaml @@ -31,6 +31,8 @@ data: OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceNamePrefix . }}-conductor" {{- if not .Values.global.dev }} {{- else }} + ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.conductor.sequencerChainId . }}" + ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID: "{{ tpl .Values.config.conductor.celestiaChainId . }}" {{- end }} --- apiVersion: v1 diff --git a/charts/evm-rollup/values.yaml b/charts/evm-rollup/values.yaml index fae0fd36cc..96d9167468 100644 --- a/charts/evm-rollup/values.yaml +++ b/charts/evm-rollup/values.yaml @@ -16,7 +16,7 @@ images: conductor: repo: ghcr.io/astriaorg/conductor pullPolicy: IfNotPresent - tag: "0.20.1" + tag: "0.21.0" devTag: latest @@ -178,6 +178,8 @@ config: sequencerGrpc: "" # The maximum number of requests to make to the sequencer per second sequencerRequestsPerSecond: 500 + # The chain id of the celestia network the conductor communicates with + celestiaChainId: "" celestia: # if config.rollup.executionLevel is NOT 'SoftOnly' AND celestia-node is not enabled diff --git a/charts/evm-stack/Chart.lock b/charts/evm-stack/Chart.lock index 9563164e67..fc9ac843ba 100644 --- a/charts/evm-stack/Chart.lock +++ b/charts/evm-stack/Chart.lock @@ -4,7 +4,7 @@ dependencies: version: 0.3.6 - name: evm-rollup repository: file://../evm-rollup - version: 0.27.3 + version: 0.27.4 - name: composer repository: file://../composer version: 0.1.4 @@ -20,5 +20,5 @@ dependencies: - name: blockscout-stack repository: https://blockscout.github.io/helm-charts version: 1.6.2 -digest: sha256:6e62801b5f401ba653f88a5ed9d33a6de38b8bba5ba942d01a2af68371c8bfd8 -generated: "2024-09-19T12:52:41.503045-07:00" +digest: sha256:b086adf099e986e3a5c1f7f25481aaf42ebf597029a70ee0bd3ff6711e6bdccf +generated: "2024-09-25T14:31:21.35488-05:00" diff --git a/charts/evm-stack/Chart.yaml b/charts/evm-stack/Chart.yaml index b584a7e985..be8c826308 100644 --- a/charts/evm-stack/Chart.yaml +++ b/charts/evm-stack/Chart.yaml @@ -15,7 +15,7 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.6.0 +version: 0.6.1 dependencies: - name: celestia-node @@ -23,7 +23,7 @@ dependencies: repository: "file://../celestia-node" condition: celestia-node.enabled - name: evm-rollup - version: 0.27.3 + version: 0.27.4 repository: "file://../evm-rollup" - name: composer version: 0.1.4 diff --git a/charts/evm-stack/values.yaml b/charts/evm-stack/values.yaml index 6aba3ae143..c37991a2a5 100644 --- a/charts/evm-stack/values.yaml +++ b/charts/evm-stack/values.yaml @@ -13,6 +13,7 @@ global: rollupName: "" evmChainId: "" sequencerChainId: "" + celestiaChainId: "" otel: endpoint: "" tracesEndpoint: "" @@ -29,6 +30,7 @@ evm-rollup: config: conductor: sequencerChainId: "{{ .Values.global.sequencerChainId }}" + celestiaChainId: "{{ .Values.global.celestiaChainId }}" sequencerRpc: "{{ .Values.global.sequencerRpc }}" sequencerGrpc: "{{ .Values.global.sequencerGrpc }}" otel: diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index bb2cbc7a0d..15c250bd2f 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -68,6 +68,7 @@ tower = { version = "0.4.13", features = ["limit"] } # when updating. celestia-rpc = "0.1.1" celestia-types = { workspace = true } +celestia-tendermint = { workspace = true } jsonrpsee = { version = "0.20", features = ["client-core", "macros"] } [dev-dependencies] diff --git a/crates/astria-conductor/local.env.example b/crates/astria-conductor/local.env.example index 6237d3109b..9a1eb3e43d 100644 --- a/crates/astria-conductor/local.env.example +++ b/crates/astria-conductor/local.env.example @@ -73,6 +73,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000 # CometBFT node. ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500 +# The chain ID of the sequencer network the conductor should be communicating with. +ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID="test-sequencer-1000" + +# The chain ID of the Celestia network the conductor should be communicating with. +ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID="test-celestia-1000" + # Set to true to enable prometheus metrics. ASTRIA_CONDUCTOR_NO_METRICS=true diff --git a/crates/astria-conductor/src/celestia/builder.rs b/crates/astria-conductor/src/celestia/builder.rs index 7860d439ee..ef33a28cbd 100644 --- a/crates/astria-conductor/src/celestia/builder.rs +++ b/crates/astria-conductor/src/celestia/builder.rs @@ -23,6 +23,8 @@ pub(crate) struct Builder { pub(crate) executor: executor::Handle, pub(crate) sequencer_cometbft_client: SequencerClient, pub(crate) sequencer_requests_per_second: u32, + pub(crate) expected_celestia_chain_id: String, + pub(crate) expected_sequencer_chain_id: String, pub(crate) shutdown: CancellationToken, pub(crate) metrics: &'static Metrics, } @@ -37,6 +39,8 @@ impl Builder { executor, sequencer_cometbft_client, sequencer_requests_per_second, + expected_celestia_chain_id, + expected_sequencer_chain_id, shutdown, metrics, } = self; @@ -50,6 +54,8 @@ impl Builder { executor, sequencer_cometbft_client, sequencer_requests_per_second, + expected_celestia_chain_id, + expected_sequencer_chain_id, shutdown, metrics, }) diff --git a/crates/astria-conductor/src/celestia/mod.rs b/crates/astria-conductor/src/celestia/mod.rs index 5de1d5b093..8dc6904630 100644 --- a/crates/astria-conductor/src/celestia/mod.rs +++ b/crates/astria-conductor/src/celestia/mod.rs @@ -11,9 +11,11 @@ use astria_core::{ use astria_eyre::eyre::{ self, bail, + ensure, WrapErr as _, }; use bytes::Bytes; +use celestia_rpc::HeaderClient as _; use celestia_types::nmt::Namespace; use futures::{ future::{ @@ -139,6 +141,12 @@ pub(crate) struct Reader { /// (usually to verify block data retrieved from Celestia blobs). sequencer_requests_per_second: u32, + /// The chain ID of the Celestia network the reader should be communicating with. + expected_celestia_chain_id: String, + + /// The chain ID of the Sequencer the reader should be communicating with. + expected_sequencer_chain_id: String, + /// Token to listen for Conductor being shut down. shutdown: CancellationToken, @@ -147,7 +155,7 @@ pub(crate) struct Reader { impl Reader { pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> { - let (executor, sequencer_chain_id) = select!( + let ((), executor, sequencer_chain_id) = select!( () = self.shutdown.clone().cancelled_owned() => { info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(|| info!("received shutdown signal while waiting for Celestia reader task to initialize") @@ -169,7 +177,20 @@ impl Reader { #[instrument(skip_all, err)] async fn initialize( &mut self, - ) -> eyre::Result<(executor::Handle, tendermint::chain::Id)> { + ) -> eyre::Result<((), executor::Handle, tendermint::chain::Id)> { + let validate_celestia_chain_id = async { + let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client) + .await + .wrap_err("failed to fetch Celestia chain ID")?; + let expected_celestia_chain_id = &self.expected_celestia_chain_id; + ensure!( + self.expected_celestia_chain_id == actual_celestia_chain_id.as_str(), + "expected Celestia chain id `{expected_celestia_chain_id}` does not match actual: \ + `{actual_celestia_chain_id}`" + ); + Ok(()) + }; + let wait_for_init_executor = async { self.executor .wait_for_init() @@ -177,16 +198,55 @@ impl Reader { .wrap_err("handle to executor failed while waiting for it being initialized") }; - let get_sequencer_chain_id = async { - get_sequencer_chain_id(self.sequencer_cometbft_client.clone()) - .await - .wrap_err("failed to get sequencer chain ID") + let get_and_validate_sequencer_chain_id = async { + let actual_sequencer_chain_id = + get_sequencer_chain_id(self.sequencer_cometbft_client.clone()) + .await + .wrap_err("failed to get sequencer chain ID")?; + let expected_sequencer_chain_id = &self.expected_sequencer_chain_id; + ensure!( + self.expected_sequencer_chain_id == actual_sequencer_chain_id.to_string(), + "expected Celestia chain id `{expected_sequencer_chain_id}` does not match \ + actual: `{actual_sequencer_chain_id}`" + ); + Ok(actual_sequencer_chain_id) }; - try_join!(wait_for_init_executor, get_sequencer_chain_id) + try_join!( + validate_celestia_chain_id, + wait_for_init_executor, + get_and_validate_sequencer_chain_id + ) } } +#[instrument(skip_all, err)] +async fn get_celestia_chain_id( + celestia_client: &CelestiaClient, +) -> eyre::Result { + let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, next_delay: Option, error: &jsonrpsee::core::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to fetch celestia network header info; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + let network_head = tryhard::retry_fn(|| celestia_client.header_network_head()) + .with_config(retry_config) + .await?; + Ok(network_head.chain_id().clone()) +} + struct RunningReader { block_cache: BlockCache, diff --git a/crates/astria-conductor/src/conductor/inner.rs b/crates/astria-conductor/src/conductor/inner.rs index 243f69569e..c52bd32536 100644 --- a/crates/astria-conductor/src/conductor/inner.rs +++ b/crates/astria-conductor/src/conductor/inner.rs @@ -6,6 +6,7 @@ use std::{ use astria_eyre::eyre::{ self, eyre, + Result, WrapErr as _, }; use itertools::Itertools as _; @@ -47,7 +48,7 @@ enum ExitReason { ShutdownSignal, TaskFailed { name: &'static str, - error: eyre::ErrReport, + error: eyre::Report, }, } @@ -55,12 +56,12 @@ pin_project! { /// A handle returned by [`ConductorInner::spawn`]. pub(super) struct InnerHandle { shutdown_token: CancellationToken, - task: Option>, + task: Option>>, } } impl Future for InnerHandle { - type Output = Result; + type Output = Result, tokio::task::JoinError>; fn poll( self: std::pin::Pin<&mut Self>, @@ -132,6 +133,7 @@ impl ConductorInner { sequencer_grpc_client, sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms), + expected_sequencer_chain_id: cfg.expected_sequencer_chain_id.clone(), shutdown: shutdown_token.clone(), executor: executor_handle.clone(), } @@ -153,6 +155,8 @@ impl ConductorInner { executor: executor_handle.clone(), sequencer_cometbft_client: sequencer_cometbft_client.clone(), sequencer_requests_per_second: cfg.sequencer_requests_per_second, + expected_celestia_chain_id: cfg.expected_celestia_chain_id, + expected_sequencer_chain_id: cfg.expected_sequencer_chain_id, shutdown: shutdown_token.clone(), metrics, } @@ -172,7 +176,7 @@ impl ConductorInner { /// /// # Panics /// Panics if it could not install a signal handler. - async fn run_until_stopped(mut self) -> RestartOrShutdown { + async fn run_until_stopped(mut self) -> Result { info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running")); let exit_reason = select! { @@ -219,7 +223,7 @@ impl ConductorInner { /// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds /// to abort the remaining tasks. #[instrument(skip_all)] - async fn shutdown(mut self, exit_reason: ExitReason) -> RestartOrShutdown { + async fn shutdown(mut self, exit_reason: ExitReason) -> Result { self.shutdown_token.cancel(); let mut restart_or_shutdown = RestartOrShutdown::Shutdown; @@ -273,7 +277,15 @@ impl ConductorInner { } info!("shutting down"); - restart_or_shutdown + if let ExitReason::TaskFailed { + error, .. + } = exit_reason + { + if matches!(restart_or_shutdown, RestartOrShutdown::Shutdown) { + return Err(error); + } + } + Ok(restart_or_shutdown) } } @@ -289,7 +301,7 @@ fn report_exit(exit_reason: &ExitReason, message: &str) { } #[instrument(skip_all)] -fn check_for_restart(name: &str, err: &eyre::ErrReport) -> bool { +fn check_for_restart(name: &str, err: &eyre::Report) -> bool { if name != ConductorInner::EXECUTOR { return false; } diff --git a/crates/astria-conductor/src/conductor/mod.rs b/crates/astria-conductor/src/conductor/mod.rs index 796081ba48..f7450d0668 100644 --- a/crates/astria-conductor/src/conductor/mod.rs +++ b/crates/astria-conductor/src/conductor/mod.rs @@ -1,8 +1,14 @@ mod inner; -use std::future::Future; +use std::{ + future::Future, + task::ready, +}; -use astria_eyre::eyre; +use astria_eyre::eyre::{ + self, + Result, +}; use inner::{ ConductorInner, InnerHandle, @@ -51,7 +57,7 @@ impl Handle { } impl Future for Handle { - type Output = Result, tokio::task::JoinError>; + type Output = eyre::Result<()>; fn poll( self: std::pin::Pin<&mut Self>, @@ -63,7 +69,9 @@ impl Future for Handle { .task .as_mut() .expect("the Conductor handle must not be polled after shutdown"); - task.poll_unpin(cx) + + let res = ready!(task.poll_unpin(cx)); + std::task::Poll::Ready(crate::utils::flatten(res)) } } @@ -128,17 +136,18 @@ impl Conductor { #[instrument(skip_all, err)] async fn shutdown_or_restart( &mut self, - exit_reason: Result, + exit_reason: Result, JoinError>, ) -> eyre::Result<&'static str> { match exit_reason { - Ok(restart_or_shutdown) => match restart_or_shutdown { + Ok(Ok(restart_or_shutdown)) => match restart_or_shutdown { RestartOrShutdown::Restart => { self.restart(); return Ok("restarting"); } - RestartOrShutdown::Shutdown => Ok("conductor exiting"), + RestartOrShutdown::Shutdown => Ok("shutting down"), }, - Err(err) => Err(eyre::ErrReport::from(err).wrap_err("conductor failed")), + Ok(Err(err)) => Err(err.wrap_err("conductor exited with an error")), + Err(err) => Err(eyre::Report::new(err).wrap_err("conductor panicked")), } } diff --git a/crates/astria-conductor/src/config.rs b/crates/astria-conductor/src/config.rs index 0cb8c0969d..e8211b1714 100644 --- a/crates/astria-conductor/src/config.rs +++ b/crates/astria-conductor/src/config.rs @@ -63,6 +63,12 @@ pub struct Config { /// The number of requests per second that will be sent to Sequencer. pub sequencer_requests_per_second: u32, + /// The chain ID of the sequencer network the conductor should be communiacting with. + pub expected_sequencer_chain_id: String, + + /// The chain ID of the Celestia network the conductor should be communicating with. + pub expected_celestia_chain_id: String, + /// Address of the RPC server for execution pub execution_rpc_url: String, diff --git a/crates/astria-conductor/src/sequencer/builder.rs b/crates/astria-conductor/src/sequencer/builder.rs index 2cdaf4bf62..c71aa0e7d8 100644 --- a/crates/astria-conductor/src/sequencer/builder.rs +++ b/crates/astria-conductor/src/sequencer/builder.rs @@ -10,6 +10,7 @@ pub(crate) struct Builder { pub(crate) sequencer_grpc_client: SequencerGrpcClient, pub(crate) sequencer_cometbft_client: sequencer_client::HttpClient, pub(crate) sequencer_block_time: Duration, + pub(crate) expected_sequencer_chain_id: String, pub(crate) shutdown: CancellationToken, } @@ -20,6 +21,7 @@ impl Builder { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, + expected_sequencer_chain_id, shutdown, } = self; super::Reader { @@ -27,6 +29,7 @@ impl Builder { sequencer_grpc_client, sequencer_cometbft_client, sequencer_block_time, + expected_sequencer_chain_id, shutdown, } } diff --git a/crates/astria-conductor/src/sequencer/mod.rs b/crates/astria-conductor/src/sequencer/mod.rs index 719fb72050..524fbc00d7 100644 --- a/crates/astria-conductor/src/sequencer/mod.rs +++ b/crates/astria-conductor/src/sequencer/mod.rs @@ -6,6 +6,7 @@ use astria_core::sequencerblock::v1alpha1::block::FilteredSequencerBlock; use astria_eyre::eyre::{ self, bail, + ensure, Report, WrapErr as _, }; @@ -77,6 +78,9 @@ pub(crate) struct Reader { /// height. sequencer_block_time: Duration, + /// The chain ID of the sequencer network the reader should be communicating with. + expected_sequencer_chain_id: String, + /// Token to listen for Conductor being shut down. shutdown: CancellationToken, } @@ -99,6 +103,17 @@ impl Reader { #[instrument(skip_all, err)] async fn initialize(&mut self) -> eyre::Result> { + let actual_sequencer_chain_id = + get_sequencer_chain_id(self.sequencer_cometbft_client.clone()) + .await + .wrap_err("failed to get chain ID from Sequencer")?; + let expected_sequencer_chain_id = &self.expected_sequencer_chain_id; + ensure!( + self.expected_sequencer_chain_id == actual_sequencer_chain_id.as_str(), + "expected chain id `{expected_sequencer_chain_id}` does not match actual: \ + `{actual_sequencer_chain_id}`" + ); + self.executor .wait_for_init() .await @@ -313,3 +328,35 @@ fn report_exit(reason: eyre::Result<&str>, message: &str) -> eyre::Result<()> { } } } + +#[instrument(skip_all, err)] +async fn get_sequencer_chain_id( + client: sequencer_client::HttpClient, +) -> eyre::Result { + use sequencer_client::Client as _; + + let retry_config = tryhard::RetryFutureConfig::new(u32::MAX) + .exponential_backoff(Duration::from_millis(100)) + .max_delay(Duration::from_secs(20)) + .on_retry( + |attempt: u32, next_delay: Option, error: &tendermint_rpc::Error| { + let wait_duration = next_delay + .map(humantime::format_duration) + .map(tracing::field::display); + warn!( + attempt, + wait_duration, + error = error as &dyn std::error::Error, + "attempt to fetch sequencer genesis info; retrying after backoff", + ); + futures::future::ready(()) + }, + ); + + let genesis: tendermint::Genesis = tryhard::retry_fn(|| client.genesis()) + .with_config(retry_config) + .await + .wrap_err("failed to get genesis info from Sequencer after a lot of attempts")?; + + Ok(genesis.chain_id) +} diff --git a/crates/astria-conductor/tests/blackbox/firm_only.rs b/crates/astria-conductor/tests/blackbox/firm_only.rs index 51cf03db5f..9bc1fa9bf3 100644 --- a/crates/astria-conductor/tests/blackbox/firm_only.rs +++ b/crates/astria-conductor/tests/blackbox/firm_only.rs @@ -1,14 +1,42 @@ use std::time::Duration; -use astria_conductor::config::CommitLevel; +use astria_conductor::{ + config::CommitLevel, + Conductor, + Config, +}; +use astria_core::generated::execution::v1alpha2::{ + GetCommitmentStateRequest, + GetGenesisInfoRequest, +}; use futures::future::{ join, join4, }; +use serde_json::json; +use telemetry::metrics; use tokio::time::timeout; +use wiremock::{ + matchers::{ + body_partial_json, + header, + }, + Mock, + ResponseTemplate, +}; use crate::{ - helpers::spawn_conductor, + celestia_network_head, + commitment_state, + genesis_info, + helpers::{ + make_config, + spawn_conductor, + MockGrpc, + CELESTIA_BEARER_TOKEN, + CELESTIA_CHAIN_ID, + SEQUENCER_CHAIN_ID, + }, mount_celestia_blobs, mount_celestia_header_network_head, mount_executed_block, @@ -377,3 +405,111 @@ async fn fetch_from_later_celestia_height() { within 1000ms", ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn exits_on_celestia_chain_id_mismatch() { + use astria_grpc_mock::{ + matcher, + response as GrpcResponse, + Mock as GrpcMock, + }; + + // FIXME (https://github.com/astriaorg/astria/issues/1602) + // We have to create our own test conductor and perform mounts manually because `TestConductor` + // implements the `Drop` trait, which disallows us from taking ownership of its tasks and + // awaiting their completion. + + let mock_grpc = MockGrpc::spawn().await; + let mock_http = wiremock::MockServer::start().await; + + let config = Config { + celestia_node_http_url: mock_http.uri(), + execution_rpc_url: format!("http://{}", mock_grpc.local_addr), + sequencer_cometbft_url: mock_http.uri(), + sequencer_grpc_url: format!("http://{}", mock_grpc.local_addr), + execution_commit_level: CommitLevel::FirmOnly, + ..make_config() + }; + + let (metrics, _) = metrics::ConfigBuilder::new() + .set_global_recorder(false) + .build(&()) + .unwrap(); + let metrics = Box::leak(Box::new(metrics)); + + let conductor = { + let conductor = Conductor::new(config, metrics).unwrap(); + conductor.spawn() + }; + + GrpcMock::for_rpc_given( + "get_genesis_info", + matcher::message_type::(), + ) + .respond_with(GrpcResponse::constant_response( + genesis_info!(sequencer_genesis_block_height: 1, + celestia_block_variance: 10,), + )) + .expect(0..) + .mount(&mock_grpc.mock_server) + .await; + + GrpcMock::for_rpc_given( + "get_commitment_state", + matcher::message_type::(), + ) + .respond_with(GrpcResponse::constant_response(commitment_state!(firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + base_celestia_height: 1,))) + .expect(0..) + .mount(&mock_grpc.mock_server) + .await; + + let bad_chain_id = "bad_chain_id"; + + Mock::given(body_partial_json( + json!({"jsonrpc": "2.0", "method": "header.NetworkHead"}), + )) + .and(header( + "authorization", + &*format!("Bearer {CELESTIA_BEARER_TOKEN}"), + )) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ + "jsonrpc": "2.0", + "id": 0, + "result": celestia_network_head!(height: 1u32, chain_id: bad_chain_id), + }))) + .expect(1..) + .mount(&mock_http) + .await; + + let res = conductor.await; + match res { + Ok(()) => panic!("conductor should have exited with an error, no error received"), + Err(e) => { + let mut source = e.source(); + while source.is_some() { + let err = source.unwrap(); + if err.to_string().contains( + format!( + "expected Celestia chain id `{CELESTIA_CHAIN_ID}` does not match actual: \ + `{bad_chain_id}`" + ) + .as_str(), + ) { + return; + } + source = err.source(); + } + panic!("expected exit due to chain ID mismatch, but got a different error: {e:?}") + } + } +} diff --git a/crates/astria-conductor/tests/blackbox/helpers/macros.rs b/crates/astria-conductor/tests/blackbox/helpers/macros.rs index b416db47ff..357e7e7ee1 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/macros.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/macros.rs @@ -16,6 +16,9 @@ macro_rules! block { #[macro_export] macro_rules! celestia_network_head { (height: $height:expr) => { + celestia_network_head!(height: $height, chain_id: $crate::helpers::CELESTIA_CHAIN_ID) + }; + (height: $height:expr,chain_id: $chain_id:expr $(,)?) => { ::celestia_types::ExtendedHeader { header: ::celestia_tendermint::block::header::Header { height: $height.into(), @@ -23,7 +26,7 @@ macro_rules! celestia_network_head { block: 0, app: 0, }, - chain_id: "test_celestia-1000".try_into().unwrap(), + chain_id: $chain_id.try_into().unwrap(), time: ::celestia_tendermint::Time::from_unix_timestamp(1, 1).unwrap(), last_block_id: None, last_commit_hash: ::celestia_tendermint::Hash::Sha256([0; 32]), @@ -158,7 +161,7 @@ macro_rules! mount_celestia_header_network_head { ) => { $test_env .mount_celestia_header_network_head( - $crate::celestia_network_head!(height: $height) + $crate::celestia_network_head!(height: $height, chain_id: $crate::helpers::CELESTIA_CHAIN_ID), ) .await; } @@ -365,7 +368,7 @@ macro_rules! mount_sequencer_validator_set { #[macro_export] macro_rules! mount_sequencer_genesis { ($test_env:ident) => { - $test_env.mount_genesis().await; + $test_env.mount_genesis(SEQUENCER_CHAIN_ID).await; }; } diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 04d1c6aee9..8e95499ceb 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -43,6 +43,7 @@ use astria_eyre; pub use mock_grpc::MockGrpc; use serde_json::json; use tracing::debug; +use wiremock::MockServer; pub const CELESTIA_BEARER_TOKEN: &str = "ABCDEFGH"; @@ -50,6 +51,7 @@ pub const ROLLUP_ID: RollupId = RollupId::new([42; 32]); pub static ROLLUP_ID_BYTES: Bytes = Bytes::from_static(&RollupId::get(ROLLUP_ID)); pub const SEQUENCER_CHAIN_ID: &str = "test_sequencer-1000"; +pub const CELESTIA_CHAIN_ID: &str = "test_celestia-1000"; pub const INITIAL_SOFT_HASH: [u8; 64] = [1; 64]; pub const INITIAL_FIRM_HASH: [u8; 64] = [1; 64]; @@ -245,6 +247,7 @@ impl TestConductor { header, }, Mock, + Request, ResponseTemplate, }; Mock::given(body_partial_json( @@ -254,11 +257,15 @@ impl TestConductor { "authorization", &*format!("Bearer {CELESTIA_BEARER_TOKEN}"), )) - .respond_with(ResponseTemplate::new(200).set_body_json(json!({ - "jsonrpc": "2.0", - "id": 0, - "result": extended_header - }))) + .respond_with(move |request: &Request| { + let body: serde_json::Value = serde_json::from_slice(&request.body).unwrap(); + let id = body.get("id"); + ResponseTemplate::new(200).set_body_json(json!({ + "jsonrpc": "2.0", + "id": id, + "result": extended_header + })) + }) .expect(1..) .mount(&self.mock_http) .await; @@ -294,66 +301,8 @@ impl TestConductor { .await; } - pub async fn mount_genesis(&self) { - use tendermint::{ - consensus::{ - params::{ - AbciParams, - ValidatorParams, - }, - Params, - }, - genesis::Genesis, - time::Time, - }; - use wiremock::{ - matchers::body_partial_json, - Mock, - ResponseTemplate, - }; - Mock::given(body_partial_json( - json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), - )) - .respond_with(ResponseTemplate::new(200).set_body_json( - tendermint_rpc::response::Wrapper::new_with_id( - tendermint_rpc::Id::uuid_v4(), - Some( - tendermint_rpc::endpoint::genesis::Response:: { - genesis: Genesis { - genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), - chain_id: SEQUENCER_CHAIN_ID.try_into().unwrap(), - initial_height: 1, - consensus_params: Params { - block: tendermint::block::Size { - max_bytes: 1024, - max_gas: 1024, - time_iota_ms: 1000, - }, - evidence: tendermint::evidence::Params { - max_age_num_blocks: 1000, - max_age_duration: tendermint::evidence::Duration( - Duration::from_secs(3600), - ), - max_bytes: 1_048_576, - }, - validator: ValidatorParams { - pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], - }, - version: None, - abci: AbciParams::default(), - }, - validators: vec![], - app_hash: tendermint::hash::AppHash::default(), - app_state: serde_json::Value::Null, - }, - }, - ), - None, - ), - )) - .expect(1..) - .mount(&self.mock_http) - .await; + pub async fn mount_genesis(&self, chain_id: &str) { + mount_genesis(&self.mock_http, chain_id).await; } pub async fn mount_get_genesis_info(&self, genesis_info: GenesisInfo) { @@ -501,7 +450,69 @@ impl TestConductor { } } -fn make_config() -> Config { +pub async fn mount_genesis(mock_http: &MockServer, chain_id: &str) { + use tendermint::{ + consensus::{ + params::{ + AbciParams, + ValidatorParams, + }, + Params, + }, + genesis::Genesis, + time::Time, + }; + use wiremock::{ + matchers::body_partial_json, + Mock, + ResponseTemplate, + }; + Mock::given(body_partial_json( + json!({"jsonrpc": "2.0", "method": "genesis", "params": null}), + )) + .respond_with(ResponseTemplate::new(200).set_body_json( + tendermint_rpc::response::Wrapper::new_with_id( + tendermint_rpc::Id::uuid_v4(), + Some( + tendermint_rpc::endpoint::genesis::Response:: { + genesis: Genesis { + genesis_time: Time::from_unix_timestamp(1, 1).unwrap(), + chain_id: chain_id.try_into().unwrap(), + initial_height: 1, + consensus_params: Params { + block: tendermint::block::Size { + max_bytes: 1024, + max_gas: 1024, + time_iota_ms: 1000, + }, + evidence: tendermint::evidence::Params { + max_age_num_blocks: 1000, + max_age_duration: tendermint::evidence::Duration( + Duration::from_secs(3600), + ), + max_bytes: 1_048_576, + }, + validator: ValidatorParams { + pub_key_types: vec![tendermint::public_key::Algorithm::Ed25519], + }, + version: None, + abci: AbciParams::default(), + }, + validators: vec![], + app_hash: tendermint::hash::AppHash::default(), + app_state: serde_json::Value::Null, + }, + }, + ), + None, + ), + )) + .expect(1..) + .mount(mock_http) + .await; +} + +pub(crate) fn make_config() -> Config { Config { celestia_block_time_ms: 12000, celestia_node_http_url: "http://127.0.0.1:26658".into(), @@ -511,6 +522,8 @@ fn make_config() -> Config { sequencer_cometbft_url: "http://127.0.0.1:26657".into(), sequencer_requests_per_second: 500, sequencer_block_time_ms: 2000, + expected_celestia_chain_id: CELESTIA_CHAIN_ID.into(), + expected_sequencer_chain_id: SEQUENCER_CHAIN_ID.into(), execution_rpc_url: "http://127.0.0.1:50051".into(), log: "info".into(), execution_commit_level: astria_conductor::config::CommitLevel::SoftAndFirm, diff --git a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs index c3834f4cfe..1054c43f80 100644 --- a/crates/astria-conductor/tests/blackbox/soft_and_firm.rs +++ b/crates/astria-conductor/tests/blackbox/soft_and_firm.rs @@ -22,6 +22,7 @@ use crate::{ mount_sequencer_genesis, mount_sequencer_validator_set, mount_update_commitment_state, + SEQUENCER_CHAIN_ID, }; /// Tests if a single block is executed and the rollup's state updated (first soft, then firm). diff --git a/crates/astria-conductor/tests/blackbox/soft_only.rs b/crates/astria-conductor/tests/blackbox/soft_only.rs index 4795979f2c..6f924c2eb2 100644 --- a/crates/astria-conductor/tests/blackbox/soft_only.rs +++ b/crates/astria-conductor/tests/blackbox/soft_only.rs @@ -1,20 +1,38 @@ use std::time::Duration; -use astria_conductor::config::CommitLevel; +use astria_conductor::{ + config::CommitLevel, + Conductor, + Config, +}; +use astria_core::generated::execution::v1alpha2::{ + GetCommitmentStateRequest, + GetGenesisInfoRequest, +}; use futures::future::{ join, join4, }; +use telemetry::metrics; use tokio::time::timeout; use crate::{ - helpers::spawn_conductor, + commitment_state, + genesis_info, + helpers::{ + make_config, + mount_genesis, + spawn_conductor, + MockGrpc, + }, mount_abci_info, mount_executed_block, mount_get_commitment_state, mount_get_filtered_sequencer_block, mount_get_genesis_info, + mount_sequencer_genesis, mount_update_commitment_state, + SEQUENCER_CHAIN_ID, }; #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -42,6 +60,8 @@ async fn simple() { base_celestia_height: 1, ); + mount_sequencer_genesis!(test_conductor); + mount_abci_info!( test_conductor, latest_sequencer_height: 3, @@ -113,6 +133,8 @@ async fn submits_two_heights_in_succession() { base_celestia_height: 1, ); + mount_sequencer_genesis!(test_conductor); + mount_abci_info!( test_conductor, latest_sequencer_height: 4, @@ -217,6 +239,8 @@ async fn skips_already_executed_heights() { base_celestia_height: 1, ); + mount_sequencer_genesis!(test_conductor); + mount_abci_info!( test_conductor, latest_sequencer_height: 7, @@ -288,6 +312,8 @@ async fn requests_from_later_genesis_height() { base_celestia_height: 1, ); + mount_sequencer_genesis!(test_conductor); + mount_abci_info!( test_conductor, latest_sequencer_height: 12, @@ -333,3 +359,96 @@ async fn requests_from_later_genesis_height() { within 1000ms", ); } + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn exits_on_sequencer_chain_id_mismatch() { + use astria_grpc_mock::{ + matcher, + response as GrpcResponse, + Mock as GrpcMock, + }; + + // FIXME (https://github.com/astriaorg/astria/issues/1602) + // We have to create our own test conductor and perform mounts manually because `TestConductor` + // implements the `Drop` trait, which disallows us from taking ownership of its tasks and + // awaiting their completion. + + let mock_grpc = MockGrpc::spawn().await; + let mock_http = wiremock::MockServer::start().await; + + let config = Config { + celestia_node_http_url: mock_http.uri(), + execution_rpc_url: format!("http://{}", mock_grpc.local_addr), + sequencer_cometbft_url: mock_http.uri(), + sequencer_grpc_url: format!("http://{}", mock_grpc.local_addr), + execution_commit_level: CommitLevel::SoftOnly, + ..make_config() + }; + + let (metrics, _) = metrics::ConfigBuilder::new() + .set_global_recorder(false) + .build(&()) + .unwrap(); + let metrics = Box::leak(Box::new(metrics)); + + let conductor = { + let conductor = Conductor::new(config, metrics).unwrap(); + conductor.spawn() + }; + + GrpcMock::for_rpc_given( + "get_genesis_info", + matcher::message_type::(), + ) + .respond_with(GrpcResponse::constant_response( + genesis_info!(sequencer_genesis_block_height: 1, + celestia_block_variance: 10,), + )) + .expect(0..) + .mount(&mock_grpc.mock_server) + .await; + + GrpcMock::for_rpc_given( + "get_commitment_state", + matcher::message_type::(), + ) + .respond_with(GrpcResponse::constant_response(commitment_state!(firm: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + soft: ( + number: 1, + hash: [1; 64], + parent: [0; 64], + ), + base_celestia_height: 1,))) + .expect(0..) + .mount(&mock_grpc.mock_server) + .await; + + let bad_chain_id = "bad_chain_id"; + mount_genesis(&mock_http, bad_chain_id).await; + + let res = conductor.await; + match res { + Ok(()) => panic!("conductor should have exited with an error, no error received"), + Err(e) => { + let mut source = e.source(); + while source.is_some() { + let err = source.unwrap(); + if err.to_string().contains( + format!( + "expected chain id `{SEQUENCER_CHAIN_ID}` does not match actual: \ + `{bad_chain_id}`" + ) + .as_str(), + ) { + return; + } + source = err.source(); + } + panic!("expected exit due to chain ID mismatch, but got a different error: {e:?}") + } + } +} diff --git a/dev/values/rollup/dev.yaml b/dev/values/rollup/dev.yaml index e53a8c73e7..d40db84220 100644 --- a/dev/values/rollup/dev.yaml +++ b/dev/values/rollup/dev.yaml @@ -6,6 +6,7 @@ global: sequencerRpc: http://node0-sequencer-rpc-service.astria-dev-cluster.svc.cluster.local:26657 sequencerGrpc: http://node0-sequencer-grpc-service.astria-dev-cluster.svc.cluster.local:8080 sequencerChainId: sequencer-test-chain-0 + celestiaChainId: celestia-local-0 evm-rollup: genesis: