Skip to content

Commit

Permalink
chore: handle panics inside user handlers (#67)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 authored Jul 29, 2024
1 parent db74f62 commit f02acf9
Show file tree
Hide file tree
Showing 9 changed files with 746 additions and 172 deletions.
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ pub enum Error {

#[error("Source Error - {0}")]
SourceError(ErrorKind),

#[error("Source Transformer Error: {0}")]
SourceTransformerError(ErrorKind),

#[error("SideInput Error: {0}")]
SideInputError(ErrorKind),
}
11 changes: 6 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
//! features. It will support all the core features eventually. It supports [Map], [Reduce], and
//! [User Defined Sinks].
//!
//! Please note that the Rust SDK is experimental and will be refactor in the future to make it more
//! Please note that the Rust SDK is experimental and will be refactored in the future to make it more
//! idiomatic.
//!
//! [Numaflow]: https://numaflow.numaproj.io/
Expand Down Expand Up @@ -35,7 +35,7 @@ pub mod sideinput;
// Error handling on Numaflow SDKs!
//
// Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable.
// If there are errors that are retriable, we (gRPC or Numaflow SDK) would have already retried it (hence not an error), that means,
// If there are errors that are retryable, we (gRPC or Numaflow SDK) would have already retried it (hence not an error), that means,
// all errors raised by the SDK are non-recoverable.
//
// Task Ordering and error propagation.
Expand All @@ -59,13 +59,14 @@ pub mod sideinput;
// |
// (user)
//
// If a task at level-3 has an error, then that error will be propagated to level-2 (service_fn) via an mpsc::channel using the response channel.
// If a task at level-3 has an error, then that error will be propagated to level-2 (service_fn) via a mpsc::channel using the response channel.
// The Response channel passes a Result type and by returning Err() in response channel, it notifies top service_fn that the task wants to abort itself.
// service_fn (level-2) will now use another mpsc::channel to tell the gRPC service to cancel all the service_fns. gRPC service will
// will ask all the level-2 service_fns to abort using the CancellationToken. service_fn will call abort on all the tasks it created using internal
// ask all the level-2 service_fns to abort using the CancellationToken. service_fn will call abort on all the tasks it created using internal
// mpsc::channel when CancellationToken has been dropped/cancelled.
//
// User can directly send shutdown request to the gRPC server which inturn cancels the CancellationToken.
// User can directly send shutdown request to the gRPC server which triggers the shutdown of the server by stop accepting new requests
// and draining the existing requests. Lastly we will cancel the cancellation token to make sure all the tasks are aborted.
//
// The above 3 level task ordering is only for complex cases like reduce, but for simpler endpoints like `map`, it only has 2 levels but
// the error propagation is handled the same way.
Expand Down
230 changes: 210 additions & 20 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
use crate::error::Error::MapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::shared;
use crate::shared::shutdown_signal;
use chrono::{DateTime, Utc};
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;

use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use tonic::{async_trait, Request, Response, Status};

use crate::shared;
use crate::shared::shutdown_signal;

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
Expand All @@ -21,10 +22,9 @@ pub mod proto {
}

struct MapService<T> {
handler: T,
// not used ATM
// PLEASE WRITE WHY
_shutdown_tx: mpsc::Sender<()>,
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
cancellation_token: CancellationToken,
}

/// Mapper trait for implementing Map handler.
Expand Down Expand Up @@ -71,11 +71,36 @@ where
request: Request<proto::MapRequest>,
) -> Result<Response<proto::MapResponse>, Status> {
let request = request.into_inner();
let result = self.handler.map(request.into()).await;
let handler = Arc::clone(&self.handler);
let handle = tokio::spawn(async move { handler.map(request.into()).await });
let shutdown_tx = self.shutdown_tx.clone();
let cancellation_token = self.cancellation_token.clone();

// Wait for the handler to finish processing the request. If the server is shutting down(token will be cancelled),
// then return an error.
tokio::select! {
result = handle => {
match result {
Ok(result) => Ok(Response::new(proto::MapResponse {
results: result.into_iter().map(|msg| msg.into()).collect(),
})),
Err(e) => {
tracing::error!("Error in map handler: {:?}", e);
// Send a shutdown signal to the server to do a graceful shutdown because there was
// a panic in the handler.
shutdown_tx
.send(())
.await
.expect("Sending shutdown signal to gRPC server");
Err(Status::internal(MapError(UserDefinedError(e.to_string())).to_string()))
}
}
},

Ok(Response::new(proto::MapResponse {
results: result.into_iter().map(|msg| msg.into()).collect(),
}))
_ = cancellation_token.cancelled() => {
Err(Status::internal(MapError(InternalError("Server is shutting down".to_string())).to_string()))
},
}
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Expand Down Expand Up @@ -285,23 +310,24 @@ impl<T> Server<T> {
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
let cln_token = CancellationToken::new();

// Create a channel to send shutdown signal to the server to do graceful shutdown in case of non retryable errors.
let (internal_shutdown_tx, internal_shutdown_rx) = mpsc::channel(1);
let map_svc = MapService {
handler,
_shutdown_tx: internal_shutdown_tx,
handler: Arc::new(handler),
shutdown_tx: internal_shutdown_tx,
cancellation_token: cln_token.clone(),
};

let map_svc = proto::map_server::MapServer::new(map_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

let shutdown = shutdown_signal(
internal_shutdown_rx,
Some(shutdown_rx),
CancellationToken::new(),
);
let shutdown = shutdown_signal(internal_shutdown_rx, Some(shutdown_rx));

// will call cancel_token.cancel() on drop of _drop_guard
let _drop_guard = cln_token.drop_guard();

tonic::transport::Server::builder()
.add_service(map_svc)
Expand Down Expand Up @@ -338,6 +364,7 @@ mod tests {
use tempfile::TempDir;
use tokio::net::UnixStream;
use tokio::sync::oneshot;
use tokio::time::sleep;
use tonic::transport::Uri;
use tower::service_fn;

Expand Down Expand Up @@ -409,4 +436,167 @@ mod tests {
assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}

#[tokio::test]
async fn map_server_panic() -> Result<(), Box<dyn Error>> {
struct PanicCat;
#[tonic::async_trait]
impl map::Mapper for PanicCat {
async fn map(&self, _input: map::MapRequest) -> Vec<map::Message> {
panic!("PanicCat panicking");
}
}

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("mapper-server-info");

let mut server = map::Server::new(PanicCat)
.with_server_info_file(&server_info_file)
.with_socket_file(&sock_file)
.with_max_message_size(10240);

assert_eq!(server.max_message_size(), 10240);
assert_eq!(server.server_info_file(), server_info_file);
assert_eq!(server.socket_file(), sock_file);

let (_shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });

tokio::time::sleep(Duration::from_millis(50)).await;

let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let sock_file = sock_file.clone();
async move {
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(
UnixStream::connect(sock_file).await?,
))
}
}))
.await?;

