Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change Sourcer API to support sources where pending can not be … #106

Merged
merged 1 commit into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!(messages.len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022")
}

Expand All @@ -76,7 +76,7 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!(messages.len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022")
}

Expand All @@ -94,7 +94,10 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__")
assert_eq!(messages.len(), 1);
assert_eq!(
messages.first().unwrap().tags.as_ref().unwrap()[0],
"U+005C__DROP__"
)
}
}
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ pub(crate) mod simple_source {
}
}

async fn pending(&self) -> usize {
self.yet_to_ack.read().unwrap().len()
async fn pending(&self) -> Option<usize> {
self.yet_to_ack.read().unwrap().len().into()
}

async fn partitions(&self) -> Option<Vec<i32>> {
Expand Down
4 changes: 2 additions & 2 deletions numaflow/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ mod tests {

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(!resp.results.is_empty());
let msg = &resp.results.get(0).unwrap();
let msg = resp.results.first().unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "1");

Expand All @@ -660,7 +660,7 @@ mod tests {
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(!resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.results.get(0).unwrap();
let msg = resp.results.first().unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "2");

Expand Down
16 changes: 9 additions & 7 deletions numaflow/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ pub trait Sourcer {
/// Acknowledges the message that has been processed by the user-defined source.
async fn ack(&self, offset: Vec<Offset>);
/// Returns the number of messages that are yet to be processed by the user-defined source.
async fn pending(&self) -> usize;
/// The None value can be returned if source doesn't support detecting the backlog.
async fn pending(&self) -> Option<usize>;
/// Returns the partitions associated with the source. This will be used by the platform to determine
/// the partitions to which the watermark should be published. Some sources might not have the concept of partitions.
/// Kafka is an example of source where a reader can read from multiple partitions.
Expand Down Expand Up @@ -324,12 +325,13 @@ where

async fn pending_fn(&self, _: Request<()>) -> Result<Response<proto::PendingResponse>, Status> {
// invoke the user-defined source's pending handler
let pending = self.handler.pending().await;
let pending = match self.handler.pending().await {
None => -1,
Some(val) => i64::try_from(val).unwrap_or(i64::MAX),
};

Ok(Response::new(proto::PendingResponse {
result: Some(proto::pending_response::Result {
count: pending as i64,
}),
result: Some(proto::pending_response::Result { count: pending }),
}))
}

Expand Down Expand Up @@ -613,11 +615,11 @@ mod tests {
}
}

async fn pending(&self) -> usize {
async fn pending(&self) -> Option<usize> {
// The pending function should return the number of pending messages that can be read from the source.
// However, for this source the pending messages will always be 0.
// For testing purposes, we return the number of messages that are not yet acknowledged as pending.
self.yet_to_ack.read().unwrap().len()
self.yet_to_ack.read().unwrap().len().into()
}

async fn partitions(&self) -> Option<Vec<i32>> {
Expand Down
Loading