From f67d2733923247c3fb15970d581404e4bdc867b9 Mon Sep 17 00:00:00 2001 From: Fraser Hutchison Date: Thu, 24 Oct 2024 18:31:20 +0100 Subject: [PATCH] add relayer metrics for fee recording --- .../astria-bridge-withdrawer/src/metrics.rs | 12 +-- .../astria-sequencer-relayer/src/metrics.rs | 75 +++++++++++++++---- .../src/relayer/celestia_client/mod.rs | 46 +++++++----- .../src/relayer/write/mod.rs | 53 ++++++++++--- crates/astria-telemetry/Cargo.toml | 4 +- .../astria-telemetry/src/metrics/into_f64.rs | 40 +++++++++- 6 files changed, 172 insertions(+), 58 deletions(-) diff --git a/crates/astria-bridge-withdrawer/src/metrics.rs b/crates/astria-bridge-withdrawer/src/metrics.rs index 01f3e2f12b..8bdb7e7df0 100644 --- a/crates/astria-bridge-withdrawer/src/metrics.rs +++ b/crates/astria-bridge-withdrawer/src/metrics.rs @@ -46,18 +46,8 @@ impl Metrics { self.sequencer_submission_failure_count.increment(1); } - #[expect( - clippy::cast_precision_loss, - reason = "metric with potential loss of precision, logging when it occurs" - )] pub(crate) fn set_batch_total_settled_value(&self, value: u128) { - if value > u128::from(u32::MAX) { - tracing::warn!( - "{BATCH_TOTAL_SETTLED_VALUE} set with {value} which exceeds u32::MAX, precision \ - loss in metric" - ); - } - self.batch_total_settled_value.set(value as f64); + self.batch_total_settled_value.set(value); } } diff --git a/crates/astria-sequencer-relayer/src/metrics.rs b/crates/astria-sequencer-relayer/src/metrics.rs index 5665fed4ea..85491bd751 100644 --- a/crates/astria-sequencer-relayer/src/metrics.rs +++ b/crates/astria-sequencer-relayer/src/metrics.rs @@ -23,6 +23,9 @@ pub struct Metrics { sequencer_height_fetch_failure_count: Counter, sequencer_submission_height: Counter, compression_ratio_for_astria_block: Gauge, + celestia_fees_total_utia: Gauge, + celestia_fees_utia_per_uncompressed_blob_byte: Gauge, + celestia_fees_utia_per_compressed_blob_byte: Gauge, } impl Metrics { @@ -73,11 +76,28 @@ impl Metrics { pub(crate) fn set_compression_ratio_for_astria_block(&self, ratio: f64) { self.compression_ratio_for_astria_block.set(ratio); } + + pub(crate) fn set_celestia_fees_total_utia(&self, utia: u64) { + self.celestia_fees_total_utia.set(utia); + } + + pub(crate) fn set_celestia_fees_utia_per_uncompressed_blob_byte(&self, utia: f64) { + self.celestia_fees_utia_per_uncompressed_blob_byte.set(utia); + } + + pub(crate) fn set_celestia_fees_utia_per_compressed_blob_byte(&self, utia: f64) { + self.celestia_fees_utia_per_compressed_blob_byte.set(utia); + } } impl telemetry::Metrics for Metrics { type Config = (); + #[expect( + clippy::too_many_lines, + reason = "this is reasonable as we have a lot of metrics to register; the function is not \ + complex, just long" + )] fn register( builder: &mut RegisteringBuilder, _config: &Self::Config, @@ -171,6 +191,29 @@ impl telemetry::Metrics for Metrics { )? .register()?; + let celestia_fees_total_utia = builder + .new_gauge_factory( + CELESTIA_FEES_TOTAL_UTIA, + "The total Celestia fees in utia for the latest successful submission", + )? + .register()?; + + let celestia_fees_utia_per_uncompressed_blob_byte = builder + .new_gauge_factory( + CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE, + "The Celestia fees in utia per uncompressed blob byte for the latest successful \ + submission", + )? + .register()?; + + let celestia_fees_utia_per_compressed_blob_byte = builder + .new_gauge_factory( + CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE, + "The Celestia fees in utia per compressed blob byte for the latest successful \ + submission", + )? + .register()?; + Ok(Self { celestia_submission_height, celestia_submission_count, @@ -184,6 +227,9 @@ impl telemetry::Metrics for Metrics { sequencer_height_fetch_failure_count, sequencer_submission_height, compression_ratio_for_astria_block, + celestia_fees_total_utia, + celestia_fees_utia_per_uncompressed_blob_byte, + celestia_fees_utia_per_compressed_blob_byte, }) } } @@ -200,25 +246,15 @@ metric_names!(const METRICS_NAMES: SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, SEQUENCER_SUBMISSION_HEIGHT, - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + CELESTIA_FEES_TOTAL_UTIA, + CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE, + CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE ); #[cfg(test)] mod tests { - use super::{ - BLOBS_PER_CELESTIA_TX, - BLOCKS_PER_CELESTIA_TX, - BYTES_PER_CELESTIA_TX, - CELESTIA_PAYLOAD_CREATION_LATENCY, - CELESTIA_SUBMISSION_COUNT, - CELESTIA_SUBMISSION_FAILURE_COUNT, - CELESTIA_SUBMISSION_HEIGHT, - CELESTIA_SUBMISSION_LATENCY, - COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, - SEQUENCER_BLOCK_FETCH_FAILURE_COUNT, - SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT, - SEQUENCER_SUBMISSION_HEIGHT, - }; + use super::*; #[track_caller] fn assert_const(actual: &'static str, suffix: &str) { @@ -257,5 +293,14 @@ mod tests { COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, "compression_ratio_for_astria_block", ); + assert_const(CELESTIA_FEES_TOTAL_UTIA, "celestia_fees_total_utia"); + assert_const( + CELESTIA_FEES_UTIA_PER_UNCOMPRESSED_BLOB_BYTE, + "celestia_fees_utia_per_uncompressed_blob_byte", + ); + assert_const( + CELESTIA_FEES_UTIA_PER_COMPRESSED_BLOB_BYTE, + "celestia_fees_utia_per_compressed_blob_byte", + ); } } diff --git a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs index bc149b92f8..83d15478ee 100644 --- a/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/celestia_client/mod.rs @@ -154,7 +154,7 @@ impl CelestiaClient { &mut self, blobs: Arc>, maybe_last_error: Option, - ) -> Result { + ) -> Result { info!("fetching cost params and account info from celestia app"); let (blob_params, auth_params, min_gas_price, base_account) = tokio::try_join!( self.fetch_blob_params(), @@ -196,7 +196,7 @@ impl CelestiaClient { "prepared blob transaction for celestia app" ); - Ok(new_blob_tx(&signed_tx, blobs.iter())) + Ok(BlobTxAndFee::new(&signed_tx, blobs.iter(), fee)) } #[instrument(skip_all, err(level = Level::WARN))] @@ -775,22 +775,34 @@ fn new_signed_tx( } } -fn new_blob_tx<'a>(signed_tx: &Tx, blobs: impl Iterator) -> BlobTx { - // From https://github.com/celestiaorg/celestia-core/blob/v1.29.0-tm-v0.34.29/pkg/consts/consts.go#L19 - const BLOB_TX_TYPE_ID: &str = "BLOB"; +pub(in crate::relayer) struct BlobTxAndFee { + pub(in crate::relayer) tx: BlobTx, + pub(in crate::relayer) fee: u64, +} - let blobs = blobs - .map(|blob| PbBlob { - namespace_id: Bytes::from(blob.namespace.id().to_vec()), - namespace_version: u32::from(blob.namespace.version()), - data: Bytes::from(blob.data.clone()), - share_version: u32::from(blob.share_version), - }) - .collect(); - BlobTx { - tx: Bytes::from(signed_tx.encode_to_vec()), - blobs, - type_id: BLOB_TX_TYPE_ID.to_string(), +impl BlobTxAndFee { + fn new<'a>(signed_tx: &Tx, blobs: impl Iterator, fee: u64) -> Self { + // From https://github.com/celestiaorg/celestia-core/blob/v1.29.0-tm-v0.34.29/pkg/consts/consts.go#L19 + const BLOB_TX_TYPE_ID: &str = "BLOB"; + + let blobs = blobs + .map(|blob| PbBlob { + namespace_id: Bytes::from(blob.namespace.id().to_vec()), + namespace_version: u32::from(blob.namespace.version()), + data: Bytes::from(blob.data.clone()), + share_version: u32::from(blob.share_version), + }) + .collect(); + let tx = BlobTx { + tx: Bytes::from(signed_tx.encode_to_vec()), + blobs, + type_id: BLOB_TX_TYPE_ID.to_string(), + }; + + Self { + tx, + fee, + } } } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs index e9098cc4de..bed9880a20 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/mod.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/mod.rs @@ -77,6 +77,13 @@ use crate::{ mod conversion; use conversion::NextSubmission; +/// A simple, passive object to allow the Celestia fee to be returned along with the +/// `StartedSubmission` state when attempting to submit. +struct StartedSubmissionAndFee { + new_state: StartedSubmission, + fee: Option, +} + #[derive(Clone)] pub(super) struct BlobSubmitterHandle { tx: mpsc::Sender, @@ -349,10 +356,12 @@ async fn submit_blobs( started_submission: StartedSubmission, metrics: &'static Metrics, ) -> eyre::Result { + let total_data_uncompressed_size = data.uncompressed_size(); + let total_data_compressed_size = data.compressed_size(); info!( blocks = %telemetry::display::json(&data.input_metadata()), - total_data_uncompressed_size = data.uncompressed_size(), - total_data_compressed_size = data.compressed_size(), + total_data_uncompressed_size, + total_data_compressed_size, compression_ratio = data.compression_ratio(), "initiated submission of sequencer blocks converted to Celestia blobs", ); @@ -368,7 +377,10 @@ async fn submit_blobs( let largest_sequencer_height = data.greatest_sequencer_height(); let blobs = data.into_blobs(); - let new_state = submit_with_retry( + let StartedSubmissionAndFee { + new_state, + fee, + } = submit_with_retry( client, blobs, state.clone(), @@ -383,6 +395,17 @@ async fn submit_blobs( metrics.absolute_set_sequencer_submission_height(largest_sequencer_height.value()); metrics.absolute_set_celestia_submission_height(celestia_height); metrics.record_celestia_submission_latency(start.elapsed()); + #[expect( + clippy::cast_precision_loss, + reason = "precision loss here is unlikely, but is acceptable for these metrics" + )] + if let Some(fee) = fee { + metrics.set_celestia_fees_total_utia(fee); + let cost_uncompressed = fee as f64 / total_data_uncompressed_size as f64; + metrics.set_celestia_fees_utia_per_uncompressed_blob_byte(cost_uncompressed); + let cost_compressed = fee as f64 / total_data_compressed_size as f64; + metrics.set_celestia_fees_utia_per_compressed_blob_byte(cost_compressed); + } info!(%celestia_height, "successfully submitted blobs to Celestia"); @@ -453,7 +476,7 @@ async fn submit_with_retry( started_submission: StartedSubmission, largest_sequencer_height: SequencerHeight, metrics: &'static Metrics, -) -> eyre::Result { +) -> eyre::Result { // Moving the span into `on_retry`, because tryhard spawns these in a tokio // task, losing the span. let span = Span::current(); @@ -501,7 +524,7 @@ async fn submit_with_retry( let blobs = Arc::new(blobs); - let final_state = tryhard::retry_fn(move || { + let final_state_and_fee = tryhard::retry_fn(move || { try_submit( client.clone(), blobs.clone(), @@ -514,7 +537,7 @@ async fn submit_with_retry( .in_current_span() .await .wrap_err("finished trying to submit")?; - Ok(final_state) + Ok(final_state_and_fee) } #[instrument(skip_all, err(level = Level::WARN))] @@ -524,7 +547,7 @@ async fn try_submit( started_submission: StartedSubmission, largest_sequencer_height: SequencerHeight, last_error_receiver: watch::Receiver>, -) -> Result { +) -> Result { // Get the error from the last attempt to `try_submit`. let maybe_last_error = last_error_receiver.borrow().clone(); let maybe_try_submit_error = match maybe_last_error { @@ -534,7 +557,10 @@ async fn try_submit( try_confirm_submission_from_failed_attempt(client.clone(), prepared_submission) .await? { - return Ok(new_state); + return Ok(StartedSubmissionAndFee { + new_state, + fee: None, + }); } None } @@ -545,18 +571,23 @@ async fn try_submit( None => None, }; - let blob_tx = client.try_prepare(blobs, maybe_try_submit_error).await?; - let blob_tx_hash = BlobTxHash::compute(&blob_tx); + let blob_tx_and_fee = client.try_prepare(blobs, maybe_try_submit_error).await?; + let blob_tx_hash = BlobTxHash::compute(&blob_tx_and_fee.tx); let prepared_submission = started_submission .into_prepared(largest_sequencer_height, blob_tx_hash) .await .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error)))?; - match client.try_submit(blob_tx_hash, blob_tx).await { + let fee = blob_tx_and_fee.fee; + match client.try_submit(blob_tx_hash, blob_tx_and_fee.tx).await { Ok(celestia_height) => prepared_submission .into_started(celestia_height) .await + .map(|new_state| StartedSubmissionAndFee { + new_state, + fee: Some(fee), + }) .map_err(|error| SubmissionError::Unrecoverable(Arc::new(error))), Err(TrySubmitError::FailedToBroadcastTx(error)) if error.is_timeout() => { Err(SubmissionError::BroadcastTxTimedOut(prepared_submission)) diff --git a/crates/astria-telemetry/Cargo.toml b/crates/astria-telemetry/Cargo.toml index f724959a33..7cc30072a5 100644 --- a/crates/astria-telemetry/Cargo.toml +++ b/crates/astria-telemetry/Cargo.toml @@ -32,6 +32,7 @@ serde_json = { workspace = true, optional = true } serde_with = { version = "3.7.0", optional = true } thiserror = { workspace = true } tokio = { workspace = true } +tracing = { workspace = true } tracing-opentelemetry = "0.23.0" tracing-subscriber = { version = "0.3.17", features = [ "fmt", @@ -39,9 +40,6 @@ tracing-subscriber = { version = "0.3.17", features = [ "json", ] } -[dev-dependencies] -tracing = { workspace = true } - [features] display = [ "dep:base64", diff --git a/crates/astria-telemetry/src/metrics/into_f64.rs b/crates/astria-telemetry/src/metrics/into_f64.rs index e99165029b..b34dbe5246 100644 --- a/crates/astria-telemetry/src/metrics/into_f64.rs +++ b/crates/astria-telemetry/src/metrics/into_f64.rs @@ -1,3 +1,9 @@ +use tracing::warn; + +/// This is 2^53; the upper bound for guaranteed absence of precision loss when casting from an +/// integer to `f64`. +const PRECISION_LOSS_THRESHOLD: f64 = 9_007_199_254_740_992.0; + /// A trait to safely convert to `f64`. pub trait IntoF64 { /// Converts `self` to `f64`. @@ -64,6 +70,38 @@ impl IntoF64 for usize { reason = "precision loss is unlikely (values too small) but also unimportant in metrics" )] fn into_f64(self) -> f64 { - self as f64 + let value = self as f64; + if value > PRECISION_LOSS_THRESHOLD { + warn!("possible precision loss when casting {self}_usize to `f64`"); + } + value + } +} + +impl IntoF64 for u64 { + #[expect( + clippy::cast_precision_loss, + reason = "precision loss is unlikely (values too small) but also unimportant in metrics" + )] + fn into_f64(self) -> f64 { + let value = self as f64; + if value > PRECISION_LOSS_THRESHOLD { + warn!("possible precision loss when casting {self}_u64 to `f64`"); + } + value + } +} + +impl IntoF64 for u128 { + #[expect( + clippy::cast_precision_loss, + reason = "precision loss is unlikely (values too small) but also unimportant in metrics" + )] + fn into_f64(self) -> f64 { + let value = self as f64; + if value > PRECISION_LOSS_THRESHOLD { + warn!("possible precision loss when casting {self}_u128 to `f64`"); + } + value } }