From 38aa60f1a1b29bdf1f71aa61aa06ecda5c73753e Mon Sep 17 00:00:00 2001 From: Brian Martin Date: Thu, 12 Oct 2023 12:20:14 -0700 Subject: [PATCH] remove snapshot buffer and maintain previous and deltas only Changes the behavior to keep just a single set of previous snapshots and cached deltas. We assume secondly collection and will serve percentiles up to one second stale, but with timestamps in the prometheus format to aid in correlating with other metrics. --- src/admin/mod.rs | 146 ++++++++++++++++------------------------------- src/main.rs | 112 +++++++++++++++++++++++++----------- 2 files changed, 128 insertions(+), 130 deletions(-) diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 3cb7ad43..03f61084 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -97,6 +97,7 @@ mod filters { mod handlers { use super::*; use core::convert::Infallible; + use std::time::UNIX_EPOCH; use warp::http::StatusCode; /// Serves Prometheus / OpenMetrics text format metrics. All metrics have @@ -121,8 +122,11 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); + let timestamp = snapshots + .timestamp + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); for metric in &metriken::metrics() { if metric.name().starts_with("log_") { @@ -164,46 +168,30 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + + let result = delta.percentiles(&percentiles).unwrap(); + + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() + for (_label, percentile, value) in result { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); } else { - current.wrapping_sub(previous).unwrap() + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}", + percentile, + )); } } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; - - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - - let result = delta.percentiles(&percentiles).unwrap(); - - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); - - for (_label, percentile, value) in result { - if let Some(description) = metric.description() { - data.push(format!( - "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } else { - data.push(format!( - "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } } } } @@ -228,9 +216,6 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); - for metric in &metriken::metrics() { if metric.name().starts_with("log_") { continue; @@ -260,36 +245,20 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } - - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() - } else { - current.wrapping_sub(previous).unwrap() - } - } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; - - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - let result = delta.percentiles(&percentiles).unwrap(); + let result = delta.percentiles(&percentiles).unwrap(); - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - for (label, _percentile, value) in result { - data.push(format!("\"{name}/{label}\": {value}",)); + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); + } } } } @@ -315,9 +284,6 @@ mod handlers { let snapshots = SNAPSHOTS.read().await; - let previous = snapshots.front(); - let current = snapshots.back(); - for metric in &metriken::metrics() { if metric.name().starts_with("log_") { continue; @@ -347,36 +313,20 @@ mod handlers { continue; }; - if current.is_none() { - continue; - } + if let Some(delta) = snapshots.deltas.get(&key) { + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); - let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) { - (Some(current), Some(Some(previous))) => { - if snapshots.len() < 61 { - current.clone() - } else { - current.wrapping_sub(previous).unwrap() - } - } - (Some(current), Some(None)) | (Some(current), None) => current.clone(), - _ => { - continue; - } - }; + let result = delta.percentiles(&percentiles).unwrap(); - let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); - let result = delta.percentiles(&percentiles).unwrap(); - - let result: Vec<(&'static str, f64, u64)> = PERCENTILES - .iter() - .zip(result.iter()) - .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) - .collect(); - - for (label, _percentile, value) in result { - data.push(format!("\"{name}/{label}\": {value}",)); + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); + } } } } diff --git a/src/main.rs b/src/main.rs index abd8ac36..63a73ab9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,8 +10,8 @@ use metriken::{AtomicHistogram, Counter, Gauge}; use once_cell::sync::Lazy; use ringlog::*; use std::collections::HashMap; -use std::collections::VecDeque; use std::sync::Arc; +use std::time::SystemTime; use tokio::runtime::Builder; use tokio::sync::RwLock; use tokio::time::sleep; @@ -35,8 +35,81 @@ type HistogramSnapshots = HashMap; static RUNNING: AtomicBool = AtomicBool::new(true); -static SNAPSHOTS: Lazy>>> = - Lazy::new(|| Arc::new(RwLock::new(VecDeque::new()))); +pub struct Snapshots { + timestamp: SystemTime, + previous: HistogramSnapshots, + deltas: HistogramSnapshots, +} + +impl Default for Snapshots { + fn default() -> Self { + Self::new() + } +} + +impl Snapshots { + pub fn new() -> Self { + let timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(histogram) = any.downcast_ref::() { + if let Ok(key) = Histograms::try_from(metric.name()) { + if let Some(snapshot) = histogram.snapshot() { + current.insert(key, snapshot); + } + } + } + } + + let deltas = current.clone(); + + Self { + timestamp, + previous: current, + deltas, + } + } + + pub fn update(&mut self) { + self.timestamp = SystemTime::now(); + + let mut current = HashMap::new(); + + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; + + if let Some(histogram) = any.downcast_ref::() { + if let Ok(key) = Histograms::try_from(metric.name()) { + if let Some(snapshot) = histogram.snapshot() { + if let Some(previous) = self.previous.get(&key) { + self.deltas + .insert(key, snapshot.wrapping_sub(previous).unwrap()); + } + + current.insert(key, snapshot); + } + } + } + } + + self.previous = current; + } +} + +static SNAPSHOTS: Lazy>> = + Lazy::new(|| Arc::new(RwLock::new(Snapshots::new()))); fn main() { // custom panic hook to terminate whole process after unwinding @@ -125,38 +198,13 @@ fn main() { // spawn thread to maintain histogram snapshots control_runtime.spawn(async { while RUNNING.load(Ordering::Relaxed) { - // build a current snapshot for all histograms - - let mut current = HashMap::new(); - - for metric in metriken::metrics().iter() { - let any = if let Some(any) = metric.as_any() { - any - } else { - continue; - }; - - if let Some(histogram) = any.downcast_ref::() { - if let Ok(key) = Histograms::try_from(metric.name()) { - if let Some(snapshot) = histogram.snapshot() { - current.insert(key, snapshot); - } - } - } - } - // acquire a lock and update the snapshots - - let mut snapshots = SNAPSHOTS.write().await; - - // we maintain 1 minute of history, which requires 61 secondly - // snapshots - if snapshots.len() == 61 { - let _ = snapshots.pop_front(); + { + let mut snapshots = SNAPSHOTS.write().await; + snapshots.update(); } - snapshots.push_back(current); - + // delay until next update sleep(core::time::Duration::from_secs(1)).await; } });