Skip to content

Commit

Permalink
chore: Incorporate Proto Definition Changes For Reduce (#52)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
yhl25 and vigith authored Jun 29, 2024
1 parent 6901b0d commit 7dff517
Show file tree
Hide file tree
Showing 23 changed files with 1,305 additions and 266 deletions.
2 changes: 1 addition & 1 deletion .rustfmt.toml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
edition = "2021"
indent_style = "Block"
indent_style = "Block"
6 changes: 4 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,22 @@ name = "numaflow"
path = "src/lib.rs"

[dependencies]
tonic = "0.10.2"
tonic = "0.11.0"
prost = "0.12.3"
prost-types = "0.12.3"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.10"
tokio-stream = { version = "0.1.14", features = ["net"] }
serde = { version = "1.0.194", features = ["derive"] }
chrono = "0.4.31"
serde_json = "1.0.111"
futures-util = "0.3.30"
tracing = "0.1.40"
uuid = { version = "1.8.0", features = ["v4"] }
thiserror = "1.0"

[build-dependencies]
tonic-build = "0.10.2"
tonic-build = "0.11.0"

[dev-dependencies]
tempfile = "3.9.0"
Expand Down
5 changes: 1 addition & 4 deletions examples/map-cat/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use numaflow::map;
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand All @@ -11,9 +10,7 @@ struct Cat;
#[tonic::async_trait]
impl map::Mapper for Cat {
async fn map(&self, input: map::MapRequest) -> Vec<map::Message> {
let message = map::Message::new(input.value)
.keys(input.keys)
.tags(vec![]);
let message = map::Message::new(input.value).keys(input.keys).tags(vec![]);
vec![message]
}
}
5 changes: 2 additions & 3 deletions examples/map-tickgen-serde/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use chrono::{SecondsFormat, TimeZone, Utc};
use numaflow::map;
use numaflow::map::Message;
use serde::Serialize;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
map::Server::new(TickGen).start().await
}

use chrono::{SecondsFormat, TimeZone, Utc};
use serde::Serialize;

struct TickGen;

#[derive(serde::Deserialize)]
Expand Down
4 changes: 2 additions & 2 deletions examples/reduce-counter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ name = "server"
path = "src/main.rs"

[dependencies]
tonic = "0.9"
tonic = "0.11.0"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] }
numaflow = { git = "https://github.com/numaproj/numaflow-rs.git", branch="reduce" }
numaflow-rs = { path = "../../" }
50 changes: 22 additions & 28 deletions examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
use numaflow::reduce::start_uds_server;
use numaflow::reduce;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let reduce_handler = counter::Counter::new();

start_uds_server(reduce_handler).await?;

async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let handler_creator = counter::CounterCreator {};
reduce::Server::new(handler_creator).start().await?;
Ok(())
}

