Skip to content

Commit

Permalink
feat: port legacy client integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dswij committed Dec 9, 2023
1 parent 99409f5 commit 110f522
Show file tree
Hide file tree
Showing 3 changed files with 297 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ bytes = "1"
http-body-util = "0.1.0"
tokio = { version = "1", features = ["macros", "test-util"] }
tokio-test = "0.4"
pretty_env_logger = "0.5"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
pnet_datalink = "0.34.0"
Expand Down
149 changes: 149 additions & 0 deletions tests/legacy/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
mod test_utils;

use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::net::TcpListener;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Duration;

use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, FutureExt, TryFutureExt};
use futures_util::stream::StreamExt;
use futures_util::task::{Context, Poll};
use futures_util::{self, Future, Stream};
use http::Uri;
use http_body_util::{BodyExt, Empty, Full};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tokio_test::block_on;

use hyper::body::{Body, Bytes};
use hyper::Request;
use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;

use test_utils::DebugConnector;

pub fn runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.expect("new rt")
}

#[test]
fn drop_body_before_eof_closes_connection() {
// https://github.com/hyperium/hyper/issues/1353
let _ = pretty_env_logger::try_init();

let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let rt = runtime();
let (closes_tx, closes) = mpsc::channel::<()>(10);
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(
DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx),
);
let (tx1, rx1) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let body = vec![b'x'; 1024 * 128];
write!(
sock,
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
body.len()
)
.expect("write head");
let _ = sock.write_all(&body);
let _ = tx1.send(());
});

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
.body(Empty::<Bytes>::new())
.unwrap();
let res = client.request(req).map_ok(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK);
});
let rx = rx1;
rt.block_on(async move {
let (res, _) = future::join(res, rx).await;
res.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
});
rt.block_on(closes.into_future()).0.expect("closes");
}

#[tokio::test]
async fn drop_client_closes_idle_connections() {
let _ = pretty_env_logger::try_init();

let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let (closes_tx, mut closes) = mpsc::channel(10);

let (tx1, rx1) = oneshot::channel();
let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>();

thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
let body = [b'x'; 64];
write!(
sock,
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n",
body.len()
)
.expect("write head");
let _ = sock.write_all(&body);
let _ = tx1.send(());

// prevent this thread from closing until end of test, so the connection
// stays open and idle until Client is dropped
runtime().block_on(client_drop_rx.into_future())
});

let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes(
HttpConnector::new(),
closes_tx,
));

let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
.body(Empty::<Bytes>::new())
.unwrap();
let res = client.request(req).map_ok(move |res| {
assert_eq!(res.status(), hyper::StatusCode::OK);
});
let rx = rx1;
let (res, _) = future::join(res, rx).await;
res.unwrap();

// not closed yet, just idle
future::poll_fn(|ctx| {
assert!(Pin::new(&mut closes).poll_next(ctx).is_pending());
Poll::Ready(())
})
.await;

// drop to start the connections closing
drop(client);

// and wait a few ticks for the connections to close
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out"));
futures_util::pin_mut!(t);
let close = closes.into_future().map(|(opt, _)| opt.expect("closes"));
future::select(t, close).await;
}
147 changes: 147 additions & 0 deletions tests/test_utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use std::io;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use futures_channel::{mpsc, oneshot};
use futures_util::task::{Context, Poll};
use futures_util::Future;
use futures_util::TryFutureExt;
use hyper::Uri;
use tokio::io::ReadBuf;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;

use hyper::rt::Read;
use hyper::rt::ReadBufCursor;

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::{Connected, Connection};
use hyper_util::rt::TokioIo;

#[derive(Clone)]
pub struct DebugConnector {
http: HttpConnector,
closes: mpsc::Sender<()>,
connects: Arc<AtomicUsize>,
is_proxy: bool,
alpn_h2: bool,
}

impl DebugConnector {
pub fn new() -> DebugConnector {
let http = HttpConnector::new();
let (tx, _) = mpsc::channel(10);
DebugConnector::with_http_and_closes(http, tx)
}

pub fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector {
DebugConnector {
http,
closes,
connects: Arc::new(AtomicUsize::new(0)),
is_proxy: false,
alpn_h2: false,
}
}

fn proxy(mut self) -> Self {
self.is_proxy = true;
self
}
}

impl tower_service::Service<Uri> for DebugConnector {
type Response = DebugStream;
type Error = <HttpConnector as tower_service::Service<Uri>>::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
// don't forget to check inner service is ready :)
tower_service::Service::<Uri>::poll_ready(&mut self.http, cx)
}

fn call(&mut self, dst: Uri) -> Self::Future {
self.connects.fetch_add(1, Ordering::SeqCst);
let closes = self.closes.clone();
let is_proxy = self.is_proxy;
let is_alpn_h2 = self.alpn_h2;
Box::pin(self.http.call(dst).map_ok(move |tcp| DebugStream {
tcp,
on_drop: closes,
is_alpn_h2,
is_proxy,
}))
}
}

pub struct DebugStream {
tcp: TokioIo<TcpStream>,
on_drop: mpsc::Sender<()>,
is_alpn_h2: bool,
is_proxy: bool,
}

impl Drop for DebugStream {
fn drop(&mut self) {
let _ = self.on_drop.try_send(());
}
}

impl Connection for DebugStream {
fn connected(&self) -> Connected {
let connected = self.tcp.connected().proxy(self.is_proxy);

if self.is_alpn_h2 {
connected.negotiated_h2()
} else {
connected
}
}
}

impl hyper::rt::Read for DebugStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: ReadBufCursor<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Read::poll_read(Pin::new(&mut self.tcp), cx, buf)
}
}

impl hyper::rt::Write for DebugStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write(Pin::new(&mut self.tcp), cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_flush(Pin::new(&mut self.tcp), cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
hyper::rt::Write::poll_shutdown(Pin::new(&mut self.tcp), cx)
}

fn is_write_vectored(&self) -> bool {
hyper::rt::Write::is_write_vectored(&self.tcp)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
hyper::rt::Write::poll_write_vectored(Pin::new(&mut self.tcp), cx, bufs)
}
}

0 comments on commit 110f522

Please sign in to comment.