Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(conductor)!: implement chain ID checks #1482

Merged
merged 24 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
f63218c
Implemented chain ID checks for conductor
ethanoroshiba Sep 10, 2024
6b249a4
Minor updates
ethanoroshiba Sep 10, 2024
2e2bec9
Helm update
ethanoroshiba Sep 10, 2024
000cc2d
Evm stack version bump
ethanoroshiba Sep 10, 2024
5389721
evm-rollup inherits celestiaChainId from evm-stack
ethanoroshiba Sep 11, 2024
a5841c7
fix charts and smoke test
ethanoroshiba Sep 11, 2024
a6a0922
Merge branch 'main' into ENG-73/conductor_chain_id_verification
ethanoroshiba Sep 11, 2024
94ea853
minor improvements
ethanoroshiba Sep 11, 2024
0629f41
Slight improvements
ethanoroshiba Sep 11, 2024
fc1c5e5
Merge branch 'main' into ENG-73/conductor_chain_id_verification
ethanoroshiba Sep 16, 2024
5d75531
Bump chart versions
ethanoroshiba Sep 16, 2024
8a6c3ab
Merge branch 'main' into ENG-73/conductor_chain_id_verification
ethanoroshiba Sep 20, 2024
ee4e63c
Requested changes
ethanoroshiba Sep 20, 2024
0e01787
Clean up tests
ethanoroshiba Sep 20, 2024
a3f704a
Merge branch 'main' into ENG-73/conductor_chain_id_verification
ethanoroshiba Sep 25, 2024
7898474
Requested changes
ethanoroshiba Sep 25, 2024
f71c000
tapl fmt
ethanoroshiba Sep 25, 2024
1940d37
Chart bumps
ethanoroshiba Sep 25, 2024
878e84f
Requested changes
ethanoroshiba Sep 30, 2024
8510b64
Merge branch 'main' into ENG-73/conductor_chain_id_verification
ethanoroshiba Sep 30, 2024
c0be38b
requested changes
ethanoroshiba Sep 30, 2024
05a50b3
Merge branch 'ENG-73/conductor_chain_id_verification' of https://gith…
ethanoroshiba Sep 30, 2024
4d1f5aa
fix lasta few nits
SuperFluffy Oct 1, 2024
91c9768
Remove last ErrReport
ethanoroshiba Oct 1, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions charts/evm-rollup/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.27.3
version: 0.27.4

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
appVersion: "0.14.1"
appVersion: "0.14.2"

maintainers:
- name: wafflesvonmaple
Expand Down
2 changes: 2 additions & 0 deletions charts/evm-rollup/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ data:
OTEL_SERVICE_NAME: "{{ tpl .Values.otel.serviceNamePrefix . }}-conductor"
{{- if not .Values.global.dev }}
{{- else }}
ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID: "{{ tpl .Values.config.conductor.sequencerChainId . }}"
ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID: "{{ tpl .Values.config.conductor.celestiaChainId . }}"
{{- end }}
---
apiVersion: v1
Expand Down
4 changes: 3 additions & 1 deletion charts/evm-rollup/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ images:
conductor:
repo: ghcr.io/astriaorg/conductor
pullPolicy: IfNotPresent
tag: "0.20.1"
tag: "0.21.0"
devTag: latest


Expand Down Expand Up @@ -178,6 +178,8 @@ config:
sequencerGrpc: ""
# The maximum number of requests to make to the sequencer per second
sequencerRequestsPerSecond: 500
# The chain id of the celestia network the conductor communicates with
celestiaChainId: ""

