From c3b61f21ef1e20ed822705acce8a87a574fa57a3 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 12 Jun 2024 13:58:01 +0200 Subject: [PATCH] feat(hermes): make readiness thresholds configurable (#1687) --- apps/hermes/server/Cargo.lock | 2 +- apps/hermes/server/Cargo.toml | 2 +- apps/hermes/server/src/config.rs | 21 +++++--- apps/hermes/server/src/config/aggregate.rs | 21 ++++++++ apps/hermes/server/src/main.rs | 8 ++- apps/hermes/server/src/state.rs | 22 ++++++-- apps/hermes/server/src/state/aggregate.rs | 63 ++++++++++++++-------- 7 files changed, 102 insertions(+), 37 deletions(-) create mode 100644 apps/hermes/server/src/config/aggregate.rs diff --git a/apps/hermes/server/Cargo.lock b/apps/hermes/server/Cargo.lock index 45e4efff29..ee91040047 100644 --- a/apps/hermes/server/Cargo.lock +++ b/apps/hermes/server/Cargo.lock @@ -1796,7 +1796,7 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "hermes" -version = "0.5.13" +version = "0.5.14" dependencies = [ "anyhow", "async-trait", diff --git a/apps/hermes/server/Cargo.toml b/apps/hermes/server/Cargo.toml index 2899a33cd3..1892eb0a58 100644 --- a/apps/hermes/server/Cargo.toml +++ b/apps/hermes/server/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hermes" -version = "0.5.13" +version = "0.5.14" description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle." edition = "2021" diff --git a/apps/hermes/server/src/config.rs b/apps/hermes/server/src/config.rs index 90003b6dea..cde7f78d62 100644 --- a/apps/hermes/server/src/config.rs +++ b/apps/hermes/server/src/config.rs @@ -7,6 +7,7 @@ use clap::{ Parser, }; +mod aggregate; mod benchmarks; mod metrics; mod pythnet; @@ -30,9 +31,17 @@ pub enum Options { #[derive(Args, Clone, Debug)] pub struct RunOptions { - /// Wormhole Options. + /// Aggregate Options #[command(flatten)] - pub wormhole: wormhole::Options, + pub aggregate: aggregate::Options, + + /// Benchmarks Options + #[command(flatten)] + pub benchmarks: benchmarks::Options, + + /// Metrics Options + #[command(flatten)] + pub metrics: metrics::Options, /// PythNet Options #[command(flatten)] @@ -42,13 +51,9 @@ pub struct RunOptions { #[command(flatten)] pub rpc: rpc::Options, - /// Benchmarks Options - #[command(flatten)] - pub benchmarks: benchmarks::Options, - - /// Metrics Options + /// Wormhole Options. #[command(flatten)] - pub metrics: metrics::Options, + pub wormhole: wormhole::Options, } #[derive(Args, Clone, Debug)] diff --git a/apps/hermes/server/src/config/aggregate.rs b/apps/hermes/server/src/config/aggregate.rs new file mode 100644 index 0000000000..6ba2a64f72 --- /dev/null +++ b/apps/hermes/server/src/config/aggregate.rs @@ -0,0 +1,21 @@ +use { + clap::Args, + humantime::Duration, +}; + +#[derive(Args, Clone, Debug)] +#[command(next_help_heading = "Aggregate Options")] +#[group(id = "Aggregate")] +pub struct Options { + /// The duration of no aggregation after which the readiness of the state is considered stale. + #[arg(long = "aggregate-readiness-staleness-threshold")] + #[arg(env = "AGGREGATE_READINESS_STALENESS_THRESHOLD")] + #[arg(default_value = "30s")] + pub readiness_staleness_threshold: Duration, + + /// The maximum allowed slot lag between the latest observed slot and the latest completed slot. + #[arg(long = "aggregate-readiness-max-allowed-slot-lag")] + #[arg(env = "AGGREGATE_READINESS_MAX_ALLOWED_SLOT_LAG")] + #[arg(default_value = "10")] + pub readiness_max_allowed_slot_lag: u64, +} diff --git a/apps/hermes/server/src/main.rs b/apps/hermes/server/src/main.rs index b0a58e7a81..b43ad1bfba 100644 --- a/apps/hermes/server/src/main.rs +++ b/apps/hermes/server/src/main.rs @@ -51,7 +51,13 @@ async fn init() -> Result<()> { let (update_tx, _) = tokio::sync::broadcast::channel(1000); // Initialize a cache store with a 1000 element circular buffer. - let state = state::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone()); + let state = state::new( + update_tx.clone(), + 1000, + opts.benchmarks.endpoint.clone(), + opts.aggregate.readiness_staleness_threshold.into(), + opts.aggregate.readiness_max_allowed_slot_lag, + ); // Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown. spawn(async move { diff --git a/apps/hermes/server/src/state.rs b/apps/hermes/server/src/state.rs index 59cf19435d..d481e9a3d0 100644 --- a/apps/hermes/server/src/state.rs +++ b/apps/hermes/server/src/state.rs @@ -12,9 +12,13 @@ use { price_feeds_metadata::PriceFeedMetaState, wormhole::WormholeState, }, + aggregate::Slot, prometheus_client::registry::Registry, reqwest::Url, - std::sync::Arc, + std::{ + sync::Arc, + time::Duration, + }, tokio::sync::broadcast::Sender, }; @@ -64,13 +68,20 @@ pub fn new( update_tx: Sender, cache_size: u64, benchmarks_endpoint: Option, + readiness_staleness_threshold: Duration, + readiness_max_allowed_slot_lag: Slot, ) -> Arc { let mut metrics_registry = Registry::default(); Arc::new(State { cache: CacheState::new(cache_size), benchmarks: BenchmarksState::new(benchmarks_endpoint), price_feed_meta: PriceFeedMetaState::new(), - aggregates: AggregateState::new(update_tx, &mut metrics_registry), + aggregates: AggregateState::new( + update_tx, + readiness_staleness_threshold, + readiness_max_allowed_slot_lag, + &mut metrics_registry, + ), wormhole: WormholeState::new(), metrics: MetricsState::new(metrics_registry), }) @@ -85,7 +96,10 @@ pub mod test { Wormhole, }, crate::network::wormhole::GuardianSet, - std::sync::Arc, + std::{ + sync::Arc, + time::Duration, + }, tokio::sync::broadcast::Receiver, }; @@ -93,7 +107,7 @@ pub mod test { cache_size: u64, ) -> (Arc, Receiver) { let (update_tx, update_rx) = tokio::sync::broadcast::channel(1000); - let state = super::new(update_tx, cache_size, None); + let state = super::new(update_tx, cache_size, None, Duration::from_secs(30), 10); // Add an initial guardian set with public key 0 Wormhole::update_guardian_set( diff --git a/apps/hermes/server/src/state/aggregate.rs b/apps/hermes/server/src/state/aggregate.rs index ad012b24e4..14bfb2314e 100644 --- a/apps/hermes/server/src/state/aggregate.rs +++ b/apps/hermes/server/src/state/aggregate.rs @@ -19,7 +19,6 @@ use { WormholeMerkleState, }, crate::{ - api::types::RpcPriceIdentifier, network::wormhole::VaaBytes, state::{ benchmarks::Benchmarks, @@ -123,17 +122,29 @@ pub struct AggregateStateData { /// probes. pub latest_observed_slot: Option, + /// The duration of no aggregation after which the readiness of the state is considered stale. + pub readiness_staleness_threshold: Duration, + + /// The maximum allowed slot lag between the latest observed slot and the latest completed slot. + pub readiness_max_allowed_slot_lag: Slot, + /// Aggregate Specific Metrics pub metrics: metrics::Metrics, } impl AggregateStateData { - pub fn new(metrics_registry: &mut Registry) -> Self { + pub fn new( + readiness_staleness_threshold: Duration, + readiness_max_allowed_slot_lag: Slot, + metrics_registry: &mut Registry, + ) -> Self { Self { - latest_completed_slot: None, + latest_completed_slot: None, latest_completed_update_at: None, - latest_observed_slot: None, - metrics: metrics::Metrics::new(metrics_registry), + latest_observed_slot: None, + metrics: metrics::Metrics::new(metrics_registry), + readiness_staleness_threshold, + readiness_max_allowed_slot_lag, } } } @@ -144,9 +155,18 @@ pub struct AggregateState { } impl AggregateState { - pub fn new(update_tx: Sender, metrics_registry: &mut Registry) -> Self { + pub fn new( + update_tx: Sender, + readiness_staleness_threshold: Duration, + readiness_max_allowed_slot_lag: Slot, + metrics_registry: &mut Registry, + ) -> Self { Self { - data: RwLock::new(AggregateStateData::new(metrics_registry)), + data: RwLock::new(AggregateStateData::new( + readiness_staleness_threshold, + readiness_max_allowed_slot_lag, + metrics_registry, + )), api_update_tx: update_tx, } } @@ -193,12 +213,6 @@ pub struct PriceFeedsWithUpdateData { pub update_data: Vec>, } -const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30); - -/// The maximum allowed slot lag between the latest observed slot and the latest completed slot. -/// 10 slots is almost 5 seconds. -const READINESS_MAX_ALLOWED_SLOT_LAG: Slot = 10; - #[async_trait::async_trait] pub trait Aggregates where @@ -388,24 +402,25 @@ where } async fn is_ready(&self) -> bool { - let metadata = self.into().data.read().await; + let state_data = self.into().data.read().await; let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self) .await .unwrap(); - let has_completed_recently = match metadata.latest_completed_update_at.as_ref() { + let has_completed_recently = match state_data.latest_completed_update_at.as_ref() { Some(latest_completed_update_time) => { - latest_completed_update_time.elapsed() < READINESS_STALENESS_THRESHOLD + latest_completed_update_time.elapsed() < state_data.readiness_staleness_threshold } None => false, }; let is_not_behind = match ( - metadata.latest_completed_slot, - metadata.latest_observed_slot, + state_data.latest_completed_slot, + state_data.latest_observed_slot, ) { (Some(latest_completed_slot), Some(latest_observed_slot)) => { - latest_observed_slot - latest_completed_slot <= READINESS_MAX_ALLOWED_SLOT_LAG + latest_observed_slot - latest_completed_slot + <= state_data.readiness_max_allowed_slot_lag } _ => false, }; @@ -512,7 +527,10 @@ mod test { use { super::*, crate::{ - api::types::PriceFeedMetadata, + api::types::{ + PriceFeedMetadata, + RpcPriceIdentifier, + }, state::test::setup_state, }, futures::future::join_all, @@ -881,8 +899,9 @@ mod test { assert!(state.is_ready().await); // Advance the clock to make the prices stale - MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD); - MockClock::advance(READINESS_STALENESS_THRESHOLD); + let staleness_threshold = Duration::from_secs(30); + MockClock::advance_system_time(staleness_threshold); + MockClock::advance(staleness_threshold); // Check the state is not ready assert!(!state.is_ready().await); }