diff --git a/hermes/src/api.rs b/hermes/src/api.rs index 891a0d7c15..29500c14b6 100644 --- a/hermes/src/api.rs +++ b/hermes/src/api.rs @@ -18,10 +18,7 @@ use { atomic::Ordering, Arc, }, - tokio::{ - signal, - sync::mpsc::Receiver, - }, + tokio::sync::mpsc::Receiver, tower_http::cors::CorsLayer, utoipa::OpenApi, utoipa_swagger_ui::SwaggerUi, @@ -58,16 +55,61 @@ impl ApiState { } } -/// This method provides a background service that responds to REST requests -/// -/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the -/// packages they are based on (tokio & hyper). #[tracing::instrument(skip(opts, state, update_rx))] -pub async fn run( +pub async fn spawn( opts: RunOptions, state: Arc, mut update_rx: Receiver, ) -> Result<()> { + let state = { + let opts = opts.clone(); + ApiState::new( + state, + opts.rpc.ws_whitelist, + opts.rpc.requester_ip_header_name, + ) + }; + + let rpc_server = tokio::spawn(run(opts, state.clone())); + + let ws_notifier = tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(5)); + + while !crate::SHOULD_EXIT.load(Ordering::Acquire) { + tokio::select! { + update = update_rx.recv() => { + match update { + None => { + // When the received message is None it means the channel has been closed. This + // should never happen as the channel is never closed. As we can't recover from + // this we shut down the application. + tracing::error!("Failed to receive update from store."); + crate::SHOULD_EXIT.store(true, Ordering::Release); + break; + } + Some(event) => { + notify_updates(state.ws.clone(), event).await; + }, + } + }, + _ = interval.tick() => {} + } + } + + tracing::info!("Shutting down Websocket notifier...") + }); + + + let _ = tokio::join!(ws_notifier, rpc_server); + Ok(()) +} + +/// This method provides a background service that responds to REST requests +/// +/// Currently this is based on Axum due to the simplicity and strong ecosystem support for the +/// packages they are based on (tokio & hyper). +#[tracing::instrument(skip(opts, state))] +pub async fn run(opts: RunOptions, state: ApiState) -> Result<()> { tracing::info!(endpoint = %opts.rpc.listen_addr, "Starting RPC Server."); #[derive(OpenApi)] @@ -98,12 +140,6 @@ pub async fn run( )] struct ApiDoc; - let state = ApiState::new( - state, - opts.rpc.ws_whitelist, - opts.rpc.requester_ip_header_name, - ); - // Initialize Axum Router. Note the type here is a `Router` due to the use of the // `with_state` method which replaces `Body` with `State` in the type signature. let app = Router::new(); @@ -131,35 +167,16 @@ pub async fn run( // default value for this parameter). .layer(Extension(QsQueryConfig::new(5, false))); - tokio::spawn(async move { - while !crate::SHOULD_EXIT.load(Ordering::Acquire) { - match update_rx.recv().await { - None => { - // When the received message is None it means the channel has been closed. This - // should never happen as the channel is never closed. As we can't recover from - // this we shut down the application. - tracing::error!("Failed to receive update from store."); - crate::SHOULD_EXIT.store(true, Ordering::Release); - break; - } - Some(event) => { - notify_updates(state.ws.clone(), event).await; - } - } - } - - tracing::info!("Shutting down websocket updates...") - }); - // Binds the axum's server to the configured address and port. This is a blocking call and will // not return until the server is shutdown. axum::Server::try_bind(&opts.rpc.listen_addr)? .serve(app.into_make_service()) .with_graceful_shutdown(async { - // Ignore Ctrl+C errors, either way we need to shut down. The main Ctrl+C handler - // should also have triggered so we will let that one print the shutdown warning. - let _ = signal::ctrl_c().await; - crate::SHOULD_EXIT.store(true, Ordering::Release); + while !crate::SHOULD_EXIT.load(Ordering::Acquire) { + tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; + } + + tracing::info!("Shutting down RPC server..."); }) .await?; diff --git a/hermes/src/api/ws.rs b/hermes/src/api/ws.rs index 892abb94da..4a4b6c143e 100644 --- a/hermes/src/api/ws.rs +++ b/hermes/src/api/ws.rs @@ -263,6 +263,7 @@ pub struct Subscriber { sender: SplitSink, price_feeds_with_config: HashMap, ping_interval: tokio::time::Interval, + exit_check_interval: tokio::time::Interval, responded_to_ping: bool, } @@ -287,6 +288,7 @@ impl Subscriber { sender, price_feeds_with_config: HashMap::new(), ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), + exit_check_interval: tokio::time::interval(Duration::from_secs(5)), responded_to_ping: true, // We start with true so we don't close the connection immediately } } @@ -330,6 +332,14 @@ impl Subscriber { self.responded_to_ping = false; self.sender.send(Message::Ping(vec![])).await?; Ok(()) + }, + _ = self.exit_check_interval.tick() => { + if crate::SHOULD_EXIT.load(Ordering::Acquire) { + self.sender.close().await?; + self.closed = true; + return Err(anyhow!("Application is shutting down. Closing connection.")); + } + Ok(()) } } } diff --git a/hermes/src/main.rs b/hermes/src/main.rs index c51bb4202c..bc42dd14f2 100644 --- a/hermes/src/main.rs +++ b/hermes/src/main.rs @@ -61,7 +61,7 @@ async fn init() -> Result<()> { let tasks = join_all([ Box::pin(spawn(network::wormhole::spawn(opts.clone(), store.clone()))), Box::pin(spawn(network::pythnet::spawn(opts.clone(), store.clone()))), - Box::pin(spawn(api::run(opts.clone(), store.clone(), update_rx))), + Box::pin(spawn(api::spawn(opts.clone(), store.clone(), update_rx))), ]) .await; diff --git a/hermes/src/metrics_server.rs b/hermes/src/metrics_server.rs deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/hermes/src/network/wormhole.rs b/hermes/src/network/wormhole.rs index ec725c7948..59e7137216 100644 --- a/hermes/src/network/wormhole.rs +++ b/hermes/src/network/wormhole.rs @@ -160,6 +160,9 @@ pub async fn spawn(opts: RunOptions, state: Arc) -> Result<()> { tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } + + tracing::info!("Shutting down Wormhole gRPC service..."); + Ok(()) }