celestia:
# if config.rollup.executionLevel is NOT 'SoftOnly' AND celestia-node is not enabled
Expand Down
6 changes: 3 additions & 3 deletions charts/evm-stack/Chart.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dependencies:
version: 0.3.6
- name: evm-rollup
repository: file://../evm-rollup
version: 0.27.3
version: 0.27.4
- name: composer
repository: file://../composer
version: 0.1.4
Expand All @@ -20,5 +20,5 @@ dependencies:
- name: blockscout-stack
repository: https://blockscout.github.io/helm-charts
version: 1.6.2
digest: sha256:6e62801b5f401ba653f88a5ed9d33a6de38b8bba5ba942d01a2af68371c8bfd8
generated: "2024-09-19T12:52:41.503045-07:00"
digest: sha256:b086adf099e986e3a5c1f7f25481aaf42ebf597029a70ee0bd3ff6711e6bdccf
generated: "2024-09-25T14:31:21.35488-05:00"
4 changes: 2 additions & 2 deletions charts/evm-stack/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 0.6.0
version: 0.6.1

dependencies:
- name: celestia-node
version: 0.3.6
repository: "file://../celestia-node"
condition: celestia-node.enabled
- name: evm-rollup
version: 0.27.3
version: 0.27.4
repository: "file://../evm-rollup"
- name: composer
version: 0.1.4
Expand Down
2 changes: 2 additions & 0 deletions charts/evm-stack/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ global:
rollupName: ""
evmChainId: ""
sequencerChainId: ""
celestiaChainId: ""
otel:
endpoint: ""
tracesEndpoint: ""
Expand All @@ -29,6 +30,7 @@ evm-rollup:
config:
conductor:
sequencerChainId: "{{ .Values.global.sequencerChainId }}"
celestiaChainId: "{{ .Values.global.celestiaChainId }}"
sequencerRpc: "{{ .Values.global.sequencerRpc }}"
sequencerGrpc: "{{ .Values.global.sequencerGrpc }}"
otel:
Expand Down
1 change: 1 addition & 0 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ tower = { version = "0.4.13", features = ["limit"] }
# when updating.
celestia-rpc = "0.1.1"
celestia-types = { workspace = true }
celestia-tendermint = { workspace = true }
jsonrpsee = { version = "0.20", features = ["client-core", "macros"] }

[dev-dependencies]
Expand Down
6 changes: 6 additions & 0 deletions crates/astria-conductor/local.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ ASTRIA_CONDUCTOR_SEQUENCER_BLOCK_TIME_MS=2000
# CometBFT node.
ASTRIA_CONDUCTOR_SEQUENCER_REQUESTS_PER_SECOND=500

# The chain ID of the sequencer network the conductor should be communicating with.
ASTRIA_CONDUCTOR_EXPECTED_SEQUENCER_CHAIN_ID="test-sequencer-1000"

# The chain ID of the Celestia network the conductor should be communicating with.
ASTRIA_CONDUCTOR_EXPECTED_CELESTIA_CHAIN_ID="test-celestia-1000"

# Set to true to enable prometheus metrics.
ASTRIA_CONDUCTOR_NO_METRICS=true

