Skip to content

Commit

Permalink
add unit test for multiple requests case
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Jul 23, 2024
1 parent fb73fca commit 4663af9
Show file tree
Hide file tree
Showing 2 changed files with 242 additions and 0 deletions.
93 changes: 93 additions & 0 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ mod tests {
use tempfile::TempDir;
use tokio::net::UnixStream;
use tokio::sync::oneshot;
use tokio::time::sleep;
use tonic::transport::Uri;
use tower::service_fn;

Expand Down Expand Up @@ -504,4 +505,96 @@ mod tests {
assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}

#[tokio::test]
async fn panic_with_multiple_requests() -> Result<(), Box<dyn Error>> {
struct PanicCat;
#[tonic::async_trait]
impl map::Mapper for PanicCat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
if !input.keys.is_empty() && input.keys[0] == "key1" {
sleep(Duration::from_millis(20)).await;
panic!("Cat panicked");
}
// assume each request takes 100ms to process
sleep(Duration::from_millis(100)).await;
vec![]
}
}

let tmp_dir = TempDir::new()?;
let sock_file = tmp_dir.path().join("map.sock");
let server_info_file = tmp_dir.path().join("mapper-server-info");

let mut server = map::Server::new(PanicCat)
.with_server_info_file(&server_info_file)
.with_socket_file(&sock_file)
.with_max_message_size(10240);

assert_eq!(server.max_message_size(), 10240);
assert_eq!(server.server_info_file(), server_info_file);
assert_eq!(server.socket_file(), sock_file);

let (_shutdown_tx, shutdown_rx) = oneshot::channel();
let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });

tokio::time::sleep(Duration::from_millis(50)).await;

let channel = tonic::transport::Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(move |_: Uri| {
let sock_file = sock_file.clone();
async move {
Ok::<_, std::io::Error>(hyper_util::rt::TokioIo::new(
UnixStream::connect(sock_file).await?,
))
}
}))
.await?;

let mut client = MapClient::new(channel);

let mut client_one = client.clone();
tokio::spawn(async move {
let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["key2".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

// panic is only for requests with key "key1", since we have graceful shutdown
// the request should get processed.
let resp = client_one.map_fn(request).await;
assert!(resp.is_ok(), "Expected ok from server");
});

let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["key1".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

// panic happens for the key1 request, so we should expect error on the client side.
let resp = client.map_fn(request).await;
assert!(resp.is_err(), "Expected error from server");

if let Err(e) = resp {
assert_eq!(e.code(), tonic::Code::Internal);
assert!(e.message().contains("User Defined Error"));
}

// but since there is a panic, the server should shutdown.
for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(10)).await;
if task.is_finished() {
break;
}
}

assert!(task.is_finished(), "gRPC server is still running");
Ok(())
}
}
149 changes: 149 additions & 0 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1272,4 +1272,153 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn panic_with_multiple_keys() -> Result<(), Box<dyn Error>> {
struct PanicReducerCreator;

impl reduce::ReducerCreator for PanicReducerCreator {
type R = PanicReducer;
fn create(&self) -> PanicReducer {
PanicReducer {}
}
}

struct PanicReducer;

#[tonic::async_trait]
impl reduce::Reducer for PanicReducer {
async fn reduce(
&self,
keys: Vec<String>,
mut input: mpsc::Receiver<reduce::ReduceRequest>,
_md: &reduce::Metadata,
) -> Vec<reduce::Message> {
let mut count = 0;
while input.recv().await.is_some() {
count += 1;
if count == 10 && keys[0] == "key2" {
panic!("Panic in reduce method");
}
}
vec![]
}
}
let (mut server, sock_file, _) = setup_server(PanicReducerCreator).await?;

let (_shutdown_tx, shutdown_rx) = oneshot::channel();

let task = tokio::spawn(async move { server.start_with_shutdown(shutdown_rx).await });

tokio::time::sleep(Duration::from_millis(50)).await;

let client = setup_client(sock_file.clone()).await?;

let (tx1, rx1) = mpsc::channel(1);

let (tx2, rx2) = mpsc::channel(1);

// Spawn a task to send ReduceRequests to the channel
tokio::spawn(async move {
let rr = reduce::proto::ReduceRequest {
payload: Some(reduce::proto::reduce_request::Payload {
keys: vec!["key1".to_string()],
value: vec![],
watermark: None,
event_time: None,
headers: Default::default(),
}),
operation: Some(reduce::proto::reduce_request::WindowOperation {
event: 0,
windows: vec![reduce::proto::Window {
start: Some(Timestamp {
seconds: 60000,
nanos: 0,
}),
end: Some(Timestamp {
seconds: 120000,
nanos: 0,
}),
slot: "slot-0".to_string(),
}],
}),
};

for _ in 0..20 {
tx1.send(rr.clone()).await.unwrap();
sleep(Duration::from_millis(10)).await;
}
});

tokio::spawn(async move {
let rr = reduce::proto::ReduceRequest {
payload: Some(reduce::proto::reduce_request::Payload {
keys: vec!["key2".to_string()],
value: vec![],
watermark: None,
event_time: None,
headers: Default::default(),
}),
operation: Some(reduce::proto::reduce_request::WindowOperation {
event: 0,
windows: vec![reduce::proto::Window {
start: Some(Timestamp {
seconds: 60000,
nanos: 0,
}),
end: Some(Timestamp {
seconds: 120000,
nanos: 0,
}),
slot: "slot-0".to_string(),
}],
}),
};

for _ in 0..10 {
tx2.send(rr.clone()).await.unwrap();
sleep(Duration::from_millis(10)).await;
}
});

// Convert the receiver end of the channel into a stream
let stream1 = ReceiverStream::new(rx1);

let stream2 = ReceiverStream::new(rx2);

// Create a tonic::Request from the stream
let request1 = Request::new(stream1);

let request2 = Request::new(stream2);

let mut first_client = client.clone();
tokio::spawn(async move {
let mut response_stream = first_client.reduce_fn(request1).await.unwrap().into_inner();
assert!(response_stream.message().await.is_ok());
});

let mut second_client = client.clone();
tokio::spawn(async move {
let mut response_stream = second_client
.reduce_fn(request2)
.await
.unwrap()
.into_inner();

if let Err(e) = response_stream.message().await {
assert_eq!(e.code(), tonic::Code::Internal);
assert!(e.message().contains("User Defined Error"))
}
});

for _ in 0..10 {
tokio::time::sleep(Duration::from_millis(100)).await;
if task.is_finished() {
break;
}
}
assert!(task.is_finished(), "gRPC server is still running");

Ok(())
}
}

0 comments on commit 4663af9

Please sign in to comment.