-
Notifications
You must be signed in to change notification settings - Fork 106
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: port legacy client integration tests
- Loading branch information
Showing
3 changed files
with
275 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
mod test_utils; | ||
|
||
use std::io::{Read, Write}; | ||
use std::net::TcpListener; | ||
use std::pin::Pin; | ||
use std::thread; | ||
use std::time::Duration; | ||
|
||
use futures_channel::{mpsc, oneshot}; | ||
use futures_util::future::{self, FutureExt, TryFutureExt}; | ||
use futures_util::stream::StreamExt; | ||
use futures_util::task::Poll; | ||
use futures_util::{self, Stream}; | ||
use http_body_util::Empty; | ||
|
||
use hyper::body::Bytes; | ||
use hyper::Request; | ||
use hyper_util::client::legacy::connect::HttpConnector; | ||
use hyper_util::client::legacy::Client; | ||
use hyper_util::rt::TokioExecutor; | ||
|
||
use test_utils::DebugConnector; | ||
|
||
pub fn runtime() -> tokio::runtime::Runtime { | ||
tokio::runtime::Builder::new_current_thread() | ||
.enable_all() | ||
.build() | ||
.expect("new rt") | ||
} | ||
|
||
#[test] | ||
fn drop_body_before_eof_closes_connection() { | ||
// https://github.com/hyperium/hyper/issues/1353 | ||
let _ = pretty_env_logger::try_init(); | ||
|
||
let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||
let addr = server.local_addr().unwrap(); | ||
let rt = runtime(); | ||
let (closes_tx, closes) = mpsc::channel::<()>(10); | ||
let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build( | ||
DebugConnector::with_http_and_closes(HttpConnector::new(), closes_tx), | ||
); | ||
let (tx1, rx1) = oneshot::channel(); | ||
thread::spawn(move || { | ||
let mut sock = server.accept().unwrap().0; | ||
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); | ||
sock.set_write_timeout(Some(Duration::from_secs(5))) | ||
.unwrap(); | ||
let mut buf = [0; 4096]; | ||
sock.read(&mut buf).expect("read 1"); | ||
let body = vec![b'x'; 1024 * 128]; | ||
write!( | ||
sock, | ||
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", | ||
body.len() | ||
) | ||
.expect("write head"); | ||
let _ = sock.write_all(&body); | ||
let _ = tx1.send(()); | ||
}); | ||
|
||
let req = Request::builder() | ||
.uri(&*format!("http://{}/a", addr)) | ||
.body(Empty::<Bytes>::new()) | ||
.unwrap(); | ||
let res = client.request(req).map_ok(move |res| { | ||
assert_eq!(res.status(), hyper::StatusCode::OK); | ||
}); | ||
let rx = rx1; | ||
rt.block_on(async move { | ||
let (res, _) = future::join(res, rx).await; | ||
res.unwrap(); | ||
tokio::time::sleep(Duration::from_secs(1)).await; | ||
}); | ||
rt.block_on(closes.into_future()).0.expect("closes"); | ||
} | ||
|
||
#[tokio::test] | ||
async fn drop_client_closes_idle_connections() { | ||
let _ = pretty_env_logger::try_init(); | ||
|
||
let server = TcpListener::bind("127.0.0.1:0").unwrap(); | ||
let addr = server.local_addr().unwrap(); | ||
let (closes_tx, mut closes) = mpsc::channel(10); | ||
|
||
let (tx1, rx1) = oneshot::channel(); | ||
let (_client_drop_tx, client_drop_rx) = oneshot::channel::<()>(); | ||
|
||
thread::spawn(move || { | ||
let mut sock = server.accept().unwrap().0; | ||
sock.set_read_timeout(Some(Duration::from_secs(5))).unwrap(); | ||
sock.set_write_timeout(Some(Duration::from_secs(5))) | ||
.unwrap(); | ||
let mut buf = [0; 4096]; | ||
sock.read(&mut buf).expect("read 1"); | ||
let body = [b'x'; 64]; | ||
write!( | ||
sock, | ||
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n", | ||
body.len() | ||
) | ||
.expect("write head"); | ||
let _ = sock.write_all(&body); | ||
let _ = tx1.send(()); | ||
|
||
// prevent this thread from closing until end of test, so the connection | ||
// stays open and idle until Client is dropped | ||
runtime().block_on(client_drop_rx.into_future()) | ||
}); | ||
|
||
let client = Client::builder(TokioExecutor::new()).build(DebugConnector::with_http_and_closes( | ||
HttpConnector::new(), | ||
closes_tx, | ||
)); | ||
|
||
let req = Request::builder() | ||
.uri(&*format!("http://{}/a", addr)) | ||
.body(Empty::<Bytes>::new()) | ||
.unwrap(); | ||
let res = client.request(req).map_ok(move |res| { | ||
assert_eq!(res.status(), hyper::StatusCode::OK); | ||
}); | ||
let rx = rx1; | ||
let (res, _) = future::join(res, rx).await; | ||
res.unwrap(); | ||
|
||
// not closed yet, just idle | ||
future::poll_fn(|ctx| { | ||
assert!(Pin::new(&mut closes).poll_next(ctx).is_pending()); | ||
Poll::Ready(()) | ||
}) | ||
.await; | ||
|
||
// drop to start the connections closing | ||
drop(client); | ||
|
||
// and wait a few ticks for the connections to close | ||
let t = tokio::time::sleep(Duration::from_millis(100)).map(|_| panic!("time out")); | ||
futures_util::pin_mut!(t); | ||
let close = closes.into_future().map(|(opt, _)| opt.expect("closes")); | ||
future::select(t, close).await; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
use std::pin::Pin; | ||
use std::sync::atomic::{AtomicUsize, Ordering}; | ||
use std::sync::Arc; | ||
|
||
use futures_channel::mpsc; | ||
use futures_util::task::{Context, Poll}; | ||
use futures_util::Future; | ||
use futures_util::TryFutureExt; | ||
use hyper::Uri; | ||
use tokio::net::TcpStream; | ||
|
||
use hyper::rt::ReadBufCursor; | ||
|
||
use hyper_util::client::legacy::connect::HttpConnector; | ||
use hyper_util::client::legacy::connect::{Connected, Connection}; | ||
use hyper_util::rt::TokioIo; | ||
|
||
#[derive(Clone)] | ||
pub struct DebugConnector { | ||
http: HttpConnector, | ||
closes: mpsc::Sender<()>, | ||
connects: Arc<AtomicUsize>, | ||
is_proxy: bool, | ||
alpn_h2: bool, | ||
} | ||
|
||
impl DebugConnector { | ||
pub fn with_http_and_closes(http: HttpConnector, closes: mpsc::Sender<()>) -> DebugConnector { | ||
DebugConnector { | ||
http, | ||
closes, | ||
connects: Arc::new(AtomicUsize::new(0)), | ||
is_proxy: false, | ||
alpn_h2: false, | ||
} | ||
} | ||
} | ||
|
||
impl tower_service::Service<Uri> for DebugConnector { | ||
type Response = DebugStream; | ||
type Error = <HttpConnector as tower_service::Service<Uri>>::Error; | ||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>; | ||
|
||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> { | ||
// don't forget to check inner service is ready :) | ||
tower_service::Service::<Uri>::poll_ready(&mut self.http, cx) | ||
} | ||
|
||
fn call(&mut self, dst: Uri) -> Self::Future { | ||
self.connects.fetch_add(1, Ordering::SeqCst); | ||
let closes = self.closes.clone(); | ||
let is_proxy = self.is_proxy; | ||
let is_alpn_h2 = self.alpn_h2; | ||
Box::pin(self.http.call(dst).map_ok(move |tcp| DebugStream { | ||
tcp, | ||
on_drop: closes, | ||
is_alpn_h2, | ||
is_proxy, | ||
})) | ||
} | ||
} | ||
|
||
pub struct DebugStream { | ||
tcp: TokioIo<TcpStream>, | ||
on_drop: mpsc::Sender<()>, | ||
is_alpn_h2: bool, | ||
is_proxy: bool, | ||
} | ||
|
||
impl Drop for DebugStream { | ||
fn drop(&mut self) { | ||
let _ = self.on_drop.try_send(()); | ||
} | ||
} | ||
|
||
impl Connection for DebugStream { | ||
fn connected(&self) -> Connected { | ||
let connected = self.tcp.connected().proxy(self.is_proxy); | ||
|
||
if self.is_alpn_h2 { | ||
connected.negotiated_h2() | ||
} else { | ||
connected | ||
} | ||
} | ||
} | ||
|
||
impl hyper::rt::Read for DebugStream { | ||
fn poll_read( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: ReadBufCursor<'_>, | ||
) -> Poll<Result<(), std::io::Error>> { | ||
hyper::rt::Read::poll_read(Pin::new(&mut self.tcp), cx, buf) | ||
} | ||
} | ||
|
||
impl hyper::rt::Write for DebugStream { | ||
fn poll_write( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
buf: &[u8], | ||
) -> Poll<Result<usize, std::io::Error>> { | ||
hyper::rt::Write::poll_write(Pin::new(&mut self.tcp), cx, buf) | ||
} | ||
|
||
fn poll_flush( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), std::io::Error>> { | ||
hyper::rt::Write::poll_flush(Pin::new(&mut self.tcp), cx) | ||
} | ||
|
||
fn poll_shutdown( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
) -> Poll<Result<(), std::io::Error>> { | ||
hyper::rt::Write::poll_shutdown(Pin::new(&mut self.tcp), cx) | ||
} | ||
|
||
fn is_write_vectored(&self) -> bool { | ||
hyper::rt::Write::is_write_vectored(&self.tcp) | ||
} | ||
|
||
fn poll_write_vectored( | ||
mut self: Pin<&mut Self>, | ||
cx: &mut Context<'_>, | ||
bufs: &[std::io::IoSlice<'_>], | ||
) -> Poll<Result<usize, std::io::Error>> { | ||
hyper::rt::Write::poll_write_vectored(Pin::new(&mut self.tcp), cx, bufs) | ||
} | ||
} |