From 786d18ce8ddf01fb37a6c3585621a99fdc97c98d Mon Sep 17 00:00:00 2001 From: dswij Date: Thu, 28 Dec 2023 16:31:47 +0800 Subject: [PATCH 1/4] feat: add timer support for `legacy::Pool` --- src/client/legacy/client.rs | 32 ++++++++++++++++--- src/client/legacy/pool.rs | 63 +++++++++++++++++++++++-------------- src/common/mod.rs | 1 + src/common/timer.rs | 39 +++++++++++++++++++++++ 4 files changed, 107 insertions(+), 28 deletions(-) create mode 100644 src/common/timer.rs diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index 48b17db..429f47a 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -22,7 +22,8 @@ use tracing::{debug, trace, warn}; use super::connect::HttpConnector; use super::connect::{Alpn, Connect, Connected, Connection}; use super::pool::{self, Ver}; -use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper}; + +use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper}; type BoxSendFuture = Pin + Send>>; @@ -975,6 +976,7 @@ pub struct Builder { #[cfg(feature = "http2")] h2_builder: hyper::client::conn::http2::Builder, pool_config: pool::Config, + pool_timer: Option, } impl Builder { @@ -999,13 +1001,34 @@ impl Builder { idle_timeout: Some(Duration::from_secs(90)), max_idle_per_host: std::usize::MAX, }, + pool_timer: None, } } /// Set an optional timeout for idle sockets being kept-alive. + /// A `Timer` is required for this to take effect /// /// Pass `None` to disable timeout. /// /// Default is 90 seconds. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "tokio")] + /// # fn run () { + /// use std::time::Duration; + /// use hyper_util::client::legacy::Client; + /// use hyper_util::rt::{TokioExecutor, TokioTimer}; + /// + /// let client = Client::builder(TokioExecutor::new()) + /// .pool_idle_timeout(Duration::from_secs(30)) + /// .timer(TokioTimer::new()) + /// .build_http(); + /// + /// # let infer: Client<_, http_body_util::Full> = client; + /// # } + /// # fn main() {} + /// ``` pub fn pool_idle_timeout(&mut self, val: D) -> &mut Self where D: Into>, @@ -1374,11 +1397,11 @@ impl Builder { /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer pub fn timer(&mut self, timer: M) -> &mut Self where - M: Timer + Send + Sync + 'static, + M: Timer + Clone + Send + Sync + 'static, { + self.pool_timer = Some(timer::Timer::new(timer.clone())); #[cfg(feature = "http2")] self.h2_builder.timer(timer); - // TODO(https://github.com/hyperium/hyper/issues/3167) set for pool as well self } @@ -1447,6 +1470,7 @@ impl Builder { B::Data: Send, { let exec = self.exec.clone(); + let timer = self.pool_timer.clone(); Client { config: self.client_config, exec: exec.clone(), @@ -1455,7 +1479,7 @@ impl Builder { #[cfg(feature = "http2")] h2_builder: self.h2_builder.clone(), connector, - pool: pool::Pool::new(self.pool_config, exec), + pool: pool::Pool::new(self.pool_config, exec, timer), } } } diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index 77f02d1..8aa9380 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -18,7 +18,10 @@ use futures_channel::oneshot; use futures_util::ready; use tracing::{debug, trace}; -use crate::common::exec::{self, Exec}; +use hyper::rt::Sleep; +use hyper::rt::Timer as _; + +use crate::common::{exec, exec::Exec, timer::Timer}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -97,6 +100,7 @@ struct PoolInner { // the Pool completely drops. That way, the interval can cancel immediately. idle_interval_ref: Option>, exec: Exec, + timer: Option, timeout: Option, } @@ -117,11 +121,13 @@ impl Config { } impl Pool { - pub fn new(config: Config, executor: E) -> Pool + pub fn new(config: Config, executor: E, timer: Option) -> Pool where E: hyper::rt::Executor + Send + Sync + Clone + 'static, + M: hyper::rt::Timer + Send + Sync + Clone + 'static, { let exec = Exec::new(executor); + let timer = timer.map(|t| Timer::new(t)); let inner = if config.is_enabled() { Some(Arc::new(Mutex::new(PoolInner { connecting: HashSet::new(), @@ -130,6 +136,7 @@ impl Pool { max_idle_per_host: config.max_idle_per_host, waiters: HashMap::new(), exec, + timer, timeout: config.idle_timeout, }))) } else { @@ -411,31 +418,27 @@ impl PoolInner { self.waiters.remove(key); } - fn spawn_idle_interval(&mut self, _pool_ref: &Arc>>) { - // TODO - /* - let (dur, rx) = { - if self.idle_interval_ref.is_some() { - return; - } - - if let Some(dur) = self.timeout { - let (tx, rx) = oneshot::channel(); - self.idle_interval_ref = Some(tx); - (dur, rx) - } else { - return; - } + fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { + if self.idle_interval_ref.is_some() { + return; + } + let Some(dur) = self.timeout else { return }; + let Some(timer) = self.timer.clone() else { + return; }; + let (tx, rx) = oneshot::channel(); + self.idle_interval_ref = Some(tx); let interval = IdleTask { - interval: tokio::time::interval(dur), + timer: timer.clone(), + duration: dur, + deadline: Instant::now(), + fut: timer.sleep_until(Instant::now()), // ready at first tick pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx, }; self.exec.execute(interval); - */ } } @@ -755,11 +758,12 @@ impl Expiration { } } -/* pin_project_lite::pin_project! { struct IdleTask { - #[pin] - interval: Interval, + timer: Timer, + duration: Duration, + deadline: Instant, + fut: Pin>, pool: WeakOpt>>, // This allows the IdleTask to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, @@ -784,7 +788,15 @@ impl Future for IdleTask { } } - ready!(this.interval.as_mut().poll_tick(cx)); + ready!(Pin::new(&mut this.fut).poll(cx)); + // Set this task to run after the next deadline + // If the poll missed the deadline by a lot, set the deadline + // from the current time instead + *this.deadline = *this.deadline + *this.duration; + if *this.deadline < Instant::now() - Duration::from_millis(5) { + *this.deadline = Instant::now() + *this.duration; + } + *this.fut = this.timer.sleep_until(*this.deadline); if let Some(inner) = this.pool.upgrade() { if let Ok(mut inner) = inner.lock() { @@ -797,7 +809,6 @@ impl Future for IdleTask { } } } -*/ impl WeakOpt { fn none() -> Self { @@ -825,6 +836,8 @@ mod tests { use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; use crate::rt::TokioExecutor; + use crate::common::timer; + #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct KeyImpl(http::uri::Scheme, http::uri::Authority); @@ -870,6 +883,7 @@ mod tests { max_idle_per_host: max_idle, }, TokioExecutor::new(), + Option::::None, ); pool.no_timer(); pool @@ -970,6 +984,7 @@ mod tests { max_idle_per_host: std::usize::MAX, }, TokioExecutor::new(), + Option::::None, ); let key = host_key("foo"); diff --git a/src/common/mod.rs b/src/common/mod.rs index f14b6c4..63b8288 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -6,6 +6,7 @@ mod lazy; pub(crate) mod rewind; #[cfg(feature = "client")] mod sync; +pub(crate) mod timer; #[cfg(feature = "client")] pub(crate) use exec::Exec; diff --git a/src/common/timer.rs b/src/common/timer.rs new file mode 100644 index 0000000..f7e4e8b --- /dev/null +++ b/src/common/timer.rs @@ -0,0 +1,39 @@ +#![allow(dead_code)] + +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use hyper::rt::Sleep; + +#[derive(Clone)] +pub(crate) struct Timer(Arc); + +// =====impl Timer===== +impl Timer { + pub(crate) fn new(inner: T) -> Self + where + T: hyper::rt::Timer + Send + Sync + 'static, + { + Self(Arc::new(inner)) + } +} + +impl fmt::Debug for Timer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Timer").finish() + } +} + +impl hyper::rt::Timer for Timer { + fn sleep(&self, duration: Duration) -> Pin> { + self.0.sleep(duration) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + self.0.sleep_until(deadline) + } +} From 4bf4d29ed48f3a28c6f367e2c0da308dad9e5384 Mon Sep 17 00:00:00 2001 From: dswij Date: Thu, 28 Dec 2023 17:13:44 +0800 Subject: [PATCH 2/4] test(pool): re-enable pool expired checkout test --- src/client/legacy/pool.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index 8aa9380..417661f 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -834,7 +834,7 @@ mod tests { use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::rt::TokioExecutor; + use crate::rt::{TokioExecutor, TokioTimer}; use crate::common::timer; @@ -974,17 +974,14 @@ mod tests { } #[tokio::test] - #[ignore] // TODO async fn test_pool_timer_removes_expired() { - tokio::time::pause(); - let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(10)), max_idle_per_host: std::usize::MAX, }, TokioExecutor::new(), - Option::::None, + Some(TokioTimer::new()), ); let key = host_key("foo"); @@ -999,7 +996,7 @@ mod tests { ); // Let the timer tick passed the expiration... - tokio::time::advance(Duration::from_millis(30)).await; + tokio::time::sleep(Duration::from_millis(30)).await; // Yield so the Interval can reap... tokio::task::yield_now().await; From 2f36b1c57e80454a9201c8676f1e4195d6fea86b Mon Sep 17 00:00:00 2001 From: dswij Date: Thu, 28 Dec 2023 17:58:54 +0800 Subject: [PATCH 3/4] bye let-else --- src/client/legacy/pool.rs | 10 ++++++++-- src/common/timer.rs | 1 - 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index 417661f..7b93612 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -422,8 +422,14 @@ impl PoolInner { if self.idle_interval_ref.is_some() { return; } - let Some(dur) = self.timeout else { return }; - let Some(timer) = self.timer.clone() else { + let dur = if let Some(dur) = self.timeout { + dur + } else { + return; + }; + let timer = if let Some(timer) = self.timer.clone() { + timer + } else { return; }; let (tx, rx) = oneshot::channel(); diff --git a/src/common/timer.rs b/src/common/timer.rs index f7e4e8b..390be3b 100644 --- a/src/common/timer.rs +++ b/src/common/timer.rs @@ -1,7 +1,6 @@ #![allow(dead_code)] use std::fmt; -use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::time::Duration; From 56ccdf7add0b4a625a259cf4a8cba5ca6880d566 Mon Sep 17 00:00:00 2001 From: dswij Date: Fri, 29 Dec 2023 01:57:45 +0800 Subject: [PATCH 4/4] separate pool and h2 timer --- src/client/legacy/client.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index 429f47a..25f6f42 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -1005,7 +1005,7 @@ impl Builder { } } /// Set an optional timeout for idle sockets being kept-alive. - /// A `Timer` is required for this to take effect + /// A `Timer` is required for this to take effect. See `Builder::pool_timer` /// /// Pass `None` to disable timeout. /// @@ -1022,7 +1022,7 @@ impl Builder { /// /// let client = Client::builder(TokioExecutor::new()) /// .pool_idle_timeout(Duration::from_secs(30)) - /// .timer(TokioTimer::new()) + /// .pool_timer(TokioTimer::new()) /// .build_http(); /// /// # let infer: Client<_, http_body_util::Full> = client; @@ -1389,7 +1389,7 @@ impl Builder { self } - /// Provide a timer to be used for timeouts and intervals. + /// Provide a timer to be used for h2 /// /// See the documentation of [`h2::client::Builder::timer`] for more /// details. @@ -1397,14 +1397,22 @@ impl Builder { /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer pub fn timer(&mut self, timer: M) -> &mut Self where - M: Timer + Clone + Send + Sync + 'static, + M: Timer + Send + Sync + 'static, { - self.pool_timer = Some(timer::Timer::new(timer.clone())); #[cfg(feature = "http2")] self.h2_builder.timer(timer); self } + /// Provide a timer to be used for timeouts and intervals in connection pools. + pub fn pool_timer(&mut self, timer: M) -> &mut Self + where + M: Timer + Clone + Send + Sync + 'static, + { + self.pool_timer = Some(timer::Timer::new(timer.clone())); + self + } + /// Set the maximum write buffer size for each HTTP/2 stream. /// /// Default is currently 1MB, but may change.