Skip to content

Commit

Permalink
add relayer metrics for fee recording
Browse files Browse the repository at this point in the history
  • Loading branch information
Fraser999 committed Oct 25, 2024
1 parent 751ce8d commit f67d273
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 58 deletions.
12 changes: 1 addition & 11 deletions crates/astria-bridge-withdrawer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,8 @@ impl Metrics {
self.sequencer_submission_failure_count.increment(1);
}

#[expect(
clippy::cast_precision_loss,
reason = "metric with potential loss of precision, logging when it occurs"
)]
pub(crate) fn set_batch_total_settled_value(&self, value: u128) {
if value > u128::from(u32::MAX) {
tracing::warn!(
"{BATCH_TOTAL_SETTLED_VALUE} set with {value} which exceeds u32::MAX, precision \
loss in metric"
);
}
self.batch_total_settled_value.set(value as f64);
self.batch_total_settled_value.set(value);
}
}

Expand Down
75 changes: 60 additions & 15 deletions crates/astria-sequencer-relayer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub struct Metrics {
sequencer_height_fetch_failure_count: Counter,
sequencer_submission_height: Counter,
compression_ratio_for_astria_block: Gauge,
celestia_fees_total_utia: Gauge,
celestia_fees_utia_per_uncompressed_blob_byte: Gauge,
celestia_fees_utia_per_compressed_blob_byte: Gauge,
}

impl Metrics {
Expand Down Expand Up @@ -73,11 +76,28 @@ impl Metrics {
pub(crate) fn set_compression_ratio_for_astria_block(&self, ratio: f64) {
self.compression_ratio_for_astria_block.set(ratio);
}

pub(crate) fn set_celestia_fees_total_utia(&self, utia: u64) {
self.celestia_fees_total_utia.set(utia);
}

pub(crate) fn set_celestia_fees_utia_per_uncompressed_blob_byte(&self, utia: f64) {
self.celestia_fees_utia_per_uncompressed_blob_byte.set(utia);
}

pub(crate) fn set_celestia_fees_utia_per_compressed_blob_byte(&self, utia: f64) {
self.celestia_fees_utia_per_compressed_blob_byte.set(utia);
}
}

impl telemetry::Metrics for Metrics {
type Config = ();

#[expect(
clippy::too_many_lines,
reason = "this is reasonable as we have a lot of metrics to register; the function is not \
complex, just long"
)]
fn register(
builder: &mut RegisteringBuilder,
_config: &Self::Config,
Expand Down Expand Up @@ -171,6 +191,29 @@ impl telemetry::Metrics for Metrics {
)?
.register()?;

let celestia_fees_total_utia = builder
.new_gauge_factory(
CELESTIA_FEES_TOTAL_UTIA,
"The total Celestia fees in utia for the latest successful submission",
)?
.register()?;

let celestia_fees_utia_per_uncompressed_blob_byte = builder
.new_gauge_factory(
CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE,
"The Celestia fees in utia per uncompressed blob byte for the latest successful \
submission",
)?
.register()?;

let celestia_fees_utia_per_compressed_blob_byte = builder
.new_gauge_factory(
CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE,
"The Celestia fees in utia per compressed blob byte for the latest successful \
submission",
)?
.register()?;

Ok(Self {
celestia_submission_height,
celestia_submission_count,
Expand All @@ -184,6 +227,9 @@ impl telemetry::Metrics for Metrics {
sequencer_height_fetch_failure_count,
sequencer_submission_height,
compression_ratio_for_astria_block,
celestia_fees_total_utia,
celestia_fees_utia_per_uncompressed_blob_byte,
celestia_fees_utia_per_compressed_blob_byte,
})
}
}
Expand All @@ -200,25 +246,15 @@ metric_names!(const METRICS_NAMES:
SEQUENCER_BLOCK_FETCH_FAILURE_COUNT,
SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT,
SEQUENCER_SUBMISSION_HEIGHT,
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK,
CELESTIA_FEES_TOTAL_UTIA,
CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE,
CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE
);

