Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Oct 11, 2023
1 parent 6abce4b commit c18a3a2
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 51 deletions.
25 changes: 0 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 40 additions & 17 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::Arc;

/// The HTTP admin server.
pub async fn http(config: Config, ratelimit: Option<Arc<Ratelimiter>>) {

let admin = filters::admin(ratelimit);

let addr = config
Expand All @@ -20,8 +19,8 @@ pub async fn http(config: Config, ratelimit: Option<Arc<Ratelimiter>>) {
}

mod filters {
use warp::Filter;
use super::*;
use warp::Filter;

/// The combined set of admin endpoint filters
pub fn admin(
Expand Down Expand Up @@ -120,8 +119,10 @@ mod handlers {
pub async fn prometheus_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let current = CURRENT.read().await;
let previous = PREVIOUS.read().await;
let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand Down Expand Up @@ -163,9 +164,15 @@ mod handlers {
continue;
};

let delta = match (current.get(&key), previous.get(&key)) {
(Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(),
(Some(current), None) => current.clone(),
if current.is_none() {
continue;
}

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
current.wrapping_sub(previous).unwrap()
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
Expand Down Expand Up @@ -215,8 +222,10 @@ mod handlers {
pub async fn json_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let current = CURRENT.read().await;
let previous = PREVIOUS.read().await;
let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand Down Expand Up @@ -247,9 +256,15 @@ mod handlers {
continue;
};

let delta = match (current.get(&key), previous.get(&key)) {
(Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(),
(Some(current), None) => current.clone(),
if current.is_none() {
continue;
}

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
current.wrapping_sub(previous).unwrap()
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
Expand Down Expand Up @@ -290,8 +305,10 @@ mod handlers {
pub async fn human_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let current = CURRENT.read().await;
let previous = PREVIOUS.read().await;
let snapshots = SNAPSHOTS.read().await;

let previous = snapshots.front();
let current = snapshots.back();

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand Down Expand Up @@ -322,9 +339,15 @@ mod handlers {
continue;
};

let delta = match (current.get(&key), previous.get(&key)) {
(Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(),
(Some(current), None) => current.clone(),
if current.is_none() {
continue;
}

let delta = match (current.unwrap().get(&key), previous.map(|p| p.get(&key))) {
(Some(current), Some(Some(previous))) => {
current.wrapping_sub(previous).unwrap()
}
(Some(current), Some(None)) | (Some(current), None) => current.clone(),
_ => {
continue;
}
Expand Down
26 changes: 17 additions & 9 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
use tokio::sync::RwLock;
use std::time::SystemTime;
use std::sync::Arc;
use crate::clients::launch_clients;
use crate::pubsub::launch_pubsub;
use crate::workload::{launch_workload, Generator};
Expand All @@ -12,7 +9,10 @@ use core::time::Duration;
use metriken::{AtomicHistogram, Counter, Gauge};
use ringlog::*;
use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::Arc;
use tokio::runtime::Builder;
use tokio::sync::RwLock;
use tokio::time::sleep;

mod admin;
Expand All @@ -32,8 +32,8 @@ type UnixInstant = clocksource::UnixInstant<clocksource::Nanoseconds<u64>>;

static RUNNING: AtomicBool = AtomicBool::new(true);

static CURRENT: Arc<RwLock<HashMap<Histograms, metriken::histogram::Snapshot>>> = Arc::new(RwLock::new(HashMap::new()));
static PREVIOUS: Arc<RwLock<HashMap<Histograms, metriken::histogram::Snapshot>>> = Arc::new(RwLock::new(HashMap::new()));
static SNAPSHOTS: Arc<RwLock<VecDeque<HashMap<Histograms, metriken::histogram::Snapshot>>>> =
Arc::new(RwLock::new(VecDeque::new()));

fn main() {
// custom panic hook to terminate whole process after unwinding
Expand Down Expand Up @@ -122,12 +122,10 @@ fn main() {
// spawn thread to maintain histogram snapshots
control_runtime.spawn(async {
while RUNNING.load(Ordering::Relaxed) {
// build a current snapshot for all histograms

let current = CURRENT.write().await;
let previous = PREVIOUS.write().await;
let mut current = HashMap::new();

*previous = current.clone();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
Expand All @@ -144,6 +142,16 @@ fn main() {
}
}

// acquire a lock and update the snapshots

let mut snapshots = SNAPSHOTS.write().await;

if snapshots.len() == 60 {
let _ = snapshots.pop_front();
}

snapshots.push_back(current);

sleep(core::time::Duration::from_secs(1)).await;
}
});
Expand Down

0 comments on commit c18a3a2

Please sign in to comment.