Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Bidirectional Streaming for Source #84

Merged
merged 7 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,9 @@ pub(crate) mod simple_source {
self.yet_to_ack.write().unwrap().extend(message_offsets)
}

async fn ack(&self, offsets: Vec<Offset>) {
for offset in offsets {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}
async fn ack(&self, offset: Offset) {
let x = &String::from_utf8(offset.offset).unwrap();
self.yet_to_ack.write().unwrap().remove(x);
}

async fn pending(&self) -> usize {
Expand Down
48 changes: 36 additions & 12 deletions proto/source.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,17 @@ package source.v1;

service Source {
// Read returns a stream of datum responses.
// The size of the returned ReadResponse is less than or equal to the num_records specified in ReadRequest.
// If the request timeout is reached on server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
rpc ReadFn(ReadRequest) returns (stream ReadResponse);
// The size of the returned ReadResponse is less than or equal to the num_records specified in each ReadRequest.
// If the request timeout is reached on the server side, the returned ReadResponse will contain all the datum that have been read (which could be an empty list).
// The server will continue to read and respond to subsequent ReadRequests until the client closes the stream.
rpc ReadFn(stream ReadRequest) returns (stream ReadResponse);

// AckFn acknowledges a list of datum offsets.
// AckFn acknowledges a stream of datum offsets.
// When AckFn is called, it implicitly indicates that the datum stream has been processed by the source vertex.
// The caller (numa) expects the AckFn to be successful, and it does not expect any errors.
// If there are some irrecoverable errors when the callee (UDSource) is processing the AckFn request,
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(AckRequest) returns (AckResponse);
// then it is best to crash because there are no other retry mechanisms possible.
rpc AckFn(stream AckRequest) returns (AckResponse);

// PendingFn returns the number of pending records at the user defined source.
rpc PendingFn(google.protobuf.Empty) returns (PendingResponse);
Expand Down Expand Up @@ -60,9 +61,35 @@ message ReadResponse {
// We add this optional field to support the use case where the user defined source can provide keys for the datum.
// e.g. Kafka and Redis Stream message usually include information about the keys.
repeated string keys = 4;
// Optional list of headers associated with the datum.
// Headers are the metadata associated with the datum.
// e.g. Kafka and Redis Stream message usually include information about the headers.
map<string, string> headers = 5;
}
message Status {
// Code to indicate the status of the response.
enum Code {
SUCCESS = 0;
FAILURE = 1;
}

// Error to indicate the error type. If the code is FAILURE, then the error field will be populated.
enum Error {
UNACKED = 0;
OTHER = 1;
}

// End of transmission flag.
bool eot = 1;
Code code = 2;
Error error = 3;
optional string msg = 4;
}
// Required field holding the result.
Result result = 1;
// Status of the response. Holds the end of transmission flag and the status code.
//
Status status = 2;
}

/*
Expand All @@ -71,11 +98,8 @@ message ReadResponse {
*/
message AckRequest {
message Request {
// Required field holding a list of offsets to be acknowledged.
// The offsets must be strictly corresponding to the previously read batch,
// meaning the offsets must be in the same order as the datum responses in the ReadResponse.
// By enforcing ordering, we can save deserialization effort on the server side, assuming the server keeps a local copy of the raw/un-serialized offsets.
repeated Offset offsets = 1;
// Required field holding the offset to be acked
Offset offset = 1;
}
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
Request request = 1;
Expand Down Expand Up @@ -146,4 +170,4 @@ message Offset {
// It is useful for sources that have multiple partitions. e.g. Kafka.
// If the partition_id is not specified, it is assumed that the source has a single partition.
int32 partition_id = 2;
}
}
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use thiserror::Error;

pub type Result<T> = std::result::Result<T, Error>;

#[derive(Error, Debug, Clone)]
pub enum ErrorKind {
#[error("User Defined Error: {0}")]
Expand Down
Loading
Loading