diff --git a/boltzr/src/api/sse.rs b/boltzr/src/api/sse.rs index e71c4a67..8482942d 100644 --- a/boltzr/src/api/sse.rs +++ b/boltzr/src/api/sse.rs @@ -9,6 +9,15 @@ use std::convert::Infallible; use std::sync::Arc; use tracing::{error, trace}; +struct SseGuard; + +impl Drop for SseGuard { + fn drop(&mut self) { + #[cfg(feature = "metrics")] + metrics::gauge!(crate::metrics::SSE_OPEN_COUNT).decrement(1); + } +} + #[derive(Deserialize, Debug)] pub struct IdParams { pub id: String, @@ -23,6 +32,9 @@ where { trace!("New SSE status stream for swap: {}", params.id); + #[cfg(feature = "metrics")] + metrics::gauge!(crate::metrics::SSE_OPEN_COUNT).increment(1); + let mut rx = state.swap_status_update_tx.subscribe(); state .swap_infos @@ -30,6 +42,8 @@ where .await; Sse::new(try_stream! { + let _guard = SseGuard; + loop { match rx.recv().await { Ok(events) => { diff --git a/boltzr/src/metrics/mod.rs b/boltzr/src/metrics/mod.rs index 3fbce4a6..5fef275a 100644 --- a/boltzr/src/metrics/mod.rs +++ b/boltzr/src/metrics/mod.rs @@ -1,5 +1,6 @@ pub mod server; +pub const SSE_OPEN_COUNT: &str = "sse_open_count"; pub const GRPC_REQUEST_COUNT: &str = "grpc_request_count"; pub const WEBHOOK_CALL_COUNT: &str = "webhook_call_count"; pub const WEBSOCKET_OPEN_COUNT: &str = "websocket_open_count"; diff --git a/boltzr/src/metrics/server.rs b/boltzr/src/metrics/server.rs index 55b37edd..98b92e5d 100644 --- a/boltzr/src/metrics/server.rs +++ b/boltzr/src/metrics/server.rs @@ -139,6 +139,12 @@ impl Server { "number of open WebSockets" ); + describe_gauge!( + crate::metrics::SSE_OPEN_COUNT, + Unit::Count, + "number of open SSE streams", + ); + handle } } diff --git a/boltzr/src/ws/status.rs b/boltzr/src/ws/status.rs index f16b1f25..10e2e885 100644 --- a/boltzr/src/ws/status.rs +++ b/boltzr/src/ws/status.rs @@ -20,6 +20,17 @@ pub trait SwapInfos { async fn fetch_status_info(&self, ids: &[String]); } +struct WsConnectionGuard; + +impl Drop for WsConnectionGuard { + fn drop(&mut self) { + trace!("Closing socket"); + + #[cfg(feature = "metrics")] + metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).decrement(1); + } +} + #[derive(Serialize, Deserialize, PartialEq, Debug)] struct ErrorResponse { error: String, @@ -140,6 +151,8 @@ where #[cfg(feature = "metrics")] metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).increment(1); + let _guard = WsConnectionGuard; + let mut interval = tokio::time::interval(Duration::from_millis(PING_INTERVAL_MS)); let (mut ws_sender, mut ws_receiver) = ws_stream.split(); @@ -245,11 +258,6 @@ where }, } } - - #[cfg(feature = "metrics")] - metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).decrement(1); - - trace!("Closing socket"); } async fn handle_message(