#[cfg(test)]
mod tests {
use super::{
BLOBS_PER_CELESTIA_TX,
BLOCKS_PER_CELESTIA_TX,
BYTES_PER_CELESTIA_TX,
CELESTIA_PAYLOAD_CREATION_LATENCY,
CELESTIA_SUBMISSION_COUNT,
CELESTIA_SUBMISSION_FAILURE_COUNT,
CELESTIA_SUBMISSION_HEIGHT,
CELESTIA_SUBMISSION_LATENCY,
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK,
SEQUENCER_BLOCK_FETCH_FAILURE_COUNT,
SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT,
SEQUENCER_SUBMISSION_HEIGHT,
};
use super::*;

#[track_caller]
fn assert_const(actual: &'static str, suffix: &str) {
Expand Down Expand Up @@ -257,5 +293,14 @@ mod tests {
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK,
"compression_ratio_for_astria_block",
);
assert_const(CELESTIA_FEES_TOTAL_UTIA, "celestia_fees_total_utia");
assert_const(
CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE,
"celestia_fees_utia_per_uncompressed_blob_byte",
);
assert_const(
CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE,
"celestia_fees_utia_per_compressed_blob_byte",
);
}
}
46 changes: 29 additions & 17 deletions crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl CelestiaClient {
&mut self,
blobs: Arc<Vec<Blob>>,
maybe_last_error: Option<TrySubmitError>,
) -> Result<BlobTx, TrySubmitError> {
) -> Result<BlobTxAndFee, TrySubmitError> {
info!("fetching cost params and account info from celestia app");
let (blob_params, auth_params, min_gas_price, base_account) = tokio::try_join!(
self.fetch_blob_params(),
Expand Down Expand Up @@ -196,7 +196,7 @@ impl CelestiaClient {
"prepared blob transaction for celestia app"
);

Ok(new_blob_tx(&signed_tx, blobs.iter()))
Ok(BlobTxAndFee::new(&signed_tx, blobs.iter(), fee))
}

#[instrument(skip_all, err(level = Level::WARN))]
Expand Down Expand Up @@ -775,22 +775,34 @@ fn new_signed_tx(
}
}

