Skip to content

Commit

Permalink
feat(hermes): make readiness thresholds configurable (#1687)
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-bahjati authored Jun 12, 2024
1 parent 045cfee commit c3b61f2
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 37 deletions.
2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion apps/hermes/server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "hermes"
version = "0.5.13"
version = "0.5.14"
description = "Hermes is an agent that provides Verified Prices from the Pythnet Pyth Oracle."
edition = "2021"

Expand Down
21 changes: 13 additions & 8 deletions apps/hermes/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use clap::{
Parser,
};

mod aggregate;
mod benchmarks;
mod metrics;
mod pythnet;
Expand All @@ -30,9 +31,17 @@ pub enum Options {

#[derive(Args, Clone, Debug)]
pub struct RunOptions {
/// Wormhole Options.
/// Aggregate Options
#[command(flatten)]
pub wormhole: wormhole::Options,
pub aggregate: aggregate::Options,

/// Benchmarks Options
#[command(flatten)]
pub benchmarks: benchmarks::Options,

/// Metrics Options
#[command(flatten)]
pub metrics: metrics::Options,

/// PythNet Options
#[command(flatten)]
Expand All @@ -42,13 +51,9 @@ pub struct RunOptions {
#[command(flatten)]
pub rpc: rpc::Options,

/// Benchmarks Options
#[command(flatten)]
pub benchmarks: benchmarks::Options,

/// Metrics Options
/// Wormhole Options.
#[command(flatten)]
pub metrics: metrics::Options,
pub wormhole: wormhole::Options,
}

#[derive(Args, Clone, Debug)]
Expand Down
21 changes: 21 additions & 0 deletions apps/hermes/server/src/config/aggregate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use {
clap::Args,
humantime::Duration,
};

#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Aggregate Options")]
#[group(id = "Aggregate")]
pub struct Options {
/// The duration of no aggregation after which the readiness of the state is considered stale.
#[arg(long = "aggregate-readiness-staleness-threshold")]
#[arg(env = "AGGREGATE_READINESS_STALENESS_THRESHOLD")]
#[arg(default_value = "30s")]
pub readiness_staleness_threshold: Duration,

/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
#[arg(long = "aggregate-readiness-max-allowed-slot-lag")]
#[arg(env = "AGGREGATE_READINESS_MAX_ALLOWED_SLOT_LAG")]
#[arg(default_value = "10")]
pub readiness_max_allowed_slot_lag: u64,
}
8 changes: 7 additions & 1 deletion apps/hermes/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,13 @@ async fn init() -> Result<()> {
let (update_tx, _) = tokio::sync::broadcast::channel(1000);

// Initialize a cache store with a 1000 element circular buffer.
let state = state::new(update_tx.clone(), 1000, opts.benchmarks.endpoint.clone());
let state = state::new(
update_tx.clone(),
1000,
opts.benchmarks.endpoint.clone(),
opts.aggregate.readiness_staleness_threshold.into(),
opts.aggregate.readiness_max_allowed_slot_lag,
);

// Listen for Ctrl+C so we can set the exit flag and wait for a graceful shutdown.
spawn(async move {
Expand Down
22 changes: 18 additions & 4 deletions apps/hermes/server/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ use {
price_feeds_metadata::PriceFeedMetaState,
wormhole::WormholeState,
},
aggregate::Slot,
prometheus_client::registry::Registry,
reqwest::Url,
std::sync::Arc,
std::{
sync::Arc,
time::Duration,
},
tokio::sync::broadcast::Sender,
};

Expand Down Expand Up @@ -64,13 +68,20 @@ pub fn new(
update_tx: Sender<AggregationEvent>,
cache_size: u64,
benchmarks_endpoint: Option<Url>,
readiness_staleness_threshold: Duration,
readiness_max_allowed_slot_lag: Slot,
) -> Arc<impl Metrics + Wormhole> {
let mut metrics_registry = Registry::default();
Arc::new(State {
cache: CacheState::new(cache_size),
benchmarks: BenchmarksState::new(benchmarks_endpoint),
price_feed_meta: PriceFeedMetaState::new(),
aggregates: AggregateState::new(update_tx, &mut metrics_registry),
aggregates: AggregateState::new(
update_tx,
readiness_staleness_threshold,
readiness_max_allowed_slot_lag,
&mut metrics_registry,
),
wormhole: WormholeState::new(),
metrics: MetricsState::new(metrics_registry),
})
Expand All @@ -85,15 +96,18 @@ pub mod test {
Wormhole,
},
crate::network::wormhole::GuardianSet,
std::sync::Arc,
std::{
sync::Arc,
time::Duration,
},
tokio::sync::broadcast::Receiver,
};

pub async fn setup_state(
cache_size: u64,
) -> (Arc<impl Aggregates>, Receiver<AggregationEvent>) {
let (update_tx, update_rx) = tokio::sync::broadcast::channel(1000);
let state = super::new(update_tx, cache_size, None);
let state = super::new(update_tx, cache_size, None, Duration::from_secs(30), 10);

// Add an initial guardian set with public key 0
Wormhole::update_guardian_set(
Expand Down
63 changes: 41 additions & 22 deletions apps/hermes/server/src/state/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use {
WormholeMerkleState,
},
crate::{
api::types::RpcPriceIdentifier,
network::wormhole::VaaBytes,
state::{
benchmarks::Benchmarks,
Expand Down Expand Up @@ -123,17 +122,29 @@ pub struct AggregateStateData {
/// probes.
pub latest_observed_slot: Option<Slot>,

/// The duration of no aggregation after which the readiness of the state is considered stale.
pub readiness_staleness_threshold: Duration,

/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
pub readiness_max_allowed_slot_lag: Slot,

/// Aggregate Specific Metrics
pub metrics: metrics::Metrics,
}

impl AggregateStateData {
pub fn new(metrics_registry: &mut Registry) -> Self {
pub fn new(
readiness_staleness_threshold: Duration,
readiness_max_allowed_slot_lag: Slot,
metrics_registry: &mut Registry,
) -> Self {
Self {
latest_completed_slot: None,
latest_completed_slot: None,
latest_completed_update_at: None,
latest_observed_slot: None,
metrics: metrics::Metrics::new(metrics_registry),
latest_observed_slot: None,
metrics: metrics::Metrics::new(metrics_registry),
readiness_staleness_threshold,
readiness_max_allowed_slot_lag,
}
}
}
Expand All @@ -144,9 +155,18 @@ pub struct AggregateState {
}

impl AggregateState {
pub fn new(update_tx: Sender<AggregationEvent>, metrics_registry: &mut Registry) -> Self {
pub fn new(
update_tx: Sender<AggregationEvent>,
readiness_staleness_threshold: Duration,
readiness_max_allowed_slot_lag: Slot,
metrics_registry: &mut Registry,
) -> Self {
Self {
data: RwLock::new(AggregateStateData::new(metrics_registry)),
data: RwLock::new(AggregateStateData::new(
readiness_staleness_threshold,
readiness_max_allowed_slot_lag,
metrics_registry,
)),
api_update_tx: update_tx,
}
}
Expand Down Expand Up @@ -193,12 +213,6 @@ pub struct PriceFeedsWithUpdateData {
pub update_data: Vec<Vec<u8>>,
}

const READINESS_STALENESS_THRESHOLD: Duration = Duration::from_secs(30);

/// The maximum allowed slot lag between the latest observed slot and the latest completed slot.
/// 10 slots is almost 5 seconds.
const READINESS_MAX_ALLOWED_SLOT_LAG: Slot = 10;

#[async_trait::async_trait]
pub trait Aggregates
where
Expand Down Expand Up @@ -388,24 +402,25 @@ where
}

async fn is_ready(&self) -> bool {
let metadata = self.into().data.read().await;
let state_data = self.into().data.read().await;
let price_feeds_metadata = PriceFeedMeta::retrieve_price_feeds_metadata(self)
.await
.unwrap();

let has_completed_recently = match metadata.latest_completed_update_at.as_ref() {
let has_completed_recently = match state_data.latest_completed_update_at.as_ref() {
Some(latest_completed_update_time) => {
latest_completed_update_time.elapsed() < READINESS_STALENESS_THRESHOLD
latest_completed_update_time.elapsed() < state_data.readiness_staleness_threshold
}
None => false,
};

let is_not_behind = match (
metadata.latest_completed_slot,
metadata.latest_observed_slot,
state_data.latest_completed_slot,
state_data.latest_observed_slot,
) {
(Some(latest_completed_slot), Some(latest_observed_slot)) => {
latest_observed_slot - latest_completed_slot <= READINESS_MAX_ALLOWED_SLOT_LAG
latest_observed_slot - latest_completed_slot
<= state_data.readiness_max_allowed_slot_lag
}
_ => false,
};
Expand Down Expand Up @@ -512,7 +527,10 @@ mod test {
use {
super::*,
crate::{
api::types::PriceFeedMetadata,
api::types::{
PriceFeedMetadata,
RpcPriceIdentifier,
},
state::test::setup_state,
},
futures::future::join_all,
Expand Down Expand Up @@ -881,8 +899,9 @@ mod test {
assert!(state.is_ready().await);

// Advance the clock to make the prices stale
MockClock::advance_system_time(READINESS_STALENESS_THRESHOLD);
MockClock::advance(READINESS_STALENESS_THRESHOLD);
let staleness_threshold = Duration::from_secs(30);
MockClock::advance_system_time(staleness_threshold);
MockClock::advance(staleness_threshold);
// Check the state is not ready
assert!(!state.is_ready().await);
}
Expand Down

0 comments on commit c3b61f2

Please sign in to comment.