diff --git a/examples/mapt-event-time-filter/src/main.rs b/examples/mapt-event-time-filter/src/main.rs index 6e3cef0..1b76145 100644 --- a/examples/mapt-event-time-filter/src/main.rs +++ b/examples/mapt-event-time-filter/src/main.rs @@ -58,7 +58,7 @@ mod tests { let messages = filter_event_time(source_request); - assert_eq!((&messages).len(), 1); + assert_eq!(messages.len(), 1); assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "within_year_2022") } @@ -76,7 +76,7 @@ mod tests { let messages = filter_event_time(source_request); - assert_eq!((&messages).len(), 1); + assert_eq!(messages.len(), 1); assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "after_year_2022") } @@ -94,7 +94,10 @@ mod tests { let messages = filter_event_time(source_request); - assert_eq!((&messages).len(), 1); - assert_eq!((&messages)[0].tags.as_ref().unwrap()[0], "U+005C__DROP__") + assert_eq!(messages.len(), 1); + assert_eq!( + messages.first().unwrap().tags.as_ref().unwrap()[0], + "U+005C__DROP__" + ) } } diff --git a/examples/simple-source/src/main.rs b/examples/simple-source/src/main.rs index 1fe06e9..dd842c8 100644 --- a/examples/simple-source/src/main.rs +++ b/examples/simple-source/src/main.rs @@ -68,8 +68,8 @@ pub(crate) mod simple_source { } } - async fn pending(&self) -> usize { - self.yet_to_ack.read().unwrap().len() + async fn pending(&self) -> Option { + self.yet_to_ack.read().unwrap().len().into() } async fn partitions(&self) -> Option> { diff --git a/numaflow/src/sink.rs b/numaflow/src/sink.rs index 5b6a751..1171256 100644 --- a/numaflow/src/sink.rs +++ b/numaflow/src/sink.rs @@ -646,7 +646,7 @@ mod tests { let resp = resp_stream.message().await.unwrap().unwrap(); assert!(!resp.results.is_empty()); - let msg = &resp.results.get(0).unwrap(); + let msg = resp.results.first().unwrap(); assert_eq!(msg.err_msg, ""); assert_eq!(msg.id, "1"); @@ -660,7 +660,7 @@ mod tests { let resp = resp_stream.message().await.unwrap().unwrap(); assert!(!resp.results.is_empty()); assert!(resp.handshake.is_none()); - let msg = &resp.results.get(0).unwrap(); + let msg = resp.results.first().unwrap(); assert_eq!(msg.err_msg, ""); assert_eq!(msg.id, "2"); diff --git a/numaflow/src/source.rs b/numaflow/src/source.rs index 190f9f2..40d4070 100644 --- a/numaflow/src/source.rs +++ b/numaflow/src/source.rs @@ -54,7 +54,8 @@ pub trait Sourcer { /// Acknowledges the message that has been processed by the user-defined source. async fn ack(&self, offset: Vec); /// Returns the number of messages that are yet to be processed by the user-defined source. - async fn pending(&self) -> usize; + /// The None value can be returned if source doesn't support detecting the backlog. + async fn pending(&self) -> Option; /// Returns the partitions associated with the source. This will be used by the platform to determine /// the partitions to which the watermark should be published. Some sources might not have the concept of partitions. /// Kafka is an example of source where a reader can read from multiple partitions. @@ -324,12 +325,13 @@ where async fn pending_fn(&self, _: Request<()>) -> Result, Status> { // invoke the user-defined source's pending handler - let pending = self.handler.pending().await; + let pending = match self.handler.pending().await { + None => -1, + Some(val) => i64::try_from(val).unwrap_or(i64::MAX), + }; Ok(Response::new(proto::PendingResponse { - result: Some(proto::pending_response::Result { - count: pending as i64, - }), + result: Some(proto::pending_response::Result { count: pending }), })) } @@ -613,11 +615,11 @@ mod tests { } } - async fn pending(&self) -> usize { + async fn pending(&self) -> Option { // The pending function should return the number of pending messages that can be read from the source. // However, for this source the pending messages will always be 0. // For testing purposes, we return the number of messages that are not yet acknowledged as pending. - self.yet_to_ack.read().unwrap().len() + self.yet_to_ack.read().unwrap().len().into() } async fn partitions(&self) -> Option> {