Skip to content

Commit

Permalink
chore: Fallback and Headers Support for Custom Sink (#49)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
Signed-off-by: Vigith Maurice <vigith@gmail.com>
Co-authored-by: Vigith Maurice <vigith@gmail.com>
  • Loading branch information
yhl25 and vigith authored May 22, 2024
1 parent adfe620 commit eabe854
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 50 deletions.
12 changes: 2 additions & 10 deletions examples/sink-log/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,9 @@ impl sink::Sinker for Logger {
Ok(v) => {
println!("{}", v);
// record the response
Response {
id: datum.id,
success: true,
err: "".to_string(),
}
Response::ok(datum.id)
}
Err(e) => Response {
id: datum.id,
success: true, // there is no point setting success to false as retrying is not going to help
err: format!("Invalid UTF-8 sequence: {}", e),
},
Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
};

// return the responses
Expand Down
14 changes: 12 additions & 2 deletions proto/sink.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ message SinkRequest {
google.protobuf.Timestamp event_time = 3;
google.protobuf.Timestamp watermark = 4;
string id = 5;
map<string, string> headers = 6;
}

/**
Expand All @@ -38,10 +39,19 @@ message SinkResponse {
message Result {
// id is the ID of the message, can be used to uniquely identify the message.
string id = 1;
// success denotes the status of persisting to disk. if set to false, it means writing to sink for the message failed.
bool success = 2;
// status denotes the status of persisting to sink. It can be SUCCESS, FAILURE, or FALLBACK.
Status status = 2;
// err_msg is the error message, set it if success is set to false.
string err_msg = 3;
}
repeated Result results = 1;
}

/*
* Status is the status of the response.
*/
enum Status {
SUCCESS = 0;
FAILURE = 1;
FALLBACK = 2;
}
120 changes: 82 additions & 38 deletions src/sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;
use std::env;
use std::future::Future;
use std::path::PathBuf;

Expand All @@ -11,6 +13,11 @@ const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/sink.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/sinker-server-info";

const DEFAULT_FB_SOCK_ADDR: &str = "/var/run/numaflow/fb-sink.sock";
const DEFAULT_FB_SERVER_INFO_FILE: &str = "/var/run/numaflow/fb-sinker-server-info";
const ENV_UD_CONTAINER_TYPE: &str = "NUMAFLOW_UD_CONTAINER_TYPE";
const UD_CONTAINER_FB_SINK: &str = "fb-udsink";


/// Numaflow Sink Proto definitions.
pub mod proto {
Expand Down Expand Up @@ -56,17 +63,9 @@ pub trait Sinker {
/// Ok(v) => {
/// println!("{}", v);
/// // record the response
/// Response {
/// id: datum.id,
/// success: true,
/// err: "".to_string(),
/// }
/// Response::ok(datum.id)
/// }
/// Err(e) => Response {
/// id: datum.id,
/// success: true, // there is no point setting success to false as retrying is not going to help
/// err: format!("Invalid UTF-8 sequence: {}", e),
/// },
/// Err(e) => Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
/// };
///
/// // return the responses
Expand All @@ -90,8 +89,10 @@ pub struct SinkRequest {
pub watermark: DateTime<Utc>,
/// Time of the element as seen at source or aligned after a reduce operation.
pub event_time: DateTime<Utc>,
/// ID is the unique id of the message to be send to the Sink.
/// ID is the unique id of the message to be sent to the Sink.
pub id: String,
/// Headers for the message.
pub headers: HashMap<String, String>,
}

impl From<proto::SinkRequest> for SinkRequest {
Expand All @@ -102,6 +103,7 @@ impl From<proto::SinkRequest> for SinkRequest {
watermark: shared::utc_from_timestamp(sr.watermark),
event_time: shared::utc_from_timestamp(sr.event_time),
id: sr.id,
headers: sr.headers,
}
}
}
Expand All @@ -110,27 +112,68 @@ impl From<proto::SinkRequest> for SinkRequest {
pub struct Response {
/// id is the unique ID of the message.
pub id: String,
/// success indicates whether the write to the sink was successful. If set to `false`, it will be
/// success indicates whether to write to the sink was successful. If set to `false`, it will be
/// retried, hence it is better to try till it is successful.
pub success: bool,
/// fallback is used to indicate that the message should be forwarded to the fallback sink.
pub fallback: bool,
/// err string is used to describe the error if [`Response::success`] was `false`.
pub err: String,
pub err: Option<String>,
}

impl Response {
/// Creates a new `Response` instance indicating a successful operation.
pub fn ok(id: String) -> Self {
Self {
id,
success: true,
fallback: false,
err: None,
}
}

/// Creates a new `Response` instance indicating a failed operation.
pub fn failure(id: String, err: String) -> Self {
Self {
id,
success: false,
fallback: false,
err: Some(err),
}
}

/// Creates a new `Response` instance indicating a failed operation with a fallback
/// set to 'true'. So that the message will be forwarded to the fallback sink.
pub fn fallback(id: String) -> Self {
Self {
id,
success: false,
fallback: true,
err: None,
}
}
}

impl From<Response> for proto::sink_response::Result {
fn from(r: Response) -> Self {
Self {
id: r.id,
success: r.success,
err_msg: r.err.to_string(),
status: if r.fallback {
proto::Status::Fallback as i32
} else if r.success {
proto::Status::Fallback as i32
} else {
proto::Status::Failure as i32
},
err_msg: r.err.unwrap_or_default(),
}
}
}

#[tonic::async_trait]
impl<T> proto::sink_server::Sink for SinkService<T>
where
T: Sinker + Send + Sync + 'static,
where
T: Sinker + Send + Sync + 'static,
{
async fn sink_fn(
&self,
Expand Down Expand Up @@ -185,10 +228,17 @@ pub struct Server<T> {

impl<T> Server<T> {
pub fn new(svc: T) -> Self {
let container_type = env::var(ENV_UD_CONTAINER_TYPE).unwrap_or_default();
let (sock_addr, server_info_file) = if container_type == UD_CONTAINER_FB_SINK {
(DEFAULT_FB_SOCK_ADDR.into(), DEFAULT_FB_SERVER_INFO_FILE.into())
} else {
(DEFAULT_SOCK_ADDR.into(), DEFAULT_SERVER_INFO_FILE.into())
};

Self {
sock_addr: DEFAULT_SOCK_ADDR.into(),
sock_addr,
max_message_size: DEFAULT_MAX_MESSAGE_SIZE,
server_info_file: DEFAULT_SERVER_INFO_FILE.into(),
server_info_file,
svc: Some(svc),
}
}
Expand Down Expand Up @@ -232,9 +282,9 @@ impl<T> Server<T> {
&mut self,
shutdown: F,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
F: Future<Output = ()>,
where
T: Sinker + Send + Sync + 'static,
F: Future<Output=()>,
{
let listener = shared::create_listener_stream(&self.sock_addr, &self.server_info_file)?;
let handler = self.svc.take().unwrap();
Expand All @@ -250,10 +300,10 @@ impl<T> Server<T> {
.map_err(Into::into)
}

/// Starts the gRPC server. Automatically registers singal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives.
/// Starts the gRPC server. Automatically registers signal handlers for SIGINT and SIGTERM and initiates graceful shutdown of gRPC server when either one of the singal arrives.
pub async fn start(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
where
T: Sinker + Send + Sync + 'static,
where
T: Sinker + Send + Sync + 'static,
{
self.start_with_shutdown(shared::shutdown_signal()).await
}
Expand All @@ -262,13 +312,14 @@ impl<T> Server<T> {
#[cfg(test)]
mod tests {
use std::{error::Error, time::Duration};
use tower::service_fn;

use crate::sink;
use crate::sink::proto::sink_client::SinkClient;
use tempfile::TempDir;
use tokio::sync::oneshot;
use tonic::transport::Uri;
use tower::service_fn;

use crate::sink;
use crate::sink::proto::sink_client::SinkClient;

#[tokio::test]
async fn sink_server() -> Result<(), Box<dyn Error>> {
Expand All @@ -289,17 +340,9 @@ mod tests {
Ok(v) => {
println!("{}", v);
// record the response
sink::Response {
id: datum.id,
success: true,
err: "".to_string(),
}
sink::Response::ok(datum.id)
}
Err(e) => sink::Response {
id: datum.id,
success: true, // there is no point setting success to false as retrying is not going to help
err: format!("Invalid UTF-8 sequence: {}", e),
},
Err(e) => sink::Response::failure(datum.id, format!("Invalid UTF-8 sequence: {}", e)),
};

// return the responses
Expand Down Expand Up @@ -347,6 +390,7 @@ mod tests {
watermark: Some(prost_types::Timestamp::default()),
event_time: Some(prost_types::Timestamp::default()),
id: "1".to_string(),
headers: Default::default(),
};

let resp = client.sink_fn(tokio_stream::iter(vec![request])).await?;
Expand Down

0 comments on commit eabe854

Please sign in to comment.