Skip to content

Commit

Permalink
fix(hermes): handle ws notifier shut down properly
Browse files Browse the repository at this point in the history
  • Loading branch information
ali-bahjati committed Oct 19, 2023
1 parent cefdd84 commit 29dc108
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 40 deletions.
95 changes: 56 additions & 39 deletions hermes/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ use {
atomic::Ordering,
Arc,
},
tokio::{
signal,
sync::mpsc::Receiver,
},
tokio::sync::mpsc::Receiver,
tower_http::cors::CorsLayer,
utoipa::OpenApi,
utoipa_swagger_ui::SwaggerUi,
Expand Down Expand Up @@ -58,16 +55,61 @@ impl ApiState {
}
}

/// This method provides a background service that responds to REST requests
///
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
/// packages they are based on (tokio & hyper).
#[tracing::instrument(skip(opts, state, update_rx))]
pub async fn run(
pub async fn spawn(
opts: RunOptions,
state: Arc<State>,
mut update_rx: Receiver<AggregationEvent>,
) -> Result<()> {
let state = {
let opts = opts.clone();
ApiState::new(
state,
opts.rpc.ws_whitelist,
opts.rpc.requester_ip_header_name,
)
};

let rpc_server = tokio::spawn(run(opts, state.clone()));

let ws_notifier = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));

while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
tokio::select! {
update = update_rx.recv() => {
match update {
None => {
// When the received message is None it means the channel has been closed. This
// should never happen as the channel is never closed. As we can't recover from
// this we shut down the application.
tracing::error!("Failed to receive update from store.");
crate::SHOULD_EXIT.store(true, Ordering::Release);
break;
}
Some(event) => {
notify_updates(state.ws.clone(), event).await;
},
}
},
_ = interval.tick() => {}
}
}

tracing::info!("Shutting down Websocket notifier...")
});


let _ = tokio::join!(ws_notifier, rpc_server);
Ok(())
}

/// This method provides a background service that responds to REST requests
///
/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the
/// packages they are based on (tokio & hyper).
#[tracing::instrument(skip(opts, state))]
pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> {
tracing::info!(endpoint = %opts.rpc.listen_addr, "Starting RPC Server.");

#[derive(OpenApi)]
Expand Down Expand Up @@ -98,12 +140,6 @@ pub async fn run(
)]
struct ApiDoc;

let state = ApiState::new(
state,
opts.rpc.ws_whitelist,
opts.rpc.requester_ip_header_name,
);

// Initialize Axum Router. Note the type here is a `Router<State>` due to the use of the
// `with_state` method which replaces `Body` with `State` in the type signature.
let app = Router::new();
Expand Down Expand Up @@ -131,35 +167,16 @@ pub async fn run(
// default value for this parameter).
.layer(Extension(QsQueryConfig::new(5, false)));

tokio::spawn(async move {
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
match update_rx.recv().await {
None => {
// When the received message is None it means the channel has been closed. This
// should never happen as the channel is never closed. As we can't recover from
// this we shut down the application.
tracing::error!("Failed to receive update from store.");
crate::SHOULD_EXIT.store(true, Ordering::Release);
break;
}
Some(event) => {
notify_updates(state.ws.clone(), event).await;
}
}
}

tracing::info!("Shutting down websocket updates...")
});

// Binds the axum's server to the configured address and port. This is a blocking call and will
// not return until the server is shutdown.
axum::Server::try_bind(&opts.rpc.listen_addr)?
.serve(app.into_make_service())
.with_graceful_shutdown(async {
// Ignore Ctrl+C errors, either way we need to shut down. The main Ctrl+C handler
// should also have triggered so we will let that one print the shutdown warning.
let _ = signal::ctrl_c().await;
crate::SHOULD_EXIT.store(true, Ordering::Release);
while !crate::SHOULD_EXIT.load(Ordering::Acquire) {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}

tracing::info!("Shutting down RPC server...");
})
.await?;

Expand Down
10 changes: 10 additions & 0 deletions hermes/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ pub struct Subscriber {
sender: SplitSink<WebSocket, Message>,
price_feeds_with_config: HashMap<PriceIdentifier, PriceFeedClientConfig>,
ping_interval: tokio::time::Interval,
exit_check_interval: tokio::time::Interval,
responded_to_ping: bool,
}

Expand All @@ -287,6 +288,7 @@ impl Subscriber {
sender,
price_feeds_with_config: HashMap::new(),
ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
exit_check_interval: tokio::time::interval(Duration::from_secs(5)),
responded_to_ping: true, // We start with true so we don't close the connection immediately
}
}
Expand Down Expand Up @@ -330,6 +332,14 @@ impl Subscriber {
self.responded_to_ping = false;
self.sender.send(Message::Ping(vec![])).await?;
Ok(())
},
_ = self.exit_check_interval.tick() => {
if crate::SHOULD_EXIT.load(Ordering::Acquire) {
self.sender.close().await?;
self.closed = true;
return Err(anyhow!("Application is shutting down. Closing connection."));
}
Ok(())
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion hermes/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ async fn init() -> Result<()> {
let tasks = join_all([
Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))),
Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))),
Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))),
Box::pin(spawn(api::spawn(opts.clone(), store.clone(), update_rx))),
])
.await;

Expand Down
Empty file removed hermes/src/metrics_server.rs
Empty file.
3 changes: 3 additions & 0 deletions hermes/src/network/wormhole.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,9 @@ pub async fn spawn(opts: RunOptions, state: Arc<State>) -> Result<()> {

tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}

tracing::info!("Shutting down Wormhole gRPC service...");

Ok(())
}

Expand Down

0 comments on commit 29dc108

Please sign in to comment.