Skip to content

Commit

Permalink
naming and structure
Browse files Browse the repository at this point in the history
  • Loading branch information
brayniac committed Oct 12, 2023
1 parent 38aa60f commit 3130e11
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 106 deletions.
20 changes: 10 additions & 10 deletions src/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,9 @@ mod handlers {
pub async fn prometheus_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let snapshots = SNAPSHOTS.read().await;
let metrics_state = METRICS_STATE.read().await;

let timestamp = snapshots
let timestamp = metrics_state
.timestamp
.duration_since(UNIX_EPOCH)
.unwrap()
Expand Down Expand Up @@ -162,13 +162,13 @@ mod handlers {
data.push(format!("# TYPE {name} gauge\n{name} {value}"));
}
} else if any.downcast_ref::<AtomicHistogram>().is_some() {
let key = if let Ok(h) = Histograms::try_from(metric.name()) {
let key = if let Ok(h) = HistogramMetric::try_from(metric.name()) {
h
} else {
continue;
};

if let Some(delta) = snapshots.deltas.get(&key) {
if let Some(delta) = metrics_state.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();
Expand Down Expand Up @@ -214,7 +214,7 @@ mod handlers {
pub async fn json_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let snapshots = SNAPSHOTS.read().await;
let metrics_state = METRICS_STATE.read().await;

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand All @@ -239,13 +239,13 @@ mod handlers {

data.push(format!("\"{name}\": {value}"));
} else if any.downcast_ref::<AtomicHistogram>().is_some() {
let key = if let Ok(h) = Histograms::try_from(metric.name()) {
let key = if let Ok(h) = HistogramMetric::try_from(metric.name()) {
h
} else {
continue;
};

if let Some(delta) = snapshots.deltas.get(&key) {
if let Some(delta) = metrics_state.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();
Expand Down Expand Up @@ -282,7 +282,7 @@ mod handlers {
pub async fn human_stats() -> Result<impl warp::Reply, Infallible> {
let mut data = Vec::new();

let snapshots = SNAPSHOTS.read().await;
let metrics_state = METRICS_STATE.read().await;

for metric in &metriken::metrics() {
if metric.name().starts_with("log_") {
Expand All @@ -307,13 +307,13 @@ mod handlers {

data.push(format!("\"{name}\": {value}"));
} else if any.downcast_ref::<AtomicHistogram>().is_some() {
let key = if let Ok(h) = Histograms::try_from(metric.name()) {
let key = if let Ok(h) = HistogramMetric::try_from(metric.name()) {
h
} else {
continue;
};

if let Some(delta) = snapshots.deltas.get(&key) {
if let Some(delta) = metrics_state.deltas.get(&key) {
let percentiles: Vec<f64> = PERCENTILES.iter().map(|p| p.1).collect();

let result = delta.percentiles(&percentiles).unwrap();
Expand Down
82 changes: 3 additions & 79 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use once_cell::sync::Lazy;
use ringlog::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::SystemTime;
use tokio::runtime::Builder;
use tokio::sync::RwLock;
use tokio::time::sleep;
Expand All @@ -31,85 +30,10 @@ use metrics::*;
type Instant = clocksource::Instant<clocksource::Nanoseconds<u64>>;
type UnixInstant = clocksource::UnixInstant<clocksource::Nanoseconds<u64>>;

type HistogramSnapshots = HashMap<Histograms, metriken::histogram::Snapshot>;

static RUNNING: AtomicBool = AtomicBool::new(true);

pub struct Snapshots {
timestamp: SystemTime,
previous: HistogramSnapshots,
deltas: HistogramSnapshots,
}

impl Default for Snapshots {
fn default() -> Self {
Self::new()
}
}

impl Snapshots {
pub fn new() -> Self {
let timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<AtomicHistogram>() {
if let Ok(key) = Histograms::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
current.insert(key, snapshot);
}
}
}
}

let deltas = current.clone();

Self {
timestamp,
previous: current,
deltas,
}
}

pub fn update(&mut self) {
self.timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<AtomicHistogram>() {
if let Ok(key) = Histograms::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
if let Some(previous) = self.previous.get(&key) {
self.deltas
.insert(key, snapshot.wrapping_sub(previous).unwrap());
}

current.insert(key, snapshot);
}
}
}
}

self.previous = current;
}
}

static SNAPSHOTS: Lazy<Arc<RwLock<Snapshots>>> =
Lazy::new(|| Arc::new(RwLock::new(Snapshots::new())));
static METRICS_STATE: Lazy<Arc<RwLock<HistogramMetricsSnapshot>>> =
Lazy::new(|| Arc::new(RwLock::new(Default::default())));

fn main() {
// custom panic hook to terminate whole process after unwinding
Expand Down Expand Up @@ -200,7 +124,7 @@ fn main() {
while RUNNING.load(Ordering::Relaxed) {
// acquire a lock and update the snapshots
{
let mut snapshots = SNAPSHOTS.write().await;
let mut snapshots = METRICS_STATE.write().await;
snapshots.update();
}

Expand Down
96 changes: 86 additions & 10 deletions src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
// for now, we use some of the metrics defined in the protocol crates

pub use protocol_memcache::*;

use ahash::HashMap;
use ahash::HashMapExt;
use metriken::Lazy;
use paste::paste;
use std::concat;
use std::time::SystemTime;

pub static PERCENTILES: &[(&str, f64)] = &[
("p25", 25.0),
Expand All @@ -16,10 +19,83 @@ pub static PERCENTILES: &[(&str, f64)] = &[
("p9999", 99.99),
];

pub struct HistogramMetricsSnapshot {
pub timestamp: SystemTime,
pub previous: HashMap<HistogramMetric, metriken::histogram::Snapshot>,
pub deltas: HashMap<HistogramMetric, metriken::histogram::Snapshot>,
}

impl Default for HistogramMetricsSnapshot {
fn default() -> Self {
Self::new()
}
}

impl HistogramMetricsSnapshot {
pub fn new() -> Self {
let timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<metriken::AtomicHistogram>() {
if let Ok(key) = HistogramMetric::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
current.insert(key, snapshot);
}
}
}
}

let deltas = current.clone();

Self {
timestamp,
previous: current,
deltas,
}
}

pub fn update(&mut self) {
self.timestamp = SystemTime::now();

let mut current = HashMap::new();

for metric in metriken::metrics().iter() {
let any = if let Some(any) = metric.as_any() {
any
} else {
continue;
};

if let Some(histogram) = any.downcast_ref::<metriken::AtomicHistogram>() {
if let Ok(key) = HistogramMetric::try_from(metric.name()) {
if let Some(snapshot) = histogram.snapshot() {
if let Some(previous) = self.previous.get(&key) {
self.deltas
.insert(key, snapshot.wrapping_sub(previous).unwrap());
}

current.insert(key, snapshot);
}
}
}
}

self.previous = current;
}
}

#[derive(Default)]
pub struct Snapshot {
pub struct MetricsSnapshot {
pub counters: HashMap<Counters, u64>,
pub histograms: HashMap<Histograms, histogram::Snapshot>,
pub histograms: HashMap<HistogramMetric, histogram::Snapshot>,
}

#[derive(Eq, Hash, PartialEq, Copy, Clone)]
Expand Down Expand Up @@ -76,7 +152,7 @@ impl Counters {
}
}

pub fn delta(&self, snapshot: &mut Snapshot) -> u64 {
pub fn delta(&self, snapshot: &mut MetricsSnapshot) -> u64 {
let curr = self.counter().value();
let prev = snapshot.counters.insert(*self, curr).unwrap_or(0);
curr.wrapping_sub(prev)
Expand All @@ -85,13 +161,13 @@ impl Counters {

#[allow(clippy::enum_variant_names)]
#[derive(Eq, Hash, PartialEq, Copy, Clone)]
pub enum Histograms {
pub enum HistogramMetric {
PubsubLatency,
PubsubPublishLatency,
ResponseLatency,
}

impl Histograms {
impl HistogramMetric {
pub fn histogram(&self) -> &metriken::AtomicHistogram {
match self {
Self::PubsubLatency => &PUBSUB_LATENCY,
Expand All @@ -100,7 +176,7 @@ impl Histograms {
}
}

pub fn delta(&self, snapshot: &mut Snapshot) -> Option<histogram::Snapshot> {
pub fn delta(&self, snapshot: &mut MetricsSnapshot) -> Option<histogram::Snapshot> {
if let Some(curr) = self.histogram().snapshot() {
match snapshot.histograms.insert(*self, curr.clone()) {
Some(prev) => Some(curr.wrapping_sub(&prev).unwrap()),
Expand All @@ -112,15 +188,15 @@ impl Histograms {
}
}

impl TryFrom<&str> for Histograms {
impl TryFrom<&str> for HistogramMetric {
type Error = ();
fn try_from(
other: &str,
) -> std::result::Result<Self, <Self as std::convert::TryFrom<&str>>::Error> {
match other {
"response_latency" => Ok(Histograms::ResponseLatency),
"pubsub_latency" => Ok(Histograms::PubsubLatency),
"pubsub_publish_latency" => Ok(Histograms::PubsubPublishLatency),
"response_latency" => Ok(Self::ResponseLatency),
"pubsub_latency" => Ok(Self::PubsubLatency),
"pubsub_publish_latency" => Ok(Self::PubsubPublishLatency),
_ => Err(()),
}
}
Expand Down
14 changes: 7 additions & 7 deletions src/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub fn log(config: &Config) {

let mut window_id = 0;

let mut snapshot = Snapshot::default();
let mut snapshot = MetricsSnapshot::default();

let mut prev = Instant::now();

Expand Down Expand Up @@ -59,7 +59,7 @@ pub fn log(config: &Config) {
}

/// Outputs client stats and returns the number of requests successfully sent
fn client_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 {
fn client_stats(snapshot: &mut MetricsSnapshot, elapsed: f64) -> u64 {
let connect_ok = Counters::ConnectOk.delta(snapshot);
let connect_ex = Counters::ConnectEx.delta(snapshot);
let connect_timeout = Counters::ConnectTimeout.delta(snapshot);
Expand All @@ -78,7 +78,7 @@ fn client_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 {

let connect_sr = 100.0 * connect_ok as f64 / connect_total as f64;

let response_latency = Histograms::ResponseLatency.delta(snapshot);
let response_latency = HistogramMetric::ResponseLatency.delta(snapshot);

output!(
"Client Connection: Open: {} Success Rate: {:.2} %",
Expand Down Expand Up @@ -151,14 +151,14 @@ fn client_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 {
}

/// Output pubsub metrics and return the number of successful publish operations
fn pubsub_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 {
fn pubsub_stats(snapshot: &mut MetricsSnapshot, elapsed: f64) -> u64 {
// publisher stats
let pubsub_tx_ex = Counters::PubsubTxEx.delta(snapshot);
let pubsub_tx_ok = Counters::PubsubTxOk.delta(snapshot);
let pubsub_tx_timeout = Counters::PubsubTxTimeout.delta(snapshot);
let pubsub_tx_total = Counters::PubsubTx.delta(snapshot);

let pubsub_publish_latency = Histograms::PubsubPublishLatency.delta(snapshot);
let pubsub_publish_latency = HistogramMetric::PubsubPublishLatency.delta(snapshot);

// subscriber stats
let pubsub_rx_ok = Counters::PubsubRxOk.delta(snapshot);
Expand All @@ -168,7 +168,7 @@ fn pubsub_stats(snapshot: &mut Snapshot, elapsed: f64) -> u64 {
let pubsub_rx_total = Counters::PubsubRx.delta(snapshot);

// end-to-end stats
let pubsub_latency = Histograms::PubsubLatency.delta(snapshot);
let pubsub_latency = HistogramMetric::PubsubLatency.delta(snapshot);

output!("Publishers: Current: {}", PUBSUB_PUBLISHER_CURR.value(),);

Expand Down Expand Up @@ -269,7 +269,7 @@ pub fn json(config: Config, ratelimit: Option<&Ratelimiter>) {

let mut window_id = 0;

let mut snapshot = Snapshot::default();
let mut snapshot = MetricsSnapshot::default();

while end > now {
std::thread::sleep(Duration::from_millis(1));
Expand Down

0 comments on commit 3130e11

Please sign in to comment.