Skip to content

Commit

Permalink
Merge pull request #401 from bulwark-security/fix-prometheus-metrics
Browse files Browse the repository at this point in the history
Fix prometheus metrics
  • Loading branch information
sporkmonger authored Aug 10, 2024
2 parents 6d8b9e8 + 0df63d9 commit d2688aa
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 101 deletions.
46 changes: 12 additions & 34 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ clap = { version = "4.4.3", features = ["derive"] }
clap_complete = "4.5.2"
color-eyre = "0.6.2"
hyper = { version = "1.2.0", features = ["server"] }
metrics-exporter-prometheus = "0.15.0"
metrics-exporter-statsd = "0.6.0"
metrics-exporter-prometheus = "0.15.3"
metrics-exporter-statsd = "0.8.0"
quoted-string = "0.6.1"
tower = { version = "0.4.13", features = ["tokio", "tracing"] }
tower-http = { version = "0.5.0", features = [
Expand Down Expand Up @@ -119,7 +119,7 @@ futures = "0.3"
hex = "0.4.3"
http = "1.0"
matchit = "0.8.2"
metrics = "0.21.1"
metrics = "0.23.0"
owo-colors = "3.5.0"
redis = { version = "0.25", features = [
"tokio-comp",
Expand Down
120 changes: 79 additions & 41 deletions crates/ext-processor/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,23 +232,35 @@ impl BulwarkProcessor {
/// * `config` - The root of the Bulwark configuration structure to be used to initialize the service.
pub async fn new(config: Config) -> Result<Self, PluginLoadError> {
// Get all outcomes registered even if those outcomes don't happen immediately.
metrics::register_counter!(
metrics::counter!(
"combined_decision",
"outcome" => "trusted",
);
metrics::register_counter!(
)
.absolute(0);
metrics::counter!(
"combined_decision",
"outcome" => "accepted",
);
metrics::register_counter!(
)
.absolute(0);
metrics::counter!(
"combined_decision",
"outcome" => "suspected",
);
metrics::register_counter!(
)
.absolute(0);
metrics::counter!(
"combined_decision",
"outcome" => "restricted",
)
.absolute(0);
metrics::describe_counter!(
"combined_decision",
"Counters for each combined decision outcome."
);
metrics::histogram!("combined_decision_score").record(0.0);
metrics::describe_histogram!(
"combined_decision_score",
"A histogram over the combined decision scores for all requests processed."
);
metrics::register_histogram!("combined_decision_score");

let redis_pool: Option<Arc<deadpool_redis::Pool>> =
if let Some(redis_addr) = config.state.redis_uri.as_ref() {
Expand Down Expand Up @@ -344,16 +356,18 @@ impl BulwarkProcessor {
let mut plugin_instance = plugin_instance.lock().await;
let result = plugin_instance.handle_init().await;
match result {
Ok(_) => metrics::increment_counter!(
Ok(_) => metrics::counter!(
"plugin_wasm_on_init",
"ref" => plugin_instance.plugin_reference(),
"result" => "ok"
),
Err(_) => metrics::increment_counter!(
)
.increment(1),
Err(_) => metrics::counter!(
"plugin_wasm_on_init",
"ref" => plugin_instance.plugin_reference(),
"result" => "error"
),
)
.increment(1),
}
result
}
Expand All @@ -368,16 +382,18 @@ impl BulwarkProcessor {
.handle_request_enrichment(request, labels)
.await;
match result {
Ok(_) => metrics::increment_counter!(
Ok(_) => metrics::counter!(
"plugin_wasm_on_request",
"ref" => plugin_instance.plugin_reference(),
"result" => "ok"
),
Err(_) => metrics::increment_counter!(
)
.increment(1),
Err(_) => metrics::counter!(
"plugin_wasm_on_request",
"ref" => plugin_instance.plugin_reference(),
"result" => "error"
),
)
.increment(1),
}
result
}
Expand All @@ -392,16 +408,18 @@ impl BulwarkProcessor {
.handle_request_decision(request, labels)
.await;
match result {
Ok(_) => metrics::increment_counter!(
Ok(_) => metrics::counter!(
"plugin_wasm_on_request_decision",
"ref" => plugin_instance.plugin_reference(),
"result" => "ok"
),
Err(_) => metrics::increment_counter!(
)
.increment(1),
Err(_) => metrics::counter!(
"plugin_wasm_on_request_decision",
"ref" => plugin_instance.plugin_reference(),
"result" => "error"
),
)
.increment(1),
}
result
}
Expand All @@ -417,16 +435,18 @@ impl BulwarkProcessor {
.handle_response_decision(request, response, labels)
.await;
match result {
Ok(_) => metrics::increment_counter!(
Ok(_) => metrics::counter!(
"plugin_wasm_on_response_decision",
"ref" => plugin_instance.plugin_reference(),
"result" => "ok"
),
Err(_) => metrics::increment_counter!(
)
.increment(1),
Err(_) => metrics::counter!(
"plugin_wasm_on_response_decision",
"ref" => plugin_instance.plugin_reference(),
"result" => "error"
),
)
.increment(1),
}
result
}
Expand All @@ -443,16 +463,18 @@ impl BulwarkProcessor {
.handle_decision_feedback(request, response, labels, verdict)
.await;
match result {
Ok(_) => metrics::increment_counter!(
Ok(_) => metrics::counter!(
"plugin_wasm_on_decision_feedback",
"ref" => plugin_instance.plugin_reference(),
"result" => "ok"
),
Err(_) => metrics::increment_counter!(
)
.increment(1),
Err(_) => metrics::counter!(
"plugin_wasm_on_decision_feedback",
"ref" => plugin_instance.plugin_reference(),
"result" => "error"
),
)
.increment(1),
}
result
}
Expand Down Expand Up @@ -873,14 +895,13 @@ impl ProcessorContext {
.verdict
.as_ref()
.expect("cannot execute feedback phase without verdict");
metrics::increment_counter!(
metrics::counter!(
"combined_decision",
"outcome" => verdict.outcome.to_string(),
);
metrics::histogram!(
"combined_decision_score",
verdict.decision.pignistic().restrict
);
)
.increment(1);
metrics::histogram!("combined_decision_score",)
.record(verdict.decision.pignistic().restrict);

let mut decisions: Vec<Decision> = Vec::with_capacity(self.plugin_instances.len());
let mut feedback_phase_tasks = JoinSet::new();
Expand Down Expand Up @@ -910,15 +931,26 @@ impl ProcessorContext {
});
metrics::histogram!(
"decision_score",
decision.pignistic().restrict,

"ref" => plugin_instance.plugin_reference(),
)
.record(decision.pignistic().restrict);
metrics::describe_histogram!(
"decision_score",
"A histogram over the individual plugin decision scores for all requests processed."
);

// Measure the conflict between each individual decision and the combined decision
metrics::histogram!(
"decision_conflict",
Decision::conflict(&[decision, verdict.decision]),
"ref" => plugin_instance.plugin_reference(),
)
.record(Decision::conflict(&[decision, verdict.decision]));
metrics::describe_histogram!(
"decision_conflict",
"A histogram over the individual plugin disagreement values for all requests processed."
);

decisions.push(decision);
}
let request = self.request.clone();
Expand Down Expand Up @@ -948,7 +980,11 @@ impl ProcessorContext {
join_all(feedback_phase_tasks, |_| {}).await;

// Measure total conflict in the combined decision
metrics::histogram!("combined_conflict", Decision::conflict(&decisions));
metrics::histogram!("combined_conflict").record(Decision::conflict(&decisions));
metrics::describe_histogram!(
"combined_conflict",
"A histogram over the combined disagreement values for all requests processed."
);

// Capturing stdio is always the last thing that happens and feedback should always be the second-to-last.
self.capture_stdio().await;
Expand Down Expand Up @@ -982,11 +1018,12 @@ impl ProcessorContext {
.to_vec()
.join(","),
);
metrics::increment_counter!(
metrics::counter!(
"plugin_request_phase_decision",
"outcome" => outcome.to_string(),
"observe_only" => self.thresholds.observe_only.to_string(),
);
)
.increment(1);

let mut restricted = false;
let end_of_stream = self.request.body().is_empty();
Expand Down Expand Up @@ -1084,11 +1121,12 @@ impl ProcessorContext {
.to_vec()
.join(","),
);
metrics::increment_counter!(
metrics::counter!(
"plugin_response_phase_decision",
"outcome" => outcome.to_string(),
"observe_only" => self.thresholds.observe_only.to_string(),
);
)
.increment(1);

let response = self
.response
Expand Down
Loading

0 comments on commit d2688aa

Please sign in to comment.