From 1a13298e5f02f904d2fd583019fedfec2ee57219 Mon Sep 17 00:00:00 2001 From: Bob Aman Date: Fri, 9 Aug 2024 15:14:04 -0500 Subject: [PATCH 1/3] Describe some of the metrics --- crates/ext-processor/src/service.rs | 24 +++++++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/crates/ext-processor/src/service.rs b/crates/ext-processor/src/service.rs index f5d3abbe..45bdfc82 100644 --- a/crates/ext-processor/src/service.rs +++ b/crates/ext-processor/src/service.rs @@ -248,7 +248,15 @@ impl BulwarkProcessor { "combined_decision", "outcome" => "restricted", ); + metrics::describe_histogram!( + "combined_decision", + "Counters for each combined decision outcome." + ); metrics::register_histogram!("combined_decision_score"); + metrics::describe_histogram!( + "combined_decision_score", + "A histogram over the combined decision scores for all requests processed." + ); let redis_pool: Option> = if let Some(redis_addr) = config.state.redis_uri.as_ref() { @@ -879,7 +887,7 @@ impl ProcessorContext { ); metrics::histogram!( "combined_decision_score", - verdict.decision.pignistic().restrict + verdict.decision.pignistic().restrict, ); let mut decisions: Vec = Vec::with_capacity(self.plugin_instances.len()); @@ -913,12 +921,22 @@ impl ProcessorContext { decision.pignistic().restrict, "ref" => plugin_instance.plugin_reference(), ); + metrics::describe_histogram!( + "decision_score", + "A histogram over the individual plugin decision scores for all requests processed." + ); + // Measure the conflict between each individual decision and the combined decision metrics::histogram!( "decision_conflict", Decision::conflict(&[decision, verdict.decision]), "ref" => plugin_instance.plugin_reference(), ); + metrics::describe_histogram!( + "decision_conflict", + "A histogram over the individual plugin disagreement values for all requests processed." + ); + decisions.push(decision); } let request = self.request.clone(); @@ -949,6 +967,10 @@ impl ProcessorContext { // Measure total conflict in the combined decision metrics::histogram!("combined_conflict", Decision::conflict(&decisions)); + metrics::describe_histogram!( + "combined_conflict", + "A histogram over the combined disagreement values for all requests processed." + ); // Capturing stdio is always the last thing that happens and feedback should always be the second-to-last. self.capture_stdio().await; From 65c9c060966ba5fbad2644e2e331ec4bdef64f88 Mon Sep 17 00:00:00 2001 From: Bob Aman Date: Fri, 9 Aug 2024 15:14:20 -0500 Subject: [PATCH 2/3] Warn when exporting empty metrics --- src/admin.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/admin.rs b/src/admin.rs index e2cb9700..ebbd26c7 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -3,6 +3,7 @@ use super::*; use http::{HeaderMap, HeaderValue}; pub(super) use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle}; use std::fmt; +use tracing::warn; /// Axum state for the admin service. pub(super) struct AdminState { @@ -97,6 +98,9 @@ pub(super) async fn metrics_handler( let metrics = prometheus_handle.render(); // TODO: Add gzip compression support. body = metrics.into(); + if body.is_empty() { + warn!("exporting empty prometheus metrics"); + } } else { body = vec![]; } From 0df63d991e0cb349a366e9073de5e47b54effea5 Mon Sep 17 00:00:00 2001 From: Bob Aman Date: Sat, 10 Aug 2024 01:06:44 -0500 Subject: [PATCH 3/3] Prometheus metrics were broken because the crate got out of sync w/ parent dependency --- Cargo.lock | 46 ++++-------- Cargo.toml | 6 +- crates/ext-processor/src/service.rs | 106 ++++++++++++++++------------ crates/host/src/plugin.rs | 50 +++++++------ src/errors.rs | 4 +- src/main.rs | 2 +- 6 files changed, 109 insertions(+), 105 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bab8e167..56e150f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -410,7 +410,7 @@ dependencies = [ "deadpool-redis", "http 1.1.0", "hyper 1.3.1", - "metrics 0.21.1", + "metrics", "metrics-exporter-prometheus", "metrics-exporter-statsd", "quoted-string", @@ -482,7 +482,7 @@ dependencies = [ "futures", "http 1.1.0", "matchit 0.8.2", - "metrics 0.21.1", + "metrics", "prost", "prost-types", "redis", @@ -512,7 +512,7 @@ dependencies = [ "hex", "http 1.1.0", "http-body-util", - "metrics 0.21.1", + "metrics", "redis", "redis-test", "reqwest", @@ -580,9 +580,9 @@ checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" [[package]] name = "cadence" -version = "0.29.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39286bc075b023101dccdb79456a1334221c768b8faede0c2aff7ed29a9482d" +checksum = "2f338b979d9ebfff4bb9801ae8f3af0dc3615f7f1ca963f2e4782bcf9acb3753" dependencies = [ "crossbeam-channel", ] @@ -2036,17 +2036,6 @@ dependencies = [ "autocfg", ] -[[package]] -name = "metrics" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" -dependencies = [ - "ahash", - "metrics-macros", - "portable-atomic", -] - [[package]] name = "metrics" version = "0.23.0" @@ -2059,9 +2048,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.15.1" +version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf0af7a0d7ced10c0151f870e5e3f3f8bc9ffc5992d32873566ca1f9169ae776" +checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" dependencies = [ "base64 0.22.1", "http-body-util", @@ -2070,7 +2059,7 @@ dependencies = [ "hyper-util", "indexmap 2.2.6", "ipnet", - "metrics 0.23.0", + "metrics", "metrics-util", "quanta", "thiserror", @@ -2080,26 +2069,15 @@ dependencies = [ [[package]] name = "metrics-exporter-statsd" -version = "0.6.0" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e34a620eecf9e4321ebbef8f2f8e7cd22e098f11b65f2d987ce66faaa8918418" +checksum = "a0905009f54328c743a2046a86c88157621f473a2202cfa922cb716615c4b292" dependencies = [ "cadence", - "metrics 0.21.1", + "metrics", "thiserror", ] -[[package]] -name = "metrics-macros" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38b4faf00617defe497754acde3024865bc143d44a86799b24e191ecff91354f" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.68", -] - [[package]] name = "metrics-util" version = "0.17.0" @@ -2109,7 +2087,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.14.5", - "metrics 0.23.0", + "metrics", "num_cpus", "quanta", "sketches-ddsketch", diff --git a/Cargo.toml b/Cargo.toml index 02b16d3b..36f138ff 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,8 +34,8 @@ clap = { version = "4.4.3", features = ["derive"] } clap_complete = "4.5.2" color-eyre = "0.6.2" hyper = { version = "1.2.0", features = ["server"] } -metrics-exporter-prometheus = "0.15.0" -metrics-exporter-statsd = "0.6.0" +metrics-exporter-prometheus = "0.15.3" +metrics-exporter-statsd = "0.8.0" quoted-string = "0.6.1" tower = { version = "0.4.13", features = ["tokio", "tracing"] } tower-http = { version = "0.5.0", features = [ @@ -119,7 +119,7 @@ futures = "0.3" hex = "0.4.3" http = "1.0" matchit = "0.8.2" -metrics = "0.21.1" +metrics = "0.23.0" owo-colors = "3.5.0" redis = { version = "0.25", features = [ "tokio-comp", diff --git a/crates/ext-processor/src/service.rs b/crates/ext-processor/src/service.rs index 45bdfc82..7fb3e0a6 100644 --- a/crates/ext-processor/src/service.rs +++ b/crates/ext-processor/src/service.rs @@ -232,27 +232,31 @@ impl BulwarkProcessor { /// * `config` - The root of the Bulwark configuration structure to be used to initialize the service. pub async fn new(config: Config) -> Result { // Get all outcomes registered even if those outcomes don't happen immediately. - metrics::register_counter!( + metrics::counter!( "combined_decision", "outcome" => "trusted", - ); - metrics::register_counter!( + ) + .absolute(0); + metrics::counter!( "combined_decision", "outcome" => "accepted", - ); - metrics::register_counter!( + ) + .absolute(0); + metrics::counter!( "combined_decision", "outcome" => "suspected", - ); - metrics::register_counter!( + ) + .absolute(0); + metrics::counter!( "combined_decision", "outcome" => "restricted", - ); - metrics::describe_histogram!( + ) + .absolute(0); + metrics::describe_counter!( "combined_decision", "Counters for each combined decision outcome." ); - metrics::register_histogram!("combined_decision_score"); + metrics::histogram!("combined_decision_score").record(0.0); metrics::describe_histogram!( "combined_decision_score", "A histogram over the combined decision scores for all requests processed." @@ -352,16 +356,18 @@ impl BulwarkProcessor { let mut plugin_instance = plugin_instance.lock().await; let result = plugin_instance.handle_init().await; match result { - Ok(_) => metrics::increment_counter!( + Ok(_) => metrics::counter!( "plugin_wasm_on_init", "ref" => plugin_instance.plugin_reference(), "result" => "ok" - ), - Err(_) => metrics::increment_counter!( + ) + .increment(1), + Err(_) => metrics::counter!( "plugin_wasm_on_init", "ref" => plugin_instance.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } result } @@ -376,16 +382,18 @@ impl BulwarkProcessor { .handle_request_enrichment(request, labels) .await; match result { - Ok(_) => metrics::increment_counter!( + Ok(_) => metrics::counter!( "plugin_wasm_on_request", "ref" => plugin_instance.plugin_reference(), "result" => "ok" - ), - Err(_) => metrics::increment_counter!( + ) + .increment(1), + Err(_) => metrics::counter!( "plugin_wasm_on_request", "ref" => plugin_instance.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } result } @@ -400,16 +408,18 @@ impl BulwarkProcessor { .handle_request_decision(request, labels) .await; match result { - Ok(_) => metrics::increment_counter!( + Ok(_) => metrics::counter!( "plugin_wasm_on_request_decision", "ref" => plugin_instance.plugin_reference(), "result" => "ok" - ), - Err(_) => metrics::increment_counter!( + ) + .increment(1), + Err(_) => metrics::counter!( "plugin_wasm_on_request_decision", "ref" => plugin_instance.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } result } @@ -425,16 +435,18 @@ impl BulwarkProcessor { .handle_response_decision(request, response, labels) .await; match result { - Ok(_) => metrics::increment_counter!( + Ok(_) => metrics::counter!( "plugin_wasm_on_response_decision", "ref" => plugin_instance.plugin_reference(), "result" => "ok" - ), - Err(_) => metrics::increment_counter!( + ) + .increment(1), + Err(_) => metrics::counter!( "plugin_wasm_on_response_decision", "ref" => plugin_instance.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } result } @@ -451,16 +463,18 @@ impl BulwarkProcessor { .handle_decision_feedback(request, response, labels, verdict) .await; match result { - Ok(_) => metrics::increment_counter!( + Ok(_) => metrics::counter!( "plugin_wasm_on_decision_feedback", "ref" => plugin_instance.plugin_reference(), "result" => "ok" - ), - Err(_) => metrics::increment_counter!( + ) + .increment(1), + Err(_) => metrics::counter!( "plugin_wasm_on_decision_feedback", "ref" => plugin_instance.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } result } @@ -881,14 +895,13 @@ impl ProcessorContext { .verdict .as_ref() .expect("cannot execute feedback phase without verdict"); - metrics::increment_counter!( + metrics::counter!( "combined_decision", "outcome" => verdict.outcome.to_string(), - ); - metrics::histogram!( - "combined_decision_score", - verdict.decision.pignistic().restrict, - ); + ) + .increment(1); + metrics::histogram!("combined_decision_score",) + .record(verdict.decision.pignistic().restrict); let mut decisions: Vec = Vec::with_capacity(self.plugin_instances.len()); let mut feedback_phase_tasks = JoinSet::new(); @@ -918,9 +931,10 @@ impl ProcessorContext { }); metrics::histogram!( "decision_score", - decision.pignistic().restrict, + "ref" => plugin_instance.plugin_reference(), - ); + ) + .record(decision.pignistic().restrict); metrics::describe_histogram!( "decision_score", "A histogram over the individual plugin decision scores for all requests processed." @@ -929,9 +943,9 @@ impl ProcessorContext { // Measure the conflict between each individual decision and the combined decision metrics::histogram!( "decision_conflict", - Decision::conflict(&[decision, verdict.decision]), "ref" => plugin_instance.plugin_reference(), - ); + ) + .record(Decision::conflict(&[decision, verdict.decision])); metrics::describe_histogram!( "decision_conflict", "A histogram over the individual plugin disagreement values for all requests processed." @@ -966,7 +980,7 @@ impl ProcessorContext { join_all(feedback_phase_tasks, |_| {}).await; // Measure total conflict in the combined decision - metrics::histogram!("combined_conflict", Decision::conflict(&decisions)); + metrics::histogram!("combined_conflict").record(Decision::conflict(&decisions)); metrics::describe_histogram!( "combined_conflict", "A histogram over the combined disagreement values for all requests processed." @@ -1004,11 +1018,12 @@ impl ProcessorContext { .to_vec() .join(","), ); - metrics::increment_counter!( + metrics::counter!( "plugin_request_phase_decision", "outcome" => outcome.to_string(), "observe_only" => self.thresholds.observe_only.to_string(), - ); + ) + .increment(1); let mut restricted = false; let end_of_stream = self.request.body().is_empty(); @@ -1106,11 +1121,12 @@ impl ProcessorContext { .to_vec() .join(","), ); - metrics::increment_counter!( + metrics::counter!( "plugin_response_phase_decision", "outcome" => outcome.to_string(), "observe_only" => self.thresholds.observe_only.to_string(), - ); + ) + .increment(1); let response = self .response diff --git a/crates/host/src/plugin.rs b/crates/host/src/plugin.rs index dd85d4e9..dda5ac6d 100644 --- a/crates/host/src/plugin.rs +++ b/crates/host/src/plugin.rs @@ -436,14 +436,16 @@ impl PluginInstance { .call_handle_init(self.store.as_context_mut()) .await; match result { - Ok(Ok(_)) => metrics::increment_counter!( + Ok(Ok(_)) => metrics::counter!( "plugin_on_init", "ref" => self.plugin_reference(), "result" => "ok" - ), - Ok(Err(_)) | Err(_) => metrics::increment_counter!( + ) + .increment(1), + Ok(Err(_)) | Err(_) => metrics::counter!( "plugin_on_init", "ref" => self.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } // Initialization doesn't return anything unless there's an error @@ -483,14 +485,16 @@ impl PluginInstance { ) .await; match result { - Ok(Ok(_)) => metrics::increment_counter!( + Ok(Ok(_)) => metrics::counter!( "plugin_on_request", "ref" => self.plugin_reference(), "result" => "ok" - ), - Ok(Err(_)) | Err(_) => metrics::increment_counter!( + ) + .increment(1), + Ok(Err(_)) | Err(_) => metrics::counter!( "plugin_on_request", "ref" => self.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } let labels: HashMap = result??.into_iter().collect(); @@ -528,14 +532,16 @@ impl PluginInstance { ) .await; match result { - Ok(Ok(_)) => metrics::increment_counter!( + Ok(Ok(_)) => metrics::counter!( "plugin_on_request_decision", "ref" => self.plugin_reference(), "result" => "ok" - ), - Ok(Err(_)) | Err(_) => metrics::increment_counter!( + ) + .increment(1), + Ok(Err(_)) | Err(_) => metrics::counter!( "plugin_on_request_decision", "ref" => self.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } Ok(result??.into()) @@ -594,14 +600,16 @@ impl PluginInstance { ) .await; match result { - Ok(Ok(_)) => metrics::increment_counter!( + Ok(Ok(_)) => metrics::counter!( "plugin_on_request_body_decision", "ref" => self.plugin_reference(), "result" => "ok" - ), - Ok(Err(_)) | Err(_) => metrics::increment_counter!( + ) + .increment(1), + Ok(Err(_)) | Err(_) => metrics::counter!( "plugin_on_request_body_decision", "ref" => self.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } Ok(result??.into()) @@ -661,14 +669,16 @@ impl PluginInstance { ) .await; match result { - Ok(Ok(_)) => metrics::increment_counter!( + Ok(Ok(_)) => metrics::counter!( "plugin_on_decision_feedback", "ref" => self.plugin_reference(), "result" => "ok" - ), - Ok(Err(_)) | Err(_) => metrics::increment_counter!( + ) + .increment(1), + Ok(Err(_)) | Err(_) => metrics::counter!( "plugin_on_decision_feedback", "ref" => self.plugin_reference(), "result" => "error" - ), + ) + .increment(1), } // Decision feedback doesn't return anything unless there's an error diff --git a/src/errors.rs b/src/errors.rs index 1b8baff7..ef192f88 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -20,8 +20,8 @@ pub enum MetricsError { Prometheus(#[from] metrics_exporter_prometheus::BuildError), #[error("failed to install StatsD metrics exporter: {0}")] Statsd(#[from] metrics_exporter_statsd::StatsdError), - #[error("failed to install metrics exporter: {0}")] - Install(#[from] metrics::SetRecorderError), + #[error("failed to install StatsD metrics exporter: {0}")] + SetStatsd(#[from] metrics::SetRecorderError), } #[derive(thiserror::Error, Debug)] diff --git a/src/main.rs b/src/main.rs index d2770a5e..232acb85 100644 --- a/src/main.rs +++ b/src/main.rs @@ -183,7 +183,7 @@ async fn main() -> Result<(), Box> { .build(prefix) .map_err(MetricsError::from)?; - metrics::set_boxed_recorder(Box::new(recorder)).map_err(MetricsError::from)?; + metrics::set_global_recorder(recorder).map_err(MetricsError::from)?; } else { let thresholds = config_root.thresholds; prometheus_handle = Some(