Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added header values #51

Merged
merged 13 commits into from
Jun 4, 2024
2 changes: 2 additions & 0 deletions proto/map.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ message MapRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;

}

/**
Expand Down
1 change: 1 addition & 0 deletions proto/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message ReduceRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}

/**
Expand Down
1 change: 1 addition & 0 deletions proto/sourcetransform.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ message SourceTransformRequest {
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}

/**
Expand Down
7 changes: 7 additions & 0 deletions src/map.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;

Expand Down Expand Up @@ -87,6 +88,7 @@ pub struct Message {
pub value: Vec<u8>,
/// Tags are used for [conditional forwarding](https://numaflow.numaproj.io/user-guide/reference/conditional-forwarding/).
pub tags: Vec<String>,

}

impl From<Message> for proto::map_response::Result {
Expand All @@ -109,6 +111,9 @@ pub struct MapRequest {
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,

}

impl From<proto::MapRequest> for MapRequest {
Expand All @@ -118,6 +123,7 @@ impl From<proto::MapRequest> for MapRequest {
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers: value.headers
}
}
}
Expand Down Expand Up @@ -268,6 +274,7 @@ mod tests {
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers:Default::default()
});

let resp = client.map_fn(request).await?;
Expand Down
3 changes: 3 additions & 0 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ pub struct ReduceRequest {
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<proto::ReduceRequest> for ReduceRequest {
Expand All @@ -204,6 +206,7 @@ impl From<proto::ReduceRequest> for ReduceRequest {
value: mr.value,
watermark: shared::utc_from_timestamp(mr.watermark),
eventtime: shared::utc_from_timestamp(mr.event_time),
headers: mr.headers,
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::future::Future;
use std::path::PathBuf;

Expand Down Expand Up @@ -88,6 +89,8 @@ pub struct SourceTransformRequest {
pub watermark: DateTime<Utc>,
/// event_time is the time of the element as seen at source or aligned after a reduce operation.
pub eventtime: DateTime<Utc>,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<Message> for proto::source_transform_response::Result {
Expand All @@ -108,6 +111,7 @@ impl From<proto::SourceTransformRequest> for SourceTransformRequest {
value: value.value,
watermark: shared::utc_from_timestamp(value.watermark),
eventtime: shared::utc_from_timestamp(value.event_time),
headers:value.headers
}
}
}
Expand Down Expand Up @@ -289,6 +293,7 @@ mod tests {
value: "hello".into(),
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
headers:Default::default()
});

let resp = client.source_transform_fn(request).await?;
Expand Down
Loading