Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add timer support for legacy::Pool #84

Merged
merged 4 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions src/client/legacy/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ use tracing::{debug, trace, warn};
use super::connect::HttpConnector;
use super::connect::{Alpn, Connect, Connected, Connection};
use super::pool::{self, Ver};
use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper};

use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};

type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;

Expand Down Expand Up @@ -975,6 +976,7 @@ pub struct Builder {
#[cfg(feature = "http2")]
h2_builder: hyper::client::conn::http2::Builder<Exec>,
pool_config: pool::Config,
pool_timer: Option<timer::Timer>,
}

impl Builder {
Expand All @@ -999,13 +1001,34 @@ impl Builder {
idle_timeout: Some(Duration::from_secs(90)),
max_idle_per_host: std::usize::MAX,
},
pool_timer: None,
}
}
/// Set an optional timeout for idle sockets being kept-alive.
/// A `Timer` is required for this to take effect. See `Builder::pool_timer`
///
/// Pass `None` to disable timeout.
///
/// Default is 90 seconds.
///
/// # Example
///
/// ```
/// # #[cfg(feature = "tokio")]
/// # fn run () {
/// use std::time::Duration;
/// use hyper_util::client::legacy::Client;
/// use hyper_util::rt::{TokioExecutor, TokioTimer};
///
/// let client = Client::builder(TokioExecutor::new())
/// .pool_idle_timeout(Duration::from_secs(30))
/// .pool_timer(TokioTimer::new())
/// .build_http();
///
/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
/// # }
/// # fn main() {}
/// ```
pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
where
D: Into<Option<Duration>>,
Expand Down Expand Up @@ -1366,7 +1389,7 @@ impl Builder {
self
}

