diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index e1dda26..ba0cbe6 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -1,54 +1,69 @@ -name: Rust +name: Tests on: push: - branches: [ main ] + branches: + - main + - release-* pull_request: - branches: [ main ] + branches: + - main + +defaults: + run: + shell: bash jobs: build: - name: Unit Tests + name: Clippy and Unit Tests runs-on: ubuntu-24.04 + timeout-minutes: 10 steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@v4 - - name: Set up Rust - uses: actions-rs/toolchain@v1 + - name: Install Rust + uses: actions-rust-lang/setup-rust-toolchain@v1 with: - toolchain: stable - override: true + cache-workspaces: | + . -> target + ./examples/batchmap-cat -> target + ./examples/batchmap-flatmap -> target + ./examples/map-cat -> target + ./examples/map-tickgen-serde -> target + ./examples/mapt-event-time-filter -> target + ./examples/reduce-counter -> target + ./examples/sideinput -> target + ./examples/sideinput/udf -> target + ./examples/simple-source -> target + ./examples/sink-log -> target + ./examples/source-transformer-now -> target - - name: Cache Cargo dependencies - uses: actions/cache@v2 - with: - path: | - ~/.cargo/bin/ - ~/.cargo/registry/index/ - ~/.cargo/registry/cache/ - ~/.cargo/git/db/ - target/ - key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }} - - - name: Install protoc (for Protocol Buffers) + - name: Configure sccache run: | - set -eux -o pipefail - PROTOC_VERSION=3.19.4 - PROTOC_ZIP=protoc-$PROTOC_VERSION-linux-x86_64.zip - curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/$PROTOC_ZIP - sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc - sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*' - sudo chmod +x /usr/local/bin/protoc - sudo find /usr/local/include -type f | xargs sudo chmod a+r - sudo find /usr/local/include -type d | xargs sudo chmod a+rx - rm -f $PROTOC_ZIP - - - name: Build - run: cargo build --verbose + echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV + echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV + + - name: Run sccache-cache + uses: mozilla-actions/sccache-action@v0.0.5 + + - name: Install dependencies + run: sudo apt-get install -y protobuf-compiler + + - name: Code Generation + run: make codegen + + - name: Ensure generated code is checked in + run: git diff --exit-code + + - name: Lint + run: make lint - name: Run tests - run: cargo test --verbose + run: make test + + - name: Documentation generation + run: RUSTFLAGS="-D warnings" cargo doc - - name: Check formatting - run: cargo fmt --all -- --check + - name: Dry run Cargo Publish + run: cargo publish --dry-run diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 0000000..6e898b5 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,46 @@ +name: Release + +on: + release: + types: [ created ] + +defaults: + run: + shell: bash + +jobs: + build: + name: Publish to crates.io + runs-on: ubuntu-24.04 + timeout-minutes: 12 + # run workflow only on numaproj/numaflow-rs repository + if: ${{ github.repository }} == "numaproj/numaflow-rs" + steps: + - name: Check out code + uses: actions/checkout@v4 + + - name: Install Rust + uses: actions-rust-lang/setup-rust-toolchain@v1 + with: + cache: false + + - name: Install dependencies + run: sudo apt-get install -y protobuf-compiler + + - name: Code Generation + run: make codegen + + - name: Ensure generated code is checked in + run: git diff --exit-code + + - name: Lint + run: make lint + + - name: Run tests + run: make test + + - name: Documentation generation + run: RUSTFLAGS="-D warnings" cargo doc + + - name: Dry run Cargo Publish + run: CARGO_REGISTRY_TOKEN=${{ secrets.CARGO_PUBLISH }} cargo publish diff --git a/Cargo.toml b/Cargo.toml index 9908598..7744487 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,6 +11,13 @@ homepage = "https://numaproj.github.io/numaflow/" repository = "https://github.com/numaproj/numaflow-rs" keywords = ["numaflow", "streaming", "messaging", "event-driven"] categories = ["network-programming", "api-bindings"] +exclude = [ + ".github/*", + ".gitignore", + ".dockerignore", + "hack/*", + "Makefile", +] [lib] @@ -18,9 +25,9 @@ name = "numaflow" path = "src/lib.rs" [dependencies] -tonic = "0.12.2" -prost = "0.13.2" -prost-types = "0.13.2" +tonic = "0.12.3" +prost = "0.13.3" +prost-types = "0.13.3" tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] } tokio-util = "0.7.12" tokio-stream = { version = "0.1.16", features = ["net"] } @@ -34,7 +41,7 @@ thiserror = "1.0" hyper-util = "0.1.7" [build-dependencies] -tonic-build = "0.12.2" +tonic-build = "0.12.3" [dev-dependencies] tempfile = "3.9.0" diff --git a/Makefile b/Makefile index 2e7ee8d..71ea7e9 100644 --- a/Makefile +++ b/Makefile @@ -3,15 +3,39 @@ # perform a cargo fmt on all directories containing a Cargo.toml file .PHONY: lint # find all directories containing Cargo.toml files -DIRS := $(shell find . -type f -name Cargo.toml -exec dirname {} \; | sort -u) +DIRS := $(shell find . -type f -name Cargo.toml -not -path "./target/*" -exec dirname {} \; | sort -u) $(info Included directories: $(DIRS)) -lint: +fmt: @for dir in $(DIRS); do \ echo "Formatting code in $$dir"; \ cargo fmt --all --manifest-path "$$dir/Cargo.toml"; \ done +# Check if all files are formatted and run clippy on all directories containing a Cargo.toml file +.PHONY: lint +lint: test-fmt clippy + +.PHONY: test-fmt +test-fmt: + @for dir in $(DIRS); do \ + echo "Checking if code is formatted in directory: $$dir"; \ + cargo fmt --all --check --manifest-path "$$dir/Cargo.toml" || { echo "Code is not formatted in $$dir"; exit 1; }; \ + done + +.PHONY: clippy +clippy: + @for dir in $(DIRS); do \ + echo "Running clippy in directory: $$dir"; \ + cargo clippy --workspace --manifest-path "$$dir/Cargo.toml" -- -D warnings || { echo "Clippy warnings/errors found in $$dir"; exit 1; }; \ + done + # run cargo test on the repository root .PHONY: test test: cargo test --workspace + +.PHONY: codegen +codegen: + # Change timestamps so that tonic_build code generation will always be triggered. + touch proto/* + PROTO_CODE_GEN=1 cargo build diff --git a/build.rs b/build.rs index 45943b8..0858267 100644 --- a/build.rs +++ b/build.rs @@ -1,7 +1,13 @@ +use std::env; + fn main() { + if env::var("PROTO_CODE_GEN").unwrap_or("0".to_string()) != "1" { + return; + } tonic_build::configure() .build_server(true) - .compile( + .out_dir("src/servers") + .compile_protos( &[ "proto/source.proto", "proto/sourcetransform.proto", diff --git a/examples/reduce-counter/src/main.rs b/examples/reduce-counter/src/main.rs index 8f8b17e..3f8e988 100644 --- a/examples/reduce-counter/src/main.rs +++ b/examples/reduce-counter/src/main.rs @@ -37,7 +37,7 @@ mod counter { &self, keys: Vec, mut input: Receiver, - md: &Metadata, + _md: &Metadata, ) -> Vec { let mut counter = 0; // the loop exits when input is closed which will happen only on close of book. diff --git a/examples/source-transformer-now/src/main.rs b/examples/source-transformer-now/src/main.rs index cdfc023..106a9f9 100644 --- a/examples/source-transformer-now/src/main.rs +++ b/examples/source-transformer-now/src/main.rs @@ -15,6 +15,9 @@ impl sourcetransform::SourceTransformer for NowCat { &self, input: sourcetransform::SourceTransformRequest, ) -> Vec { - vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())] + vec![ + sourcetransform::Message::new(input.value, chrono::offset::Utc::now()) + .keys(input.keys.clone()), + ] } } diff --git a/src/batchmap.rs b/src/batchmap.rs index 4112618..d1f19de 100644 --- a/src/batchmap.rs +++ b/src/batchmap.rs @@ -15,16 +15,13 @@ use crate::batchmap::proto::batch_map_server::BatchMap; use crate::error::Error; use crate::error::Error::BatchMapError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::servers::batchmap as proto; use crate::shared::{self, shutdown_signal, ContainerType}; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; const DROP: &str = "U+005C__DROP__"; -/// Numaflow Batch Map Proto definitions. -pub mod proto { - tonic::include_proto!("batchmap.v1"); -} struct BatchMapService { handler: Arc, @@ -106,7 +103,7 @@ impl From for Datum { } } } -/// Message is the response struct from the [`Mapper::map`] . +/// Message is the response struct from the [`Mapper::map`][`crate::map::Mapper::map`] . #[derive(Debug, PartialEq)] pub struct Message { /// Keys are a collection of strings which will be passed on to the next vertex as is. It can diff --git a/src/lib.rs b/src/lib.rs index db9482e..9948098 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,8 @@ pub mod sideinput; /// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers. pub mod batchmap; +mod servers; + // Error handling on Numaflow SDKs! // // Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable. diff --git a/src/map.rs b/src/map.rs index a12340e..51909cb 100644 --- a/src/map.rs +++ b/src/map.rs @@ -12,7 +12,7 @@ use tonic::{async_trait, Request, Response, Status, Streaming}; use tracing::{error, info}; use crate::error::{Error, ErrorKind}; -use crate::map::proto::MapResponse; +use crate::servers::map::{self as proto, MapResponse}; use crate::shared::{self, shutdown_signal, ContainerType}; const DEFAULT_CHANNEL_SIZE: usize = 1000; @@ -21,11 +21,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info"; const DROP: &str = "U+005C__DROP__"; -/// Numaflow Map Proto definitions. -pub mod proto { - tonic::include_proto!("map.v1"); -} - struct MapService { handler: Arc, shutdown_tx: mpsc::Sender<()>, diff --git a/src/reduce.rs b/src/reduce.rs index 5d304cd..6f58972 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -14,6 +14,7 @@ use tonic::{async_trait, Request, Response, Status}; use crate::error::Error; use crate::error::Error::ReduceError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; +pub use crate::servers::reduce as proto; use crate::shared::{self, prost_timestamp_from_utc, ContainerType}; const KEY_JOIN_DELIMITER: &str = ":"; @@ -22,11 +23,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info"; const DROP: &str = "U+005C__DROP__"; -/// Numaflow Reduce Proto definitions. -pub mod proto { - tonic::include_proto!("reduce.v1"); -} - struct ReduceService { creator: Arc, shutdown_tx: Sender<()>, @@ -80,7 +76,7 @@ pub trait ReducerCreator { /// Reducer trait for implementing Reduce handler. #[async_trait] pub trait Reducer { - /// reduce_handle is provided with a set of keys, a channel of [`Datum`], and [`Metadata`]. It + /// reduce_handle is provided with a set of keys, a channel of [`ReduceRequest`], and [`Metadata`]. It /// returns 0, 1, or more results as a [`Vec`] of [`Message`]. Reduce is a stateful operation and /// the channel is for the collection of keys and for that time [Window]. /// You can read more about reduce [here](https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/). @@ -103,7 +99,7 @@ pub trait Reducer { /// use numaflow::reduce::{Reducer, Metadata}; /// use tokio::sync::mpsc::Receiver; /// use tonic::async_trait; - /// use numaflow::reduce::proto::reduce_server::Reduce; + /// use numaflow::reduce::proto::reduce_server::Reduce; /// pub(crate) struct Counter {} /// /// pub(crate) struct CounterCreator {} diff --git a/src/servers.rs b/src/servers.rs new file mode 100644 index 0000000..1b9ee46 --- /dev/null +++ b/src/servers.rs @@ -0,0 +1,27 @@ +#[path = "servers/batchmap.v1.rs"] +#[rustfmt::skip] +pub mod batchmap; + +#[path = "servers/map.v1.rs"] +#[rustfmt::skip] +pub mod map; + +#[path = "servers/reduce.v1.rs"] +#[rustfmt::skip] +pub mod reduce; + +#[path = "servers/sideinput.v1.rs"] +#[rustfmt::skip] +pub mod sideinput; + +#[path = "servers/sink.v1.rs"] +#[rustfmt::skip] +pub mod sink; + +#[path = "servers/source.v1.rs"] +#[rustfmt::skip] +pub mod source; + +#[path = "servers/sourcetransformer.v1.rs"] +#[rustfmt::skip] +pub mod sourcetransformer; diff --git a/src/servers/batchmap.v1.rs b/src/servers/batchmap.v1.rs new file mode 100644 index 0000000..fdb254b --- /dev/null +++ b/src/servers/batchmap.v1.rs @@ -0,0 +1,426 @@ +// This file is @generated by prost-build. +/// * +/// BatchMapRequest represents a request element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BatchMapRequest { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "4")] + pub watermark: ::core::option::Option<::prost_types::Timestamp>, + #[prost(map = "string, string", tag = "5")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + /// This ID is used uniquely identify a map request + #[prost(string, tag = "6")] + pub id: ::prost::alloc::string::String, +} +/// * +/// BatchMapResponse represents a response element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct BatchMapResponse { + #[prost(message, repeated, tag = "1")] + pub results: ::prost::alloc::vec::Vec, + /// This ID is used to refer the responses to the request it corresponds to. + #[prost(string, tag = "2")] + pub id: ::prost::alloc::string::String, +} +/// Nested message and enum types in `BatchMapResponse`. +pub mod batch_map_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(string, repeated, tag = "3")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + } +} +/// * +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// Generated client implementations. +pub mod batch_map_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct BatchMapClient { + inner: tonic::client::Grpc, + } + impl BatchMapClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl BatchMapClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> BatchMapClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + BatchMapClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// IsReady is the heartbeat endpoint for gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/batchmap.v1.BatchMap/IsReady", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("batchmap.v1.BatchMap", "IsReady")); + self.inner.unary(req, path, codec).await + } + /// BatchMapFn is a bi-directional streaming rpc which applies a + /// Map function on each BatchMapRequest element of the stream and then returns streams + /// back MapResponse elements. + pub async fn batch_map_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/batchmap.v1.BatchMap/BatchMapFn", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert(GrpcMethod::new("batchmap.v1.BatchMap", "BatchMapFn")); + self.inner.streaming(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod batch_map_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with BatchMapServer. + #[async_trait] + pub trait BatchMap: std::marker::Send + std::marker::Sync + 'static { + /// IsReady is the heartbeat endpoint for gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the BatchMapFn method. + type BatchMapFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// BatchMapFn is a bi-directional streaming rpc which applies a + /// Map function on each BatchMapRequest element of the stream and then returns streams + /// back MapResponse elements. + async fn batch_map_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct BatchMapServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl BatchMapServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for BatchMapServer + where + T: BatchMap, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/batchmap.v1.BatchMap/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/batchmap.v1.BatchMap/BatchMapFn" => { + #[allow(non_camel_case_types)] + struct BatchMapFnSvc(pub Arc); + impl< + T: BatchMap, + > tonic::server::StreamingService + for BatchMapFnSvc { + type Response = super::BatchMapResponse; + type ResponseStream = T::BatchMapFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::batch_map_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = BatchMapFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for BatchMapServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "batchmap.v1.BatchMap"; + impl tonic::server::NamedService for BatchMapServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/map.v1.rs b/src/servers/map.v1.rs new file mode 100644 index 0000000..1091aba --- /dev/null +++ b/src/servers/map.v1.rs @@ -0,0 +1,432 @@ +// This file is @generated by prost-build. +/// * +/// MapRequest represents a request element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MapRequest { + #[prost(message, optional, tag = "1")] + pub request: ::core::option::Option, + /// This ID is used to uniquely identify a map request + #[prost(string, tag = "2")] + pub id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "3")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `MapRequest`. +pub mod map_request { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Request { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "4")] + pub watermark: ::core::option::Option<::prost_types::Timestamp>, + #[prost(map = "string, string", tag = "5")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + } +} +/// +/// Handshake message between client and server to indicate the start of transmission. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Handshake { + /// Required field indicating the start of transmission. + #[prost(bool, tag = "1")] + pub sot: bool, +} +/// * +/// MapResponse represents a response element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct MapResponse { + #[prost(message, repeated, tag = "1")] + pub results: ::prost::alloc::vec::Vec, + /// This ID is used to refer the responses to the request it corresponds to. + #[prost(string, tag = "2")] + pub id: ::prost::alloc::string::String, + #[prost(message, optional, tag = "3")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `MapResponse`. +pub mod map_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(string, repeated, tag = "3")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + } +} +/// * +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// Generated client implementations. +pub mod map_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct MapClient { + inner: tonic::client::Grpc, + } + impl MapClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl MapClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> MapClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + MapClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// MapFn applies a function to each map request element. + pub async fn map_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/map.v1.Map/MapFn"); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("map.v1.Map", "MapFn")); + self.inner.streaming(req, path, codec).await + } + /// IsReady is the heartbeat endpoint for gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/map.v1.Map/IsReady"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("map.v1.Map", "IsReady")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod map_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with MapServer. + #[async_trait] + pub trait Map: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the MapFn method. + type MapFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// MapFn applies a function to each map request element. + async fn map_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// IsReady is the heartbeat endpoint for gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct MapServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl MapServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for MapServer + where + T: Map, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/map.v1.Map/MapFn" => { + #[allow(non_camel_case_types)] + struct MapFnSvc(pub Arc); + impl tonic::server::StreamingService + for MapFnSvc { + type Response = super::MapResponse; + type ResponseStream = T::MapFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::map_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = MapFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/map.v1.Map/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for MapServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "map.v1.Map"; + impl tonic::server::NamedService for MapServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/reduce.v1.rs b/src/servers/reduce.v1.rs new file mode 100644 index 0000000..19f5dfd --- /dev/null +++ b/src/servers/reduce.v1.rs @@ -0,0 +1,490 @@ +// This file is @generated by prost-build. +/// * +/// ReduceRequest represents a request element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReduceRequest { + #[prost(message, optional, tag = "1")] + pub payload: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub operation: ::core::option::Option, +} +/// Nested message and enum types in `ReduceRequest`. +pub mod reduce_request { + /// WindowOperation represents a window operation. + /// For Aligned windows, OPEN, APPEND and CLOSE events are sent. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct WindowOperation { + #[prost(enumeration = "window_operation::Event", tag = "1")] + pub event: i32, + #[prost(message, repeated, tag = "2")] + pub windows: ::prost::alloc::vec::Vec, + } + /// Nested message and enum types in `WindowOperation`. + pub mod window_operation { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Event { + Open = 0, + Close = 1, + Append = 4, + } + impl Event { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Open => "OPEN", + Self::Close => "CLOSE", + Self::Append => "APPEND", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "OPEN" => Some(Self::Open), + "CLOSE" => Some(Self::Close), + "APPEND" => Some(Self::Append), + _ => None, + } + } + } + } + /// Payload represents a payload element. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Payload { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "4")] + pub watermark: ::core::option::Option<::prost_types::Timestamp>, + #[prost(map = "string, string", tag = "5")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + } +} +/// Window represents a window. +/// Since the client doesn't track keys, window doesn't have a keys field. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Window { + #[prost(message, optional, tag = "1")] + pub start: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "2")] + pub end: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag = "3")] + pub slot: ::prost::alloc::string::String, +} +/// * +/// ReduceResponse represents a response element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReduceResponse { + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, + /// window represents a window to which the result belongs. + #[prost(message, optional, tag = "2")] + pub window: ::core::option::Option, + /// EOF represents the end of the response for a window. + #[prost(bool, tag = "3")] + pub eof: bool, +} +/// Nested message and enum types in `ReduceResponse`. +pub mod reduce_response { + /// Result represents a result element. It contains the result of the reduce function. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(string, repeated, tag = "3")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + } +} +/// * +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// Generated client implementations. +pub mod reduce_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct ReduceClient { + inner: tonic::client::Grpc, + } + impl ReduceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl ReduceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> ReduceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + ReduceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// ReduceFn applies a reduce function to a request stream. + pub async fn reduce_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/reduce.v1.Reduce/ReduceFn", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("reduce.v1.Reduce", "ReduceFn")); + self.inner.streaming(req, path, codec).await + } + /// IsReady is the heartbeat endpoint for gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/reduce.v1.Reduce/IsReady"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("reduce.v1.Reduce", "IsReady")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod reduce_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with ReduceServer. + #[async_trait] + pub trait Reduce: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the ReduceFn method. + type ReduceFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// ReduceFn applies a reduce function to a request stream. + async fn reduce_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// IsReady is the heartbeat endpoint for gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct ReduceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl ReduceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for ReduceServer + where + T: Reduce, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/reduce.v1.Reduce/ReduceFn" => { + #[allow(non_camel_case_types)] + struct ReduceFnSvc(pub Arc); + impl tonic::server::StreamingService + for ReduceFnSvc { + type Response = super::ReduceResponse; + type ResponseStream = T::ReduceFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::reduce_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReduceFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/reduce.v1.Reduce/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for ReduceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "reduce.v1.Reduce"; + impl tonic::server::NamedService for ReduceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/sideinput.v1.rs b/src/servers/sideinput.v1.rs new file mode 100644 index 0000000..7ef86e6 --- /dev/null +++ b/src/servers/sideinput.v1.rs @@ -0,0 +1,396 @@ +// This file is @generated by prost-build. +/// SideInputResponse represents a response to a given side input retrieval request. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SideInputResponse { + /// value represents the latest value of the side input payload + #[prost(bytes = "vec", tag = "1")] + pub value: ::prost::alloc::vec::Vec, + /// noBroadcast indicates whether the side input value should be broadcasted to all + /// True if value should not be broadcasted + /// False if value should be broadcasted + #[prost(bool, tag = "2")] + pub no_broadcast: bool, +} +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// Generated client implementations. +pub mod side_input_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// SideInput is the gRPC service for user-defined Side Inputs. + /// It is used to propagate changes in the values of the provided Side Inputs + /// which allows access to slow updated data or configuration without needing to retrieve + /// it during each message processing. + /// Through this service we should should be able to:- + /// 1) Invoke retrieval request for a single Side Input parameter, which in turn should + /// check for updates and return its latest value. + /// 2) Provide a health check endpoint to indicate whether the service is ready to be used. + #[derive(Debug, Clone)] + pub struct SideInputClient { + inner: tonic::client::Grpc, + } + impl SideInputClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SideInputClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SideInputClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SideInputClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// RetrieveSideInput is the endpoint to retrieve the latest value of a given Side Input. + pub async fn retrieve_side_input( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sideinput.v1.SideInput/RetrieveSideInput", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sideinput.v1.SideInput", "RetrieveSideInput")); + self.inner.unary(req, path, codec).await + } + /// IsReady is the health check endpoint to indicate whether the service is ready to be used. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sideinput.v1.SideInput/IsReady", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("sideinput.v1.SideInput", "IsReady")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod side_input_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SideInputServer. + #[async_trait] + pub trait SideInput: std::marker::Send + std::marker::Sync + 'static { + /// RetrieveSideInput is the endpoint to retrieve the latest value of a given Side Input. + async fn retrieve_side_input( + &self, + request: tonic::Request<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// IsReady is the health check endpoint to indicate whether the service is ready to be used. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + /// SideInput is the gRPC service for user-defined Side Inputs. + /// It is used to propagate changes in the values of the provided Side Inputs + /// which allows access to slow updated data or configuration without needing to retrieve + /// it during each message processing. + /// Through this service we should should be able to:- + /// 1) Invoke retrieval request for a single Side Input parameter, which in turn should + /// check for updates and return its latest value. + /// 2) Provide a health check endpoint to indicate whether the service is ready to be used. + #[derive(Debug)] + pub struct SideInputServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SideInputServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SideInputServer + where + T: SideInput, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/sideinput.v1.SideInput/RetrieveSideInput" => { + #[allow(non_camel_case_types)] + struct RetrieveSideInputSvc(pub Arc); + impl tonic::server::UnaryService<()> + for RetrieveSideInputSvc { + type Response = super::SideInputResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::retrieve_side_input(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = RetrieveSideInputSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sideinput.v1.SideInput/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> + for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SideInputServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "sideinput.v1.SideInput"; + impl tonic::server::NamedService for SideInputServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/sink.v1.rs b/src/servers/sink.v1.rs new file mode 100644 index 0000000..c4a3c29 --- /dev/null +++ b/src/servers/sink.v1.rs @@ -0,0 +1,473 @@ +// This file is @generated by prost-build. +/// * +/// SinkRequest represents a request element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SinkRequest { + /// Required field indicating the request. + #[prost(message, optional, tag = "1")] + pub request: ::core::option::Option, + /// Required field indicating the status of the request. + /// If eot is set to true, it indicates the end of transmission. + #[prost(message, optional, tag = "2")] + pub status: ::core::option::Option, + /// optional field indicating the handshake message. + #[prost(message, optional, tag = "3")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `SinkRequest`. +pub mod sink_request { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Request { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "4")] + pub watermark: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, tag = "5")] + pub id: ::prost::alloc::string::String, + #[prost(map = "string, string", tag = "6")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + } + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct Status { + #[prost(bool, tag = "1")] + pub eot: bool, + } +} +/// +/// Handshake message between client and server to indicate the start of transmission. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Handshake { + /// Required field indicating the start of transmission. + #[prost(bool, tag = "1")] + pub sot: bool, +} +/// * +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// * +/// SinkResponse is the individual response of each message written to the sink. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SinkResponse { + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `SinkResponse`. +pub mod sink_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + /// id is the ID of the message, can be used to uniquely identify the message. + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + /// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK. + #[prost(enumeration = "super::Status", tag = "2")] + pub status: i32, + /// err_msg is the error message, set it if success is set to false. + #[prost(string, tag = "3")] + pub err_msg: ::prost::alloc::string::String, + } +} +/// +/// Status is the status of the response. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum Status { + Success = 0, + Failure = 1, + Fallback = 2, +} +impl Status { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Success => "SUCCESS", + Self::Failure => "FAILURE", + Self::Fallback => "FALLBACK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SUCCESS" => Some(Self::Success), + "FAILURE" => Some(Self::Failure), + "FALLBACK" => Some(Self::Fallback), + _ => None, + } + } +} +/// Generated client implementations. +pub mod sink_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SinkClient { + inner: tonic::client::Grpc, + } + impl SinkClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SinkClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SinkClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SinkClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// SinkFn writes the request to a user defined sink. + pub async fn sink_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/sink.v1.Sink/SinkFn"); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("sink.v1.Sink", "SinkFn")); + self.inner.streaming(req, path, codec).await + } + /// IsReady is the heartbeat endpoint for gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/sink.v1.Sink/IsReady"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("sink.v1.Sink", "IsReady")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod sink_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SinkServer. + #[async_trait] + pub trait Sink: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the SinkFn method. + type SinkFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// SinkFn writes the request to a user defined sink. + async fn sink_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// IsReady is the heartbeat endpoint for gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct SinkServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SinkServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SinkServer + where + T: Sink, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/sink.v1.Sink/SinkFn" => { + #[allow(non_camel_case_types)] + struct SinkFnSvc(pub Arc); + impl tonic::server::StreamingService + for SinkFnSvc { + type Response = super::SinkResponse; + type ResponseStream = T::SinkFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::sink_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SinkFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sink.v1.Sink/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SinkServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "sink.v1.Sink"; + impl tonic::server::NamedService for SinkServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/source.v1.rs b/src/servers/source.v1.rs new file mode 100644 index 0000000..68210fc --- /dev/null +++ b/src/servers/source.v1.rs @@ -0,0 +1,869 @@ +// This file is @generated by prost-build. +/// +/// Handshake message between client and server to indicate the start of transmission. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Handshake { + /// Required field indicating the start of transmission. + #[prost(bool, tag = "1")] + pub sot: bool, +} +/// +/// ReadRequest is the request for reading datum stream from user defined source. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadRequest { + /// Required field indicating the request. + #[prost(message, optional, tag = "1")] + pub request: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `ReadRequest`. +pub mod read_request { + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct Request { + /// Required field indicating the number of records to read. + #[prost(uint64, tag = "1")] + pub num_records: u64, + /// Required field indicating the request timeout in milliseconds. + /// uint32 can represent 2^32 milliseconds, which is about 49 days. + /// We don't use uint64 because time.Duration takes int64 as nano seconds. Using uint64 for milli will cause overflow. + #[prost(uint32, tag = "2")] + pub timeout_in_ms: u32, + } +} +/// +/// ReadResponse is the response for reading datum stream from user defined source. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ReadResponse { + /// Required field holding the result. + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, + /// Status of the response. Holds the end of transmission flag and the status code. + #[prost(message, optional, tag = "2")] + pub status: ::core::option::Option, + /// Handshake message between client and server to indicate the start of transmission. + #[prost(message, optional, tag = "3")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `ReadResponse`. +pub mod read_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + /// Required field holding the payload of the datum. + #[prost(bytes = "vec", tag = "1")] + pub payload: ::prost::alloc::vec::Vec, + /// Required field indicating the offset information of the datum. + #[prost(message, optional, tag = "2")] + pub offset: ::core::option::Option, + /// Required field representing the time associated with each datum. It is used for watermarking. + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + /// Optional list of keys associated with the datum. + /// Key is the "key" attribute in (key,value) as in the map-reduce paradigm. + /// We add this optional field to support the use case where the user defined source can provide keys for the datum. + /// e.g. Kafka and Redis Stream message usually include information about the keys. + #[prost(string, repeated, tag = "4")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Optional list of headers associated with the datum. + /// Headers are the metadata associated with the datum. + /// e.g. Kafka and Redis Stream message usually include information about the headers. + #[prost(map = "string, string", tag = "5")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + } + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Status { + /// End of transmission flag. + #[prost(bool, tag = "1")] + pub eot: bool, + #[prost(enumeration = "status::Code", tag = "2")] + pub code: i32, + #[prost(enumeration = "status::Error", optional, tag = "3")] + pub error: ::core::option::Option, + #[prost(string, optional, tag = "4")] + pub msg: ::core::option::Option<::prost::alloc::string::String>, + } + /// Nested message and enum types in `Status`. + pub mod status { + /// Code to indicate the status of the response. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Code { + Success = 0, + Failure = 1, + } + impl Code { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Success => "SUCCESS", + Self::Failure => "FAILURE", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SUCCESS" => Some(Self::Success), + "FAILURE" => Some(Self::Failure), + _ => None, + } + } + } + /// Error to indicate the error type. If the code is FAILURE, then the error field will be populated. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Error { + Unacked = 0, + Other = 1, + } + impl Error { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unacked => "UNACKED", + Self::Other => "OTHER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "UNACKED" => Some(Self::Unacked), + "OTHER" => Some(Self::Other), + _ => None, + } + } + } + } +} +/// +/// AckRequest is the request for acknowledging datum. +/// It takes a list of offsets to be acknowledged. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AckRequest { + /// Required field holding the request. The list will be ordered and will have the same order as the original Read response. + #[prost(message, optional, tag = "1")] + pub request: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `AckRequest`. +pub mod ack_request { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Request { + /// Required field holding the offset to be acked + #[prost(message, optional, tag = "1")] + pub offset: ::core::option::Option, + } +} +/// +/// AckResponse is the response for acknowledging datum. It contains one empty field confirming +/// the batch of offsets that have been successfully acknowledged. The contract between client and server +/// is that the server will only return the AckResponse if the ack request is successful. +/// If the server hangs during the ack request, the client can decide to timeout and error out the data forwarder. +/// The reason why we define such contract is that we always expect the server to be able to process the ack request. +/// Client is expected to send the AckRequest to the server with offsets that are strictly +/// corresponding to the previously read batch. If the client sends the AckRequest with offsets that are not, +/// it is considered as a client error and the server will not return the AckResponse. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct AckResponse { + /// Required field holding the result. + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, + /// Handshake message between client and server to indicate the start of transmission. + #[prost(message, optional, tag = "2")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `AckResponse`. +pub mod ack_response { + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct Result { + /// Required field indicating the ack request is successful. + #[prost(message, optional, tag = "1")] + pub success: ::core::option::Option<()>, + } +} +/// +/// ReadyResponse is the health check result for user defined source. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + /// Required field holding the health check result. + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// +/// PendingResponse is the response for the pending request. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct PendingResponse { + /// Required field holding the result. + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `PendingResponse`. +pub mod pending_response { + #[derive(Clone, Copy, PartialEq, ::prost::Message)] + pub struct Result { + /// Required field holding the number of pending records at the user defined source. + /// A negative count indicates that the pending information is not available. + #[prost(int64, tag = "1")] + pub count: i64, + } +} +/// +/// PartitionsResponse is the response for the partitions request. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PartitionsResponse { + /// Required field holding the result. + #[prost(message, optional, tag = "1")] + pub result: ::core::option::Option, +} +/// Nested message and enum types in `PartitionsResponse`. +pub mod partitions_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + /// Required field holding the list of partitions. + #[prost(int32, repeated, tag = "1")] + pub partitions: ::prost::alloc::vec::Vec, + } +} +/// +/// Offset is the offset of the datum. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Offset { + /// offset is the offset of the datum. This field is required. + /// We define Offset as a byte array because different input data sources can have different representations for Offset. + /// The only way to generalize it is to define it as a byte array, + /// Such that we can let the UDSource to de-serialize the offset using its own interpretation logics. + #[prost(bytes = "vec", tag = "1")] + pub offset: ::prost::alloc::vec::Vec, + /// Optional partition_id indicates which partition of the source the datum belongs to. + /// It is useful for sources that have multiple partitions. e.g. Kafka. + /// If the partition_id is not specified, it is assumed that the source has a single partition. + #[prost(int32, tag = "2")] + pub partition_id: i32, +} +/// Generated client implementations. +pub mod source_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SourceClient { + inner: tonic::client::Grpc, + } + impl SourceClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SourceClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SourceClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SourceClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// Read returns a stream of datum responses. + /// The size of the returned responses is less than or equal to the num_records specified in each ReadRequest. + /// If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list). + /// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream. + /// Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true. + pub async fn read_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/source.v1.Source/ReadFn"); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("source.v1.Source", "ReadFn")); + self.inner.streaming(req, path, codec).await + } + /// AckFn acknowledges a stream of datum offsets. + /// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex. + /// The caller (numa) expects the AckFn to be successful, and it does not expect any errors. + /// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, + /// then it is best to crash because there are no other retry mechanisms possible. + /// Clients sends n requests and expects n responses. + pub async fn ack_fn( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/source.v1.Source/AckFn"); + let mut req = request.into_streaming_request(); + req.extensions_mut().insert(GrpcMethod::new("source.v1.Source", "AckFn")); + self.inner.streaming(req, path, codec).await + } + /// PendingFn returns the number of pending records at the user defined source. + pub async fn pending_fn( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/source.v1.Source/PendingFn", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("source.v1.Source", "PendingFn")); + self.inner.unary(req, path, codec).await + } + /// PartitionsFn returns the list of partitions for the user defined source. + pub async fn partitions_fn( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/source.v1.Source/PartitionsFn", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("source.v1.Source", "PartitionsFn")); + self.inner.unary(req, path, codec).await + } + /// IsReady is the heartbeat endpoint for user defined source gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/source.v1.Source/IsReady"); + let mut req = request.into_request(); + req.extensions_mut().insert(GrpcMethod::new("source.v1.Source", "IsReady")); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod source_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SourceServer. + #[async_trait] + pub trait Source: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the ReadFn method. + type ReadFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// Read returns a stream of datum responses. + /// The size of the returned responses is less than or equal to the num_records specified in each ReadRequest. + /// If the request timeout is reached on the server side, the returned responses will contain all the datum that have been read (which could be an empty list). + /// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream. + /// Once it has sent all the datum, the server will send a ReadResponse with the end of transmission flag set to true. + async fn read_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// Server streaming response type for the AckFn method. + type AckFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// AckFn acknowledges a stream of datum offsets. + /// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex. + /// The caller (numa) expects the AckFn to be successful, and it does not expect any errors. + /// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request, + /// then it is best to crash because there are no other retry mechanisms possible. + /// Clients sends n requests and expects n responses. + async fn ack_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result, tonic::Status>; + /// PendingFn returns the number of pending records at the user defined source. + async fn pending_fn( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + /// PartitionsFn returns the list of partitions for the user defined source. + async fn partitions_fn( + &self, + request: tonic::Request<()>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// IsReady is the heartbeat endpoint for user defined source gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct SourceServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SourceServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SourceServer + where + T: Source, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/source.v1.Source/ReadFn" => { + #[allow(non_camel_case_types)] + struct ReadFnSvc(pub Arc); + impl tonic::server::StreamingService + for ReadFnSvc { + type Response = super::ReadResponse; + type ResponseStream = T::ReadFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::read_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = ReadFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/source.v1.Source/AckFn" => { + #[allow(non_camel_case_types)] + struct AckFnSvc(pub Arc); + impl tonic::server::StreamingService + for AckFnSvc { + type Response = super::AckResponse; + type ResponseStream = T::AckFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request>, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::ack_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = AckFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/source.v1.Source/PendingFn" => { + #[allow(non_camel_case_types)] + struct PendingFnSvc(pub Arc); + impl tonic::server::UnaryService<()> for PendingFnSvc { + type Response = super::PendingResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::pending_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PendingFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/source.v1.Source/PartitionsFn" => { + #[allow(non_camel_case_types)] + struct PartitionsFnSvc(pub Arc); + impl tonic::server::UnaryService<()> + for PartitionsFnSvc { + type Response = super::PartitionsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::partitions_fn(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = PartitionsFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/source.v1.Source/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SourceServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "source.v1.Source"; + impl tonic::server::NamedService for SourceServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/servers/sourcetransformer.v1.rs b/src/servers/sourcetransformer.v1.rs new file mode 100644 index 0000000..d3c7dfe --- /dev/null +++ b/src/servers/sourcetransformer.v1.rs @@ -0,0 +1,463 @@ +// This file is @generated by prost-build. +/// +/// Handshake message between client and server to indicate the start of transmission. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct Handshake { + /// Required field indicating the start of transmission. + #[prost(bool, tag = "1")] + pub sot: bool, +} +/// * +/// SourceTransformerRequest represents a request element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SourceTransformRequest { + #[prost(message, optional, tag = "1")] + pub request: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `SourceTransformRequest`. +pub mod source_transform_request { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Request { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "4")] + pub watermark: ::core::option::Option<::prost_types::Timestamp>, + #[prost(map = "string, string", tag = "5")] + pub headers: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + /// This ID is used to uniquely identify a transform request + #[prost(string, tag = "6")] + pub id: ::prost::alloc::string::String, + } +} +/// * +/// SourceTransformerResponse represents a response element. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct SourceTransformResponse { + #[prost(message, repeated, tag = "1")] + pub results: ::prost::alloc::vec::Vec, + /// This ID is used to refer the responses to the request it corresponds to. + #[prost(string, tag = "2")] + pub id: ::prost::alloc::string::String, + /// Handshake message between client and server to indicate the start of transmission. + #[prost(message, optional, tag = "3")] + pub handshake: ::core::option::Option, +} +/// Nested message and enum types in `SourceTransformResponse`. +pub mod source_transform_response { + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Result { + #[prost(string, repeated, tag = "1")] + pub keys: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(bytes = "vec", tag = "2")] + pub value: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "3")] + pub event_time: ::core::option::Option<::prost_types::Timestamp>, + #[prost(string, repeated, tag = "4")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + } +} +/// * +/// ReadyResponse is the health check result. +#[derive(Clone, Copy, PartialEq, ::prost::Message)] +pub struct ReadyResponse { + #[prost(bool, tag = "1")] + pub ready: bool, +} +/// Generated client implementations. +pub mod source_transform_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + #[derive(Debug, Clone)] + pub struct SourceTransformClient { + inner: tonic::client::Grpc, + } + impl SourceTransformClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl SourceTransformClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> SourceTransformClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + SourceTransformClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// SourceTransformFn applies a function to each request element. + /// In addition to map function, SourceTransformFn also supports assigning a new event time to response. + /// SourceTransformFn can be used only at source vertex by source data transformer. + pub async fn source_transform_fn( + &mut self, + request: impl tonic::IntoStreamingRequest< + Message = super::SourceTransformRequest, + >, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sourcetransformer.v1.SourceTransform/SourceTransformFn", + ); + let mut req = request.into_streaming_request(); + req.extensions_mut() + .insert( + GrpcMethod::new( + "sourcetransformer.v1.SourceTransform", + "SourceTransformFn", + ), + ); + self.inner.streaming(req, path, codec).await + } + /// IsReady is the heartbeat endpoint for gRPC. + pub async fn is_ready( + &mut self, + request: impl tonic::IntoRequest<()>, + ) -> std::result::Result, tonic::Status> { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/sourcetransformer.v1.SourceTransform/IsReady", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("sourcetransformer.v1.SourceTransform", "IsReady"), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod source_transform_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with SourceTransformServer. + #[async_trait] + pub trait SourceTransform: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the SourceTransformFn method. + type SourceTransformFnStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result, + > + + std::marker::Send + + 'static; + /// SourceTransformFn applies a function to each request element. + /// In addition to map function, SourceTransformFn also supports assigning a new event time to response. + /// SourceTransformFn can be used only at source vertex by source data transformer. + async fn source_transform_fn( + &self, + request: tonic::Request>, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// IsReady is the heartbeat endpoint for gRPC. + async fn is_ready( + &self, + request: tonic::Request<()>, + ) -> std::result::Result, tonic::Status>; + } + #[derive(Debug)] + pub struct SourceTransformServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl SourceTransformServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for SourceTransformServer + where + T: SourceTransform, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/sourcetransformer.v1.SourceTransform/SourceTransformFn" => { + #[allow(non_camel_case_types)] + struct SourceTransformFnSvc(pub Arc); + impl< + T: SourceTransform, + > tonic::server::StreamingService + for SourceTransformFnSvc { + type Response = super::SourceTransformResponse; + type ResponseStream = T::SourceTransformFnStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request< + tonic::Streaming, + >, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::source_transform_fn(&inner, request) + .await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = SourceTransformFnSvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/sourcetransformer.v1.SourceTransform/IsReady" => { + #[allow(non_camel_case_types)] + struct IsReadySvc(pub Arc); + impl tonic::server::UnaryService<()> + for IsReadySvc { + type Response = super::ReadyResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call(&mut self, request: tonic::Request<()>) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::is_ready(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = IsReadySvc(inner); + let codec = tonic::codec::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for SourceTransformServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "sourcetransformer.v1.SourceTransform"; + impl tonic::server::NamedService for SourceTransformServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/src/shared.rs b/src/shared.rs index b22f585..5c6968c 100644 --- a/src/shared.rs +++ b/src/shared.rs @@ -137,9 +137,8 @@ pub(crate) fn prost_timestamp_from_utc(t: DateTime) -> Option { } /// shuts downs the gRPC server. This happens in 2 cases -/// 1. there has been an internal error (one of the tasks failed) and we need to shutdown -/// 2. user is explicitly asking us to shutdown -/// +/// 1. there has been an internal error (one of the tasks failed) and we need to shutdown +/// 2. user is explicitly asking us to shutdown /// Once the request for shutdown has be invoked, server will broadcast shutdown to all tasks /// through the cancellation-token. pub(crate) async fn shutdown_signal( diff --git a/src/sideinput.rs b/src/sideinput.rs index 93a6bd0..9980653 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -8,16 +8,13 @@ use tonic::{async_trait, Request, Response, Status}; use crate::error::Error::SideInputError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::servers::sideinput as proto; use crate::shared::{self, shutdown_signal, ContainerType}; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sideinput.sock"; const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sideinput-server-info"; -mod proto { - tonic::include_proto!("sideinput.v1"); -} - struct SideInputService { handler: Arc, shutdown_tx: mpsc::Sender<()>, diff --git a/src/sink.rs b/src/sink.rs index 329a3dc..7aca2dd 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -14,8 +14,8 @@ use tracing::{debug, info}; use crate::error::Error; use crate::error::Error::SinkError; use crate::error::ErrorKind::{InternalError, UserDefinedError}; +use crate::servers::sink::{self as sink_pb, SinkResponse}; use crate::shared::{self, ContainerType}; -use crate::sink::sink_pb::SinkResponse; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sink.sock"; @@ -27,10 +27,6 @@ const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE"; const UD_CONTAINER_FB_SINK: &str = "fb-udsink"; // TODO: use batch-size, blocked by https://github.com/numaproj/numaflow/issues/2026 const DEFAULT_CHANNEL_SIZE: usize = 1000; -/// Numaflow Sink Proto definitions. -pub mod sink_pb { - tonic::include_proto!("sink.v1"); -} struct SinkService { handler: Arc, diff --git a/src/source.rs b/src/source.rs index 6a67fbc..79adf63 100644 --- a/src/source.rs +++ b/src/source.rs @@ -15,6 +15,7 @@ use tracing::{error, info}; use crate::error::Error::SourceError; use crate::error::{Error, ErrorKind}; +use crate::servers::source as proto; use crate::shared::{self, prost_timestamp_from_utc, ContainerType}; use crate::source::proto::{AckRequest, AckResponse, ReadRequest, ReadResponse}; @@ -24,11 +25,6 @@ const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcer-server-info"; // TODO: use batch-size, blocked by https://github.com/numaproj/numaflow/issues/2026 const DEFAULT_CHANNEL_SIZE: usize = 1000; -/// Source Proto definitions. -pub mod proto { - tonic::include_proto!("source.v1"); -} - struct SourceService { handler: Arc, shutdown_tx: Sender<()>, diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index b3ad454..1895312 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -14,6 +14,7 @@ use tracing::{error, info}; use crate::error::Error::{self, SourceTransformerError}; use crate::error::ErrorKind; +use crate::servers::sourcetransformer as proto; use crate::shared::{self, prost_timestamp_from_utc, utc_from_timestamp, ContainerType}; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; @@ -23,11 +24,6 @@ const DEFAULT_CHANNEL_SIZE: usize = 1000; const DROP: &str = "U+005C__DROP__"; -/// Numaflow SourceTransformer Proto definitions. -pub mod proto { - tonic::include_proto!("sourcetransformer.v1"); -} - struct SourceTransformerService { handler: Arc, shutdown_tx: mpsc::Sender<()>, @@ -82,7 +78,7 @@ pub struct Message { /// Value is the value passed to the next vertex. pub value: Vec, /// Time for the given event. This will be used for tracking watermarks. If cannot be derived, set it to the incoming - /// event_time from the [`Datum`]. + /// event_time from the [`SourceTransformRequest`]. pub event_time: DateTime, /// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/). pub tags: Option>,