let mut client = MapClient::new(channel);
let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["first".into(), "second".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

// server should return an error because of the panic.
let resp = client.map_fn(request).await;
assert!(resp.is_err(), "Expected error from server");

if let Err(e) = resp {
assert_eq!(e.code(), tonic::Code::Internal);
assert!(e.message().contains("User Defined Error"));
}

// server should shut down gracefully because there was a panic in the handler.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
if task.is_finished() {
break;
}
}
assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}

// tests for panic when we have multiple inflight requests, only one of the requests
// causes panic, the other requests should be processed successfully and the server
// should shut down gracefully.
#[tokio::test]
async fn panic_with_multiple_requests() -> Result<(), Box<dyn Error>> {
struct PanicCat;
#[tonic::async_trait]
impl map::Mapper for PanicCat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
if !input.keys.is_empty() && input.keys[0] == "key1" {
sleep(Duration::from_millis(20)).await;
panic!("Cat panicked");
}
// assume each request takes 100ms to process
sleep(Duration::from_millis(100)).await;
vec![]
}
}

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("mapper-server-info");

let mut server = map::Server::new(PanicCat)
.with_server_info_file(&server_info_file)
.with_socket_file(&sock_file)
.with_max_message_size(10240);

assert_eq!(server.max_message_size(), 10240);
assert_eq!(server.server_info_file(), server_info_file);
assert_eq!(server.socket_file(), sock_file);

let (_shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });

tokio::time::sleep(Duration::from_millis(50)).await;

let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let sock_file = sock_file.clone();
async move {
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(
UnixStream::connect(sock_file).await?,
))
}
}))
.await?;

let mut client = MapClient::new(channel);

let mut client_one = client.clone();
tokio::spawn(async move {
let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["key2".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

// panic is only for requests with key "key1", since we have graceful shutdown
// the request should get processed.
let resp = client_one.map_fn(request).await;
assert!(resp.is_ok(), "Expected ok from server");
});

let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["key1".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

// panic happens for the key1 request, so we should expect error on the client side.
let resp = client.map_fn(request).await;
assert!(resp.is_err(), "Expected error from server");

if let Err(e) = resp {
assert_eq!(e.code(), tonic::Code::Internal);
assert!(e.message().contains("User Defined Error"));
}

// but since there is a panic, the server should shutdown.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
if task.is_finished() {
break;
}
}

assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}
}
Loading

0 comments on commit f02acf9

Please sign in to comment.