From 8075d0704ebe92456cff44227f6a38de1b39adfb Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 19 Oct 2023 11:21:40 +0200 Subject: [PATCH] refactor(hermes): move metrics endpoint to a separate metrics server --- hermes/.envrc.sample | 1 + hermes/src/api.rs | 1 - hermes/src/api/rest.rs | 2 -- hermes/src/api/rest/metrics.rs | 20 ------------ hermes/src/config.rs | 5 +++ hermes/src/config/metrics.rs | 17 ++++++++++ hermes/src/main.rs | 2 ++ hermes/src/metrics_server.rs | 59 ++++++++++++++++++++++++++++++++++ 8 files changed, 84 insertions(+), 23 deletions(-) delete mode 100644 hermes/src/api/rest/metrics.rs create mode 100644 hermes/src/config/metrics.rs create mode 100644 hermes/src/metrics_server.rs diff --git a/hermes/.envrc.sample b/hermes/.envrc.sample index dfdc5e8ad5..67c4790aef 100644 --- a/hermes/.envrc.sample +++ b/hermes/.envrc.sample @@ -1,6 +1,7 @@ export PYTHNET_HTTP_ADDR=http://pythnet-http-rpc/ export PYTHNET_WS_ADDR=ws://pythnet-ws-rpc/ export RPC_LISTEN_ADDR=0.0.0.0:7575 +export METRICS_LISTEN_ADDR=0.0.0.0:7576 export BENCHMARKS_ENDPOINT=https://benchmarks.pyth.network export WORMHOLE_SPY_RPC_ADDR=http://spy-or-beacon-rpc export RUST_LOG=info diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 29500c14b6..900144eedc 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -153,7 +153,6 @@ pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { .route("/api/latest_vaas", get(rest::latest_vaas)) .route("/api/price_feed_ids", get(rest::price_feed_ids)) .route("/live", get(rest::live)) - .route("/metrics", get(rest::metrics)) .route("/ready", get(rest::ready)) .route("/ws", get(ws::ws_route_handler)) .route_layer(from_fn_with_state( diff --git a/hermes/src/api/rest.rs b/hermes/src/api/rest.rs index 24f9bc3b57..6e80fcb909 100644 --- a/hermes/src/api/rest.rs +++ b/hermes/src/api/rest.rs @@ -17,7 +17,6 @@ mod index; mod latest_price_feeds; mod latest_vaas; mod live; -mod metrics; mod price_feed_ids; mod ready; @@ -29,7 +28,6 @@ pub use { latest_price_feeds::*, latest_vaas::*, live::*, - metrics::*, price_feed_ids::*, ready::*, }; diff --git a/hermes/src/api/rest/metrics.rs b/hermes/src/api/rest/metrics.rs deleted file mode 100644 index eec581cba0..0000000000 --- a/hermes/src/api/rest/metrics.rs +++ /dev/null @@ -1,20 +0,0 @@ -//! Exposing prometheus metrics via HTTP in openmetrics format. - -use { - axum::{ - extract::State, - response::IntoResponse, - }, - prometheus_client::encoding::text::encode, -}; - -pub async fn metrics(State(state): State) -> impl IntoResponse { - let registry = state.state.metrics_registry.read().await; - let mut buffer = String::new(); - - // Should not fail if the metrics are valid and there is memory available - // to write to the buffer. - encode(&mut buffer, ®istry).unwrap(); - - buffer -} diff --git a/hermes/src/config.rs b/hermes/src/config.rs index 9716594ac0..90003b6dea 100644 --- a/hermes/src/config.rs +++ b/hermes/src/config.rs @@ -8,6 +8,7 @@ use clap::{ }; mod benchmarks; +mod metrics; mod pythnet; mod rpc; mod wormhole; @@ -44,6 +45,10 @@ pub struct RunOptions { /// Benchmarks Options #[command(flatten)] pub benchmarks: benchmarks::Options, + + /// Metrics Options + #[command(flatten)] + pub metrics: metrics::Options, } #[derive(Args, Clone, Debug)] diff --git a/hermes/src/config/metrics.rs b/hermes/src/config/metrics.rs new file mode 100644 index 0000000000..77a16d8775 --- /dev/null +++ b/hermes/src/config/metrics.rs @@ -0,0 +1,17 @@ +use { + clap::Args, + std::net::SocketAddr, +}; + +const DEFAULT_METRICS_SERVER_LISTEN_ADDR: &str = "127.0.0.1:33888"; + +#[derive(Args, Clone, Debug)] +#[command(next_help_heading = "Metrics Options")] +#[group(id = "Metrics")] +pub struct Options { + /// Address and port the RPC server will bind to. + #[arg(long = "metrics-server-listen-addr")] + #[arg(default_value = DEFAULT_METRICS_SERVER_LISTEN_ADDR)] + #[arg(env = "METRICS_LISTEN_ADDR")] + pub server_listen_addr: SocketAddr, +} diff --git a/hermes/src/main.rs b/hermes/src/main.rs index bc42dd14f2..b939062270 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -19,6 +19,7 @@ use { mod aggregate; mod api; mod config; +mod metrics_server; mod network; mod serde; mod state; @@ -61,6 +62,7 @@ async fn init() -> Result<()> { let tasks = join_all([ Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))), Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))), + Box::pin(spawn(metrics_server::run(opts.clone(), store.clone()))), Box::pin(spawn(api::spawn(opts.clone(), store.clone(), update_rx))), ]) .await; diff --git a/hermes/src/metrics_server.rs b/hermes/src/metrics_server.rs new file mode 100644 index 0000000000..213dfafffa --- /dev/null +++ b/hermes/src/metrics_server.rs @@ -0,0 +1,59 @@ +//! Metrics Server +//! +//! This server serves metrics over /metrics in OpenMetrics format. + +use { + crate::{ + config::RunOptions, + state::State as AppState, + }, + anyhow::Result, + axum::{ + extract::State, + response::IntoResponse, + routing::get, + Router, + }, + prometheus_client::encoding::text::encode, + std::sync::{ + atomic::Ordering, + Arc, + }, +}; + + +#[tracing::instrument(skip(opts, state))] +pub async fn run(opts: RunOptions, state: Arc) -> Result<()> { + tracing::info!(endpoint = %opts.metrics.server_listen_addr, "Starting Metrics Server."); + + let app = Router::new(); + let app = app + .route("/metrics", get(metrics)) + .with_state(state.clone()); + + // Binds the axum's server to the configured address and port. This is a blocking call and will + // not return until the server is shutdown. + axum::Server::try_bind(&opts.metrics.server_listen_addr)? + .serve(app.into_make_service()) + .with_graceful_shutdown(async { + while !crate::SHOULD_EXIT.load(Ordering::Acquire) { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + + tracing::info!("Shutting down metrics server..."); + }) + .await?; + + Ok(()) +} + +pub async fn metrics(State(state): State>) -> impl IntoResponse { + let registry = state.metrics_registry.read().await; + let mut buffer = String::new(); + + // Should not fail if the metrics are valid and there is memory available + // to write to the buffer. + encode(&mut buffer, ®istry).unwrap(); + + buffer +}