diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index 1585c4f..d2ffa64 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -100,7 +100,7 @@ jobs: env: # Can't enable tcp feature since Miri does not support the tokio runtime MIRIFLAGS: "-Zmiri-disable-isolation" - run: cargo miri test + run: cargo miri test --all-features features: name: features diff --git a/Cargo.toml b/Cargo.toml index d21f02e..d69999b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ bytes = "1" http-body-util = "0.1.0" tokio = { version = "1", features = ["macros", "test-util"] } tokio-test = "0.4" +pretty_env_logger = "0.5" [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] pnet_datalink = "0.34.0" diff --git a/src/rt/tokio.rs b/src/rt/tokio.rs index 0a6b678..a75c0e3 100644 --- a/src/rt/tokio.rs +++ b/src/rt/tokio.rs @@ -72,6 +72,11 @@ impl TokioIo { &self.inner } + /// Mut borrow the inner type. + pub fn inner_mut(&mut self) -> &mut T { + &mut self.inner + } + /// Consume this wrapper and get the inner type. pub fn into_inner(self) -> T { self.inner diff --git a/tests/legacy_client.rs b/tests/legacy_client.rs new file mode 100644 index 0000000..3aab054 --- /dev/null +++ b/tests/legacy_client.rs @@ -0,0 +1,878 @@ +mod test_utils; + +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener}; +use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::task::Poll; +use std::thread; +use std::time::Duration; + +use futures_channel::{mpsc, oneshot}; +use futures_util::future::{self, FutureExt, TryFutureExt}; +use futures_util::stream::StreamExt; +use futures_util::{self, Stream}; +use http_body_util::BodyExt; +use http_body_util::{Empty, Full, StreamBody}; + +use hyper::body::Bytes; +use hyper::body::Frame; +use hyper::Request; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Client; +use hyper_util::rt::{TokioExecutor, TokioIo}; + +use test_utils::{DebugConnector, DebugStream}; + +pub fn runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .expect("new rt") +} + +fn s(buf: &[u8]) -> &str { + std::str::from_utf8(buf).expect("from_utf8") +} + +#[cfg(not(miri))] +#[test] +fn drop_body_before_eof_closes_connection() { + // https://github.com/hyperium/hyper/issues/1353 + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + let (closes_tx, closes) = mpsc::channel::<()>(10); + let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build( + DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx), + ); + let (tx1, rx1) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + let body = vec![b'x'; 1024 * 128]; + write!( + sock, + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + body.len() + ) + .expect("write head"); + let _ = sock.write_all(&body); + let _ = tx1.send(()); + }); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req).map_ok(move |res| { + assert_eq!(res.status(), hyper::StatusCode::OK); + }); + let rx = rx1; + rt.block_on(async move { + let (res, _) = future::join(res, rx).await; + res.unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + }); + rt.block_on(closes.into_future()).0.expect("closes"); +} + +#[cfg(not(miri))] +#[tokio::test] +async fn drop_client_closes_idle_connections() { + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let (closes_tx, mut closes) = mpsc::channel(10); + + let (tx1, rx1) = oneshot::channel(); + let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + let body = [b'x'; 64]; + write!( + sock, + "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", + body.len() + ) + .expect("write head"); + let _ = sock.write_all(&body); + let _ = tx1.send(()); + + // prevent this thread from closing until end of test, so the connection + // stays open and idle until Client is dropped + runtime().block_on(client_drop_rx.into_future()) + }); + + let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes( + HttpConnector::new(), + closes_tx, + )); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req).map_ok(move |res| { + assert_eq!(res.status(), hyper::StatusCode::OK); + }); + let rx = rx1; + let (res, _) = future::join(res, rx).await; + res.unwrap(); + + // not closed yet, just idle + future::poll_fn(|ctx| { + assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); + Poll::Ready(()) + }) + .await; + + // drop to start the connections closing + drop(client); + + // and wait a few ticks for the connections to close + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(t, close).await; +} + +#[cfg(not(miri))] +#[tokio::test] +async fn drop_response_future_closes_in_progress_connection() { + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let (closes_tx, closes) = mpsc::channel(10); + + let (tx1, rx1) = oneshot::channel(); + let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + // we never write a response head + // simulates a slow server operation + let _ = tx1.send(()); + + // prevent this thread from closing until end of test, so the connection + // stays open and idle until Client is dropped + let _ = client_drop_rx.recv(); + }); + + let res = { + let client = Client::builder(TokioExecutor::new()).build( + DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx), + ); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + client.request(req).map(|_| unreachable!()) + }; + + future::select(res, rx1).await; + + // res now dropped + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(t, close).await; +} + +#[cfg(not(miri))] +#[tokio::test] +async fn drop_response_body_closes_in_progress_connection() { + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let (closes_tx, closes) = mpsc::channel(10); + + let (tx1, rx1) = oneshot::channel(); + let (_client_drop_tx, client_drop_rx) = std::sync::mpsc::channel::<()>(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + write!( + sock, + "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n" + ) + .expect("write head"); + let _ = tx1.send(()); + + // prevent this thread from closing until end of test, so the connection + // stays open and idle until Client is dropped + let _ = client_drop_rx.recv(); + }); + + let rx = rx1; + let res = { + let client = Client::builder(TokioExecutor::new()).build( + DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx), + ); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + // notably, haven't read body yet + client.request(req) + }; + + let (res, _) = future::join(res, rx).await; + // drop the body + res.unwrap(); + + // and wait a few ticks to see the connection drop + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(t, close).await; +} + +#[cfg(not(miri))] +#[tokio::test] +async fn no_keep_alive_closes_connection() { + // https://github.com/hyperium/hyper/issues/1383 + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let (closes_tx, closes) = mpsc::channel(10); + + let (tx1, rx1) = oneshot::channel(); + let (_tx2, rx2) = std::sync::mpsc::channel::<()>(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .unwrap(); + let _ = tx1.send(()); + + // prevent this thread from closing until end of test, so the connection + // stays open and idle until Client is dropped + let _ = rx2.recv(); + }); + + let client = Client::builder(TokioExecutor::new()) + .pool_max_idle_per_host(0) + .build(DebugConnector::with_http_and_closes( + HttpConnector::new(), + closes_tx, + )); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req).map_ok(move |res| { + assert_eq!(res.status(), hyper::StatusCode::OK); + }); + let rx = rx1; + let (res, _) = future::join(res, rx).await; + res.unwrap(); + + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(close, t).await; +} + +#[cfg(not(miri))] +#[tokio::test] +async fn socket_disconnect_closes_idle_conn() { + // notably when keep-alive is enabled + let _ = pretty_env_logger::try_init(); + + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let (closes_tx, closes) = mpsc::channel(10); + + let (tx1, rx1) = oneshot::channel(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .unwrap(); + let _ = tx1.send(()); + }); + + let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes( + HttpConnector::new(), + closes_tx, + )); + + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req).map_ok(move |res| { + assert_eq!(res.status(), hyper::StatusCode::OK); + }); + let rx = rx1; + + let (res, _) = future::join(res, rx).await; + res.unwrap(); + + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(t, close).await; +} + +#[cfg(not(miri))] +#[test] +fn connect_call_is_lazy() { + // We especially don't want connects() triggered if there's + // idle connections that the Checkout would have found + let _ = pretty_env_logger::try_init(); + + let _rt = runtime(); + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + assert_eq!(connects.load(Ordering::Relaxed), 0); + let req = Request::builder() + .uri("http://hyper.local/a") + .body(Empty::::new()) + .unwrap(); + let _fut = client.request(req); + // internal Connect::connect should have been lazy, and not + // triggered an actual connect yet. + assert_eq!(connects.load(Ordering::Relaxed), 0); +} + +#[cfg(not(miri))] +#[test] +fn client_keep_alive_0() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + let _ = tx1.send(()); + + let n2 = sock.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 2"); + let _ = tx2.send(()); + }); + + assert_eq!(connects.load(Ordering::SeqCst), 0); + + let rx = rx1; + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::SeqCst), 1); + + // sleep real quick to let the threadpool put connection in ready + // state and back into client pool + thread::sleep(Duration::from_millis(50)); + + let rx = rx2; + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + + assert_eq!( + connects.load(Ordering::SeqCst), + 1, + "second request should still only have 1 connect" + ); + drop(client); +} + +#[cfg(not(miri))] +#[test] +fn client_keep_alive_extra_body() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 5\r\n\r\nhello") + .expect("write 1"); + // the body "hello", while ignored because its a HEAD request, should mean the connection + // cannot be put back in the pool + let _ = tx1.send(()); + + let mut sock2 = server.accept().unwrap().0; + let n2 = sock2.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock2 + .write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 2"); + let _ = tx2.send(()); + }); + + assert_eq!(connects.load(Ordering::Relaxed), 0); + + let rx = rx1; + let req = Request::builder() + .method("HEAD") + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 1); + + let rx = rx2; + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + + assert_eq!(connects.load(Ordering::Relaxed), 2); +} + +#[cfg(not(miri))] +#[tokio::test] +async fn client_keep_alive_when_response_before_request_body_ends() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + + let (closes_tx, mut closes) = mpsc::channel::<()>(10); + let connector = DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx); + let connects = connector.connects.clone(); + let client = Client::builder(TokioExecutor::new()).build(connector.clone()); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + let (_tx3, rx3) = std::sync::mpsc::channel::<()>(); + + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + // after writing the response, THEN stream the body + let _ = tx1.send(()); + + sock.read(&mut buf).expect("read 2"); + let _ = tx2.send(()); + + // prevent this thread from closing until end of test, so the connection + // stays open and idle until Client is dropped + let _ = rx3.recv(); + }); + + assert_eq!(connects.load(Ordering::Relaxed), 0); + + let delayed_body = rx1 + .then(|_| Box::pin(tokio::time::sleep(Duration::from_millis(200)))) + .map(|_| Ok::<_, ()>(Frame::data(&b"hello a"[..]))) + .map_err(|_| -> hyper::Error { panic!("rx1") }) + .into_stream(); + + let req = Request::builder() + .method("POST") + .uri(&*format!("http://{}/a", addr)) + .body(StreamBody::new(delayed_body)) + .unwrap(); + let res = client.request(req).map_ok(move |res| { + assert_eq!(res.status(), hyper::StatusCode::OK); + }); + + future::join(res, rx2).await.0.unwrap(); + future::poll_fn(|ctx| { + assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); + Poll::Ready(()) + }) + .await; + + assert_eq!(connects.load(Ordering::Relaxed), 1); + + drop(client); + let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); + futures_util::pin_mut!(t); + let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); + future::select(t, close).await; +} + +#[cfg(not(miri))] +#[tokio::test] +async fn client_keep_alive_eager_when_chunked() { + // If a response body has been read to completion, with completion + // determined by some other factor, like decompression, and thus + // it is in't polled a final time to clear the final 0-len chunk, + // try to eagerly clear it so the connection can still be used. + + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let connector = DebugConnector::new(); + let connects = connector.connects.clone(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all( + b"\ + HTTP/1.1 200 OK\r\n\ + transfer-encoding: chunked\r\n\ + \r\n\ + 5\r\n\ + hello\r\n\ + 0\r\n\r\n\ + ", + ) + .expect("write 1"); + let _ = tx1.send(()); + + let n2 = sock.read(&mut buf).expect("read 2"); + assert_ne!(n2, 0, "bytes of second request"); + let second_get = "GET /b HTTP/1.1\r\n"; + assert_eq!(s(&buf[..second_get.len()]), second_get); + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 2"); + let _ = tx2.send(()); + }); + + assert_eq!(connects.load(Ordering::SeqCst), 0); + + let rx = rx1; + let req = Request::builder() + .uri(&*format!("http://{}/a", addr)) + .body(Empty::::new()) + .unwrap(); + let fut = client.request(req); + + let resp = future::join(fut, rx).map(|r| r.0).await.unwrap(); + assert_eq!(connects.load(Ordering::SeqCst), 1); + assert_eq!(resp.status(), 200); + assert_eq!(resp.headers()["transfer-encoding"], "chunked"); + + // Read the "hello" chunk... + let chunk = resp.collect().await.unwrap().to_bytes(); + assert_eq!(chunk, "hello"); + + // sleep real quick to let the threadpool put connection in ready + // state and back into client pool + tokio::time::sleep(Duration::from_millis(50)).await; + + let rx = rx2; + let req = Request::builder() + .uri(&*format!("http://{}/b", addr)) + .body(Empty::::new()) + .unwrap(); + let fut = client.request(req); + future::join(fut, rx).map(|r| r.0).await.unwrap(); + + assert_eq!( + connects.load(Ordering::SeqCst), + 1, + "second request should still only have 1 connect" + ); + drop(client); +} + +#[cfg(not(miri))] +#[test] +fn connect_proxy_sends_absolute_uri() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + let connector = DebugConnector::new().proxy(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + let n = sock.read(&mut buf).expect("read 1"); + let expected = format!( + "GET http://{addr}/foo/bar HTTP/1.1\r\nhost: {addr}\r\n\r\n", + addr = addr + ); + assert_eq!(s(&buf[..n]), expected); + + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + let _ = tx1.send(()); + }); + + let rx = rx1; + let req = Request::builder() + .uri(&*format!("http://{}/foo/bar", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); +} + +#[cfg(not(miri))] +#[test] +fn connect_proxy_http_connect_sends_authority_form() { + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + let connector = DebugConnector::new().proxy(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + //drop(server); + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + let n = sock.read(&mut buf).expect("read 1"); + let expected = format!( + "CONNECT {addr} HTTP/1.1\r\nhost: {addr}\r\n\r\n", + addr = addr + ); + assert_eq!(s(&buf[..n]), expected); + + sock.write_all(b"HTTP/1.1 200 OK\r\nContent-Length: 0\r\n\r\n") + .expect("write 1"); + let _ = tx1.send(()); + }); + + let rx = rx1; + let req = Request::builder() + .method("CONNECT") + .uri(&*format!("http://{}/useless/path", addr)) + .body(Empty::::new()) + .unwrap(); + let res = client.request(req); + rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); +} + +#[cfg(not(miri))] +#[test] +fn client_upgrade() { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + + let _ = pretty_env_logger::try_init(); + let server = TcpListener::bind("127.0.0.1:0").unwrap(); + let addr = server.local_addr().unwrap(); + let rt = runtime(); + + let connector = DebugConnector::new(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + let (tx1, rx1) = oneshot::channel(); + thread::spawn(move || { + let mut sock = server.accept().unwrap().0; + sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); + sock.set_write_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let mut buf = [0; 4096]; + sock.read(&mut buf).expect("read 1"); + sock.write_all( + b"\ + HTTP/1.1 101 Switching Protocols\r\n\ + Upgrade: foobar\r\n\ + \r\n\ + foobar=ready\ + ", + ) + .unwrap(); + let _ = tx1.send(()); + + let n = sock.read(&mut buf).expect("read 2"); + assert_eq!(&buf[..n], b"foo=bar"); + sock.write_all(b"bar=foo").expect("write 2"); + }); + + let rx = rx1; + + let req = Request::builder() + .method("GET") + .uri(&*format!("http://{}/up", addr)) + .body(Empty::::new()) + .unwrap(); + + let res = client.request(req); + let res = rt.block_on(future::join(res, rx).map(|r| r.0)).unwrap(); + + assert_eq!(res.status(), 101); + let upgraded = rt.block_on(hyper::upgrade::on(res)).expect("on_upgrade"); + + let parts = upgraded.downcast::().unwrap(); + assert_eq!(s(&parts.read_buf), "foobar=ready"); + + let mut io = parts.io; + rt.block_on(io.write_all(b"foo=bar")).unwrap(); + let mut vec = vec![]; + rt.block_on(io.read_to_end(&mut vec)).unwrap(); + assert_eq!(vec, b"bar=foo"); +} + +#[cfg(not(miri))] +#[test] +fn alpn_h2() { + use http::Response; + use hyper::service::service_fn; + use tokio::net::TcpListener; + + let _ = pretty_env_logger::try_init(); + let rt = runtime(); + let listener = rt + .block_on(TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0)))) + .unwrap(); + let addr = listener.local_addr().unwrap(); + let mut connector = DebugConnector::new(); + connector.alpn_h2 = true; + let connects = connector.connects.clone(); + + let client = Client::builder(TokioExecutor::new()).build(connector); + + rt.spawn(async move { + let (stream, _) = listener.accept().await.expect("accept"); + let stream = TokioIo::new(stream); + let _ = hyper::server::conn::http2::Builder::new(TokioExecutor::new()) + .serve_connection( + stream, + service_fn(|req| async move { + assert_eq!(req.headers().get("host"), None); + Ok::<_, hyper::Error>(Response::new(Full::::from("Hello, world"))) + }), + ) + .await + .expect("server"); + }); + + assert_eq!(connects.load(Ordering::SeqCst), 0); + + let url = format!("http://{}/a", addr) + .parse::<::hyper::Uri>() + .unwrap(); + let res1 = client.get(url.clone()); + let res2 = client.get(url.clone()); + let res3 = client.get(url.clone()); + rt.block_on(future::try_join3(res1, res2, res3)).unwrap(); + + // Since the client doesn't know it can ALPN at first, it will have + // started 3 connections. But, the server above will only handle 1, + // so the unwrapped responses futures show it still worked. + assert_eq!(connects.load(Ordering::SeqCst), 3); + + let res4 = client.get(url.clone()); + rt.block_on(res4).unwrap(); + + // HTTP/2 request allowed + let res5 = client.request( + Request::builder() + .uri(url) + .version(hyper::Version::HTTP_2) + .body(Empty::::new()) + .unwrap(), + ); + rt.block_on(res5).unwrap(); + + assert_eq!( + connects.load(Ordering::SeqCst), + 3, + "after ALPN, no more connects" + ); + drop(client); +} diff --git a/tests/test_utils/mod.rs b/tests/test_utils/mod.rs new file mode 100644 index 0000000..df3a65d --- /dev/null +++ b/tests/test_utils/mod.rs @@ -0,0 +1,175 @@ +use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use futures_channel::mpsc; +use futures_util::task::{Context, Poll}; +use futures_util::Future; +use futures_util::TryFutureExt; +use hyper::Uri; +use tokio::io::{self, AsyncRead, AsyncWrite, ReadBuf}; +use tokio::net::TcpStream; + +use hyper::rt::ReadBufCursor; + +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::connect::{Connected, Connection}; +use hyper_util::rt::TokioIo; + +#[derive(Clone)] +pub struct DebugConnector { + pub http: HttpConnector, + pub closes: mpsc::Sender<()>, + pub connects: Arc, + pub is_proxy: bool, + pub alpn_h2: bool, +} + +impl DebugConnector { + pub fn new() -> DebugConnector { + let http = HttpConnector::new(); + let (tx, _) = mpsc::channel(10); + DebugConnector::with_http_and_closes(http, tx) + } + + pub fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector { + DebugConnector { + http, + closes, + connects: Arc::new(AtomicUsize::new(0)), + is_proxy: false, + alpn_h2: false, + } + } + + pub fn proxy(mut self) -> Self { + self.is_proxy = true; + self + } +} + +impl tower_service::Service for DebugConnector { + type Response = DebugStream; + type Error = >::Error; + type Future = Pin> + Send>>; + + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + // don't forget to check inner service is ready :) + tower_service::Service::::poll_ready(&mut self.http, cx) + } + + fn call(&mut self, dst: Uri) -> Self::Future { + self.connects.fetch_add(1, Ordering::SeqCst); + let closes = self.closes.clone(); + let is_proxy = self.is_proxy; + let is_alpn_h2 = self.alpn_h2; + Box::pin(self.http.call(dst).map_ok(move |tcp| DebugStream { + tcp, + on_drop: closes, + is_alpn_h2, + is_proxy, + })) + } +} + +pub struct DebugStream { + tcp: TokioIo, + on_drop: mpsc::Sender<()>, + is_alpn_h2: bool, + is_proxy: bool, +} + +impl Drop for DebugStream { + fn drop(&mut self) { + let _ = self.on_drop.try_send(()); + } +} + +impl Connection for DebugStream { + fn connected(&self) -> Connected { + let connected = self.tcp.connected().proxy(self.is_proxy); + + if self.is_alpn_h2 { + connected.negotiated_h2() + } else { + connected + } + } +} + +impl hyper::rt::Read for DebugStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: ReadBufCursor<'_>, + ) -> Poll> { + hyper::rt::Read::poll_read(Pin::new(&mut self.tcp), cx, buf) + } +} + +impl hyper::rt::Write for DebugStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(Pin::new(&mut self.tcp), cx, buf) + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_flush(Pin::new(&mut self.tcp), cx) + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(Pin::new(&mut self.tcp), cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.tcp) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(Pin::new(&mut self.tcp), cx, bufs) + } +} + +impl AsyncWrite for DebugStream { + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + Pin::new(self.tcp.inner_mut()).poll_shutdown(cx) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(self.tcp.inner_mut()).poll_flush(cx) + } + + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(self.tcp.inner_mut()).poll_write(cx, buf) + } +} + +impl AsyncRead for DebugStream { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(self.tcp.inner_mut()).poll_read(cx, buf) + } +}