From ea43ceabd2720991c95cee774d0375bad89c9e45 Mon Sep 17 00:00:00 2001 From: dswij Date: Sun, 10 Dec 2023 02:08:05 +0800 Subject: [PATCH] cleanup test --- tests/legacy/client.rs | 149 ---------------------------------------- tests/test_utils/mod.rs | 17 +---- 2 files changed, 1 insertion(+), 165 deletions(-) delete mode 100644 tests/legacy/client.rs diff --git a/tests/legacy/client.rs b/tests/legacy/client.rs deleted file mode 100644 index ea8f092..0000000 --- a/tests/legacy/client.rs +++ /dev/null @@ -1,149 +0,0 @@ -mod test_utils; - -use std::io::{self, Read, Write}; -use std::net::SocketAddr; -use std::net::TcpListener; -use std::pin::Pin; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; -use std::thread; -use std::time::Duration; - -use futures_channel::{mpsc, oneshot}; -use futures_util::future::{self, FutureExt, TryFutureExt}; -use futures_util::stream::StreamExt; -use futures_util::task::{Context, Poll}; -use futures_util::{self, Future, Stream}; -use http::Uri; -use http_body_util::{BodyExt, Empty, Full}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; -use tokio::net::TcpStream; -use tokio_test::block_on; - -use hyper::body::{Body, Bytes}; -use hyper::Request; -use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector}; -use hyper_util::client::legacy::Client; -use hyper_util::rt::TokioExecutor; - -use test_utils::DebugConnector; - -pub fn runtime() -> tokio::runtime::Runtime { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("new rt") -} - -#[test] -fn drop_body_before_eof_closes_connection() { - // https://github.com/hyperium/hyper/issues/1353 - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let rt = runtime(); - let (closes_tx, closes) = mpsc::channel::<()>(10); - let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build( - DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx), - ); - let (tx1, rx1) = oneshot::channel(); - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - let body = vec![b'x'; 1024 * 128]; - write!( - sock, - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - body.len() - ) - .expect("write head"); - let _ = sock.write_all(&body); - let _ = tx1.send(()); - }); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Empty::::new()) - .unwrap(); - let res = client.request(req).map_ok(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - }); - let rx = rx1; - rt.block_on(async move { - let (res, _) = future::join(res, rx).await; - res.unwrap(); - tokio::time::sleep(Duration::from_secs(1)).await; - }); - rt.block_on(closes.into_future()).0.expect("closes"); -} - -#[tokio::test] -async fn drop_client_closes_idle_connections() { - let _ = pretty_env_logger::try_init(); - - let server = TcpListener::bind("127.0.0.1:0").unwrap(); - let addr = server.local_addr().unwrap(); - let (closes_tx, mut closes) = mpsc::channel(10); - - let (tx1, rx1) = oneshot::channel(); - let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>(); - - thread::spawn(move || { - let mut sock = server.accept().unwrap().0; - sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); - sock.set_write_timeout(Some(Duration::from_secs(5))) - .unwrap(); - let mut buf = [0; 4096]; - sock.read(&mut buf).expect("read 1"); - let body = [b'x'; 64]; - write!( - sock, - "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", - body.len() - ) - .expect("write head"); - let _ = sock.write_all(&body); - let _ = tx1.send(()); - - // prevent this thread from closing until end of test, so the connection - // stays open and idle until Client is dropped - runtime().block_on(client_drop_rx.into_future()) - }); - - let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes( - HttpConnector::new(), - closes_tx, - )); - - let req = Request::builder() - .uri(&*format!("http://{}/a", addr)) - .body(Empty::::new()) - .unwrap(); - let res = client.request(req).map_ok(move |res| { - assert_eq!(res.status(), hyper::StatusCode::OK); - }); - let rx = rx1; - let (res, _) = future::join(res, rx).await; - res.unwrap(); - - // not closed yet, just idle - future::poll_fn(|ctx| { - assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); - Poll::Ready(()) - }) - .await; - - // drop to start the connections closing - drop(client); - - // and wait a few ticks for the connections to close - let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); - futures_util::pin_mut!(t); - let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); - future::select(t, close).await; -} diff --git a/tests/test_utils/mod.rs b/tests/test_utils/mod.rs index 2e76853..166befc 100644 --- a/tests/test_utils/mod.rs +++ b/tests/test_utils/mod.rs @@ -1,18 +1,14 @@ -use std::io; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use futures_channel::{mpsc, oneshot}; +use futures_channel::mpsc; use futures_util::task::{Context, Poll}; use futures_util::Future; use futures_util::TryFutureExt; use hyper::Uri; -use tokio::io::ReadBuf; -use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; -use hyper::rt::Read; use hyper::rt::ReadBufCursor; use hyper_util::client::legacy::connect::HttpConnector; @@ -29,12 +25,6 @@ pub struct DebugConnector { } impl DebugConnector { - pub fn new() -> DebugConnector { - let http = HttpConnector::new(); - let (tx, _) = mpsc::channel(10); - DebugConnector::with_http_and_closes(http, tx) - } - pub fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector { DebugConnector { http, @@ -44,11 +34,6 @@ impl DebugConnector { alpn_h2: false, } } - - fn proxy(mut self) -> Self { - self.is_proxy = true; - self - } } impl tower_service::Service for DebugConnector {