Skip to content

Commit

Permalink
more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dswij committed Dec 25, 2023
1 parent ef21699 commit 65d8bdb
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 103 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ http-body-util = "0.1.0"
tokio = { version = "1", features = ["macros", "test-util", "tracing"] }
tokio-test = "0.4"
pretty_env_logger = "0.5"
console-subscriber = "*"

[target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies]
pnet_datalink = "0.34.0"
Expand Down
214 changes: 114 additions & 100 deletions tests/legacy_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use futures_channel::{mpsc, oneshot};
use futures_util::future::{self, FutureExt, TryFutureExt};
use futures_util::stream::StreamExt;
use futures_util::{self, Stream};
use http_body_util::{BodyStream, Empty, Full};
use http_body_util::BodyExt;
use http_body_util::{Empty, Full};

use hyper::body::Bytes;
use hyper::Request;
Expand Down Expand Up @@ -148,6 +149,7 @@ async fn drop_client_closes_idle_connections() {
future::select(t, close).await;
}

#[cfg(not(miri))]
#[tokio::test]
async fn drop_response_future_closes_in_progress_connection() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -196,6 +198,7 @@ async fn drop_response_future_closes_in_progress_connection() {
future::select(t, close).await;
}

#[cfg(not(miri))]
#[tokio::test]
async fn drop_response_body_closes_in_progress_connection() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -251,6 +254,7 @@ async fn drop_response_body_closes_in_progress_connection() {
future::select(t, close).await;
}

