Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added header values #51

Merged
merged 13 commits into from
Jun 4, 2024
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ chrono = "0.4.31"
serde_json = "1.0.111"
futures-util = "0.3.30"
tracing = "0.1.40"
uuid = { version = "1.8.0", features = ["v4"] }

[build-dependencies]
tonic-build = "0.10.2"
Expand Down
3 changes: 2 additions & 1 deletion examples/simple-source/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ path = "src/main.rs"
tonic = "0.10.2"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch = "main" }
chrono = "0.4.30"
chrono = "0.4.30"
uuid = "1.2.0"
10 changes: 7 additions & 3 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
}

pub(crate) mod simple_source {
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use std::collections::HashMap;
use std::sync::Arc;
use std::{
collections::HashSet,
sync::atomic::{AtomicUsize, Ordering},
sync::RwLock,
};

use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use tokio::{sync::mpsc::Sender, time::Instant};
use tonic::async_trait;
use uuid::Uuid;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down Expand Up @@ -52,7 +54,8 @@ pub(crate) mod simple_source {
self.read_idx
.store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
let offset = self.read_idx.load(Ordering::Relaxed);

headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4()));
shubhamdixit863 marked this conversation as resolved.
Show resolved Hide resolved
let shared_headers = Arc::new(headers);
// send the message to the transmitter
transmitter
.send(Message {
Expand All @@ -63,6 +66,7 @@ pub(crate) mod simple_source {
},
event_time: chrono::offset::Utc::now(),
keys: vec![],
headers: Arc::clone(&shared_headers),
})
.await
.unwrap();
Expand Down
2 changes: 2 additions & 0 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message MapRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;

}

