Skip to content

Commit

Permalink
feat: change Sourcer API to support sources where pending can not be … (
Browse files Browse the repository at this point in the history
#106)

Signed-off-by: Sreekanth <prsreekanth920@gmail.com>
  • Loading branch information
BulkBeing authored Nov 21, 2024
1 parent 152ffb9 commit d0a7377
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 15 deletions.
11 changes: 7 additions & 4 deletions examples/mapt-event-time-filter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!(messages.len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022")
}

Expand All @@ -76,7 +76,7 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!(messages.len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022")
}

Expand All @@ -94,7 +94,10 @@ mod tests {

let messages = filter_event_time(source_request);

assert_eq!((&messages).len(), 1);
assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__")
assert_eq!(messages.len(), 1);
assert_eq!(
messages.first().unwrap().tags.as_ref().unwrap()[0],
"U+005C__DROP__"
)
}
}
4 changes: 2 additions & 2 deletions examples/simple-source/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ pub(crate) mod simple_source {
}
}

async fn pending(&self) -> usize {
self.yet_to_ack.read().unwrap().len()
async fn pending(&self) -> Option<usize> {
self.yet_to_ack.read().unwrap().len().into()
}

async fn partitions(&self) -> Option<Vec<i32>> {
Expand Down
4 changes: 2 additions & 2 deletions numaflow/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ mod tests {

let resp = resp_stream.message().await.unwrap().unwrap();
assert!(!resp.results.is_empty());
let msg = &resp.results.get(0).unwrap();
let msg = resp.results.first().unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "1");

Expand All @@ -660,7 +660,7 @@ mod tests {
let resp = resp_stream.message().await.unwrap().unwrap();
assert!(!resp.results.is_empty());
assert!(resp.handshake.is_none());
let msg = &resp.results.get(0).unwrap();
let msg = resp.results.first().unwrap();
assert_eq!(msg.err_msg, "");
assert_eq!(msg.id, "2");

Expand Down
16 changes: 9 additions & 7 deletions numaflow/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ pub trait Sourcer {
/// Acknowledges the message that has been processed by the user-defined source.
async fn ack(&self, offset: Vec<Offset>);
/// Returns the number of messages that are yet to be processed by the user-defined source.
async fn pending(&self) -> usize;
/// The None value can be returned if source doesn't support detecting the backlog.
async fn pending(&self) -> Option<usize>;
/// Returns the partitions associated with the source. This will be used by the platform to determine
/// the partitions to which the watermark should be published. Some sources might not have the concept of partitions.
/// Kafka is an example of source where a reader can read from multiple partitions.
Expand Down Expand Up @@ -324,12 +325,13 @@ where

async fn pending_fn(&self, _: Request<()>) -> Result<Response<proto::PendingResponse>, Status> {
// invoke the user-defined source's pending handler
let pending = self.handler.pending().await;
let pending = match self.handler.pending().await {
None => -1,
Some(val) => i64::try_from(val).unwrap_or(i64::MAX),
};

Ok(Response::new(proto::PendingResponse {
result: Some(proto::pending_response::Result {
count: pending as i64,
}),
result: Some(proto::pending_response::Result { count: pending }),
}))
}

Expand Down Expand Up @@ -613,11 +615,11 @@ mod tests {
}
}

async fn pending(&self) -> usize {
async fn pending(&self) -> Option<usize> {
// The pending function should return the number of pending messages that can be read from the source.
// However, for this source the pending messages will always be 0.
// For testing purposes, we return the number of messages that are not yet acknowledged as pending.
self.yet_to_ack.read().unwrap().len()
self.yet_to_ack.read().unwrap().len().into()
}

async fn partitions(&self) -> Option<Vec<i32>> {
Expand Down

0 comments on commit d0a7377

Please sign in to comment.