diff --git a/src/client/legacy/client.rs b/src/client/legacy/client.rs index 48b17db..25f6f42 100644 --- a/src/client/legacy/client.rs +++ b/src/client/legacy/client.rs @@ -22,7 +22,8 @@ use tracing::{debug, trace, warn}; use super::connect::HttpConnector; use super::connect::{Alpn, Connect, Connected, Connection}; use super::pool::{self, Ver}; -use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper}; + +use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper}; type BoxSendFuture = Pin + Send>>; @@ -975,6 +976,7 @@ pub struct Builder { #[cfg(feature = "http2")] h2_builder: hyper::client::conn::http2::Builder, pool_config: pool::Config, + pool_timer: Option, } impl Builder { @@ -999,13 +1001,34 @@ impl Builder { idle_timeout: Some(Duration::from_secs(90)), max_idle_per_host: std::usize::MAX, }, + pool_timer: None, } } /// Set an optional timeout for idle sockets being kept-alive. + /// A `Timer` is required for this to take effect. See `Builder::pool_timer` /// /// Pass `None` to disable timeout. /// /// Default is 90 seconds. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "tokio")] + /// # fn run () { + /// use std::time::Duration; + /// use hyper_util::client::legacy::Client; + /// use hyper_util::rt::{TokioExecutor, TokioTimer}; + /// + /// let client = Client::builder(TokioExecutor::new()) + /// .pool_idle_timeout(Duration::from_secs(30)) + /// .pool_timer(TokioTimer::new()) + /// .build_http(); + /// + /// # let infer: Client<_, http_body_util::Full> = client; + /// # } + /// # fn main() {} + /// ``` pub fn pool_idle_timeout(&mut self, val: D) -> &mut Self where D: Into>, @@ -1366,7 +1389,7 @@ impl Builder { self } - /// Provide a timer to be used for timeouts and intervals. + /// Provide a timer to be used for h2 /// /// See the documentation of [`h2::client::Builder::timer`] for more /// details. @@ -1378,7 +1401,15 @@ impl Builder { { #[cfg(feature = "http2")] self.h2_builder.timer(timer); - // TODO(https://github.com/hyperium/hyper/issues/3167) set for pool as well + self + } + + /// Provide a timer to be used for timeouts and intervals in connection pools. + pub fn pool_timer(&mut self, timer: M) -> &mut Self + where + M: Timer + Clone + Send + Sync + 'static, + { + self.pool_timer = Some(timer::Timer::new(timer.clone())); self } @@ -1447,6 +1478,7 @@ impl Builder { B::Data: Send, { let exec = self.exec.clone(); + let timer = self.pool_timer.clone(); Client { config: self.client_config, exec: exec.clone(), @@ -1455,7 +1487,7 @@ impl Builder { #[cfg(feature = "http2")] h2_builder: self.h2_builder.clone(), connector, - pool: pool::Pool::new(self.pool_config, exec), + pool: pool::Pool::new(self.pool_config, exec, timer), } } } diff --git a/src/client/legacy/pool.rs b/src/client/legacy/pool.rs index 77f02d1..7b93612 100644 --- a/src/client/legacy/pool.rs +++ b/src/client/legacy/pool.rs @@ -18,7 +18,10 @@ use futures_channel::oneshot; use futures_util::ready; use tracing::{debug, trace}; -use crate::common::exec::{self, Exec}; +use hyper::rt::Sleep; +use hyper::rt::Timer as _; + +use crate::common::{exec, exec::Exec, timer::Timer}; // FIXME: allow() required due to `impl Trait` leaking types to this lint #[allow(missing_debug_implementations)] @@ -97,6 +100,7 @@ struct PoolInner { // the Pool completely drops. That way, the interval can cancel immediately. idle_interval_ref: Option>, exec: Exec, + timer: Option, timeout: Option, } @@ -117,11 +121,13 @@ impl Config { } impl Pool { - pub fn new(config: Config, executor: E) -> Pool + pub fn new(config: Config, executor: E, timer: Option) -> Pool where E: hyper::rt::Executor + Send + Sync + Clone + 'static, + M: hyper::rt::Timer + Send + Sync + Clone + 'static, { let exec = Exec::new(executor); + let timer = timer.map(|t| Timer::new(t)); let inner = if config.is_enabled() { Some(Arc::new(Mutex::new(PoolInner { connecting: HashSet::new(), @@ -130,6 +136,7 @@ impl Pool { max_idle_per_host: config.max_idle_per_host, waiters: HashMap::new(), exec, + timer, timeout: config.idle_timeout, }))) } else { @@ -411,31 +418,33 @@ impl PoolInner { self.waiters.remove(key); } - fn spawn_idle_interval(&mut self, _pool_ref: &Arc>>) { - // TODO - /* - let (dur, rx) = { - if self.idle_interval_ref.is_some() { - return; - } - - if let Some(dur) = self.timeout { - let (tx, rx) = oneshot::channel(); - self.idle_interval_ref = Some(tx); - (dur, rx) - } else { - return; - } + fn spawn_idle_interval(&mut self, pool_ref: &Arc>>) { + if self.idle_interval_ref.is_some() { + return; + } + let dur = if let Some(dur) = self.timeout { + dur + } else { + return; + }; + let timer = if let Some(timer) = self.timer.clone() { + timer + } else { + return; }; + let (tx, rx) = oneshot::channel(); + self.idle_interval_ref = Some(tx); let interval = IdleTask { - interval: tokio::time::interval(dur), + timer: timer.clone(), + duration: dur, + deadline: Instant::now(), + fut: timer.sleep_until(Instant::now()), // ready at first tick pool: WeakOpt::downgrade(pool_ref), pool_drop_notifier: rx, }; self.exec.execute(interval); - */ } } @@ -755,11 +764,12 @@ impl Expiration { } } -/* pin_project_lite::pin_project! { struct IdleTask { - #[pin] - interval: Interval, + timer: Timer, + duration: Duration, + deadline: Instant, + fut: Pin>, pool: WeakOpt>>, // This allows the IdleTask to be notified as soon as the entire // Pool is fully dropped, and shutdown. This channel is never sent on, @@ -784,7 +794,15 @@ impl Future for IdleTask { } } - ready!(this.interval.as_mut().poll_tick(cx)); + ready!(Pin::new(&mut this.fut).poll(cx)); + // Set this task to run after the next deadline + // If the poll missed the deadline by a lot, set the deadline + // from the current time instead + *this.deadline = *this.deadline + *this.duration; + if *this.deadline < Instant::now() - Duration::from_millis(5) { + *this.deadline = Instant::now() + *this.duration; + } + *this.fut = this.timer.sleep_until(*this.deadline); if let Some(inner) = this.pool.upgrade() { if let Ok(mut inner) = inner.lock() { @@ -797,7 +815,6 @@ impl Future for IdleTask { } } } -*/ impl WeakOpt { fn none() -> Self { @@ -823,7 +840,9 @@ mod tests { use std::time::Duration; use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt}; - use crate::rt::TokioExecutor; + use crate::rt::{TokioExecutor, TokioTimer}; + + use crate::common::timer; #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct KeyImpl(http::uri::Scheme, http::uri::Authority); @@ -870,6 +889,7 @@ mod tests { max_idle_per_host: max_idle, }, TokioExecutor::new(), + Option::::None, ); pool.no_timer(); pool @@ -960,16 +980,14 @@ mod tests { } #[tokio::test] - #[ignore] // TODO async fn test_pool_timer_removes_expired() { - tokio::time::pause(); - let pool = Pool::new( super::Config { idle_timeout: Some(Duration::from_millis(10)), max_idle_per_host: std::usize::MAX, }, TokioExecutor::new(), + Some(TokioTimer::new()), ); let key = host_key("foo"); @@ -984,7 +1002,7 @@ mod tests { ); // Let the timer tick passed the expiration... - tokio::time::advance(Duration::from_millis(30)).await; + tokio::time::sleep(Duration::from_millis(30)).await; // Yield so the Interval can reap... tokio::task::yield_now().await; diff --git a/src/common/mod.rs b/src/common/mod.rs index f14b6c4..63b8288 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -6,6 +6,7 @@ mod lazy; pub(crate) mod rewind; #[cfg(feature = "client")] mod sync; +pub(crate) mod timer; #[cfg(feature = "client")] pub(crate) use exec::Exec; diff --git a/src/common/timer.rs b/src/common/timer.rs new file mode 100644 index 0000000..390be3b --- /dev/null +++ b/src/common/timer.rs @@ -0,0 +1,38 @@ +#![allow(dead_code)] + +use std::fmt; +use std::pin::Pin; +use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; + +use hyper::rt::Sleep; + +#[derive(Clone)] +pub(crate) struct Timer(Arc); + +// =====impl Timer===== +impl Timer { + pub(crate) fn new(inner: T) -> Self + where + T: hyper::rt::Timer + Send + Sync + 'static, + { + Self(Arc::new(inner)) + } +} + +impl fmt::Debug for Timer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Timer").finish() + } +} + +impl hyper::rt::Timer for Timer { + fn sleep(&self, duration: Duration) -> Pin> { + self.0.sleep(duration) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + self.0.sleep_until(deadline) + } +}