From fb73fcaf8db5012561dbbea5eef66af874f66efe Mon Sep 17 00:00:00 2001 From: Yashash H L Date: Tue, 23 Jul 2024 09:07:08 +0530 Subject: [PATCH] review comments, panic handling for side input Signed-off-by: Yashash H L --- src/error.rs | 3 ++ src/map.rs | 6 +++- src/reduce.rs | 7 +++-- src/shared.rs | 17 ++-------- src/sideinput.rs | 70 +++++++++++++++++++++++++++--------------- src/sink.rs | 44 ++++++++++++++------------ src/source.rs | 16 +++++----- src/sourcetransform.rs | 6 +++- 8 files changed, 100 insertions(+), 69 deletions(-) diff --git a/src/error.rs b/src/error.rs index 55eeb9c..d517799 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,4 +25,7 @@ pub enum Error { #[error("Source Transformer Error: {0}")] SourceTransformerError(ErrorKind), + + #[error("SideInput Error: {0}")] + SideInputError(ErrorKind), } diff --git a/src/map.rs b/src/map.rs index 72664a9..3dfec6c 100644 --- a/src/map.rs +++ b/src/map.rs @@ -324,7 +324,11 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token); + let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(map_svc) diff --git a/src/reduce.rs b/src/reduce.rs index dbc64e3..270940b 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -832,8 +832,11 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = - shared::shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx), cln_token); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(reduce_svc) diff --git a/src/shared.rs b/src/shared.rs index 9d4a88a..6a4123c 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -8,14 +8,13 @@ use tokio::net::UnixListener; use tokio::signal; use tokio::sync::{mpsc, oneshot}; use tokio_stream::wrappers::UnixListenerStream; -use tokio_util::sync::CancellationToken; use tracing::info; // #[tracing::instrument(skip(path), fields(path = ?path.as_ref()))] #[tracing::instrument(fields(path = ? path.as_ref()))] fn write_info_file(path: impl AsRef) -> io::Result<()> { let parent = path.as_ref().parent().unwrap(); - std::fs::create_dir_all(parent)?; + fs::create_dir_all(parent)?; // TODO: make port-number and CPU meta-data configurable, e.g., ("CPU_LIMIT", "1") let metadata: HashMap = HashMap::new(); @@ -57,18 +56,13 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime) -> Option { /// shuts downs the gRPC server. This happens in 2 cases /// 1. there has been an internal error (one of the tasks failed) and we need to shutdown -/// 2. user is explictly asking us to shutdown +/// 2. user is explicitly asking us to shutdown /// Once the request for shutdown has be invoked, server will broadcast shutdown to all tasks /// through the cancellation-token. pub(crate) async fn shutdown_signal( mut shutdown_on_err: mpsc::Receiver<()>, shutdown_from_user: Option>, - cancel_token: CancellationToken, ) { - // will call cancel_token.cancel() when the function exits - // because of abort request, ctrl-c, or SIGTERM signal - let _drop_guard = cancel_token.drop_guard(); - let ctrl_c = async { signal::ctrl_c() .await @@ -180,12 +174,7 @@ mod tests { // Spawn a new task to call shutdown_signal let shutdown_signal_task = tokio::spawn(async move { - shutdown_signal( - internal_shutdown_rx, - Some(user_shutdown_rx), - CancellationToken::new(), - ) - .await; + shutdown_signal(internal_shutdown_rx, Some(user_shutdown_rx)).await; }); // Send a shutdown signal diff --git a/src/sideinput.rs b/src/sideinput.rs index b5d3bb6..17cca0e 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -1,13 +1,14 @@ +use crate::error::Error::SideInputError; +use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::shared; +use crate::shared::shutdown_signal; use std::fs; use std::path::PathBuf; - +use std::sync::Arc; use tokio::sync::{mpsc, oneshot}; use tokio_util::sync::CancellationToken; use tonic::{async_trait, Request, Response, Status}; -use crate::shared; -use crate::shared::shutdown_signal; - const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sideinput.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sideinput-server-info"; @@ -17,8 +18,9 @@ mod proto { } struct SideInputService { - handler: T, - _shutdown_tx: mpsc::Sender<()>, + handler: Arc, + shutdown_tx: mpsc::Sender<()>, + cancellation_token: CancellationToken, } /// The `SideInputer` trait defines a method for retrieving side input data. @@ -94,19 +96,35 @@ where &self, _: Request<()>, ) -> Result, Status> { - let msg = self.handler.retrieve_sideinput().await; - let si = match msg { - Some(value) => proto::SideInputResponse { - value, - no_broadcast: false, + let handler = Arc::clone(&self.handler); + let shutdown_tx = self.shutdown_tx.clone(); + let handle = tokio::spawn(async move { handler.retrieve_sideinput().await }); + + tokio::select! { + msg = handle => { + match msg { + Ok(Some(value)) => { + Ok(Response::new(proto::SideInputResponse { + value, + no_broadcast: false, + })) + } + Ok(None) => { + Ok(Response::new(proto::SideInputResponse { + value: Vec::new(), + no_broadcast: true, + })) + } + Err(e) => { + shutdown_tx.send(()).await.expect("Failed to send shutdown signal"); + Err(Status::internal(SideInputError(UserDefinedError(e.to_string())).to_string())) + } + } + } + _ = self.cancellation_token.cancelled() => { + Err(Status::internal(SideInputError(InternalError("Server is shutting down".to_string())).to_string())) }, - None => proto::SideInputResponse { - value: Vec::new(), - no_broadcast: true, - }, - }; - - Ok(Response::new(si)) + } } async fn is_ready(&self, _: Request<()>) -> Result, Status> { @@ -179,20 +197,22 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); + let cln_token = CancellationToken::new(); let sideinput_svc = SideInputService { - handler, - _shutdown_tx: internal_shutdown_tx, + handler: Arc::new(handler), + shutdown_tx: internal_shutdown_tx, + cancellation_token: cln_token.clone(), }; let sideinput_svc = proto::side_input_server::SideInputServer::new(sideinput_svc) .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shutdown_signal( - internal_shutdown_rx, - Some(shutdown_rx), - CancellationToken::new(), - ); + let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(sideinput_svc) diff --git a/src/sink.rs b/src/sink.rs index 5df361d..fc833e1 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -188,28 +188,28 @@ where // TODO: what should be the idle buffer size? let (tx, rx) = mpsc::channel::(1); - let writer_cln_token = cancellation_token.clone(); - + let reader_shutdown_tx = shutdown_tx.clone(); // spawn a task to read messages from the stream and send them to the user's sink handle - tokio::spawn(async move { + let reader_handle = tokio::spawn(async move { loop { - tokio::select! { - next_message = stream.message() => { - match next_message { - Ok(Some(message)) => { - // If sending fails, it means the receiver is dropped, and we should stop the task. - if tx.send(message.into()).await.is_err() { - break; - } - }, - // If there's an error or the stream ends, break the loop to stop the task. - Ok(None) | Err(_) => break, + match stream.message().await { + Ok(Some(message)) => { + // If sending fails, it means the receiver is dropped, and we should stop the task. + if let Err(e) = tx.send(message.into()).await { + tracing::error!("Failed to send message: {}", e); + break; } - }, - // Listen for cancellation. If triggered, break the loop to stop reading new messages. - _ = writer_cln_token.cancelled() => { + } + // If there's an error or the stream ends, break the loop to stop the task. + Ok(None) => break, + Err(e) => { + tracing::error!("Error reading message from stream: {}", e); + reader_shutdown_tx + .send(()) + .await + .expect("Sending shutdown signal to gRPC server"); break; - }, + } } } }); @@ -237,6 +237,8 @@ where }, _ = cancellation_token.cancelled() => { + // abort the reader task to stop reading messages from the stream + reader_handle.abort(); Err(Status::cancelled(SinkError(InternalError("Server is shutting down".to_string())).to_string())) } } @@ -336,7 +338,11 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(svc) diff --git a/src/source.rs b/src/source.rs index 2d8b29e..5d7713a 100644 --- a/src/source.rs +++ b/src/source.rs @@ -4,6 +4,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use crate::shared::{self, prost_timestamp_from_utc}; use chrono::{DateTime, Utc}; use tokio::sync::mpsc::{self, Sender}; use tokio::sync::oneshot; @@ -11,8 +12,6 @@ use tokio_stream::wrappers::ReceiverStream; use tokio_util::sync::CancellationToken; use tonic::{async_trait, Request, Response, Status}; -use crate::shared::{self, prost_timestamp_from_utc}; - const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/source.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; @@ -25,6 +24,7 @@ pub mod proto { struct SourceService { handler: Arc, _shutdown_tx: Sender<()>, + _cancellation_token: CancellationToken, } #[async_trait] @@ -267,21 +267,23 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1); + let cln_token = CancellationToken::new(); let source_service = SourceService { handler: Arc::new(handler), _shutdown_tx: internal_shutdown_tx, + _cancellation_token: cln_token.clone(), }; let source_svc = proto::source_server::SourceServer::new(source_service) .max_decoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shared::shutdown_signal( - internal_shutdown_rx, - Some(shutdown_rx), - CancellationToken::new(), - ); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(source_svc) diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 1deccbd..068113f 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -349,7 +349,11 @@ impl Server { .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); - let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx), cln_token); + let shutdown = shared::shutdown_signal(internal_shutdown_rx, Some(shutdown_rx)); + + // will call cancel_token.cancel() when the function exits + // because of abort request, ctrl-c, or SIGTERM signal + let _drop_guard = cln_token.drop_guard(); tonic::transport::Server::builder() .add_service(sourcetrf_svc)