/// Provide a timer to be used for timeouts and intervals.
/// Provide a timer to be used for h2
///
/// See the documentation of [`h2::client::Builder::timer`] for more
/// details.
Expand All @@ -1378,7 +1401,15 @@ impl Builder {
{
#[cfg(feature = "http2")]
self.h2_builder.timer(timer);
// TODO(https://github.com/hyperium/hyper/issues/3167) set for pool as well
self
}

/// Provide a timer to be used for timeouts and intervals in connection pools.
pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
where
M: Timer + Clone + Send + Sync + 'static,
{
self.pool_timer = Some(timer::Timer::new(timer.clone()));
self
}

Expand Down Expand Up @@ -1447,6 +1478,7 @@ impl Builder {
B::Data: Send,
{
let exec = self.exec.clone();
let timer = self.pool_timer.clone();
Client {
config: self.client_config,
exec: exec.clone(),
Expand All @@ -1455,7 +1487,7 @@ impl Builder {
#[cfg(feature = "http2")]
h2_builder: self.h2_builder.clone(),
connector,
pool: pool::Pool::new(self.pool_config, exec),
pool: pool::Pool::new(self.pool_config, exec, timer),
}
}
}
Expand Down
76 changes: 47 additions & 29 deletions src/client/legacy/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use futures_channel::oneshot;
use futures_util::ready;
use tracing::{debug, trace};

use crate::common::exec::{self, Exec};
use hyper::rt::Sleep;
use hyper::rt::Timer as _;

use crate::common::{exec, exec::Exec, timer::Timer};

// FIXME: allow() required due to `impl Trait` leaking types to this lint
#[allow(missing_debug_implementations)]
Expand Down Expand Up @@ -97,6 +100,7 @@ struct PoolInner<T, K: Eq + Hash> {
// the Pool completely drops. That way, the interval can cancel immediately.
idle_interval_ref: Option<oneshot::Sender<Infallible>>,
exec: Exec,
timer: Option<Timer>,
timeout: Option<Duration>,
}

Expand All @@ -117,11 +121,13 @@ impl Config {
}

impl<T, K: Key> Pool<T, K> {
pub fn new<E>(config: Config, executor: E) -> Pool<T, K>
pub fn new<E, M>(config: Config, executor: E, timer: Option<M>) -> Pool<T, K>
where
E: hyper::rt::Executor<exec::BoxSendFuture> + Send + Sync + Clone + 'static,
M: hyper::rt::Timer + Send + Sync + Clone + 'static,
{
let exec = Exec::new(executor);
let timer = timer.map(|t| Timer::new(t));
let inner = if config.is_enabled() {
Some(Arc::new(Mutex::new(PoolInner {
connecting: HashSet::new(),
Expand All @@ -130,6 +136,7 @@ impl<T, K: Key> Pool<T, K> {
max_idle_per_host: config.max_idle_per_host,
waiters: HashMap::new(),
exec,
timer,
timeout: config.idle_timeout,
})))
} else {
Expand Down Expand Up @@ -411,31 +418,33 @@ impl<T: Poolable, K: Key> PoolInner<T, K> {
self.waiters.remove(key);
}

fn spawn_idle_interval(&mut self, _pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
// TODO
/*
let (dur, rx) = {
if self.idle_interval_ref.is_some() {
return;
}

if let Some(dur) = self.timeout {
let (tx, rx) = oneshot::channel();
self.idle_interval_ref = Some(tx);
(dur, rx)
} else {
return;
}
fn spawn_idle_interval(&mut self, pool_ref: &Arc<Mutex<PoolInner<T, K>>>) {
if self.idle_interval_ref.is_some() {
return;
}
let dur = if let Some(dur) = self.timeout {
dur
} else {
return;
};
let timer = if let Some(timer) = self.timer.clone() {
timer
} else {
return;
};
let (tx, rx) = oneshot::channel();
self.idle_interval_ref = Some(tx);

let interval = IdleTask {
interval: tokio::time::interval(dur),
timer: timer.clone(),
duration: dur,
deadline: Instant::now(),
fut: timer.sleep_until(Instant::now()), // ready at first tick
pool: WeakOpt::downgrade(pool_ref),
pool_drop_notifier: rx,
};

self.exec.execute(interval);
*/
}
}

Expand Down Expand Up @@ -755,11 +764,12 @@ impl Expiration {
}
}

/*
pin_project_lite::pin_project! {
struct IdleTask<T, K: Key> {
#[pin]
interval: Interval,
timer: Timer,
duration: Duration,
deadline: Instant,
fut: Pin<Box<dyn Sleep>>,
pool: WeakOpt<Mutex<PoolInner<T, K>>>,
// This allows the IdleTask to be notified as soon as the entire
// Pool is fully dropped, and shutdown. This channel is never sent on,
Expand All @@ -784,7 +794,15 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
}
}

ready!(this.interval.as_mut().poll_tick(cx));
ready!(Pin::new(&mut this.fut).poll(cx));
// Set this task to run after the next deadline
// If the poll missed the deadline by a lot, set the deadline
// from the current time instead
*this.deadline = *this.deadline + *this.duration;
if *this.deadline < Instant::now() - Duration::from_millis(5) {
*this.deadline = Instant::now() + *this.duration;
}
*this.fut = this.timer.sleep_until(*this.deadline);

if let Some(inner) = this.pool.upgrade() {
if let Ok(mut inner) = inner.lock() {
Expand All @@ -797,7 +815,6 @@ impl<T: Poolable + 'static, K: Key> Future for IdleTask<T, K> {
}
}
}
*/

impl<T> WeakOpt<T> {
fn none() -> Self {
Expand All @@ -823,7 +840,9 @@ mod tests {
use std::time::Duration;

use super::{Connecting, Key, Pool, Poolable, Reservation, WeakOpt};
use crate::rt::TokioExecutor;
use crate::rt::{TokioExecutor, TokioTimer};

use crate::common::timer;

#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct KeyImpl(http::uri::Scheme, http::uri::Authority);
Expand Down Expand Up @@ -870,6 +889,7 @@ mod tests {
max_idle_per_host: max_idle,
},
TokioExecutor::new(),
Option::<timer::Timer>::None,
);
pool.no_timer();
pool
Expand Down Expand Up @@ -960,16 +980,14 @@ mod tests {
}

#[tokio::test]
#[ignore] // TODO
async fn test_pool_timer_removes_expired() {
tokio::time::pause();

let pool = Pool::new(
super::Config {
idle_timeout: Some(Duration::from_millis(10)),
max_idle_per_host: std::usize::MAX,
},
TokioExecutor::new(),
Some(TokioTimer::new()),
);

let key = host_key("foo");
Expand All @@ -984,7 +1002,7 @@ mod tests {
);

// Let the timer tick passed the expiration...
tokio::time::advance(Duration::from_millis(30)).await;
tokio::time::sleep(Duration::from_millis(30)).await;
// Yield so the Interval can reap...
tokio::task::yield_now().await;

Expand Down
1 change: 1 addition & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod lazy;
pub(crate) mod rewind;
#[cfg(feature = "client")]
mod sync;
pub(crate) mod timer;

#[cfg(feature = "client")]
pub(crate) use exec::Exec;
Expand Down
38 changes: 38 additions & 0 deletions src/common/timer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#![allow(dead_code)]

use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;

use hyper::rt::Sleep;

#[derive(Clone)]
pub(crate) struct Timer(Arc<dyn hyper::rt::Timer + Send + Sync>);

// =====impl Timer=====
impl Timer {
pub(crate) fn new<T>(inner: T) -> Self
where
T: hyper::rt::Timer + Send + Sync + 'static,
{
Self(Arc::new(inner))
}
}

impl fmt::Debug for Timer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Timer").finish()
}
}

impl hyper::rt::Timer for Timer {
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
self.0.sleep(duration)
}

fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
self.0.sleep_until(deadline)
}
}