Skip to content

Commit

Permalink
feat: add open SSE streams metric
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Aug 24, 2024
1 parent cf3071e commit 7358989
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
14 changes: 14 additions & 0 deletions boltzr/src/api/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ use std::convert::Infallible;
use std::sync::Arc;
use tracing::{error, trace};

struct SseGuard;

impl Drop for SseGuard {
fn drop(&mut self) {
#[cfg(feature = "metrics")]
metrics::gauge!(crate::metrics::SSE_OPEN_COUNT).decrement(1);
}
}

#[derive(Deserialize, Debug)]
pub struct IdParams {
pub id: String,
Expand All @@ -23,13 +32,18 @@ where
{
trace!("New SSE status stream for swap: {}", params.id);

#[cfg(feature = "metrics")]
metrics::gauge!(crate::metrics::SSE_OPEN_COUNT).increment(1);

let mut rx = state.swap_status_update_tx.subscribe();
state
.swap_infos
.fetch_status_info(&[params.id.clone()])
.await;

Sse::new(try_stream! {
let _guard = SseGuard;

loop {
match rx.recv().await {
Ok(events) => {
Expand Down
1 change: 1 addition & 0 deletions boltzr/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod server;

pub const SSE_OPEN_COUNT: &str = "sse_open_count";
pub const GRPC_REQUEST_COUNT: &str = "grpc_request_count";
pub const WEBHOOK_CALL_COUNT: &str = "webhook_call_count";
pub const WEBSOCKET_OPEN_COUNT: &str = "websocket_open_count";
6 changes: 6 additions & 0 deletions boltzr/src/metrics/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ impl Server {
"number of open WebSockets"
);

describe_gauge!(
crate::metrics::SSE_OPEN_COUNT,
Unit::Count,
"number of open SSE streams",
);

handle
}
}
Expand Down
18 changes: 13 additions & 5 deletions boltzr/src/ws/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ pub trait SwapInfos {
async fn fetch_status_info(&self, ids: &[String]);
}

struct WsConnectionGuard;

impl Drop for WsConnectionGuard {
fn drop(&mut self) {
trace!("Closing socket");

#[cfg(feature = "metrics")]
metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).decrement(1);
}
}

#[derive(Serialize, Deserialize, PartialEq, Debug)]
struct ErrorResponse {
error: String,
Expand Down Expand Up @@ -140,6 +151,8 @@ where
#[cfg(feature = "metrics")]
metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).increment(1);

let _guard = WsConnectionGuard;

let mut interval = tokio::time::interval(Duration::from_millis(PING_INTERVAL_MS));
let (mut ws_sender, mut ws_receiver) = ws_stream.split();

Expand Down Expand Up @@ -245,11 +258,6 @@ where
},
}
}

#[cfg(feature = "metrics")]
metrics::gauge!(crate::metrics::WEBSOCKET_OPEN_COUNT).decrement(1);

trace!("Closing socket");
}

async fn handle_message(
Expand Down

0 comments on commit 7358989

Please sign in to comment.