diff --git a/Cargo.lock b/Cargo.lock index 53cc0d28..10a0166d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -92,12 +92,6 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "ascii" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" - [[package]] name = "async-channel" version = "1.9.0" @@ -337,12 +331,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "chunked_transfer" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a" - [[package]] name = "clang-sys" version = "1.6.1" @@ -1952,7 +1940,6 @@ dependencies = [ "session", "sha2", "slab", - "tiny_http", "tokio", "tokio-boring", "toml 0.7.8", @@ -2372,18 +2359,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "tiny_http" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" -dependencies = [ - "ascii", - "chunked_transfer", - "httpdate", - "log", -] - [[package]] name = "tinyvec" version = "1.6.0" diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 42ffd22f..44a8a4a3 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -5,7 +5,6 @@ use std::sync::Arc; /// The HTTP admin server. pub async fn http(config: Config, ratelimit: Option>) { - let admin = filters::admin(ratelimit); let addr = config @@ -20,8 +19,8 @@ pub async fn http(config: Config, ratelimit: Option>) { } mod filters { - use warp::Filter; use super::*; + use warp::Filter; /// The combined set of admin endpoint filters pub fn admin( @@ -120,8 +119,10 @@ mod handlers { pub async fn prometheus_stats() -> Result { let mut data = Vec::new(); - let current = CURRENT.read().await; - let previous = PREVIOUS.read().await; + let snapshots = SNAPSHOTS.read().await; + + let previous = snapshots.front(); + let current = snapshots.back(); for metric in &metriken::metrics() { if metric.name().starts_with("log_") { @@ -163,9 +164,15 @@ mod handlers { continue; }; - let delta = match (current.get(&key), previous.get(&key)) { - (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), - (Some(current), None) => current.clone(), + if current.is_none() { + continue; + } + + let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { + (Some(current), Some(Some(previous))) => { + current.wrapping_sub(previous).unwrap() + } + (Some(current), Some(None)) | (Some(current), None) => current.clone(), _ => { continue; } @@ -215,8 +222,10 @@ mod handlers { pub async fn json_stats() -> Result { let mut data = Vec::new(); - let current = CURRENT.read().await; - let previous = PREVIOUS.read().await; + let snapshots = SNAPSHOTS.read().await; + + let previous = snapshots.front(); + let current = snapshots.back(); for metric in &metriken::metrics() { if metric.name().starts_with("log_") { @@ -247,9 +256,15 @@ mod handlers { continue; }; - let delta = match (current.get(&key), previous.get(&key)) { - (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), - (Some(current), None) => current.clone(), + if current.is_none() { + continue; + } + + let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { + (Some(current), Some(Some(previous))) => { + current.wrapping_sub(previous).unwrap() + } + (Some(current), Some(None)) | (Some(current), None) => current.clone(), _ => { continue; } @@ -290,8 +305,10 @@ mod handlers { pub async fn human_stats() -> Result { let mut data = Vec::new(); - let current = CURRENT.read().await; - let previous = PREVIOUS.read().await; + let snapshots = SNAPSHOTS.read().await; + + let previous = snapshots.front(); + let current = snapshots.back(); for metric in &metriken::metrics() { if metric.name().starts_with("log_") { @@ -322,9 +339,15 @@ mod handlers { continue; }; - let delta = match (current.get(&key), previous.get(&key)) { - (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), - (Some(current), None) => current.clone(), + if current.is_none() { + continue; + } + + let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { + (Some(current), Some(Some(previous))) => { + current.wrapping_sub(previous).unwrap() + } + (Some(current), Some(None)) | (Some(current), None) => current.clone(), _ => { continue; } diff --git a/src/main.rs b/src/main.rs index d8ca2ef5..e8447ce2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,3 @@ -use tokio::sync::RwLock; -use std::time::SystemTime; -use std::sync::Arc; use crate::clients::launch_clients; use crate::pubsub::launch_pubsub; use crate::workload::{launch_workload, Generator}; @@ -12,7 +9,10 @@ use core::time::Duration; use metriken::{AtomicHistogram, Counter, Gauge}; use ringlog::*; use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::Arc; use tokio::runtime::Builder; +use tokio::sync::RwLock; use tokio::time::sleep; mod admin; @@ -32,8 +32,8 @@ type UnixInstant = clocksource::UnixInstant>; static RUNNING: AtomicBool = AtomicBool::new(true); -static CURRENT: Arc>> = Arc::new(RwLock::new(HashMap::new())); -static PREVIOUS: Arc>> = Arc::new(RwLock::new(HashMap::new())); +static SNAPSHOTS: Arc>>> = + Arc::new(RwLock::new(VecDeque::new())); fn main() { // custom panic hook to terminate whole process after unwinding @@ -122,12 +122,10 @@ fn main() { // spawn thread to maintain histogram snapshots control_runtime.spawn(async { while RUNNING.load(Ordering::Relaxed) { + // build a current snapshot for all histograms - let current = CURRENT.write().await; - let previous = PREVIOUS.write().await; + let mut current = HashMap::new(); - *previous = current.clone(); - for metric in metriken::metrics().iter() { let any = if let Some(any) = metric.as_any() { any @@ -144,6 +142,16 @@ fn main() { } } + // acquire a lock and update the snapshots + + let mut snapshots = SNAPSHOTS.write().await; + + if snapshots.len() == 60 { + let _ = snapshots.pop_front(); + } + + snapshots.push_back(current); + sleep(core::time::Duration::from_secs(1)).await; } });