fn new_blob_tx<'a>(signed_tx: &Tx, blobs: impl Iterator<Item = &'a Blob>) -> BlobTx {
// From https://github.com/celestiaorg/celestia-core/blob/v1.29.0-tm-v0.34.29/pkg/consts/consts.go#L19
const BLOB_TX_TYPE_ID: &str = "BLOB";
pub(in crate::relayer) struct BlobTxAndFee {
pub(in crate::relayer) tx: BlobTx,
pub(in crate::relayer) fee: u64,
}

let blobs = blobs
.map(|blob| PbBlob {
namespace_id: Bytes::from(blob.namespace.id().to_vec()),
namespace_version: u32::from(blob.namespace.version()),
data: Bytes::from(blob.data.clone()),
share_version: u32::from(blob.share_version),
})
.collect();
BlobTx {
tx: Bytes::from(signed_tx.encode_to_vec()),
blobs,
type_id: BLOB_TX_TYPE_ID.to_string(),
impl BlobTxAndFee {
fn new<'a>(signed_tx: &Tx, blobs: impl Iterator<Item = &'a Blob>, fee: u64) -> Self {
// From https://github.com/celestiaorg/celestia-core/blob/v1.29.0-tm-v0.34.29/pkg/consts/consts.go#L19
const BLOB_TX_TYPE_ID: &str = "BLOB";

let blobs = blobs
.map(|blob| PbBlob {
namespace_id: Bytes::from(blob.namespace.id().to_vec()),
namespace_version: u32::from(blob.namespace.version()),
data: Bytes::from(blob.data.clone()),
share_version: u32::from(blob.share_version),
})
.collect();
let tx = BlobTx {
tx: Bytes::from(signed_tx.encode_to_vec()),
blobs,
type_id: BLOB_TX_TYPE_ID.to_string(),
};

Self {
tx,
fee,
}
}
}

Expand Down
53 changes: 42 additions & 11 deletions crates/astria-sequencer-relayer/src/relayer/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ use crate::{
mod conversion;
use conversion::NextSubmission;

/// A simple, passive object to allow the Celestia fee to be returned along with the
/// `StartedSubmission` state when attempting to submit.
struct StartedSubmissionAndFee {
new_state: StartedSubmission,
fee: Option<u64>,
}

#[derive(Clone)]
pub(super) struct BlobSubmitterHandle {
tx: mpsc::Sender<SequencerBlock>,
Expand Down Expand Up @@ -349,10 +356,12 @@ async fn submit_blobs(
started_submission: StartedSubmission,
metrics: &'static Metrics,
) -> eyre::Result<StartedSubmission> {
let total_data_uncompressed_size = data.uncompressed_size();
let total_data_compressed_size = data.compressed_size();
info!(
blocks = %telemetry::display::json(&data.input_metadata()),
total_data_uncompressed_size = data.uncompressed_size(),
total_data_compressed_size = data.compressed_size(),
total_data_uncompressed_size,
total_data_compressed_size,
compression_ratio = data.compression_ratio(),
"initiated submission of sequencer blocks converted to Celestia blobs",
);
Expand All @@ -368,7 +377,10 @@ async fn submit_blobs(
let largest_sequencer_height = data.greatest_sequencer_height();
let blobs = data.into_blobs();

let new_state = submit_with_retry(
let StartedSubmissionAndFee {
new_state,
fee,
} = submit_with_retry(
client,
blobs,
state.clone(),
Expand All @@ -383,6 +395,17 @@ async fn submit_blobs(
metrics.absolute_set_sequencer_submission_height(largest_sequencer_height.value());
metrics.absolute_set_celestia_submission_height(celestia_height);
metrics.record_celestia_submission_latency(start.elapsed());
#[expect(
clippy::cast_precision_loss,
reason = "precision loss here is unlikely, but is acceptable for these metrics"
)]
if let Some(fee) = fee {
metrics.set_celestia_fees_total_utia(fee);
let cost_uncompressed = fee as f64 / total_data_uncompressed_size as f64;
metrics.set_celestia_fees_utia_per_uncompressed_blob_byte(cost_uncompressed);
let cost_compressed = fee as f64 / total_data_compressed_size as f64;
metrics.set_celestia_fees_utia_per_compressed_blob_byte(cost_compressed);
}

info!(%celestia_height, "successfully submitted blobs to Celestia");

Expand Down Expand Up @@ -453,7 +476,7 @@ async fn submit_with_retry(
started_submission: StartedSubmission,
largest_sequencer_height: SequencerHeight,
metrics: &'static Metrics,
) -> eyre::Result<StartedSubmission> {
) -> eyre::Result<StartedSubmissionAndFee> {
// Moving the span into `on_retry`, because tryhard spawns these in a tokio
// task, losing the span.
let span = Span::current();
Expand Down Expand Up @@ -501,7 +524,7 @@ async fn submit_with_retry(

let blobs = Arc::new(blobs);

let final_state = tryhard::retry_fn(move || {
let final_state_and_fee = tryhard::retry_fn(move || {
try_submit(
client.clone(),
blobs.clone(),
Expand All @@ -514,7 +537,7 @@ async fn submit_with_retry(
.in_current_span()
.await
.wrap_err("finished trying to submit")?;
Ok(final_state)
Ok(final_state_and_fee)
}

#[instrument(skip_all, err(level = Level::WARN))]
Expand All @@ -524,7 +547,7 @@ async fn try_submit(
started_submission: StartedSubmission,
largest_sequencer_height: SequencerHeight,
last_error_receiver: watch::Receiver<Option<SubmissionError>>,
) -> Result<StartedSubmission, SubmissionError> {
) -> Result<StartedSubmissionAndFee, SubmissionError> {
// Get the error from the last attempt to `try_submit`.
let maybe_last_error = last_error_receiver.borrow().clone();
let maybe_try_submit_error = match maybe_last_error {
Expand All @@ -534,7 +557,10 @@ async fn try_submit(
try_confirm_submission_from_failed_attempt(client.clone(), prepared_submission)
.await?
{
return Ok(new_state);
return Ok(StartedSubmissionAndFee {
new_state,
fee: None,
});
}
None
}
Expand All @@ -545,18 +571,23 @@ async fn try_submit(
None => None,
};

let blob_tx = client.try_prepare(blobs, maybe_try_submit_error).await?;
let blob_tx_hash = BlobTxHash::compute(&blob_tx);
let blob_tx_and_fee = client.try_prepare(blobs, maybe_try_submit_error).await?;
let blob_tx_hash = BlobTxHash::compute(&blob_tx_and_fee.tx);

let prepared_submission = started_submission
.into_prepared(largest_sequencer_height, blob_tx_hash)
.await
.map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?;

match client.try_submit(blob_tx_hash, blob_tx).await {
let fee = blob_tx_and_fee.fee;
match client.try_submit(blob_tx_hash, blob_tx_and_fee.tx).await {
Ok(celestia_height) => prepared_submission
.into_started(celestia_height)
.await
.map(|new_state| StartedSubmissionAndFee {
new_state,
fee: Some(fee),
})
.map_err(|error| SubmissionError::Unrecoverable(Arc::new(error))),
Err(TrySubmitError::FailedToBroadcastTx(error)) if error.is_timeout() => {
Err(SubmissionError::BroadcastTxTimedOut(prepared_submission))
Expand Down
4 changes: 1 addition & 3 deletions crates/astria-telemetry/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@ serde_json = { workspace = true, optional = true }
serde_with = { version = "3.7.0", optional = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = "0.23.0"
tracing-subscriber = { version = "0.3.17", features = [
"fmt",
"env-filter",
"json",
] }

[dev-dependencies]
tracing = { workspace = true }

[features]
display = [
"dep:base64",
Expand Down
Loading

0 comments on commit f67d273

Please sign in to comment.