#[cfg(not(miri))]
#[tokio::test]
async fn no_keep_alive_closes_connection() {
// https://github.com/hyperium/hyper/issues/1383
Expand Down Expand Up @@ -303,6 +307,7 @@ async fn no_keep_alive_closes_connection() {
future::select(close, t).await;
}

#[cfg(not(miri))]
#[tokio::test]
async fn socket_disconnect_closes_idle_conn() {
// notably when keep-alive is enabled
Expand Down Expand Up @@ -349,6 +354,7 @@ async fn socket_disconnect_closes_idle_conn() {
future::select(t, close).await;
}

#[cfg(not(miri))]
#[test]
fn connect_call_is_lazy() {
// We especially don't want connects() triggered if there's
Expand All @@ -372,6 +378,7 @@ fn connect_call_is_lazy() {
assert_eq!(connects.load(Ordering::Relaxed), 0);
}

#[cfg(not(miri))]
#[test]
fn client_keep_alive_0() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -438,6 +445,7 @@ fn client_keep_alive_0() {
drop(client);
}

#[cfg(not(miri))]
#[test]
fn client_keep_alive_extra_body() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -509,8 +517,9 @@ fn client_keep_alive_extra_body() {
//
// let connector = DebugConnector::new();
// let connects = connector.connects.clone();
// let executor = TokioExecutor::new();
//
// let client = Client::builder(TokioExecutor::new()).build(connector);
// let client = Client::builder(executor.clone()).build(connector.clone());
//
// let (tx1, rx1) = oneshot::channel();
// let (tx2, rx2) = oneshot::channel();
Expand Down Expand Up @@ -542,126 +551,128 @@ fn client_keep_alive_extra_body() {
// assert_eq!(connects.load(Ordering::Relaxed), 0);
//
// let delayed_body = rx1
// .then(|_| tokio::time::sleep(Duration::from_millis(200)))
// .map(|_| Ok::<_, ()>("hello a"))
// .then(|_| Box::pin(tokio::time::sleep(Duration::from_millis(200))))
// .map(|_| Ok::<_, ()>(Frame::data(&b"hello a"[..])))
// .map_err(|_| -> hyper::Error { panic!("rx1") })
// .into_stream();
//
// let rx = rx2;
// let req = Request::builder()
// .method("POST")
// .uri(&*format!("http://{}/a", addr))
// .body(BodyStream::new(delayed_body))
// .body(StreamBody::new(delayed_body))
// .unwrap();
// let client2 = client.clone();
//
// let client2 = Client::builder(executor.clone()).build(connector.clone());
// let res = client.request(req).map_ok(move |res| {
// assert_eq!(res.status(), hyper::StatusCode::OK);
// });
// // req 1
// let fut = future::join(client.request(req), rx)
// let fut = future::join(res, rx2)
// .then(|_| tokio::time::sleep(Duration::from_millis(200)))
// // req 2
// // req 2
// .then(move |()| {
// let rx = rx3.expect("thread panicked");
// let rx = rx3;
// let req = Request::builder()
// .uri(&*format!("http://{}/b", addr))
// .body(Empty::<Bytes>::new())
// .unwrap();
// future::join(client2.request(req), rx).map(|r| r.0)
// let res = client2.request(req).map_ok(move |res| {
// assert_eq!(res.status(), hyper::StatusCode::OK);
// });
// future::join(res, rx).map(|r| r.0)
// });
//
// rt.block_on(fut).unwrap();
//
// assert_eq!(connects.load(Ordering::Relaxed), 1);
// }

// #[tokio::test]
// async fn client_keep_alive_eager_when_chunked() {
// // If a response body has been read to completion, with completion
// // determined by some other factor, like decompression, and thus
// // it is in't polled a final time to clear the final 0-len chunk,
// // try to eagerly clear it so the connection can still be used.
//
// let _ = pretty_env_logger::try_init();
// let server = TcpListener::bind("127.0.0.1:0").unwrap();
// let addr = server.local_addr().unwrap();
// let connector = DebugConnector::new();
// let connects = connector.connects.clone();
//
// let client = Client::builder(TokioExecutor::new()).build(connector);
//
// let (tx1, rx1) = oneshot::channel();
// let (tx2, rx2) = oneshot::channel();
// thread::spawn(move || {
// let mut sock = server.accept().unwrap().0;
// //drop(server);
// sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
// sock.set_write_timeout(Some(Duration::from_secs(5)))
// .unwrap();
// let mut buf = [0; 4096];
// sock.read(&mut buf).expect("read 1");
// sock.write_all(
// b"\
// HTTP/1.1 200 OK\r\n\
// transfer-encoding: chunked\r\n\
// \r\n\
// 5\r\n\
// hello\r\n\
// 0\r\n\r\n\
// ",
// )
// .expect("write 1");
// let _ = tx1.send(());
//
// let n2 = sock.read(&mut buf).expect("read 2");
// assert_ne!(n2, 0, "bytes of second request");
// let second_get = "GET /b HTTP/1.1\r\n";
// assert_eq!(s(&buf[..second_get.len()]), second_get);
// sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
// .expect("write 2");
// let _ = tx2.send(());
// });
//
// assert_eq!(connects.load(Ordering::SeqCst), 0);
//
// let rx = rx1;
// let req = Request::builder()
// .uri(&*format!("http://{}/a", addr))
// .body(Empty::<Bytes>::new())
// .unwrap();
// let fut = client.request(req);
//
// let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap();
// assert_eq!(connects.load(Ordering::SeqCst), 1);
// assert_eq!(resp.status(), 200);
// assert_eq!(resp.headers()["transfer-encoding"], "chunked");
//
// // Read the "hello" chunk...
// let chunk = resp.body_mut().data().await.unwrap().unwrap();
// assert_eq!(chunk, "hello");
//
// // With our prior knowledge, we know that's the end of the body.
// // So just drop the body, without polling for the `0\r\n\r\n` end.
// drop(resp);
//
// // sleep real quick to let the threadpool put connection in ready
// // state and back into client pool
// tokio::time::sleep(Duration::from_millis(50)).await;
//
// let rx = rx2;
// let req = Request::builder()
// .uri(&*format!("http://{}/b", addr))
// .body(Empty::<Bytes>::new())
// .unwrap();
// let fut = client.request(req);
// future::join(fut, rx).map(|r| r.0).await.unwrap();
//
// assert_eq!(
// connects.load(Ordering::SeqCst),
// 1,
// "second request should still only have 1 connect"
// );
// drop(client);
// }
#[cfg(not(miri))]
#[tokio::test]
async fn client_keep_alive_eager_when_chunked() {
// If a response body has been read to completion, with completion
// determined by some other factor, like decompression, and thus
// it is in't polled a final time to clear the final 0-len chunk,
// try to eagerly clear it so the connection can still be used.

let _ = pretty_env_logger::try_init();
let server = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = server.local_addr().unwrap();
let connector = DebugConnector::new();
let connects = connector.connects.clone();

let client = Client::builder(TokioExecutor::new()).build(connector);

let (tx1, rx1) = oneshot::channel();
let (tx2, rx2) = oneshot::channel();
thread::spawn(move || {
let mut sock = server.accept().unwrap().0;
//drop(server);
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap();
sock.set_write_timeout(Some(Duration::from_secs(5)))
.unwrap();
let mut buf = [0; 4096];
sock.read(&mut buf).expect("read 1");
sock.write_all(
b"\
HTTP/1.1 200 OK\r\n\
transfer-encoding: chunked\r\n\
\r\n\
5\r\n\
hello\r\n\
0\r\n\r\n\
",
)
.expect("write 1");
let _ = tx1.send(());

let n2 = sock.read(&mut buf).expect("read 2");
assert_ne!(n2, 0, "bytes of second request");
let second_get = "GET /b HTTP/1.1\r\n";
assert_eq!(s(&buf[..second_get.len()]), second_get);
sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n")
.expect("write 2");
let _ = tx2.send(());
});

assert_eq!(connects.load(Ordering::SeqCst), 0);

let rx = rx1;
let req = Request::builder()
.uri(&*format!("http://{}/a", addr))
.body(Empty::<Bytes>::new())
.unwrap();
let fut = client.request(req);

let mut resp = future::join(fut, rx).map(|r| r.0).await.unwrap();
assert_eq!(connects.load(Ordering::SeqCst), 1);
assert_eq!(resp.status(), 200);
assert_eq!(resp.headers()["transfer-encoding"], "chunked");

// Read the "hello" chunk...
let chunk = resp.collect().await.unwrap().to_bytes();
assert_eq!(chunk, "hello");

// sleep real quick to let the threadpool put connection in ready
// state and back into client pool
tokio::time::sleep(Duration::from_millis(50)).await;

let rx = rx2;
let req = Request::builder()
.uri(&*format!("http://{}/b", addr))
.body(Empty::<Bytes>::new())
.unwrap();
let fut = client.request(req);
future::join(fut, rx).map(|r| r.0).await.unwrap();

assert_eq!(
connects.load(Ordering::SeqCst),
1,
"second request should still only have 1 connect"
);
drop(client);
}

#[cfg(not(miri))]
#[test]
fn connect_proxy_sends_absolute_uri() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -701,6 +712,7 @@ fn connect_proxy_sends_absolute_uri() {
rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
}

#[cfg(not(miri))]
#[test]
fn connect_proxy_http_connect_sends_authority_form() {
let _ = pretty_env_logger::try_init();
Expand Down Expand Up @@ -741,6 +753,7 @@ fn connect_proxy_http_connect_sends_authority_form() {
rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap();
}

#[cfg(not(miri))]
#[test]
fn client_upgrade() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
Expand Down Expand Up @@ -802,6 +815,7 @@ fn client_upgrade() {
assert_eq!(vec, b"bar=foo");
}

#[cfg(not(miri))]
#[test]
fn alpn_h2() {
use http::Response;
Expand Down
2 changes: 0 additions & 2 deletions tests/test_utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;

use hyper::rt::ReadBufCursor;
use hyper::rt::{Read, Write};

use hyper_util::client::legacy::connect::HttpConnector;
use hyper_util::client::legacy::connect::{Connected, Connection};
Expand Down Expand Up @@ -82,7 +81,6 @@ pub struct DebugStream {

impl Drop for DebugStream {
fn drop(&mut self) {
println!("DROPPING STREAM");
let _ = self.on_drop.try_send(());
}
}
Expand Down

0 comments on commit 65d8bdb

Please sign in to comment.