Skip to content

Commit

Permalink
remove snapshot buffer and maintain previous and deltas only
Browse files Browse the repository at this point in the history
Changes the behavior to keep just a single set of previous
snapshots and cached deltas.

We assume secondly collection and will serve percentiles up to one
second stale, but with timestamps in the prometheus format to aid
in correlating with other metrics.
  • Loading branch information
brayniac committed Oct 12, 2023
1 parent 487534b commit 38aa60f
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 130 deletions.
146 changes: 48 additions & 98 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ mod filters {
mod handlers {
use super::*;
use core::convert::Infallible;
use std::time::UNIX_EPOCH;
use warp::http::StatusCode;

/// Serves Prometheus / OpenMetrics text format metrics. All metrics have
Expand All @@ -121,8 +122,11 @@ mod handlers {

let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();
let timestamp = snapshots
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand Down Expand Up @@ -164,46 +168,30 @@ mod handlers {
continue;
};

if current.is_none() {
continue;
}
if let Some(delta) = snapshots.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();

let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
if snapshots.len() < 61 {
current.clone()
for (_label, percentile, value) in result {
if let Some(description) = metric.description() {
data.push(format!(
"# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}",
percentile,
));
} else {
current.wrapping_sub(previous).unwrap()
data.push(format!(
"# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value} {timestamp}",
percentile,
));
}
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
};

let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();

let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();

for (_label, percentile, value) in result {
if let Some(description) = metric.description() {
data.push(format!(
"# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value}",
percentile,
));
} else {
data.push(format!(
"# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value}",
percentile,
));
}
}
}
}
Expand All @@ -228,9 +216,6 @@ mod handlers {

let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
continue;
Expand Down Expand Up @@ -260,36 +245,20 @@ mod handlers {
continue;
};

if current.is_none() {
continue;
}

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
if snapshots.len() < 61 {
current.clone()
} else {
current.wrapping_sub(previous).unwrap()
}
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
};

let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();
if let Some(delta) = snapshots.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();
let result = delta.percentiles(&percentiles).unwrap();

let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();
let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();

for (label, _percentile, value) in result {
data.push(format!("\"{name}/{label}\": {value}",));
for (label, _percentile, value) in result {
data.push(format!("\"{name}/{label}\": {value}",));
}
}
}
}
Expand All @@ -315,9 +284,6 @@ mod handlers {

let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
continue;
Expand Down Expand Up @@ -347,36 +313,20 @@ mod handlers {
continue;
};

if current.is_none() {
continue;
}
if let Some(delta) = snapshots.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
if snapshots.len() < 61 {
current.clone()
} else {
current.wrapping_sub(previous).unwrap()
}
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
};
let result = delta.percentiles(&percentiles).unwrap();

let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();
let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();

let result = delta.percentiles(&percentiles).unwrap();

let result: Vec<(&'static str, f64, u64)> = PERCENTILES
.iter()
.zip(result.iter())
.map(|((label, percentile), (_, value))| (*label, *percentile, value.end()))
.collect();

for (label, _percentile, value) in result {
data.push(format!("\"{name}/{label}\": {value}",));
for (label, _percentile, value) in result {
data.push(format!("\"{name}/{label}\": {value}",));
}
}
}
}
Expand Down
112 changes: 80 additions & 32 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use metriken::{AtomicHistogram, Counter, Gauge};
use once_cell::sync::Lazy;
use ringlog::*;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Builder;
use tokio::sync::RwLock;
use tokio::time::sleep;
Expand All @@ -35,8 +35,81 @@ type HistogramSnapshots = HashMap<Histograms, metriken::histogram::Snapshot>;

static RUNNING: AtomicBool = AtomicBool::new(true);

static SNAPSHOTS: Lazy<Arc<RwLock<VecDeque<HistogramSnapshots>>>> =
Lazy::new(|| Arc::new(RwLock::new(VecDeque::new())));
pub struct Snapshots {
timestamp: SystemTime,
previous: HistogramSnapshots,
deltas: HistogramSnapshots,
}

impl Default for Snapshots {
fn default() -> Self {
Self::new()
}
}

impl Snapshots {
pub fn new() -> Self {
let timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<AtomicHistogram>() {
if let Ok(key) = Histograms::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
current.insert(key, snapshot);
}
}
}
}

let deltas = current.clone();

Self {
timestamp,
previous: current,
deltas,
}
}

pub fn update(&mut self) {
self.timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<AtomicHistogram>() {
if let Ok(key) = Histograms::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
if let Some(previous) = self.previous.get(&key) {
self.deltas
.insert(key, snapshot.wrapping_sub(previous).unwrap());
}

current.insert(key, snapshot);
}
}
}
}

self.previous = current;
}
}

static SNAPSHOTS: Lazy<Arc<RwLock<Snapshots>>> =
Lazy::new(|| Arc::new(RwLock::new(Snapshots::new())));

fn main() {
// custom panic hook to terminate whole process after unwinding
Expand Down Expand Up @@ -125,38 +198,13 @@ fn main() {
// spawn thread to maintain histogram snapshots
control_runtime.spawn(async {
while RUNNING.load(Ordering::Relaxed) {
// build a current snapshot for all histograms

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<AtomicHistogram>() {
if let Ok(key) = Histograms::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
current.insert(key, snapshot);
}
}
}
}

// acquire a lock and update the snapshots

let mut snapshots = SNAPSHOTS.write().await;

// we maintain 1 minute of history, which requires 61 secondly
// snapshots
if snapshots.len() == 61 {
let _ = snapshots.pop_front();
{
let mut snapshots = SNAPSHOTS.write().await;
snapshots.update();
}

snapshots.push_back(current);

// delay until next update
sleep(core::time::Duration::from_secs(1)).await;
}
});
Expand Down

0 comments on commit 38aa60f

Please sign in to comment.