Skip to content

Commit

Permalink
Added messages drop to reducer and source transform
Browse files Browse the repository at this point in the history
Signed-off-by: shubham <shubhamdixit863@gmail.com>
  • Loading branch information
shubhamdixit863 committed May 22, 2024
1 parent cd19b14 commit e2bddab
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 0 deletions.
61 changes: 61 additions & 0 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ const WIN_END_TIME: &str = "x-numaflow-win-end-time";
const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info";
const DROP: &str ="U+005C__DROP__";

/// Numaflow Reduce Proto definitions.
pub mod proto {
Expand Down Expand Up @@ -175,6 +176,7 @@ pub struct Metadata {
}

/// Message is the response from the user's [`Reducer::reduce`].
#[derive(Default)]
pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
/// be an empty collection. It is mainly used in creating a partition in [`Reducer::reduce`].
Expand All @@ -185,6 +187,65 @@ pub struct Message {
pub tags: Vec<String>,
}


impl Message {
pub fn new_message(value: Vec<u8>)->Self{
Message {
value,
..Default::default() // Use default values for keys and tags
}
}

pub fn message_to_drop()->Self{
Message {
tags:vec![DROP.parse().unwrap()],
..Default::default() // Use default values for keys and tags
}
}

pub fn with_keys(mut self,keys:Vec<String>)-> Self{
self.keys=keys;
self
}

pub fn with_tags(mut self,tags:Vec<String>)-> Self{
self.tags=tags;
self
}

pub fn keys(mut self) ->Vec<String>{
self.keys
}
pub fn value(mut self) ->Vec<u8>{
self.value
}

pub fn tags(mut self) ->Vec<String>{
self.tags
}

}

pub struct Messages{
messages:Vec<Message>
}

impl Messages {
fn message_builder() -> Self {
Messages {
messages: Vec::new(),
}
}
fn append(mut self, msg: Message) -> Self {
self.messages.push(msg);
self
}

fn items( &self) ->&Vec<Message>{
&self.messages
}
}

/// Incoming request into the reducer handler of [`Reducer`].
pub struct ReduceRequest {
/// Set of keys in the (key, value) terminology of map/reduce paradigm.
Expand Down
60 changes: 60 additions & 0 deletions src/sourcetransform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sourcetransform.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sourcetransformer-server-info";

const DROP: &str ="U+005C__DROP__";
/// Numaflow SourceTransformer Proto definitions.
pub mod proto {
tonic::include_proto!("sourcetransformer.v1");
Expand Down Expand Up @@ -62,6 +63,7 @@ pub trait SourceTransformer {
}

/// Message is the response struct from the [`SourceTransformer::transform`] .
#[derive(Default)]
pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
/// be an empty collection.
Expand All @@ -75,6 +77,64 @@ pub struct Message {
pub tags: Vec<String>,
}

impl Message {
pub fn new_message(value: Vec<u8>)->Self{
Message {
value,
..Default::default() // Use default values for keys and tags
}
}

pub fn message_to_drop()->Self{
Message {
tags:vec![DROP.parse().unwrap()],
..Default::default() // Use default values for keys and tags
}
}

pub fn with_keys(mut self,keys:Vec<String>)-> Self{
self.keys=keys;
self
}

pub fn with_tags(mut self,tags:Vec<String>)-> Self{
self.tags=tags;
self
}

pub fn keys(mut self) ->Vec<String>{
self.keys
}
pub fn value(mut self) ->Vec<u8>{
self.value
}

pub fn tags(mut self) ->Vec<String>{
self.tags
}

}

pub struct Messages{
messages:Vec<crate::reduce::Message>
}

impl Messages {
fn message_builder() -> Self {
Messages {
messages: Vec::new(),
}
}
fn append(mut self, msg: crate::reduce::Message) -> Self {
self.messages.push(msg);
self
}

fn items( &self) ->&Vec<crate::reduce::Message>{
&self.messages
}
}

/// Incoming request to the Source Transformer.
pub struct SourceTransformRequest {
/// keys are the keys in the (key, value) terminology of map/reduce paradigm.
Expand Down

0 comments on commit e2bddab

Please sign in to comment.