diff --git a/Cargo.lock b/Cargo.lock index d33fc097..fe052830 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -93,6 +93,12 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "ascii" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d92bec98840b8f03a5ff5413de5293bfcd8bf96467cf5452609f939ec6f5de16" + [[package]] name = "async-channel" version = "1.9.0" @@ -332,6 +338,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "chunked_transfer" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cca491388666e04d7248af3f60f0c40cfb0991c72205595d7c396e3510207d1a" + [[package]] name = "clang-sys" version = "1.6.1" @@ -514,15 +526,6 @@ version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a26ae43d7bcc3b814de94796a5e736d4029efb0ee900c12e2d54c993ad1a1e07" -[[package]] -name = "encoding_rs" -version = "0.8.33" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7268b386296a025e474d5140678f75d6de9493ae55a5d709eeb9dd08149945e1" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "equivalent" version = "1.0.1" @@ -779,31 +782,6 @@ version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c6201b9ff9fd90a5a3bac2e56a830d0caa509576f0e503818ee82c181b3437a" -[[package]] -name = "headers" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" -dependencies = [ - "base64 0.13.1", - "bitflags 1.3.2", - "bytes 1.4.0", - "headers-core", - "http", - "httpdate", - "mime", - "sha1", -] - -[[package]] -name = "headers-core" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" -dependencies = [ - "http", -] - [[package]] name = "heatmap" version = "0.7.7" @@ -1237,16 +1215,6 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1354,24 +1322,6 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bf78b1242a953be96e01b5f8ed8ffdfc8055c0a2b779899b3835e5d27a69dced" -[[package]] -name = "multer" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" -dependencies = [ - "bytes 1.4.0", - "encoding_rs", - "futures-util", - "http", - "httparse", - "log", - "memchr", - "mime", - "spin 0.9.8", - "version_check", -] - [[package]] name = "multimap" version = "0.8.3" @@ -1879,7 +1829,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin 0.5.2", + "spin", "untrusted", "web-sys", "winapi 0.3.9", @@ -1935,10 +1885,10 @@ dependencies = [ "session", "sha2", "slab", + "tiny_http", "tokio", "tokio-boring", "toml 0.7.6", - "warp", "zipf", ] @@ -2029,12 +1979,6 @@ dependencies = [ "windows-sys", ] -[[package]] -name = "scoped-tls" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e1cf6437eb19a8f4a6cc0f7dca544973b0b78843adbfeb3683d1a94a0024a294" - [[package]] name = "scopeguard" version = "1.2.0" @@ -2114,18 +2058,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_urlencoded" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3491c14715ca2294c4d6a88f15e84739788c1d030eed8c110436aafdaa2f3fd" -dependencies = [ - "form_urlencoded", - "itoa", - "ryu", - "serde", -] - [[package]] name = "session" version = "0.3.1" @@ -2139,17 +2071,6 @@ dependencies = [ "protocol-common", ] -[[package]] -name = "sha1" -version = "0.10.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" -dependencies = [ - "cfg-if 1.0.0", - "cpufeatures", - "digest", -] - [[package]] name = "sha1_smol" version = "1.0.0" @@ -2247,12 +2168,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" -[[package]] -name = "spin" -version = "0.9.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" - [[package]] name = "storage-types" version = "0.3.1" @@ -2353,6 +2268,18 @@ dependencies = [ "time-core", ] +[[package]] +name = "tiny_http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "389915df6413a2e74fb181895f933386023c71110878cd0825588928e64cdc82" +dependencies = [ + "ascii", + "chunked_transfer", + "httpdate", + "log", +] + [[package]] name = "tinyvec" version = "1.6.0" @@ -2441,18 +2368,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-tungstenite" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54319c93411147bced34cb5609a80e0a8e44c5999c93903a81cd866630ec0bfd" -dependencies = [ - "futures-util", - "log", - "tokio", - "tungstenite", -] - [[package]] name = "tokio-util" version = "0.7.8" @@ -2623,7 +2538,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", - "log", "pin-project-lite", "tracing-attributes", "tracing-core", @@ -2665,40 +2579,12 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" -[[package]] -name = "tungstenite" -version = "0.18.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30ee6ab729cd4cf0fd55218530c4522ed30b7b6081752839b68fcec8d0960788" -dependencies = [ - "base64 0.13.1", - "byteorder", - "bytes 1.4.0", - "http", - "httparse", - "log", - "rand", - "sha1", - "thiserror", - "url", - "utf-8", -] - [[package]] name = "typenum" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" -[[package]] -name = "unicase" -version = "2.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7d2d4dafb69621809a81864c9c1b864479e1235c0dd4e199924b9742439ed89" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-bidi" version = "0.3.13" @@ -2737,12 +2623,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "utf-8" -version = "0.7.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" - [[package]] name = "utf8parse" version = "0.2.1" @@ -2764,37 +2644,6 @@ dependencies = [ "try-lock", ] -[[package]] -name = "warp" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba431ef570df1287f7f8b07e376491ad54f84d26ac473489427231e1718e1f69" -dependencies = [ - "bytes 1.4.0", - "futures-channel", - "futures-util", - "headers", - "http", - "hyper 0.14.27", - "log", - "mime", - "mime_guess", - "multer", - "percent-encoding", - "pin-project", - "rustls-pemfile", - "scoped-tls", - "serde", - "serde_json", - "serde_urlencoded", - "tokio", - "tokio-stream", - "tokio-tungstenite", - "tokio-util", - "tower-service", - "tracing", -] - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index 9ae0cf6c..4397761a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -57,7 +57,7 @@ tokio = { version = "1.32.0", features = ["full"] } tokio-boring = "2.1.5" toml = "0.7.6" zipf = "7.0.1" -warp = "0.3.5" +tiny_http = "0.12.0" [workspace] members = [ diff --git a/src/admin/mod.rs b/src/admin/mod.rs index 63dafec8..c9183f8d 100644 --- a/src/admin/mod.rs +++ b/src/admin/mod.rs @@ -2,11 +2,18 @@ use crate::*; use ratelimit::Ratelimiter; use std::net::ToSocketAddrs; use std::sync::Arc; +use std::time::SystemTime; +use tiny_http::Method; +use tiny_http::Response; /// The HTTP admin server. pub async fn http(config: Config, ratelimit: Option>) { - let admin = filters::admin(ratelimit); + let http_server = tokio::task::spawn_blocking(|| http_server(config, ratelimit)); + http_server.await.unwrap(); +} + +pub fn http_server(config: Config, ratelimit: Option>) { let addr = config .general() .admin() @@ -15,339 +22,360 @@ pub async fn http(config: Config, ratelimit: Option>) { .next() .expect("couldn't determine listen address"); - warp::serve(admin).run(addr).await; -} - -mod filters { - use super::*; - - /// The combined set of admin endpoint filters - pub fn admin( - ratelimit: Option>, - ) -> impl Filter + Clone { - prometheus_stats() - .or(human_stats()) - .or(json_stats()) - .or(update_ratelimit(ratelimit)) - } - - /// Serves Prometheus / OpenMetrics text format metrics. - /// - /// GET /metrics - pub fn prometheus_stats( - ) -> impl Filter + Clone { - warp::path!("metrics") - .and(warp::get()) - .and_then(handlers::prometheus_stats) - } + let mut previous: HashMap = HashMap::new(); + let mut current: HashMap = HashMap::new(); + let mut snapshot_at = SystemTime::now(); - /// Serves a human readable metrics output. - /// - /// GET /vars - pub fn human_stats( - ) -> impl Filter + Clone { - warp::path!("vars") - .and(warp::get()) - .and_then(handlers::human_stats) - } + let server = tiny_http::Server::http(addr).unwrap(); - /// Serves JSON metrics output that is compatible with Twitter Server / - /// Finagle metrics endpoints. Multiple paths are provided for enhanced - /// compatibility with metrics collectors. - /// - /// GET /metrics.json - /// GET /vars.json - /// GET /admin/metrics.json - pub fn json_stats( - ) -> impl Filter + Clone { - warp::path!("metrics.json") - .and(warp::get()) - .and_then(handlers::json_stats) - .or(warp::path!("vars.json") - .and(warp::get()) - .and_then(handlers::json_stats)) - .or(warp::path!("admin" / "metrics.json") - .and(warp::get()) - .and_then(handlers::json_stats)) - } + loop { + let now = SystemTime::now(); - // TODO(bmartin): we should probably pass the rate in the body - - /// An endpoint that allows realtime adjustment of the workload ratelimit. - /// - /// PUT /ratelimit/:rate - pub fn update_ratelimit( - ratelimit: Option>, - ) -> impl Filter + Clone { - warp::path!("ratelimit" / u64) - .and(warp::put()) - .and(with_ratelimit(ratelimit)) - .and_then(handlers::update_ratelimit) - } + if now >= snapshot_at { + previous = current.clone(); + for metric in metriken::metrics().iter() { + let any = if let Some(any) = metric.as_any() { + any + } else { + continue; + }; - fn with_ratelimit( - ratelimit: Option>, - ) -> impl Filter>,), Error = std::convert::Infallible> + Clone - { - warp::any().map(move || ratelimit.clone()) - } -} + if let Some(histogram) = any.downcast_ref::() { + if let Ok(key) = Histograms::try_from(metric.name()) { + if let Some(snapshot) = histogram.snapshot() { + current.insert(key, snapshot); + } + } + } + } -/// An enum to wrap metric readings for various metric types. -pub enum Metric<'a> { - Counter(&'a str, Option<&'a str>, u64), - Gauge(&'a str, Option<&'a str>, i64), - Percentiles(&'a str, Option<&'a str>, Vec<(&'a str, f64, Option)>), -} + snapshot_at = now + core::time::Duration::from_secs(1); + } -impl<'a> Metric<'a> { - /// Returns the name of the metric. - pub fn name(&self) -> &'a str { - match self { - Self::Counter(name, _description, _value) => name, - Self::Gauge(name, _description, _value) => name, - Self::Percentiles(name, _description, _percentiles) => name, + if let Some(request) = match server.try_recv() { + Ok(rq) => rq, + Err(e) => { + println!("error: {}", e); + break; + } + } { + let url = request.url(); + let parts: Vec<&str> = url.split('?').collect(); + let url = parts[0]; + + match request.method() { + Method::Get => match url { + "/metrics" => { + let _ = request + .respond(Response::from_string(prometheus_stats(&previous, ¤t))); + } + "/ratelimit" => { + let _ = request.respond(Response::empty(404)); + } + "/vars" => { + let _ = request + .respond(Response::from_string(human_stats(&previous, ¤t))); + } + "/vars.json" | "/admin/metrics.json" | "/metrics.json" => { + let _ = + request.respond(Response::from_string(json_stats(&previous, ¤t))); + } + _ => { + let _ = request.respond(Response::empty(404)); + } + }, + Method::Put => { + if url.starts_with("/ratelimit/") { + if let Some(ref r) = ratelimit { + let parts: Vec<&str> = url.split('/').collect(); + + if parts.len() != 3 { + let _ = request.respond(Response::empty(500)); + } else if let Ok(rate) = parts[2].parse::() { + let amount = (rate as f64 / 1_000_000.0).ceil() as u64; + + // even though we might not have nanosecond level clock resolution, + // by using a nanosecond level duration, we achieve more accurate + // ratelimits. + let interval = + Duration::from_nanos(1_000_000_000 / (rate / amount)); + + let capacity = std::cmp::max(100, amount); + + r.set_max_tokens(capacity) + .expect("failed to set max tokens"); + r.set_refill_interval(interval) + .expect("failed to set refill interval"); + r.set_refill_amount(amount) + .expect("failed to set refill amount"); + + let _ = request.respond(Response::empty(200)); + } else { + let _ = request.respond(Response::empty(500)); + } + } else { + let _ = request.respond(Response::empty(404)); + } + } else { + let _ = request.respond(Response::empty(404)); + } + } + _ => { + let _ = request.respond(Response::empty(500)); + } + } } } } -impl<'a> TryFrom<&'a metriken::MetricEntry> for Metric<'a> { - type Error = (); +/// Produces Prometheus / OpenMetrics text format metrics. All metrics have +/// type information, some have descriptions as well. Percentiles read from +/// heatmaps are exposed with a `percentile` label where the value +/// corresponds to the percentile in the range of 0.0 - 100.0. +/// +/// See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md +/// +/// ```text +/// # TYPE some_counter counter +/// # HELP some_counter An unsigned 64bit monotonic counter. +/// counter 0 +/// # TYPE some_gauge gauge +/// # HELP some_gauge A signed 64bit gauge. +/// some_gauge 0 +/// # TYPE some_distribution{percentile="50.0"} gauge +/// some_distribution{percentile="50.0"} 0 +/// ``` +pub fn prometheus_stats( + previous: &HashMap, + current: &HashMap, +) -> String { + let mut data = Vec::new(); + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } - fn try_from(metric: &'a metriken::MetricEntry) -> Result { let any = match metric.as_any() { Some(any) => any, None => { - return Err(()); + continue; } }; + let name = metric.name(); + if let Some(counter) = any.downcast_ref::() { - return Ok(Metric::Counter( - (*metric).name(), - metric.description(), - counter.value(), - )); + let value = counter.value(); + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} counter\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} counter\n{name} {value}")); + } } else if let Some(gauge) = any.downcast_ref::() { - return Ok(Metric::Gauge( - metric.name(), - metric.description(), - gauge.value(), - )); - } else if let Some(histogram) = any.downcast_ref::() { - if let Some(snapshot) = histogram.snapshot() { - let p: Vec = PERCENTILES.iter().map(|(_, p)| *p).collect(); - if let Ok(result) = snapshot.percentiles(&p) { - let percentiles = result - .iter() - .zip(PERCENTILES.iter()) - .map(|((p, b), (l, _))| (*l, *p, Some(b.end()))) - .collect(); - return Ok(Metric::Percentiles( - metric.name(), - metric.description(), - percentiles, + let value = gauge.value(); + + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name} {value}" + )); + } else { + data.push(format!("# TYPE {name} gauge\n{name} {value}")); + } + } else if any.downcast_ref::().is_some() { + let key = if let Ok(h) = Histograms::try_from(metric.name()) { + h + } else { + continue; + }; + + let delta = match (current.get(&key), previous.get(&key)) { + (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), + (Some(current), None) => current.clone(), + _ => { + continue; + } + }; + + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + + let result = delta.percentiles(&percentiles).unwrap(); + + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); + + for (_label, percentile, value) in result { + if let Some(description) = metric.description() { + data.push(format!( + "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value}", + percentile, + )); + } else { + data.push(format!( + "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value}", + percentile, )); } } } - - Err(()) } + + data.sort(); + let mut content = data.join("\n"); + content += "\n"; + let parts: Vec<&str> = content.split('/').collect(); + parts.join("_") } -mod handlers { - use super::*; - use core::convert::Infallible; - use warp::http::StatusCode; - - /// Serves Prometheus / OpenMetrics text format metrics. All metrics have - /// type information, some have descriptions as well. Percentiles read from - /// heatmaps are exposed with a `percentile` label where the value - /// corresponds to the percentile in the range of 0.0 - 100.0. - /// - /// See: https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md - /// - /// ```text - /// # TYPE some_counter counter - /// # HELP some_counter An unsigned 64bit monotonic counter. - /// counter 0 - /// # TYPE some_gauge gauge - /// # HELP some_gauge A signed 64bit gauge. - /// some_gauge 0 - /// # TYPE some_distribution{percentile="50.0"} gauge - /// some_distribution{percentile="50.0"} 0 - /// ``` - pub async fn prometheus_stats() -> Result { - let mut data = Vec::new(); - - for metric in metriken::metrics() - .iter() - .map(Metric::try_from) - .filter_map(|m| m.ok()) - { - if metric.name().starts_with("log_") { +/// Produces JSON formatted metrics following the conventions of Finagle / +/// TwitterServer. Percentiles read from heatmaps will have a percentile +/// label appended to the metric name in the form `/p999` which would be the +/// 99.9th percentile. +/// +/// ```text +/// {"get/ok": 0,"client/request/p999": 0, ... } +/// ``` +pub fn json_stats( + previous: &HashMap, + current: &HashMap, +) -> String { + let mut data = Vec::new(); + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } + + let any = match metric.as_any() { + Some(any) => any, + None => { continue; } + }; - match metric { - Metric::Counter(name, description, value) => { - if let Some(description) = description { - data.push(format!( - "# TYPE {name} counter\n# HELP {name} {description}\n{name} {value}" - )); - } else { - data.push(format!("# TYPE {name} counter\n{name} {value}")); - } - } - Metric::Gauge(name, description, value) => { - if let Some(description) = description { - data.push(format!( - "# TYPE {name} gauge\n# HELP {name} {description}\n{name} {value}" - )); - } else { - data.push(format!("# TYPE {name} gauge\n{name} {value}")); - } - } - Metric::Percentiles(name, description, percentiles) => { - for (_label, percentile, value) in percentiles { - if let Some(value) = value { - if let Some(description) = description { - data.push(format!( - "# TYPE {name} gauge\n# HELP {name} {description}\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } else { - data.push(format!( - "# TYPE {name} gauge\n{name}{{percentile=\"{:02}\"}} {value}", - percentile, - )); - } - } - } - } - } - } + let name = metric.name(); - data.sort(); - let mut content = data.join("\n"); - content += "\n"; - let parts: Vec<&str> = content.split('/').collect(); - Ok(parts.join("_")) - } + if let Some(counter) = any.downcast_ref::() { + let value = counter.value(); + + data.push(format!("\"{name}\": {value}")); + } else if let Some(gauge) = any.downcast_ref::() { + let value = gauge.value(); - /// Serves JSON formatted metrics following the conventions of Finagle / - /// TwitterServer. Percentiles read from heatmaps will have a percentile - /// label appended to the metric name in the form `/p999` which would be the - /// 99.9th percentile. - /// - /// ```text - /// {"get/ok": 0,"client/request/p999": 0, ... } - /// ``` - pub async fn json_stats() -> Result { - let mut data = Vec::new(); - - for metric in metriken::metrics() - .iter() - .map(Metric::try_from) - .filter_map(|m| m.ok()) - { - if metric.name().starts_with("log_") { + data.push(format!("\"{name}\": {value}")); + } else if any.downcast_ref::().is_some() { + let key = if let Ok(h) = Histograms::try_from(metric.name()) { + h + } else { continue; - } + }; - match metric { - Metric::Counter(name, _description, value) => { - data.push(format!("\"{name}\": {value}")); - } - Metric::Gauge(name, _description, value) => { - data.push(format!("\"{name}\": {value}")); - } - Metric::Percentiles(name, _description, percentiles) => { - for (label, _percentile, value) in percentiles { - if let Some(value) = value { - data.push(format!("\"{name}/{label}\": {value}",)); - } - } + let delta = match (current.get(&key), previous.get(&key)) { + (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), + (Some(current), None) => current.clone(), + _ => { + continue; } + }; + + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + + let result = delta.percentiles(&percentiles).unwrap(); + + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); + + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); } } + } - data.sort(); - let mut content = "{".to_string(); - content += &data.join(","); - content += "}"; + data.sort(); + let mut content = "{".to_string(); + content += &data.join(","); + content += "}"; - Ok(content) - } + content +} + +/// Produces human readable stats. One metric per line with a `LF` as the +/// newline character (Unix-style). Percentiles will have percentile labels +/// appened with a `/` as a separator. +/// +/// ``` +/// get/ok: 0 +/// client/request/latency/p50: 0, +/// ``` +pub fn human_stats( + previous: &HashMap, + current: &HashMap, +) -> String { + let mut data = Vec::new(); + + for metric in &metriken::metrics() { + if metric.name().starts_with("log_") { + continue; + } - /// Serves human readable stats. One metric per line with a `LF` as the - /// newline character (Unix-style). Percentiles will have percentile labels - /// appened with a `/` as a separator. - /// - /// ``` - /// get/ok: 0 - /// client/request/latency/p50: 0, - /// ``` - pub async fn human_stats() -> Result { - let mut data = Vec::new(); - - for metric in metriken::metrics() - .iter() - .map(Metric::try_from) - .filter_map(|m| m.ok()) - { - if metric.name().starts_with("log_") { + let any = match metric.as_any() { + Some(any) => any, + None => { continue; } + }; - match metric { - Metric::Counter(name, _description, value) => { - data.push(format!("{name}: {value}")); - } - Metric::Gauge(name, _description, value) => { - data.push(format!("{name}: {value}")); - } - Metric::Percentiles(name, _description, percentiles) => { - for (label, _percentile, value) in percentiles { - if let Some(value) = value { - data.push(format!("{name}/{label}: {value}",)); - } - } + let name = metric.name(); + + if let Some(counter) = any.downcast_ref::() { + let value = counter.value(); + + data.push(format!("\"{name}\": {value}")); + } else if let Some(gauge) = any.downcast_ref::() { + let value = gauge.value(); + + data.push(format!("\"{name}\": {value}")); + } else if any.downcast_ref::().is_some() { + let key = if let Ok(h) = Histograms::try_from(metric.name()) { + h + } else { + continue; + }; + + let delta = match (current.get(&key), previous.get(&key)) { + (Some(current), Some(previous)) => current.wrapping_sub(previous).unwrap(), + (Some(current), None) => current.clone(), + _ => { + continue; } - } - } + }; - data.sort(); - let mut content = data.join("\n"); - content += "\n"; - Ok(content) - } + let percentiles: Vec = PERCENTILES.iter().map(|p| p.1).collect(); + + let result = delta.percentiles(&percentiles).unwrap(); - pub async fn update_ratelimit( - rate: u64, - ratelimit: Option>, - ) -> Result { - if let Some(r) = ratelimit { - let amount = (rate as f64 / 1_000_000.0).ceil() as u64; - - // even though we might not have nanosecond level clock resolution, - // by using a nanosecond level duration, we achieve more accurate - // ratelimits. - let interval = Duration::from_nanos(1_000_000_000 / (rate / amount)); - - let capacity = std::cmp::max(100, amount); - - r.set_max_tokens(capacity) - .expect("failed to set max tokens"); - r.set_refill_interval(interval) - .expect("failed to set refill interval"); - r.set_refill_amount(amount) - .expect("failed to set refill amount"); - - Ok(StatusCode::OK) - } else { - Ok(StatusCode::NOT_FOUND) + let result: Vec<(&'static str, f64, u64)> = PERCENTILES + .iter() + .zip(result.iter()) + .map(|((label, percentile), (_, value))| (*label, *percentile, value.end())) + .collect(); + + for (label, _percentile, value) in result { + data.push(format!("\"{name}/{label}\": {value}",)); + } } } + + data.sort(); + let mut content = data.join("\n"); + content += "\n"; + content } diff --git a/src/main.rs b/src/main.rs index 6dad1f9b..a8c73bc1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,7 +11,7 @@ use ringlog::*; use std::collections::HashMap; use tokio::runtime::Builder; use tokio::time::sleep; -use warp::Filter; +// use warp::Filter; mod admin; mod clients; diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs index bcc4849a..602bfaab 100644 --- a/src/metrics/mod.rs +++ b/src/metrics/mod.rs @@ -111,6 +111,20 @@ impl Histograms { } } +impl TryFrom<&str> for Histograms { + type Error = (); + fn try_from( + other: &str, + ) -> std::result::Result>::Error> { + match other { + "response_latency" => Ok(Histograms::ResponseLatency), + "pubsub_latency" => Ok(Histograms::PubsubLatency), + "pubsub_publish_latency" => Ok(Histograms::PubsubPublishLatency), + _ => Err(()), + } + } +} + #[macro_export] #[rustfmt::skip] macro_rules! counter {