mod counter {
use numaflow::reduce::{Datum, Message};
use numaflow::reduce::{Metadata, Reducer};
use numaflow::reduce::{Message, ReduceRequest};
use numaflow::reduce::{Reducer, Metadata};
use tokio::sync::mpsc::Receiver;
use tonic::async_trait;

pub(crate) struct Counter {}

pub(crate) struct CounterCreator {}

impl numaflow::reduce::ReducerCreator for CounterCreator {
type R = Counter;

fn create(&self) -> Self::R {
Counter::new()
}
}

impl Counter {
pub(crate) fn new() -> Self {
Self {}
Expand All @@ -25,33 +33,19 @@ mod counter {

#[async_trait]
impl Reducer for Counter {
async fn reduce<T: Datum + Send + Sync + 'static, U: Metadata + Send + Sync + 'static>(
async fn reduce(
&self,
keys: Vec<String>,
mut input: Receiver<T>,
md: &U,
mut input: Receiver<ReduceRequest>,
md: &Metadata,
) -> Vec<Message> {
println!(
"Entering into UDF {:?} {:?}",
md.start_time(),
md.end_time()
);

let mut counter = 0;
// the loop exits when input is closed which will happen only on close of book.
while (input.recv().await).is_some() {
while input.recv().await.is_some() {
counter += 1;
}

println!(
"Returning from UDF {:?} {:?}",
md.start_time(),
md.end_time()
);
let message = reduce::Message::new(counter.to_string().into_bytes())
.keys(keys.clone())
.tags(vec![]);
let message = Message::new(counter.to_string().into_bytes()).tags(vec![]).keys(keys.clone());
vec![message]
}
}
}
}
2 changes: 1 addition & 1 deletion examples/side-input/manifests/simple-sideinput.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ spec:
- name: out
sink:
# A simple log printing sink
log: {}
log: { }
edges:
- from: in
to: si-log
Expand Down
13 changes: 5 additions & 8 deletions examples/side-input/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use std::time::{SystemTime, UNIX_EPOCH};
use numaflow::sideinput::start_uds_server;
use numaflow::sideinput::SideInputer;
use tonic::{async_trait};
use std::sync::Mutex;


use numaflow::sideinput::SideInputer;
use numaflow::sideinput::start_uds_server;
use tonic::async_trait;

struct SideInputHandler {
counter: Mutex<u32>,
Expand All @@ -20,8 +18,7 @@ impl SideInputHandler {

#[async_trait]
impl SideInputer for SideInputHandler {

async fn retrieve_sideinput(& self) -> Option<Vec<u8>> {
async fn retrieve_sideinput(&self) -> Option<Vec<u8>> {
let current_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards");
Expand All @@ -42,4 +39,4 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let side_input_handler = SideInputHandler::new();
start_uds_server(side_input_handler).await?;
Ok(())
}
}
5 changes: 3 additions & 2 deletions examples/sideinput-udf/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use notify::{RecursiveMode, Result, Watcher};
use numaflow::map::{MapRequest, Mapper, Message, Server};
use std::path::Path;

use notify::{RecursiveMode, Result, Watcher};
use numaflow::map::{Mapper, MapRequest, Message, Server};
use tokio::spawn;
use tonic::async_trait;

Expand Down
2 changes: 1 addition & 1 deletion examples/simple-source/manifests/simple-source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ spec:
scale:
min: 1
sink:
log: {}
log: { }
edges:
- from: in
to: out
5 changes: 2 additions & 3 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ pub(crate) mod simple_source {
use numaflow::source::{Message, Offset, SourceReadRequest, Sourcer};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::Arc;
use std::{
collections::HashSet,
collections::HashMap,
collections::HashSet,
sync::atomic::{AtomicUsize, Ordering},
sync::RwLock,
};
use tokio::{sync::mpsc::Sender, time::Instant};
use tonic::async_trait;
use uuid::Uuid;
use std::sync::Arc;

/// SimpleSource is a data generator which generates monotonically increasing offsets and data. It is a shared state which is protected using Locks
/// or Atomics to provide concurrent access. Numaflow actually does not require concurrent access but we are forced to do this because the SDK
Expand Down Expand Up @@ -55,7 +55,6 @@ pub(crate) mod simple_source {
let mut headers = HashMap::new();
headers.insert(String::from("x-txn-id"), String::from(Uuid::new_v4()));


// increment the read_idx which is used as the offset
self.read_idx
.store(self.read_idx.load(Ordering::Relaxed) + 1, Ordering::Relaxed);
Expand Down
1 change: 0 additions & 1 deletion examples/sink-log/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use numaflow::sink::{self, Response, SinkRequest};
use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync>> {
Expand Down
1 change: 0 additions & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use numaflow::sourcetransform;
use std::error::Error;

/// A simple source transformer which assigns event time to the current time in utc.

Expand Down
47 changes: 41 additions & 6 deletions proto/reduce.proto
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,58 @@ service Reduce {
* ReduceRequest represents a request element.
*/
message ReduceRequest {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
// WindowOperation represents a window operation.
// For Aligned windows, OPEN, APPEND and CLOSE events are sent.
message WindowOperation {
enum Event {
OPEN = 0;
CLOSE = 1;
APPEND = 4;
}

Event event = 1;
repeated Window windows = 2;
}

// Payload represents a payload element.
message Payload {
repeated string keys = 1;
bytes value = 2;
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
map<string, string> headers = 5;
}

Payload payload = 1;
WindowOperation operation = 2;
}

// Window represents a window.
// Since the client doesn't track keys, window doesn't have a keys field.
message Window {
google.protobuf.Timestamp start = 1;
google.protobuf.Timestamp end = 2;
string slot = 3;
}

/**
* ReduceResponse represents a response element.
*/
message ReduceResponse {
// Result represents a result element. It contains the result of the reduce function.
message Result {
repeated string keys = 1;
bytes value = 2;
repeated string tags = 3;
}
repeated Result results = 1;

Result result = 1;

// window represents a window to which the result belongs.
Window window = 2;

// EOF represents the end of the response for a window.
bool EOF = 3;
}

/**
Expand Down
25 changes: 25 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
use thiserror::Error;

#[derive(Error, Debug, Clone)]
pub enum ErrorKind {
#[error("User Defined Error: {0}")]
UserDefinedError(String),

#[error("Internal Error: {0}")]
InternalError(String),
}

#[derive(Error, Debug, Clone)]
pub enum Error {
#[error("Map Error - {0}")]
MapError(ErrorKind),

#[error("Reduce Error - {0}")]
ReduceError(ErrorKind),

#[error("Sink Error - {0}")]
SinkError(ErrorKind),

#[error("Source Error - {0}")]
SourceError(ErrorKind),
}
42 changes: 42 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! [Numaflow]: https://numaflow.numaproj.io/
//! [Map]: https://numaflow.numaproj.io/user-guide/user-defined-functions/map/map/
//! [Reduce]: https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/
//! [User Defined Sources]: https://numaflow.numaproj.io/user-guide/sources/user-defined-sources/
//! [User Defined Sinks]: https://numaflow.numaproj.io/user-guide/sinks/user-defined-sinks/

/// start up code
Expand All @@ -30,3 +31,44 @@ pub mod sink;

/// building [side input](https://numaflow.numaproj.io/user-guide/reference/side-inputs/)
pub mod sideinput;

// Error handling on Numaflow SDKs!
//
// Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable.
// If there are errors that are retriable, we (gRPC or Numaflow SDK) would have already retried it (hence not an error), that means,
// all errors raised by the SDK are non-recoverable.
//
// Task Ordering and error propagation.
//
// level-1 level-2 level-3
//
// +---> (service_fn) ->
// |
// |
// | +---> (task)
// | |
// | |
// (gRPC Service) ---+---> (service_fn) ---+---> (task)
// ^ | |
// | | |
// | | +---> (task)
// | |
// (shutdown) |
// | +---> (service_fn) ->
// |
// |
// (user)
//
// If a task at level-3 has an error, then that error will be propagated to level-2 (service_fn) via an mpsc::channel using the response channel.
// The Response channel passes a Result type and by returning Err() in response channel, it notifies top service_fn that the task wants to abort itself.
// service_fn (level-2) will now use another mpsc::channel to tell the gRPC service to cancel all the service_fns. gRPC service will
// will ask all the level-2 service_fns to abort using the CancellationToken. service_fn will call abort on all the tasks it created using internal
// mpsc::channel when CancellationToken has been dropped/cancelled.
//
// User can directly send shutdown request to the gRPC server which inturn cancels the CancellationToken.
//
// The above 3 level task ordering is only for complex cases like reduce, but for simpler endpoints like `map`, it only has 2 levels but
// the error propagation is handled the same way.

/// error module
pub mod error;
Loading

0 comments on commit 7dff517

Please sign in to comment.