/**
Expand Down
1 change: 1 addition & 0 deletions proto/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message ReduceRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}

/**
Expand Down
1 change: 1 addition & 0 deletions proto/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message SourceTransformRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}

/**
Expand Down
5 changes: 5 additions & 0 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;

Expand Down Expand Up @@ -200,6 +201,8 @@ pub struct MapRequest {
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<proto::MapRequest> for MapRequest {
Expand All @@ -209,6 +212,7 @@ impl From<proto::MapRequest> for MapRequest {
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers: value.headers,
}
}
}
Expand Down Expand Up @@ -359,6 +363,7 @@ mod tests {
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

let resp = client.map_fn(request).await?;
Expand Down
3 changes: 3 additions & 0 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,8 @@ pub struct ReduceRequest {
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<proto::ReduceRequest> for ReduceRequest {
Expand All @@ -300,6 +302,7 @@ impl From<proto::ReduceRequest> for ReduceRequest {
value: mr.value,
watermark: shared::utc_from_timestamp(mr.watermark),
eventtime: shared::utc_from_timestamp(mr.event_time),
headers: mr.headers,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/sideinput.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::shared;
use std::future::Future;
use std::path::PathBuf;
use tonic::{async_trait, Request, Response, Status};
use crate::shared;

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sideinput.sock";
Expand Down Expand Up @@ -167,9 +167,9 @@ impl<T> Server<T> {
&mut self,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: SideInputer + Send + Sync + 'static,
F: Future<Output = ()>,
where
T: SideInputer + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
Expand All @@ -187,8 +187,8 @@ impl<T> Server<T> {

/// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the signal arrives.
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: SideInputer + Send + Sync + 'static,
where
T: SideInputer + Send + Sync + 'static,
{
self.start_with_shutdown(shared::shutdown_signal()).await
}
Expand Down
25 changes: 15 additions & 10 deletions src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-in
const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE";
const UD_CONTAINER_FB_SINK: &str = "fb-udsink";


/// Numaflow Sink Proto definitions.
pub mod proto {
tonic::include_proto!("sink.v1");
Expand Down Expand Up @@ -172,8 +171,8 @@ impl From<Response> for proto::sink_response::Result {

#[tonic::async_trait]
impl<T> proto::sink_server::Sink for SinkService<T>
where
T: Sinker + Send + Sync + 'static,
where
T: Sinker + Send + Sync + 'static,
{
async fn sink_fn(
&self,
Expand Down Expand Up @@ -230,7 +229,10 @@ impl<T> Server<T> {
pub fn new(svc: T) -> Self {
let container_type = env::var(ENV_UD_CONTAINER_TYPE).unwrap_or_default();
let (sock_addr, server_info_file) = if container_type == UD_CONTAINER_FB_SINK {
(DEFAULT_FB_SOCK_ADDR.into(), DEFAULT_FB_SERVER_INFO_FILE.into())
(
DEFAULT_FB_SOCK_ADDR.into(),
DEFAULT_FB_SERVER_INFO_FILE.into(),
)
} else {
(DEFAULT_SOCK_ADDR.into(), DEFAULT_SERVER_INFO_FILE.into())
};
Expand Down Expand Up @@ -282,9 +284,9 @@ impl<T> Server<T> {
&mut self,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
F: Future<Output=()>,
where
T: Sinker + Send + Sync + 'static,
F: Future<Output = ()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
Expand All @@ -302,8 +304,8 @@ impl<T> Server<T> {

/// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives.
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
where
T: Sinker + Send + Sync + 'static,
{
self.start_with_shutdown(shared::shutdown_signal()).await
}
Expand Down Expand Up @@ -342,7 +344,10 @@ mod tests {
// record the response
sink::Response::ok(datum.id)
}
Err(e) => sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
Err(e) => sink::Response::failure(
datum.id,
format!("Invalid UTF-8 sequence: {}", e),
),
};

// return the responses
Expand Down
23 changes: 18 additions & 5 deletions src/source.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![warn(missing_docs)]

use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -169,10 +170,12 @@ where
&self,
_request: Request<()>,
) -> Result<Response<proto::PartitionsResponse>, Status> {
let partitions = self.handler.partitions().await.unwrap_or_else(|| vec![std::env::var("NUMAFLOW_REPLICA")
.unwrap_or_default()
.parse::<i32>()
.unwrap_or_default()]);
let partitions = self.handler.partitions().await.unwrap_or_else(|| {
vec![std::env::var("NUMAFLOW_REPLICA")
.unwrap_or_default()
.parse::<i32>()
.unwrap_or_default()]
});
Ok(Response::new(proto::PartitionsResponse {
result: Some(proto::partitions_response::Result { partitions }),
}))
Expand All @@ -193,6 +196,8 @@ pub struct Message {
pub event_time: DateTime<Utc>,
/// Keys of the message.
pub keys: Vec<String>,

pub headers: Arc<HashMap<String, String>>,
}

/// gRPC server for starting a [`Sourcer`] service
Expand Down Expand Up @@ -288,7 +293,8 @@ impl<T> Server<T> {
mod tests {
use super::proto;
use chrono::Utc;
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::vec;
use std::{error::Error, time::Duration};
use tokio_stream::StreamExt;
Expand All @@ -299,6 +305,7 @@ mod tests {
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot;
use tonic::transport::Uri;
use uuid::Uuid;

// A source that repeats the `num` for the requested count
struct Repeater {
Expand All @@ -320,7 +327,12 @@ mod tests {
async fn read(&self, request: SourceReadRequest, transmitter: Sender<Message>) {
let event_time = Utc::now();
let mut message_offsets = Vec::with_capacity(request.count);


for i in 0..request.count {
let mut headers = HashMap::new();
headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4()));
let shared_headers = Arc::new(headers);
// we assume timestamp in nanoseconds would be unique on each read operation from our source
let offset = format!("{}-{}", event_time.timestamp_nanos_opt().unwrap(), i);
transmitter
Expand All @@ -332,6 +344,7 @@ mod tests {
partition_id: 0,
},
keys: vec![],
headers: Arc::clone(&shared_headers),
})
.await
.unwrap();
Expand Down
5 changes: 5 additions & 0 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;

Expand Down Expand Up @@ -197,6 +198,8 @@ pub struct SourceTransformRequest {
pub watermark: DateTime<Utc>,
/// event_time is the time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<Message> for proto::source_transform_response::Result {
Expand All @@ -217,6 +220,7 @@ impl From<proto::SourceTransformRequest> for SourceTransformRequest {
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers: value.headers,
}
}
}
Expand Down Expand Up @@ -398,6 +402,7 @@ mod tests {
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers: Default::default(),
});

let resp = client.source_transform_fn(request).await?;
Expand Down
Loading