diff --git a/Cargo.toml b/Cargo.toml index d7cbab6..9f00866 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,7 @@ chrono = "0.4.31" serde_json = "1.0.111" futures-util = "0.3.30" tracing = "0.1.40" +uuid = { version = "1.8.0", features = ["v4"] } [build-dependencies] tonic-build = "0.10.2" diff --git a/examples/simple-source/Cargo.toml b/examples/simple-source/Cargo.toml index 7a4bcf4..e611fe8 100644 --- a/examples/simple-source/Cargo.toml +++ b/examples/simple-source/Cargo.toml @@ -11,4 +11,5 @@ path = "src/main.rs" tonic = "0.10.2" tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" } -chrono = "0.4.30" \ No newline at end of file +chrono = "0.4.30" +uuid = "1.2.0" \ No newline at end of file diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index e4ed64f..b1e0d39 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -7,15 +7,17 @@ async fn main() -> Result<(), Box> { } pub(crate) mod simple_source { + use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer}; + use std::collections::HashMap; + use std::sync::Arc; use std::{ collections::HashSet, sync::atomic::{AtomicUsize, Ordering}, sync::RwLock, }; - - use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer}; use tokio::{sync::mpsc::Sender, time::Instant}; use tonic::async_trait; + use uuid::Uuid; /// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks /// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK @@ -52,7 +54,9 @@ pub(crate) mod simple_source { self.read_idx .store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed); let offset = self.read_idx.load(Ordering::Relaxed); - + let mut headers = HashMap::new(); + headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4())); + let shared_headers = Arc::new(headers); // send the message to the transmitter transmitter .send(Message { @@ -63,6 +67,7 @@ pub(crate) mod simple_source { }, event_time: chrono::offset::Utc::now(), keys: vec![], + headers: Arc::clone(&shared_headers), }) .await .unwrap(); diff --git a/proto/map.proto b/proto/map.proto index a8ab49b..07433dd 100644 --- a/proto/map.proto +++ b/proto/map.proto @@ -21,6 +21,8 @@ message MapRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; + } /** diff --git a/proto/reduce.proto b/proto/reduce.proto index 81571e1..a789f97 100644 --- a/proto/reduce.proto +++ b/proto/reduce.proto @@ -22,6 +22,7 @@ message ReduceRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } /** diff --git a/proto/sourcetransform.proto b/proto/sourcetransform.proto index b18b7f3..18e045c 100644 --- a/proto/sourcetransform.proto +++ b/proto/sourcetransform.proto @@ -23,6 +23,7 @@ message SourceTransformRequest { bytes value = 2; google.protobuf.Timestamp event_time = 3; google.protobuf.Timestamp watermark = 4; + map headers = 5; } /** diff --git a/src/map.rs b/src/map.rs index 20770c0..ed9bd19 100644 --- a/src/map.rs +++ b/src/map.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::future::Future; use std::path::PathBuf; @@ -200,6 +201,8 @@ pub struct MapRequest { pub watermark: DateTime, /// Time of the element as seen at source or aligned after a reduce operation. pub eventtime: DateTime, + /// Headers for the message. + pub headers: HashMap, } impl From for MapRequest { @@ -209,6 +212,7 @@ impl From for MapRequest { value: value.value, watermark: shared::utc_from_timestamp(value.watermark), eventtime: shared::utc_from_timestamp(value.event_time), + headers: value.headers, } } } @@ -359,6 +363,7 @@ mod tests { value: "hello".into(), watermark: Some(prost_types::Timestamp::default()), event_time: Some(prost_types::Timestamp::default()), + headers: Default::default(), }); let resp = client.map_fn(request).await?; diff --git a/src/reduce.rs b/src/reduce.rs index 1f2985a..8276663 100644 --- a/src/reduce.rs +++ b/src/reduce.rs @@ -291,6 +291,8 @@ pub struct ReduceRequest { pub watermark: DateTime, /// Time of the element as seen at source or aligned after a reduce operation. pub eventtime: DateTime, + /// Headers for the message. + pub headers: HashMap, } impl From for ReduceRequest { @@ -300,6 +302,7 @@ impl From for ReduceRequest { value: mr.value, watermark: shared::utc_from_timestamp(mr.watermark), eventtime: shared::utc_from_timestamp(mr.event_time), + headers: mr.headers, } } } diff --git a/src/sideinput.rs b/src/sideinput.rs index 1ac1503..ef46886 100644 --- a/src/sideinput.rs +++ b/src/sideinput.rs @@ -1,7 +1,7 @@ +use crate::shared; use std::future::Future; use std::path::PathBuf; use tonic::{async_trait, Request, Response, Status}; -use crate::shared; const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024; const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sideinput.sock"; @@ -167,9 +167,9 @@ impl Server { &mut self, shutdown: F, ) -> Result<(), Box> - where - T: SideInputer + Send + Sync + 'static, - F: Future, + where + T: SideInputer + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -187,8 +187,8 @@ impl Server { /// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives. pub async fn start(&mut self) -> Result<(), Box> - where - T: SideInputer + Send + Sync + 'static, + where + T: SideInputer + Send + Sync + 'static, { self.start_with_shutdown(shared::shutdown_signal()).await } diff --git a/src/sink.rs b/src/sink.rs index 2907371..1109028 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -18,7 +18,6 @@ const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-in const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE"; const UD_CONTAINER_FB_SINK: &str = "fb-udsink"; - /// Numaflow Sink Proto definitions. pub mod proto { tonic::include_proto!("sink.v1"); @@ -172,8 +171,8 @@ impl From for proto::sink_response::Result { #[tonic::async_trait] impl proto::sink_server::Sink for SinkService - where - T: Sinker + Send + Sync + 'static, +where + T: Sinker + Send + Sync + 'static, { async fn sink_fn( &self, @@ -230,7 +229,10 @@ impl Server { pub fn new(svc: T) -> Self { let container_type = env::var(ENV_UD_CONTAINER_TYPE).unwrap_or_default(); let (sock_addr, server_info_file) = if container_type == UD_CONTAINER_FB_SINK { - (DEFAULT_FB_SOCK_ADDR.into(), DEFAULT_FB_SERVER_INFO_FILE.into()) + ( + DEFAULT_FB_SOCK_ADDR.into(), + DEFAULT_FB_SERVER_INFO_FILE.into(), + ) } else { (DEFAULT_SOCK_ADDR.into(), DEFAULT_SERVER_INFO_FILE.into()) }; @@ -282,9 +284,9 @@ impl Server { &mut self, shutdown: F, ) -> Result<(), Box> - where - T: Sinker + Send + Sync + 'static, - F: Future, + where + T: Sinker + Send + Sync + 'static, + F: Future, { let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?; let handler = self.svc.take().unwrap(); @@ -302,8 +304,8 @@ impl Server { /// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives. pub async fn start(&mut self) -> Result<(), Box> - where - T: Sinker + Send + Sync + 'static, + where + T: Sinker + Send + Sync + 'static, { self.start_with_shutdown(shared::shutdown_signal()).await } @@ -342,7 +344,10 @@ mod tests { // record the response sink::Response::ok(datum.id) } - Err(e) => sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)), + Err(e) => sink::Response::failure( + datum.id, + format!("Invalid UTF-8 sequence: {}", e), + ), }; // return the responses diff --git a/src/source.rs b/src/source.rs index 07db146..6230ec8 100644 --- a/src/source.rs +++ b/src/source.rs @@ -1,5 +1,6 @@ #![warn(missing_docs)] +use std::collections::HashMap; use std::future::Future; use std::path::PathBuf; use std::sync::Arc; @@ -169,10 +170,12 @@ where &self, _request: Request<()>, ) -> Result, Status> { - let partitions = self.handler.partitions().await.unwrap_or_else(|| vec![std::env::var("NUMAFLOW_REPLICA") - .unwrap_or_default() - .parse::() - .unwrap_or_default()]); + let partitions = self.handler.partitions().await.unwrap_or_else(|| { + vec![std::env::var("NUMAFLOW_REPLICA") + .unwrap_or_default() + .parse::() + .unwrap_or_default()] + }); Ok(Response::new(proto::PartitionsResponse { result: Some(proto::partitions_response::Result { partitions }), })) @@ -193,6 +196,8 @@ pub struct Message { pub event_time: DateTime, /// Keys of the message. pub keys: Vec, + + pub headers: Arc>, } /// gRPC server for starting a [`Sourcer`] service @@ -288,7 +293,8 @@ impl Server { mod tests { use super::proto; use chrono::Utc; - use std::collections::HashSet; + use std::collections::{HashMap, HashSet}; + use std::sync::Arc; use std::vec; use std::{error::Error, time::Duration}; use tokio_stream::StreamExt; @@ -299,6 +305,7 @@ mod tests { use tokio::sync::mpsc::Sender; use tokio::sync::oneshot; use tonic::transport::Uri; + use uuid::Uuid; // A source that repeats the `num` for the requested count struct Repeater { @@ -320,7 +327,12 @@ mod tests { async fn read(&self, request: SourceReadRequest, transmitter: Sender) { let event_time = Utc::now(); let mut message_offsets = Vec::with_capacity(request.count); + + for i in 0..request.count { + let mut headers = HashMap::new(); + headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4())); + let shared_headers = Arc::new(headers); // we assume timestamp in nanoseconds would be unique on each read operation from our source let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i); transmitter @@ -332,6 +344,7 @@ mod tests { partition_id: 0, }, keys: vec![], + headers: Arc::clone(&shared_headers), }) .await .unwrap(); diff --git a/src/sourcetransform.rs b/src/sourcetransform.rs index 87fa9c4..b9be311 100644 --- a/src/sourcetransform.rs +++ b/src/sourcetransform.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::future::Future; use std::path::PathBuf; @@ -197,6 +198,8 @@ pub struct SourceTransformRequest { pub watermark: DateTime, /// event_time is the time of the element as seen at source or aligned after a reduce operation. pub eventtime: DateTime, + /// Headers for the message. + pub headers: HashMap, } impl From for proto::source_transform_response::Result { @@ -217,6 +220,7 @@ impl From for SourceTransformRequest { value: value.value, watermark: shared::utc_from_timestamp(value.watermark), eventtime: shared::utc_from_timestamp(value.event_time), + headers: value.headers, } } } @@ -398,6 +402,7 @@ mod tests { value: "hello".into(), watermark: Some(prost_types::Timestamp::default()), event_time: Some(prost_types::Timestamp::default()), + headers: Default::default(), }); let resp = client.source_transform_fn(request).await?;