Skip to content

Commit

Permalink
chore: rename pb module name (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
vigith authored Mar 21, 2024
1 parent 19f975a commit 5f71995
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 46 deletions.
31 changes: 14 additions & 17 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@ use std::path::PathBuf;
use tokio::sync::oneshot;
use tonic::{async_trait, Request, Response, Status};

use crate::map::mapper::{
map_response, map_server, MapRequest as RPCMapRequest, MapResponse, ReadyResponse,
};
use crate::shared;

mod mapper {
mod proto {
tonic::include_proto!("map.v1");
}

Expand Down Expand Up @@ -53,24 +50,24 @@ pub trait Mapper {
}

#[async_trait]
impl<T> map_server::Map for MapService<T>
impl<T> proto::map_server::Map for MapService<T>
where
T: Mapper + Send + Sync + 'static,
{
async fn map_fn(
&self,
request: Request<RPCMapRequest>,
) -> Result<Response<MapResponse>, Status> {
request: Request<proto::MapRequest>,
) -> Result<Response<proto::MapResponse>, Status> {
let request = request.into_inner();
let result = self.handler.map(request.into()).await;

Ok(Response::new(MapResponse {
Ok(Response::new(proto::MapResponse {
results: result.into_iter().map(|msg| msg.into()).collect(),
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand All @@ -85,9 +82,9 @@ pub struct Message {
pub tags: Vec<String>,
}

impl From<Message> for map_response::Result {
impl From<Message> for proto::map_response::Result {
fn from(value: Message) -> Self {
map_response::Result {
proto::map_response::Result {
keys: value.keys,
value: value.value,
tags: value.tags,
Expand All @@ -107,8 +104,8 @@ pub struct MapRequest {
pub eventtime: DateTime<Utc>,
}

impl From<RPCMapRequest> for MapRequest {
fn from(value: RPCMapRequest) -> Self {
impl From<proto::MapRequest> for MapRequest {
fn from(value: proto::MapRequest) -> Self {
Self {
keys: value.keys,
value: value.value,
Expand Down Expand Up @@ -187,7 +184,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.map_svc.take().unwrap();
let map_svc = MapService { handler };
let map_svc = map_server::MapServer::new(map_svc)
let map_svc = proto::map_server::MapServer::new(map_svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

Expand Down Expand Up @@ -220,7 +217,7 @@ mod tests {
use tower::service_fn;

use crate::map;
use crate::map::mapper::map_client::MapClient;
use crate::map::proto::map_client::MapClient;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
Expand Down Expand Up @@ -267,7 +264,7 @@ mod tests {
.await?;

let mut client = MapClient::new(channel);
let request = tonic::Request::new(map::mapper::MapRequest {
let request = tonic::Request::new(map::proto::MapRequest {
keys: vec!["first".into(), "second".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
Expand Down
18 changes: 8 additions & 10 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use tonic::{async_trait, Request, Response, Status};

use crate::sideinput::sideinputer::{side_input_server, ReadyResponse, SideInputResponse};

mod sideinputer {
mod proto {
tonic::include_proto!("sideinput.v1");
}

Expand All @@ -16,21 +14,21 @@ pub trait SideInputer {
}

#[async_trait]
impl<T> side_input_server::SideInput for SideInputService<T>
impl<T> proto::side_input_server::SideInput for SideInputService<T>
where
T: SideInputer + Send + Sync + 'static,
{
async fn retrieve_side_input(
&self,
_: Request<()>,
) -> Result<Response<SideInputResponse>, Status> {
) -> Result<Response<proto::SideInputResponse>, Status> {
let msg = self.handler.retrieve_sideinput().await;
let si = match msg {
Some(value) => SideInputResponse {
Some(value) => proto::SideInputResponse {
value,
no_broadcast: false,
},
None => SideInputResponse {
None => proto::SideInputResponse {
value: Vec::new(),
no_broadcast: true,
},
Expand All @@ -39,8 +37,8 @@ where
Ok(Response::new(si))
}

async fn is_ready(&self, _: Request<()>) -> Result<Response<ReadyResponse>, Status> {
Ok(Response::new(ReadyResponse { ready: true }))
async fn is_ready(&self, _: Request<()>) -> Result<Response<proto::ReadyResponse>, Status> {
Ok(Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand All @@ -58,7 +56,7 @@ where
let si_svc = SideInputService { handler: m };

tonic::transport::Server::builder()
.add_service(side_input_server::SideInputServer::new(si_svc))
.add_service(proto::side_input_server::SideInputServer::new(si_svc))
.serve_with_incoming(listener)
.await
.map_err(Into::into)
Expand Down
35 changes: 16 additions & 19 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,9 @@ use chrono::{DateTime, Utc};
use tokio::sync::{mpsc, oneshot};
use tonic::{Request, Status, Streaming};

use crate::sink::sinker_grpc::{
sink_response,
sink_server::{Sink, SinkServer},
ReadyResponse, SinkRequest as RPCSinkRequest, SinkResponse,
};

use crate::shared;

mod sinker_grpc {
mod proto {
tonic::include_proto!("sink.v1");
}

Expand Down Expand Up @@ -93,8 +87,8 @@ pub struct SinkRequest {
pub id: String,
}

impl From<RPCSinkRequest> for SinkRequest {
fn from(sr: RPCSinkRequest) -> Self {
impl From<proto::SinkRequest> for SinkRequest {
fn from(sr: proto::SinkRequest) -> Self {
Self {
keys: sr.keys,
value: sr.value,
Expand All @@ -116,7 +110,7 @@ pub struct Response {
pub err: String,
}

impl From<Response> for sink_response::Result {
impl From<Response> for proto::sink_response::Result {
fn from(r: Response) -> Self {
Self {
id: r.id,
Expand All @@ -127,14 +121,14 @@ impl From<Response> for sink_response::Result {
}

#[tonic::async_trait]
impl<T> Sink for SinkService<T>
impl<T> proto::sink_server::Sink for SinkService<T>
where
T: Sinker + Send + Sync + 'static,
{
async fn sink_fn(
&self,
request: Request<Streaming<RPCSinkRequest>>,
) -> Result<tonic::Response<SinkResponse>, Status> {
request: Request<Streaming<proto::SinkRequest>>,
) -> Result<tonic::Response<proto::SinkResponse>, Status> {
let mut stream = request.into_inner();

// TODO: what should be the idle buffer size?
Expand All @@ -160,13 +154,16 @@ where
// wait for the sink handle to respond
let responses = sink_handle.await;

Ok(tonic::Response::new(SinkResponse {
Ok(tonic::Response::new(proto::SinkResponse {
results: responses.into_iter().map(|r| r.into()).collect(),
}))
}

async fn is_ready(&self, _: Request<()>) -> Result<tonic::Response<ReadyResponse>, Status> {
Ok(tonic::Response::new(ReadyResponse { ready: true }))
async fn is_ready(
&self,
_: Request<()>,
) -> Result<tonic::Response<proto::ReadyResponse>, Status> {
Ok(tonic::Response::new(proto::ReadyResponse { ready: true }))
}
}

Expand Down Expand Up @@ -239,7 +236,7 @@ impl<T> Server<T> {
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
let svc = SinkService { handler };
let svc = SinkServer::new(svc)
let svc = proto::sink_server::SinkServer::new(svc)
.max_encoding_message_size(self.max_message_size)
.max_decoding_message_size(self.max_message_size);

Expand Down Expand Up @@ -272,7 +269,7 @@ mod tests {
use tower::service_fn;

use crate::sink;
use crate::sink::sinker_grpc::sink_client::SinkClient;
use crate::sink::proto::sink_client::SinkClient;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
Expand Down Expand Up @@ -345,7 +342,7 @@ mod tests {
.await?;

let mut client = SinkClient::new(channel);
let request = sink::sinker_grpc::SinkRequest {
let request = sink::proto::SinkRequest {
keys: vec!["first".into(), "second".into()],
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
Expand Down

0 comments on commit 5f71995

Please sign in to comment.