From 5f71995fd38090249d7bd09ded78b243ccf11733 Mon Sep 17 00:00:00 2001 From: Vigith Maurice Date: Wed, 20 Mar 2024 19:58:51 -0700 Subject: [PATCH] chore: rename pb module name (#33) --- src/map.rs | 31 ++++++++++++++----------------- src/sideinput.rs | 18 ++++++++---------- src/sink.rs | 35 ++++++++++++++++------------------- 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/src/map.rs b/src/map.rs index 0ace85a..32dae55 100644 --- a/src/map.rs +++ b/src/map.rs @@ -3,12 +3,9 @@ use std::path::PathBuf; use tokio::sync::oneshot; use tonic::{async_trait, Request, Response, Status}; -use crate::map::mapper::{ - map_response, map_server, MapRequest as RPCMapRequest, MapResponse, ReadyResponse, -}; use crate::shared; -mod mapper { +mod proto { tonic::include_proto!("map.v1"); } @@ -53,24 +50,24 @@ pub trait Mapper { } #[async_trait] -impl map_server::Map for MapService +impl proto::map_server::Map for MapService where T: Mapper + Send + Sync + 'static, { async fn map_fn( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { let request = request.into_inner(); let result = self.handler.map(request.into()).await; - Ok(Response::new(MapResponse { + Ok(Response::new(proto::MapResponse { results: result.into_iter().map(|msg| msg.into()).collect(), })) } - async fn is_ready(&self, _: Request<()>) -> Result, Status> { - Ok(Response::new(ReadyResponse { ready: true })) + async fn is_ready(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(proto::ReadyResponse { ready: true })) } } @@ -85,9 +82,9 @@ pub struct Message { pub tags: Vec, } -impl From for map_response::Result { +impl From for proto::map_response::Result { fn from(value: Message) -> Self { - map_response::Result { + proto::map_response::Result { keys: value.keys, value: value.value, tags: value.tags, @@ -107,8 +104,8 @@ pub struct MapRequest { pub eventtime: DateTime, } -impl From for MapRequest { - fn from(value: RPCMapRequest) -> Self { +impl From for MapRequest { + fn from(value: proto::MapRequest) -> Self { Self { keys: value.keys, value: value.value, @@ -187,7 +184,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.map_svc.take().unwrap(); let map_svc = MapService { handler }; - let map_svc = map_server::MapServer::new(map_svc) + let map_svc = proto::map_server::MapServer::new(map_svc) .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); @@ -220,7 +217,7 @@ mod tests { use tower::service_fn; use crate::map; - use crate::map::mapper::map_client::MapClient; + use crate::map::proto::map_client::MapClient; use tempfile::TempDir; use tokio::sync::oneshot; use tonic::transport::Uri; @@ -267,7 +264,7 @@ mod tests { .await?; let mut client = MapClient::new(channel); - let request = tonic::Request::new(map::mapper::MapRequest { + let request = tonic::Request::new(map::proto::MapRequest { keys: vec!["first".into(), "second".into()], value: "hello".into(), watermark: Some(prost_types::Timestamp::default()), diff --git a/src/sideinput.rs b/src/sideinput.rs index 8ad8cca..ce0d3ee 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -1,8 +1,6 @@ use tonic::{async_trait, Request, Response, Status}; -use crate::sideinput::sideinputer::{side_input_server, ReadyResponse, SideInputResponse}; - -mod sideinputer { +mod proto { tonic::include_proto!("sideinput.v1"); } @@ -16,21 +14,21 @@ pub trait SideInputer { } #[async_trait] -impl side_input_server::SideInput for SideInputService +impl proto::side_input_server::SideInput for SideInputService where T: SideInputer + Send + Sync + 'static, { async fn retrieve_side_input( &self, _: Request<()>, - ) -> Result, Status> { + ) -> Result, Status> { let msg = self.handler.retrieve_sideinput().await; let si = match msg { - Some(value) => SideInputResponse { + Some(value) => proto::SideInputResponse { value, no_broadcast: false, }, - None => SideInputResponse { + None => proto::SideInputResponse { value: Vec::new(), no_broadcast: true, }, @@ -39,8 +37,8 @@ where Ok(Response::new(si)) } - async fn is_ready(&self, _: Request<()>) -> Result, Status> { - Ok(Response::new(ReadyResponse { ready: true })) + async fn is_ready(&self, _: Request<()>) -> Result, Status> { + Ok(Response::new(proto::ReadyResponse { ready: true })) } } @@ -58,7 +56,7 @@ where let si_svc = SideInputService { handler: m }; tonic::transport::Server::builder() - .add_service(side_input_server::SideInputServer::new(si_svc)) + .add_service(proto::side_input_server::SideInputServer::new(si_svc)) .serve_with_incoming(listener) .await .map_err(Into::into) diff --git a/src/sink.rs b/src/sink.rs index 57985f0..87d9cb2 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -4,15 +4,9 @@ use chrono::{DateTime, Utc}; use tokio::sync::{mpsc, oneshot}; use tonic::{Request, Status, Streaming}; -use crate::sink::sinker_grpc::{ - sink_response, - sink_server::{Sink, SinkServer}, - ReadyResponse, SinkRequest as RPCSinkRequest, SinkResponse, -}; - use crate::shared; -mod sinker_grpc { +mod proto { tonic::include_proto!("sink.v1"); } @@ -93,8 +87,8 @@ pub struct SinkRequest { pub id: String, } -impl From for SinkRequest { - fn from(sr: RPCSinkRequest) -> Self { +impl From for SinkRequest { + fn from(sr: proto::SinkRequest) -> Self { Self { keys: sr.keys, value: sr.value, @@ -116,7 +110,7 @@ pub struct Response { pub err: String, } -impl From for sink_response::Result { +impl From for proto::sink_response::Result { fn from(r: Response) -> Self { Self { id: r.id, @@ -127,14 +121,14 @@ impl From for sink_response::Result { } #[tonic::async_trait] -impl Sink for SinkService +impl proto::sink_server::Sink for SinkService where T: Sinker + Send + Sync + 'static, { async fn sink_fn( &self, - request: Request>, - ) -> Result, Status> { + request: Request>, + ) -> Result, Status> { let mut stream = request.into_inner(); // TODO: what should be the idle buffer size? @@ -160,13 +154,16 @@ where // wait for the sink handle to respond let responses = sink_handle.await; - Ok(tonic::Response::new(SinkResponse { + Ok(tonic::Response::new(proto::SinkResponse { results: responses.into_iter().map(|r| r.into()).collect(), })) } - async fn is_ready(&self, _: Request<()>) -> Result, Status> { - Ok(tonic::Response::new(ReadyResponse { ready: true })) + async fn is_ready( + &self, + _: Request<()>, + ) -> Result, Status> { + Ok(tonic::Response::new(proto::ReadyResponse { ready: true })) } } @@ -239,7 +236,7 @@ impl Server { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); let svc = SinkService { handler }; - let svc = SinkServer::new(svc) + let svc = proto::sink_server::SinkServer::new(svc) .max_encoding_message_size(self.max_message_size) .max_decoding_message_size(self.max_message_size); @@ -272,7 +269,7 @@ mod tests { use tower::service_fn; use crate::sink; - use crate::sink::sinker_grpc::sink_client::SinkClient; + use crate::sink::proto::sink_client::SinkClient; use tempfile::TempDir; use tokio::sync::oneshot; use tonic::transport::Uri; @@ -345,7 +342,7 @@ mod tests { .await?; let mut client = SinkClient::new(channel); - let request = sink::sinker_grpc::SinkRequest { + let request = sink::proto::SinkRequest { keys: vec!["first".into(), "second".into()], value: "hello".into(), watermark: Some(prost_types::Timestamp::default()),