diff --git a/CHANGES.md b/CHANGES.md index 9ad7f79..78a84fe 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [4.3.1] - 2024-11-05 + +* Do not rely on not_ready(), always check service readiness + ## [4.3.0] - 2024-11-04 * Use updated Service trait diff --git a/Cargo.toml b/Cargo.toml index 8e7bb0b..b5b9172 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ntex-mqtt" -version = "4.3.0" +version = "4.3.1" authors = ["ntex contributors "] description = "Client and Server framework for MQTT v5 and v3.1.1 protocols" documentation = "https://docs.rs/ntex-mqtt" diff --git a/src/io.rs b/src/io.rs index 3c9de34..271779d 100644 --- a/src/io.rs +++ b/src/io.rs @@ -11,8 +11,6 @@ use ntex_util::time::Seconds; type Response = ::Item; -const READY_COUNT: u8 = 32; - pin_project_lite::pin_project! { /// Dispatcher for mqtt protocol pub(crate) struct Dispatcher @@ -52,7 +50,6 @@ struct DispatcherInner>, U: Encoder + Decoder + 'stat read_max_timeout: Seconds, keepalive_timeout: Seconds, - ready_count: u8, response: Option>>, response_idx: usize, } @@ -142,7 +139,6 @@ where st: IoDispatcherState::Processing, response: None, response_idx: 0, - ready_count: 0, read_remains: 0, read_remains_prev: 0, read_max_timeout: Seconds::ZERO, @@ -452,19 +448,9 @@ where } fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll> { - // check service readiness - if self.flags.contains(Flags::READY) { - if self.ready_count != 0 && self.service.poll_not_ready(cx).is_pending() { - self.ready_count -= 1; - return Poll::Ready(self.check_error()); - } - self.flags.remove(Flags::READY); - } - match self.service.poll_ready(cx) { Poll::Ready(Ok(_)) => { - self.ready_count = READY_COUNT; - self.flags.insert(Flags::READY); + let _ = self.service.poll_not_ready(cx); Poll::Ready(self.check_error()) } // pause io read task @@ -646,7 +632,6 @@ mod tests { service: Pipeline::new(service.into_service()).bind(), response: None, response_idx: 0, - ready_count: 0, io: IoBoxed::from(io), st: IoDispatcherState::Processing, flags: if keepalive_timeout.is_zero() {