Expand Down
6 changes: 6 additions & 0 deletions crates/astria-conductor/src/celestia/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub(crate) struct Builder {
pub(crate) executor: executor::Handle,
pub(crate) sequencer_cometbft_client: SequencerClient,
pub(crate) sequencer_requests_per_second: u32,
pub(crate) expected_celestia_chain_id: String,
pub(crate) expected_sequencer_chain_id: String,
pub(crate) shutdown: CancellationToken,
pub(crate) metrics: &'static Metrics,
}
Expand All @@ -37,6 +39,8 @@ impl Builder {
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
} = self;
Expand All @@ -50,6 +54,8 @@ impl Builder {
executor,
sequencer_cometbft_client,
sequencer_requests_per_second,
expected_celestia_chain_id,
expected_sequencer_chain_id,
shutdown,
metrics,
})
Expand Down
74 changes: 67 additions & 7 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use astria_core::{
use astria_eyre::eyre::{
self,
bail,
ensure,
WrapErr as _,
};
use bytes::Bytes;
use celestia_rpc::HeaderClient as _;
use celestia_types::nmt::Namespace;
use futures::{
future::{
Expand Down Expand Up @@ -139,6 +141,12 @@ pub(crate) struct Reader {
/// (usually to verify block data retrieved from Celestia blobs).
sequencer_requests_per_second: u32,

/// The chain ID of the Celestia network the reader should be communicating with.
expected_celestia_chain_id: String,

/// The chain ID of the Sequencer the reader should be communicating with.
expected_sequencer_chain_id: String,

/// Token to listen for Conductor being shut down.
shutdown: CancellationToken,

Expand All @@ -147,7 +155,7 @@ pub(crate) struct Reader {

impl Reader {
pub(crate) async fn run_until_stopped(mut self) -> eyre::Result<()> {
let (executor, sequencer_chain_id) = select!(
let ((), executor, sequencer_chain_id) = select!(
() = self.shutdown.clone().cancelled_owned() => {
info_span!("conductor::celestia::Reader::run_until_stopped").in_scope(||
info!("received shutdown signal while waiting for Celestia reader task to initialize")
Expand All @@ -169,24 +177,76 @@ impl Reader {
#[instrument(skip_all, err)]
async fn initialize(
ethanoroshiba marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
) -> eyre::Result<(executor::Handle<StateIsInit>, tendermint::chain::Id)> {
) -> eyre::Result<((), executor::Handle<StateIsInit>, tendermint::chain::Id)> {
let validate_celestia_chain_id = async {
let actual_celestia_chain_id = get_celestia_chain_id(&self.celestia_client)
.await
.wrap_err("failed to fetch Celestia chain ID")?;
let expected_celestia_chain_id = &self.expected_celestia_chain_id;
ensure!(
self.expected_celestia_chain_id == actual_celestia_chain_id.as_str(),
"expected Celestia chain id `{expected_celestia_chain_id}` does not match actual: \
`{actual_celestia_chain_id}`"
);
Ok(())
};

let wait_for_init_executor = async {
self.executor
.wait_for_init()
.await
.wrap_err("handle to executor failed while waiting for it being initialized")
};

let get_sequencer_chain_id = async {
get_sequencer_chain_id(self.sequencer_cometbft_client.clone())
.await
.wrap_err("failed to get sequencer chain ID")
let get_and_validate_sequencer_chain_id = async {
let actual_sequencer_chain_id =
get_sequencer_chain_id(self.sequencer_cometbft_client.clone())
.await
.wrap_err("failed to get sequencer chain ID")?;
let expected_sequencer_chain_id = &self.expected_sequencer_chain_id;
ensure!(
self.expected_sequencer_chain_id == actual_sequencer_chain_id.to_string(),
"expected Celestia chain id `{expected_sequencer_chain_id}` does not match \
actual: `{actual_sequencer_chain_id}`"
);
Ok(actual_sequencer_chain_id)
};

try_join!(wait_for_init_executor, get_sequencer_chain_id)
try_join!(
validate_celestia_chain_id,
wait_for_init_executor,
get_and_validate_sequencer_chain_id
)
}
}

#[instrument(skip_all, err)]
async fn get_celestia_chain_id(
celestia_client: &CelestiaClient,
) -> eyre::Result<celestia_tendermint::chain::Id> {
let retry_config = tryhard::RetryFutureConfig::new(u32::MAX)
.exponential_backoff(Duration::from_millis(100))
.max_delay(Duration::from_secs(20))
.on_retry(
|attempt: u32, next_delay: Option<Duration>, error: &jsonrpsee::core::Error| {
let wait_duration = next_delay
.map(humantime::format_duration)
.map(tracing::field::display);
warn!(
attempt,
wait_duration,
error = error as &dyn std::error::Error,
"attempt to fetch celestia network header info; retrying after backoff",
);
futures::future::ready(())
},
);
let network_head = tryhard::retry_fn(|| celestia_client.header_network_head())
.with_config(retry_config)
.await?;
Ok(network_head.chain_id().clone())
}

struct RunningReader {
block_cache: BlockCache<ReconstructedBlock>,

Expand Down
22 changes: 17 additions & 5 deletions crates/astria-conductor/src/conductor/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::{
use astria_eyre::eyre::{
self,
eyre,
Result,
WrapErr as _,
};
use itertools::Itertools as _;
Expand Down Expand Up @@ -55,12 +56,12 @@ pin_project! {
/// A handle returned by [`ConductorInner::spawn`].
pub(super) struct InnerHandle {
shutdown_token: CancellationToken,
task: Option<tokio::task::JoinHandle<RestartOrShutdown>>,
task: Option<tokio::task::JoinHandle<Result<RestartOrShutdown>>>,
}
}

impl Future for InnerHandle {
type Output = Result<RestartOrShutdown, tokio::task::JoinError>;
type Output = Result<Result<RestartOrShutdown>, tokio::task::JoinError>;

fn poll(
self: std::pin::Pin<&mut Self>,
Expand Down Expand Up @@ -132,6 +133,7 @@ impl ConductorInner {
sequencer_grpc_client,
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_block_time: Duration::from_millis(cfg.sequencer_block_time_ms),
expected_sequencer_chain_id: cfg.expected_sequencer_chain_id.clone(),
shutdown: shutdown_token.clone(),
executor: executor_handle.clone(),
}
Expand All @@ -153,6 +155,8 @@ impl ConductorInner {
executor: executor_handle.clone(),
sequencer_cometbft_client: sequencer_cometbft_client.clone(),
sequencer_requests_per_second: cfg.sequencer_requests_per_second,
expected_celestia_chain_id: cfg.expected_celestia_chain_id,
expected_sequencer_chain_id: cfg.expected_sequencer_chain_id,
shutdown: shutdown_token.clone(),
metrics,
}
Expand All @@ -172,7 +176,7 @@ impl ConductorInner {
///
/// # Panics
/// Panics if it could not install a signal handler.
async fn run_until_stopped(mut self) -> RestartOrShutdown {
async fn run_until_stopped(mut self) -> Result<RestartOrShutdown> {
info_span!("Conductor::run_until_stopped").in_scope(|| info!("conductor is running"));

let exit_reason = select! {
Expand Down Expand Up @@ -219,7 +223,7 @@ impl ConductorInner {
/// because kubernetes issues SIGKILL 30 seconds after SIGTERM, giving 5 seconds
/// to abort the remaining tasks.
#[instrument(skip_all)]
async fn shutdown(mut self, exit_reason: ExitReason) -> RestartOrShutdown {
async fn shutdown(mut self, exit_reason: ExitReason) -> Result<RestartOrShutdown> {
self.shutdown_token.cancel();
let mut restart_or_shutdown = RestartOrShutdown::Shutdown;

Expand Down Expand Up @@ -273,7 +277,15 @@ impl ConductorInner {
}
info!("shutting down");

restart_or_shutdown
if let ExitReason::TaskFailed {
error, ..
} = exit_reason
{
if matches!(restart_or_shutdown, RestartOrShutdown::Shutdown) {
return Err(error);
}
}
Ok(restart_or_shutdown)
}
}

Expand Down
10 changes: 7 additions & 3 deletions crates/astria-conductor/src/conductor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ mod inner;

use std::future::Future;

use astria_eyre::eyre;
use astria_eyre::eyre::{
self,
Result,
};
use inner::{
ConductorInner,
InnerHandle,
Expand Down Expand Up @@ -128,16 +131,17 @@ impl Conductor {
#[instrument(skip_all, err)]
async fn shutdown_or_restart(
&mut self,
exit_reason: Result<RestartOrShutdown, JoinError>,
exit_reason: Result<Result<RestartOrShutdown>, JoinError>,
) -> eyre::Result<&'static str> {
match exit_reason {
Ok(restart_or_shutdown) => match restart_or_shutdown {
Ok(Ok(restart_or_shutdown)) => match restart_or_shutdown {
RestartOrShutdown::Restart => {
self.restart();
return Ok("restarting");
}
RestartOrShutdown::Shutdown => Ok("conductor exiting"),
},
Ok(Err(err)) => Err(err.wrap_err("conductor failed failed")),
Err(err) => Err(eyre::ErrReport::from(err).wrap_err("conductor failed")),
ethanoroshiba marked this conversation as resolved.
Show resolved Hide resolved
}
}
Expand Down
Loading
Loading