diff --git a/proto/map.proto b/proto/map.proto index 780acb1..807e1bf 100644 --- a/proto/map.proto +++ b/proto/map.proto @@ -28,7 +28,7 @@ message MapRequest { // This ID is used to uniquely identify a map request string id = 2; optional Handshake handshake = 3; - optional Status status = 4; + optional TransmissionStatus status = 4; } /* @@ -42,7 +42,7 @@ message Handshake { /* * Status message to indicate the status of the message. */ -message Status { +message TransmissionStatus { bool eot = 1; } @@ -59,7 +59,7 @@ message MapResponse { // This ID is used to refer the responses to the request it corresponds to. string id = 2; optional Handshake handshake = 3; - optional Status status = 4; + optional TransmissionStatus status = 4; } /** diff --git a/proto/sink.proto b/proto/sink.proto index 300e570..db68e69 100644 --- a/proto/sink.proto +++ b/proto/sink.proto @@ -25,14 +25,11 @@ message SinkRequest { string id = 5; map headers = 6; } - message Status { - bool eot = 1; - } // Required field indicating the request. Request request = 1; // Required field indicating the status of the request. // If eot is set to true, it indicates the end of transmission. - Status status = 2; + TransmissionStatus status = 2; // optional field indicating the handshake message. optional Handshake handshake = 3; } @@ -52,6 +49,13 @@ message ReadyResponse { bool ready = 1; } +/** + * TransmissionStatus is the status of the transmission. + */ +message TransmissionStatus { + bool eot = 1; +} + /* * Status is the status of the response. */ @@ -75,4 +79,5 @@ message SinkResponse { } Result result = 1; optional Handshake handshake = 2; + optional TransmissionStatus status = 3; } \ No newline at end of file diff --git a/src/batchmap.rs b/src/batchmap.rs index ff43e37..f602037 100644 --- a/src/batchmap.rs +++ b/src/batchmap.rs @@ -344,7 +344,7 @@ where results: vec![], id: "".to_string(), handshake: None, - status: Some(proto::Status { eot: true }), + status: Some(proto::TransmissionStatus { eot: true }), })) .await .expect("Sending response to channel"); @@ -677,7 +677,7 @@ mod tests { request: None, id: "3".to_string(), handshake: None, - status: Some(batchmap::proto::Status { eot: true }), + status: Some(batchmap::proto::TransmissionStatus { eot: true }), }; let resp = client @@ -778,7 +778,7 @@ mod tests { request: None, id: "11".to_string(), handshake: None, - status: Some(batchmap::proto::Status { eot: true }), + status: Some(batchmap::proto::TransmissionStatus { eot: true }), }; requests.push(eot_request); diff --git a/src/servers/map.v1.rs b/src/servers/map.v1.rs index d9d4275..c632b01 100644 --- a/src/servers/map.v1.rs +++ b/src/servers/map.v1.rs @@ -11,7 +11,7 @@ pub struct MapRequest { #[prost(message, optional, tag = "3")] pub handshake: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub status: ::core::option::Option, + pub status: ::core::option::Option, } /// Nested message and enum types in `MapRequest`. pub mod map_request { @@ -43,7 +43,7 @@ pub struct Handshake { /// /// Status message to indicate the status of the message. #[derive(Clone, Copy, PartialEq, ::prost::Message)] -pub struct Status { +pub struct TransmissionStatus { #[prost(bool, tag = "1")] pub eot: bool, } @@ -59,7 +59,7 @@ pub struct MapResponse { #[prost(message, optional, tag = "3")] pub handshake: ::core::option::Option, #[prost(message, optional, tag = "4")] - pub status: ::core::option::Option, + pub status: ::core::option::Option, } /// Nested message and enum types in `MapResponse`. pub mod map_response { diff --git a/src/servers/sink.v1.rs b/src/servers/sink.v1.rs index c4a3c29..5fb32cb 100644 --- a/src/servers/sink.v1.rs +++ b/src/servers/sink.v1.rs @@ -9,7 +9,7 @@ pub struct SinkRequest { /// Required field indicating the status of the request. /// If eot is set to true, it indicates the end of transmission. #[prost(message, optional, tag = "2")] - pub status: ::core::option::Option, + pub status: ::core::option::Option, /// optional field indicating the handshake message. #[prost(message, optional, tag = "3")] pub handshake: ::core::option::Option, @@ -34,11 +34,6 @@ pub mod sink_request { ::prost::alloc::string::String, >, } - #[derive(Clone, Copy, PartialEq, ::prost::Message)] - pub struct Status { - #[prost(bool, tag = "1")] - pub eot: bool, - } } /// /// Handshake message between client and server to indicate the start of transmission. @@ -56,6 +51,13 @@ pub struct ReadyResponse { pub ready: bool, } /// * +/// TransmissionStatus is the status of the transmission. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct TransmissionStatus { + #[prost(bool, tag = "1")] + pub eot: bool, +} +/// * /// SinkResponse is the individual response of each message written to the sink. #[derive(Clone, PartialEq, ::prost::Message)] pub struct SinkResponse { @@ -63,6 +65,8 @@ pub struct SinkResponse { pub result: ::core::option::Option, #[prost(message, optional, tag = "2")] pub handshake: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub status: ::core::option::Option, } /// Nested message and enum types in `SinkResponse`. pub mod sink_response { diff --git a/src/sink.rs b/src/sink.rs index 5807279..ef07cfa 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -264,10 +264,20 @@ where .send(Ok(SinkResponse { result: Some(response.into()), handshake: None, + status: None, })) .await .expect("Sending response to channel"); } + // send an EOT message to the client to indicate the end of transmission for this batch + resp_tx + .send(Ok(SinkResponse { + result: None, + handshake: None, + status: Some(sink_pb::TransmissionStatus { eot: true }), + })) + .await + .expect("Sending EOT message to channel"); }); let mut global_stream_ended = false; @@ -377,6 +387,7 @@ where .send(Ok(SinkResponse { result: None, handshake: Some(handshake), + status: None, })) .await .map_err(|e| { @@ -512,17 +523,17 @@ impl Drop for Server { mod tests { use std::{error::Error, time::Duration}; + use crate::servers::sink::TransmissionStatus; + use crate::sink; + use crate::sink::sink_pb::sink_client::SinkClient; + use crate::sink::sink_pb::sink_request::Request; + use crate::sink::sink_pb::Handshake; use tempfile::TempDir; use tokio::net::UnixStream; use tokio::sync::oneshot; use tonic::transport::Uri; use tower::service_fn; - use crate::sink; - use crate::sink::sink_pb::sink_client::SinkClient; - use crate::sink::sink_pb::sink_request::{Request, Status}; - use crate::sink::sink_pb::Handshake; - #[tokio::test] async fn sink_server() -> Result<(), Box> { struct Logger; @@ -600,7 +611,7 @@ mod tests { let eot_request = sink::sink_pb::SinkRequest { request: None, - status: Some(Status { eot: true }), + status: Some(TransmissionStatus { eot: true }), handshake: None, }; @@ -621,8 +632,9 @@ mod tests { .sink_fn(tokio_stream::iter(vec![ handshake_request, request, - eot_request, + eot_request.clone(), request_two, + eot_request, ])) .await?; @@ -638,6 +650,13 @@ mod tests { assert_eq!(msg.err_msg, ""); assert_eq!(msg.id, "1"); + // eot for first request + let resp = resp_stream.message().await.unwrap().unwrap(); + assert!(resp.result.is_none()); + assert!(resp.handshake.is_none()); + let msg = &resp.status.unwrap(); + assert!(msg.eot); + let resp = resp_stream.message().await.unwrap().unwrap(); assert!(resp.result.is_some()); assert!(resp.handshake.is_none()); @@ -645,6 +664,13 @@ mod tests { assert_eq!(msg.err_msg, ""); assert_eq!(msg.id, "2"); + // eot for second request + let resp = resp_stream.message().await.unwrap().unwrap(); + assert!(resp.result.is_none()); + assert!(resp.handshake.is_none()); + let msg = &resp.status.unwrap(); + assert!(msg.eot); + shutdown_tx .send(()) .expect("Sending shutdown signal to gRPC server"); @@ -735,7 +761,7 @@ mod tests { requests.push(sink::sink_pb::SinkRequest { request: None, - status: Some(Status { eot: true }), + status: Some(TransmissionStatus { eot: true }), handshake: None, });