diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 3cb7ad43..03f61084 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -97,6 +97,7 @@ mod filters { mod handlers { use super::*; use core::convert::Infallible; + use std::time::UNIX_EPOCH; use warp::http::StatusCode; /// Serves Prometheus / OpenMetrics text format metrics. All metrics have @@ -121,8 +122,11 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); + let timestamp = snapshots + .timestamp + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); for metric in &metriken::metrics() { if metric.name().starts_with("log_") { @@ -164,46 +168,30 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + + let result = delta.percentiles(&percentiles).unwrap(); + + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() + for (_label, percentile, value) in result { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); } else { - current.wrapping_sub(previous).unwrap() + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); } } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; - - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - - let result = delta.percentiles(&percentiles).unwrap(); - - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); - - for (_label, percentile, value) in result { - if let Some(description) = metric.description() { - data.push(format!( - "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } else { - data.push(format!( - "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } } } } @@ -228,9 +216,6 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); - for metric in &metriken::metrics() { if metric.name().starts_with("log_") { continue; @@ -260,36 +245,20 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } - - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() - } else { - current.wrapping_sub(previous).unwrap() - } - } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; - - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - let result = delta.percentiles(&percentiles).unwrap(); + let result = delta.percentiles(&percentiles).unwrap(); - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - for (label, _percentile, value) in result { - data.push(format!("\"{name}/{label}\": {value}",)); + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); + } } } } @@ -315,9 +284,6 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); - for metric in &metriken::metrics() { if metric.name().starts_with("log_") { continue; @@ -347,36 +313,20 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() - } else { - current.wrapping_sub(previous).unwrap() - } - } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; + let result = delta.percentiles(&percentiles).unwrap(); - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - let result = delta.percentiles(&percentiles).unwrap(); - - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); - - for (label, _percentile, value) in result { - data.push(format!("\"{name}/{label}\": {value}",)); + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); + } } } } diff --git a/src/main.rs b/src/main.rs index abd8ac36..63a73ab9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,8 @@ use metriken::{AtomicHistogram, Counter, Gauge}; use once_cell::sync::Lazy; use ringlog::*; use std::collections::HashMap; -use std::collections::VecDeque; use std::sync::Arc; +use std::time::SystemTime; use tokio::runtime::Builder; use tokio::sync::RwLock; use tokio::time::sleep; @@ -35,8 +35,81 @@ type HistogramSnapshots = HashMap; static RUNNING: AtomicBool = AtomicBool::new(true); -static SNAPSHOTS: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(VecDeque::new()))); +pub struct Snapshots { + timestamp: SystemTime, + previous: HistogramSnapshots, + deltas: HistogramSnapshots, +} + +impl Default for Snapshots { + fn default() -> Self { + Self::new() + } +} + +impl Snapshots { + pub fn new() -> Self { + let timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(histogram) = any.downcast_ref::() { + if let Ok(key) = Histograms::try_from(metric.name()) { + if let Some(snapshot) = histogram.snapshot() { + current.insert(key, snapshot); + } + } + } + } + + let deltas = current.clone(); + + Self { + timestamp, + previous: current, + deltas, + } + } + + pub fn update(&mut self) { + self.timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(histogram) = any.downcast_ref::() { + if let Ok(key) = Histograms::try_from(metric.name()) { + if let Some(snapshot) = histogram.snapshot() { + if let Some(previous) = self.previous.get(&key) { + self.deltas + .insert(key, snapshot.wrapping_sub(previous).unwrap()); + } + + current.insert(key, snapshot); + } + } + } + } + + self.previous = current; + } +} + +static SNAPSHOTS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(Snapshots::new()))); fn main() { // custom panic hook to terminate whole process after unwinding @@ -125,38 +198,13 @@ fn main() { // spawn thread to maintain histogram snapshots control_runtime.spawn(async { while RUNNING.load(Ordering::Relaxed) { - // build a current snapshot for all histograms - - let mut current = HashMap::new(); - - for metric in metriken::metrics().iter() { - let any = if let Some(any) = metric.as_any() { - any - } else { - continue; - }; - - if let Some(histogram) = any.downcast_ref::() { - if let Ok(key) = Histograms::try_from(metric.name()) { - if let Some(snapshot) = histogram.snapshot() { - current.insert(key, snapshot); - } - } - } - } - // acquire a lock and update the snapshots - - let mut snapshots = SNAPSHOTS.write().await; - - // we maintain 1 minute of history, which requires 61 secondly - // snapshots - if snapshots.len() == 61 { - let _ = snapshots.pop_front(); + { + let mut snapshots = SNAPSHOTS.write().await; + snapshots.update(); } - snapshots.push_back(current); - + // delay until next update sleep(core::time::Duration::from_secs(1)).await; } });