Skip to content

Commit

Permalink
Check-in generated code from protobuff files
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
  • Loading branch information
BulkBeing committed Oct 10, 2024
1 parent 0a3fc26 commit ee79a0a
Show file tree
Hide file tree
Showing 17 changed files with 3,591 additions and 33 deletions.
6 changes: 6 additions & 0 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
use std::env;

fn main() {
if env::var("PROTO_CODE_GEN").unwrap_or("0".to_string()) != "1" {
return;
}
tonic_build::configure()
.build_server(true)
.out_dir("src/servers")
.compile_protos(
&[
"proto/source.proto",
Expand Down
5 changes: 1 addition & 4 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ use crate::batchmap::proto::batch_map_server::BatchMap;
use crate::error::Error;
use crate::error::Error::BatchMapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::batchmap as proto;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";
/// Numaflow Batch Map Proto definitions.
pub mod proto {
tonic::include_proto!("batchmap.v1");
}

struct BatchMapService<T: BatchMapper> {
handler: Arc<T>,
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub mod sideinput;
/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
pub mod batchmap;

mod servers;

// Error handling on Numaflow SDKs!
//
// Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable.
Expand Down
6 changes: 1 addition & 5 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tracing::{error, info};

use crate::error::{Error, ErrorKind};
use crate::map::proto::MapResponse;
use crate::servers::map as proto;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_CHANNEL_SIZE: usize = 1000;
Expand All @@ -21,11 +22,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Map Proto definitions.
pub mod proto {
tonic::include_proto!("map.v1");
}

struct MapService<T> {
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
Expand Down
6 changes: 1 addition & 5 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tonic::{async_trait, Request, Response, Status};
use crate::error::Error;
use crate::error::Error::ReduceError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::reduce as proto;
use crate::shared::{self, prost_timestamp_from_utc, ContainerType};

const KEY_JOIN_DELIMITER: &str = ":";
Expand All @@ -22,11 +23,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Reduce Proto definitions.
pub mod proto {
tonic::include_proto!("reduce.v1");
}

struct ReduceService<C> {
creator: Arc<C>,
shutdown_tx: Sender<()>,
Expand Down
27 changes: 27 additions & 0 deletions src/servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[path = "servers/batchmap.v1.rs"]
#[rustfmt::skip]
pub mod batchmap;

#[path = "servers/map.v1.rs"]
#[rustfmt::skip]
pub mod map;

#[path = "servers/reduce.v1.rs"]
#[rustfmt::skip]
pub mod reduce;

#[path = "servers/sideinput.v1.rs"]
#[rustfmt::skip]
pub mod sideinput;

#[path = "servers/sink.v1.rs"]
#[rustfmt::skip]
pub mod sink;

#[path = "servers/source.v1.rs"]
#[rustfmt::skip]
pub mod source;

#[path = "servers/sourcetransformer.v1.rs"]
#[rustfmt::skip]
pub mod sourcetransformer;
Loading

0 comments on commit ee79a0a

Please sign in to comment.