From c0655854cc1c61b32c24c3e3192ebdafc5dd22b5 Mon Sep 17 00:00:00 2001 From: Programatik Date: Sat, 5 Nov 2022 18:26:33 +0300 Subject: [PATCH 01/10] feat(server): add AutoConn --- Cargo.toml | 5 +- src/lib.rs | 1 + src/rt/tokio_executor.rs | 2 +- src/server/mod.rs | 268 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 274 insertions(+), 2 deletions(-) create mode 100644 src/server/mod.rs diff --git a/Cargo.toml b/Cargo.toml index fa25f55..64376aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,9 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = "0.14.19" +hyper = { version = "1.0.0-rc.1", features = ["server", "http1", "http2"] } +http-body = "1.0.0-rc1" +http-body-util = "0.1.0-rc.1" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" @@ -31,6 +33,7 @@ tower-service = "0.3" tower = { version = "0.4", features = ["util"] } [dev-dependencies] +hyper = { version = "1.0.0-rc.1", features = ["full"] } tokio = { version = "1", features = ["macros", "test-util"] } [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] diff --git a/src/lib.rs b/src/lib.rs index a982be6..2f9cc70 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,5 +6,6 @@ pub use crate::error::{GenericError, Result}; pub mod client; pub mod common; pub mod rt; +pub mod server; mod error; diff --git a/src/rt/tokio_executor.rs b/src/rt/tokio_executor.rs index 9850398..1e21a54 100644 --- a/src/rt/tokio_executor.rs +++ b/src/rt/tokio_executor.rs @@ -3,7 +3,7 @@ use std::future::Future; /// Future executor that utilises `tokio` threads. #[non_exhaustive] -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone, Copy)] pub struct TokioExecutor {} impl Executor for TokioExecutor diff --git a/src/server/mod.rs b/src/server/mod.rs new file mode 100644 index 0000000..d16fc33 --- /dev/null +++ b/src/server/mod.rs @@ -0,0 +1,268 @@ +//! Server utilities. + +use crate::rt::tokio_executor::TokioExecutor; +use http::{Request, Response}; +use http_body::Body; +use hyper::{ + body::Incoming, + server::conn::{http1, http2}, + service::Service, +}; +use pin_project_lite::pin_project; +use std::{ + error::Error as StdError, + io::Cursor, + marker::Unpin, + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; + +const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// Http1 or Http2 connection. +pub struct AutoConn { + h1: http1::Builder, + h2: http2::Builder, +} + +impl AutoConn { + /// Create a new AutoConn. + pub fn new() -> Self { + Self { + h1: http1::Builder::new(), + h2: http2::Builder::new(TokioExecutor::new()), + } + } + + /// Http1 configuration. + pub fn h1(&mut self) -> &mut http1::Builder { + &mut self.h1 + } + + /// Http2 configuration. + pub fn h2(&mut self) -> &mut http2::Builder { + &mut self.h2 + } + + /// Bind a connection together with a [`Service`]. + pub async fn serve_connection(&self, mut io: I, service: S) -> crate::Result<()> + where + S: Service, Response = Response> + Send, + S::Future: Send + 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + let mut buf = Vec::new(); + + let protocol = loop { + if buf.len() < 24 { + io.read_buf(&mut buf).await?; + + let len = buf.len().min(H2_PREFACE.len()); + + if buf[0..len] != H2_PREFACE[0..len] { + break Protocol::H1; + } + } else { + break Protocol::H2; + } + }; + + let io = PrependAsyncRead::new(Cursor::new(buf), io); + + match protocol { + Protocol::H1 => self.h1.serve_connection(io, service).await?, + Protocol::H2 => self.h2.serve_connection(io, service).await?, + } + + Ok(()) + } +} + +enum Protocol { + H1, + H2, +} + +pin_project! { + struct PrependAsyncRead { + #[pin] + first: T1, + #[pin] + second: T2, + state: State, + } +} + +impl PrependAsyncRead { + fn new(first: T1, second: T2) -> Self { + Self { + first, + second, + state: State::First, + } + } +} + +enum State { + First, + Second, +} + +impl AsyncRead for PrependAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut this = self.project(); + + loop { + match &this.state { + State::First => { + let old_len = buf.filled().len(); + + match this.first.as_mut().poll_read(cx, buf) { + Poll::Ready(result) => match result { + Ok(()) => { + if buf.filled().len() == old_len { + *this.state = State::Second; + } else { + return Poll::Ready(Ok(())); + } + } + Err(e) => return Poll::Ready(Err(e)), + }, + Poll::Pending => return Poll::Pending, + } + } + State::Second => return this.second.as_mut().poll_read(cx, buf), + } + } + } +} + +impl AsyncWrite for PrependAsyncRead { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().second.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().second.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().second.poll_shutdown(cx) + } +} + +#[cfg(test)] +mod tests { + use super::AutoConn; + use crate::rt::tokio_executor::TokioExecutor; + use http::{Request, Response}; + use http_body::Body; + use http_body_util::{BodyExt, Empty, Full}; + use hyper::{body, body::Bytes, client, service::service_fn}; + use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; + use tokio::net::{TcpListener, TcpStream}; + + const BODY: &'static [u8] = b"Hello, world!"; + + #[tokio::test] + async fn http1() { + let addr = start_server().await; + let mut sender = connect_h1(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + #[tokio::test] + async fn http2() { + let addr = start_server().await; + let mut sender = connect_h2(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + async fn connect_h1(addr: SocketAddr) -> client::conn::http1::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http2::Builder::new() + .executor(TokioExecutor::new()) + .handshake(stream) + .await + .unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn start_server() -> SocketAddr { + let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + let listener = TcpListener::bind(addr).await.unwrap(); + + let local_addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + tokio::task::spawn(async move { + let _ = AutoConn::new() + .serve_connection(stream, service_fn(hello)) + .await; + }); + } + }); + + local_addr + } + + async fn hello(_req: Request) -> Result>, Infallible> { + Ok(Response::new(Full::new(Bytes::from(BODY)))) + } +} From 124fdb82985e71fa0d00c50de381407ef5b12ab6 Mon Sep 17 00:00:00 2001 From: Programatik Date: Fri, 18 Nov 2022 16:36:53 +0300 Subject: [PATCH 02/10] rename to AutoConnection, split code to modules and minor code improvements --- src/lib.rs | 1 + src/server/conn.rs | 180 ++++++++++++++++++++++ src/server/mod.rs | 267 +-------------------------------- src/util/mod.rs | 3 + src/util/prepend_async_read.rs | 83 ++++++++++ 5 files changed, 269 insertions(+), 265 deletions(-) create mode 100644 src/server/conn.rs create mode 100644 src/util/mod.rs create mode 100644 src/util/prepend_async_read.rs diff --git a/src/lib.rs b/src/lib.rs index 2f9cc70..9444b69 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,3 +9,4 @@ pub mod rt; pub mod server; mod error; +mod util; diff --git a/src/server/conn.rs b/src/server/conn.rs new file mode 100644 index 0000000..2eeea32 --- /dev/null +++ b/src/server/conn.rs @@ -0,0 +1,180 @@ +use crate::{rt::tokio_executor::TokioExecutor, util::PrependAsyncRead}; +use http::{Request, Response}; +use http_body::Body; +use hyper::{ + body::Incoming, + server::conn::{http1, http2}, + service::Service, +}; +use std::{error::Error as StdError, io::Cursor, marker::Unpin}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; + +const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// Http1 or Http2 connection. +pub struct AutoConnection { + h1: http1::Builder, + h2: http2::Builder, +} + +impl AutoConnection { + /// Create a new AutoConn. + pub fn new() -> Self { + Self { + h1: http1::Builder::new(), + h2: http2::Builder::new(TokioExecutor::new()), + } + } + + /// Http1 configuration. + pub fn h1(&mut self) -> &mut http1::Builder { + &mut self.h1 + } + + /// Http2 configuration. + pub fn h2(&mut self) -> &mut http2::Builder { + &mut self.h2 + } + + /// Bind a connection together with a [`Service`]. + pub async fn serve_connection(&self, mut io: I, service: S) -> crate::Result<()> + where + S: Service, Response = Response> + Send, + S::Future: Send + 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + let mut buf = Vec::new(); + + let protocol = loop { + if buf.len() < 24 { + io.read_buf(&mut buf).await?; + + let len = buf.len().min(H2_PREFACE.len()); + + if buf[0..len] != H2_PREFACE[0..len] { + break Protocol::H1; + } + } else { + break Protocol::H2; + } + }; + + let io = PrependAsyncRead::new(Cursor::new(buf), io); + + match protocol { + Protocol::H1 => self.h1.serve_connection(io, service).await?, + Protocol::H2 => self.h2.serve_connection(io, service).await?, + } + + Ok(()) + } +} + +enum Protocol { + H1, + H2, +} + +#[cfg(test)] +mod tests { + use super::AutoConnection; + use crate::rt::tokio_executor::TokioExecutor; + use http::{Request, Response}; + use http_body::Body; + use http_body_util::{BodyExt, Empty, Full}; + use hyper::{body, body::Bytes, client, service::service_fn}; + use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; + use tokio::net::{TcpListener, TcpStream}; + + const BODY: &'static [u8] = b"Hello, world!"; + + #[tokio::test] + async fn http1() { + let addr = start_server().await; + let mut sender = connect_h1(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + #[tokio::test] + async fn http2() { + let addr = start_server().await; + let mut sender = connect_h2(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + async fn connect_h1(addr: SocketAddr) -> client::conn::http1::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http2::Builder::new() + .executor(TokioExecutor::new()) + .handshake(stream) + .await + .unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn start_server() -> SocketAddr { + let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + let listener = TcpListener::bind(addr).await.unwrap(); + + let local_addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + tokio::task::spawn(async move { + let _ = AutoConnection::new() + .serve_connection(stream, service_fn(hello)) + .await; + }); + } + }); + + local_addr + } + + async fn hello(_req: Request) -> Result>, Infallible> { + Ok(Response::new(Full::new(Bytes::from(BODY)))) + } +} diff --git a/src/server/mod.rs b/src/server/mod.rs index d16fc33..53c2b91 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,268 +1,5 @@ //! Server utilities. -use crate::rt::tokio_executor::TokioExecutor; -use http::{Request, Response}; -use http_body::Body; -use hyper::{ - body::Incoming, - server::conn::{http1, http2}, - service::Service, -}; -use pin_project_lite::pin_project; -use std::{ - error::Error as StdError, - io::Cursor, - marker::Unpin, - pin::Pin, - task::{Context, Poll}, -}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, ReadBuf}; +pub use conn::AutoConnection; -const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - -/// Http1 or Http2 connection. -pub struct AutoConn { - h1: http1::Builder, - h2: http2::Builder, -} - -impl AutoConn { - /// Create a new AutoConn. - pub fn new() -> Self { - Self { - h1: http1::Builder::new(), - h2: http2::Builder::new(TokioExecutor::new()), - } - } - - /// Http1 configuration. - pub fn h1(&mut self) -> &mut http1::Builder { - &mut self.h1 - } - - /// Http2 configuration. - pub fn h2(&mut self) -> &mut http2::Builder { - &mut self.h2 - } - - /// Bind a connection together with a [`Service`]. - pub async fn serve_connection(&self, mut io: I, service: S) -> crate::Result<()> - where - S: Service, Response = Response> + Send, - S::Future: Send + 'static, - S::Error: Into>, - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin + 'static, - { - let mut buf = Vec::new(); - - let protocol = loop { - if buf.len() < 24 { - io.read_buf(&mut buf).await?; - - let len = buf.len().min(H2_PREFACE.len()); - - if buf[0..len] != H2_PREFACE[0..len] { - break Protocol::H1; - } - } else { - break Protocol::H2; - } - }; - - let io = PrependAsyncRead::new(Cursor::new(buf), io); - - match protocol { - Protocol::H1 => self.h1.serve_connection(io, service).await?, - Protocol::H2 => self.h2.serve_connection(io, service).await?, - } - - Ok(()) - } -} - -enum Protocol { - H1, - H2, -} - -pin_project! { - struct PrependAsyncRead { - #[pin] - first: T1, - #[pin] - second: T2, - state: State, - } -} - -impl PrependAsyncRead { - fn new(first: T1, second: T2) -> Self { - Self { - first, - second, - state: State::First, - } - } -} - -enum State { - First, - Second, -} - -impl AsyncRead for PrependAsyncRead { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let mut this = self.project(); - - loop { - match &this.state { - State::First => { - let old_len = buf.filled().len(); - - match this.first.as_mut().poll_read(cx, buf) { - Poll::Ready(result) => match result { - Ok(()) => { - if buf.filled().len() == old_len { - *this.state = State::Second; - } else { - return Poll::Ready(Ok(())); - } - } - Err(e) => return Poll::Ready(Err(e)), - }, - Poll::Pending => return Poll::Pending, - } - } - State::Second => return this.second.as_mut().poll_read(cx, buf), - } - } - } -} - -impl AsyncWrite for PrependAsyncRead { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().second.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().second.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().second.poll_shutdown(cx) - } -} - -#[cfg(test)] -mod tests { - use super::AutoConn; - use crate::rt::tokio_executor::TokioExecutor; - use http::{Request, Response}; - use http_body::Body; - use http_body_util::{BodyExt, Empty, Full}; - use hyper::{body, body::Bytes, client, service::service_fn}; - use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; - use tokio::net::{TcpListener, TcpStream}; - - const BODY: &'static [u8] = b"Hello, world!"; - - #[tokio::test] - async fn http1() { - let addr = start_server().await; - let mut sender = connect_h1(addr).await; - - let response = sender - .send_request(Request::new(Empty::::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[tokio::test] - async fn http2() { - let addr = start_server().await; - let mut sender = connect_h2(addr).await; - - let response = sender - .send_request(Request::new(Empty::::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - async fn connect_h1(addr: SocketAddr) -> client::conn::http1::SendRequest - where - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - { - let stream = TcpStream::connect(addr).await.unwrap(); - let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest - where - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - { - let stream = TcpStream::connect(addr).await.unwrap(); - let (sender, connection) = client::conn::http2::Builder::new() - .executor(TokioExecutor::new()) - .handshake(stream) - .await - .unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn start_server() -> SocketAddr { - let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); - let listener = TcpListener::bind(addr).await.unwrap(); - - let local_addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - loop { - let (stream, _) = listener.accept().await.unwrap(); - tokio::task::spawn(async move { - let _ = AutoConn::new() - .serve_connection(stream, service_fn(hello)) - .await; - }); - } - }); - - local_addr - } - - async fn hello(_req: Request) -> Result>, Infallible> { - Ok(Response::new(Full::new(Bytes::from(BODY)))) - } -} +mod conn; diff --git a/src/util/mod.rs b/src/util/mod.rs new file mode 100644 index 0000000..af11c17 --- /dev/null +++ b/src/util/mod.rs @@ -0,0 +1,3 @@ +pub(crate) use prepend_async_read::PrependAsyncRead; + +mod prepend_async_read; diff --git a/src/util/prepend_async_read.rs b/src/util/prepend_async_read.rs new file mode 100644 index 0000000..a7ad93a --- /dev/null +++ b/src/util/prepend_async_read.rs @@ -0,0 +1,83 @@ +use crate::common::ready; +use pin_project_lite::pin_project; +use std::{ + pin::Pin, + task::{Context, Poll}, +}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +pin_project! { + pub(crate) struct PrependAsyncRead { + #[pin] + first: T1, + #[pin] + second: T2, + state: State, + } +} + +impl PrependAsyncRead { + pub(crate) fn new(first: T1, second: T2) -> Self { + Self { + first, + second, + state: State::First, + } + } +} + +enum State { + First, + Second, +} + +impl AsyncRead for PrependAsyncRead { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let mut this = self.project(); + + loop { + match &this.state { + State::First => { + let old_len = buf.filled().len(); + + match ready!(this.first.as_mut().poll_read(cx, buf)) { + Ok(()) => { + if buf.filled().len() == old_len { + *this.state = State::Second; + } else { + return Poll::Ready(Ok(())); + } + } + Err(e) => return Poll::Ready(Err(e)), + } + } + State::Second => return this.second.as_mut().poll_read(cx, buf), + } + } + } +} + +impl AsyncWrite for PrependAsyncRead { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.project().second.poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().second.poll_flush(cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.project().second.poll_shutdown(cx) + } +} From f0c97364b63de5ae659ba25d01e397950ffa885b Mon Sep 17 00:00:00 2001 From: Programatik Date: Fri, 18 Nov 2022 16:41:54 +0300 Subject: [PATCH 03/10] move http-body-util to dev-dependencies --- Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 64376aa..260a82c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,11 +16,10 @@ publish = false # no accidents while in dev [dependencies] hyper = { version = "1.0.0-rc.1", features = ["server", "http1", "http2"] } -http-body = "1.0.0-rc1" -http-body-util = "0.1.0-rc.1" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" +http-body = "1.0.0-rc1" # Necessary to overcome msrv check of rust 1.49, as 1.15.0 failed once_cell = "=1.14" @@ -34,6 +33,7 @@ tower = { version = "0.4", features = ["util"] } [dev-dependencies] hyper = { version = "1.0.0-rc.1", features = ["full"] } +http-body-util = "0.1.0-rc.1" tokio = { version = "1", features = ["macros", "test-util"] } [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] From 6fca103e9d4806725457857d1f0da1c8183d3ff6 Mon Sep 17 00:00:00 2001 From: Programatik Date: Wed, 7 Dec 2022 01:19:59 +0300 Subject: [PATCH 04/10] builder api and replace PrependAsyncRead with Rewind --- Cargo.toml | 2 + src/common/mod.rs | 1 + src/common/rewind.rs | 160 +++++++++++++ src/lib.rs | 2 +- src/server/conn.rs | 180 --------------- src/server/conn/auto.rs | 407 +++++++++++++++++++++++++++++++++ src/server/conn/mod.rs | 3 + src/server/mod.rs | 4 +- src/util/mod.rs | 3 - src/util/prepend_async_read.rs | 83 ------- 10 files changed, 575 insertions(+), 270 deletions(-) create mode 100644 src/common/rewind.rs delete mode 100644 src/server/conn.rs create mode 100644 src/server/conn/auto.rs create mode 100644 src/server/conn/mod.rs delete mode 100644 src/util/mod.rs delete mode 100644 src/util/prepend_async_read.rs diff --git a/Cargo.toml b/Cargo.toml index 260a82c..db7995c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,7 @@ futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" http-body = "1.0.0-rc1" +bytes = "1" # Necessary to overcome msrv check of rust 1.49, as 1.15.0 failed once_cell = "=1.14" @@ -35,6 +36,7 @@ tower = { version = "0.4", features = ["util"] } hyper = { version = "1.0.0-rc.1", features = ["full"] } http-body-util = "0.1.0-rc.1" tokio = { version = "1", features = ["macros", "test-util"] } +tokio-test = "0.4" [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] pnet_datalink = "0.27.2" diff --git a/src/common/mod.rs b/src/common/mod.rs index 52b9917..9ad52a1 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -12,5 +12,6 @@ macro_rules! ready { pub(crate) use ready; pub(crate) mod exec; pub(crate) mod never; +pub(crate) mod rewind; pub(crate) use never::Never; diff --git a/src/common/rewind.rs b/src/common/rewind.rs new file mode 100644 index 0000000..fa0c12d --- /dev/null +++ b/src/common/rewind.rs @@ -0,0 +1,160 @@ +use std::marker::Unpin; +use std::{cmp, io}; + +use bytes::{Buf, Bytes}; +use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; + +use std::{ + pin::Pin, + task::{self, Poll}, +}; + +/// Combine a buffer with an IO, rewinding reads to use the buffer. +#[derive(Debug)] +pub(crate) struct Rewind { + pre: Option, + inner: T, +} + +impl Rewind { + #[cfg(test)] + pub(crate) fn new(io: T) -> Self { + Rewind { + pre: None, + inner: io, + } + } + + pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { + Rewind { + pre: Some(buf), + inner: io, + } + } + + #[cfg(test)] + pub(crate) fn rewind(&mut self, bs: Bytes) { + debug_assert!(self.pre.is_none()); + self.pre = Some(bs); + } + + // pub(crate) fn into_inner(self) -> (T, Bytes) { + // (self.inner, self.pre.unwrap_or_else(Bytes::new)) + // } + + // pub(crate) fn get_mut(&mut self) -> &mut T { + // &mut self.inner + // } +} + +impl AsyncRead for Rewind +where + T: AsyncRead + Unpin, +{ + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); + // Put back what's left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(())); + } + } + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Rewind +where + T: AsyncWrite + Unpin, +{ + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + bufs: &[io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +#[cfg(test)] +mod tests { + // FIXME: re-implement tests with `async/await`, this import should + // trigger a warning to remind us + use super::Rewind; + use bytes::Bytes; + use tokio::io::AsyncReadExt; + + #[cfg(not(miri))] + #[tokio::test] + async fn partial_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + // Read off some bytes, ensure we filled o1 + let mut buf = [0; 2]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // At this point we should have read everything that was in the MockStream + assert_eq!(&buf, &underlying); + } + + #[cfg(not(miri))] + #[tokio::test] + async fn full_rewind() { + let underlying = [104, 101, 108, 108, 111]; + + let mock = tokio_test::io::Builder::new().read(&underlying).build(); + + let mut stream = Rewind::new(mock); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + + // Rewind the stream so that it is as if we never read in the first place. + stream.rewind(Bytes::copy_from_slice(&buf[..])); + + let mut buf = [0; 5]; + stream.read_exact(&mut buf).await.expect("read1"); + } +} diff --git a/src/lib.rs b/src/lib.rs index 9444b69..02299cb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![deny(missing_docs)] +#![cfg_attr(docsrs, feature(doc_cfg))] //! hyper utilities pub use crate::error::{GenericError, Result}; @@ -9,4 +10,3 @@ pub mod rt; pub mod server; mod error; -mod util; diff --git a/src/server/conn.rs b/src/server/conn.rs deleted file mode 100644 index 2eeea32..0000000 --- a/src/server/conn.rs +++ /dev/null @@ -1,180 +0,0 @@ -use crate::{rt::tokio_executor::TokioExecutor, util::PrependAsyncRead}; -use http::{Request, Response}; -use http_body::Body; -use hyper::{ - body::Incoming, - server::conn::{http1, http2}, - service::Service, -}; -use std::{error::Error as StdError, io::Cursor, marker::Unpin}; -use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; - -const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; - -/// Http1 or Http2 connection. -pub struct AutoConnection { - h1: http1::Builder, - h2: http2::Builder, -} - -impl AutoConnection { - /// Create a new AutoConn. - pub fn new() -> Self { - Self { - h1: http1::Builder::new(), - h2: http2::Builder::new(TokioExecutor::new()), - } - } - - /// Http1 configuration. - pub fn h1(&mut self) -> &mut http1::Builder { - &mut self.h1 - } - - /// Http2 configuration. - pub fn h2(&mut self) -> &mut http2::Builder { - &mut self.h2 - } - - /// Bind a connection together with a [`Service`]. - pub async fn serve_connection(&self, mut io: I, service: S) -> crate::Result<()> - where - S: Service, Response = Response> + Send, - S::Future: Send + 'static, - S::Error: Into>, - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - I: AsyncRead + AsyncWrite + Unpin + 'static, - { - let mut buf = Vec::new(); - - let protocol = loop { - if buf.len() < 24 { - io.read_buf(&mut buf).await?; - - let len = buf.len().min(H2_PREFACE.len()); - - if buf[0..len] != H2_PREFACE[0..len] { - break Protocol::H1; - } - } else { - break Protocol::H2; - } - }; - - let io = PrependAsyncRead::new(Cursor::new(buf), io); - - match protocol { - Protocol::H1 => self.h1.serve_connection(io, service).await?, - Protocol::H2 => self.h2.serve_connection(io, service).await?, - } - - Ok(()) - } -} - -enum Protocol { - H1, - H2, -} - -#[cfg(test)] -mod tests { - use super::AutoConnection; - use crate::rt::tokio_executor::TokioExecutor; - use http::{Request, Response}; - use http_body::Body; - use http_body_util::{BodyExt, Empty, Full}; - use hyper::{body, body::Bytes, client, service::service_fn}; - use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; - use tokio::net::{TcpListener, TcpStream}; - - const BODY: &'static [u8] = b"Hello, world!"; - - #[tokio::test] - async fn http1() { - let addr = start_server().await; - let mut sender = connect_h1(addr).await; - - let response = sender - .send_request(Request::new(Empty::::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - #[tokio::test] - async fn http2() { - let addr = start_server().await; - let mut sender = connect_h2(addr).await; - - let response = sender - .send_request(Request::new(Empty::::new())) - .await - .unwrap(); - - let body = response.into_body().collect().await.unwrap().to_bytes(); - - assert_eq!(body, BODY); - } - - async fn connect_h1(addr: SocketAddr) -> client::conn::http1::SendRequest - where - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - { - let stream = TcpStream::connect(addr).await.unwrap(); - let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest - where - B: Body + Send + 'static, - B::Data: Send, - B::Error: Into>, - { - let stream = TcpStream::connect(addr).await.unwrap(); - let (sender, connection) = client::conn::http2::Builder::new() - .executor(TokioExecutor::new()) - .handshake(stream) - .await - .unwrap(); - - tokio::spawn(connection); - - sender - } - - async fn start_server() -> SocketAddr { - let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); - let listener = TcpListener::bind(addr).await.unwrap(); - - let local_addr = listener.local_addr().unwrap(); - - tokio::spawn(async move { - loop { - let (stream, _) = listener.accept().await.unwrap(); - tokio::task::spawn(async move { - let _ = AutoConnection::new() - .serve_connection(stream, service_fn(hello)) - .await; - }); - } - }); - - local_addr - } - - async fn hello(_req: Request) -> Result>, Infallible> { - Ok(Response::new(Full::new(Bytes::from(BODY)))) - } -} diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs new file mode 100644 index 0000000..c5d689f --- /dev/null +++ b/src/server/conn/auto.rs @@ -0,0 +1,407 @@ +//! Http1 or Http2 connection. + +use crate::{common::rewind::Rewind, rt::tokio_executor::TokioExecutor, Result}; +use bytes::Bytes; +use http::{Request, Response}; +use http_body::Body; +use hyper::{ + body::Incoming, + rt::Timer, + server::conn::{http1, http2}, + service::Service, +}; +use std::{error::Error as StdError, marker::Unpin, time::Duration}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; + +const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; + +/// Http1 or Http2 connection builder. +pub struct Builder { + http1: http1::Builder, + http2: http2::Builder, +} + +impl Builder { + /// Create a new auto connection builder. + pub fn new() -> Self { + Self { + http1: http1::Builder::new(), + http2: http2::Builder::new(TokioExecutor::new()), + } + } + /// Set whether HTTP/1 connections should support half-closures. + /// + /// Clients can chose to shutdown their write-side while waiting + /// for the server to respond. Setting this to `true` will + /// prevent closing the connection immediately if `read` + /// detects an EOF in the middle of a request. + /// + /// Default is `false`. + pub fn http1_half_close(&mut self, val: bool) -> &mut Self { + self.http1.http1_half_close(val); + self + } + /// Enables or disables HTTP/1 keep-alive. + /// + /// Default is true. + pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { + self.http1.http1_keep_alive(val); + self + } + /// Set whether HTTP/1 connections will write header names as title case at + /// the socket level. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self { + self.http1.http1_title_case_headers(enabled); + self + } + /// Set whether to support preserving original header cases. + /// + /// Currently, this will record the original cases received, and store them + /// in a private extension on the `Request`. It will also look for and use + /// such an extension in any provided `Response`. + /// + /// Since the relevant extension is still private, there is no way to + /// interact with the original cases. The only effect this can have now is + /// to forward the cases in a proxy-like fashion. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self { + self.http1.http1_preserve_header_case(enabled); + self + } + /// Set a timeout for reading client request headers. If a client does not + /// transmit the entire header within this time, the connection is closed. + /// + /// Default is None. + pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { + self.http1.http1_header_read_timeout(read_timeout); + self + } + /// Set whether HTTP/1 connections should try to use vectored writes, + /// or always flatten into a single buffer. + /// + /// Note that setting this to false may mean more copies of body data, + /// but may also improve performance when an IO transport doesn't + /// support vectored writes well, such as most TLS implementations. + /// + /// Setting this to true will force hyper to use queued strategy + /// which may eliminate unnecessary cloning on some TLS backends + /// + /// Default is `auto`. In this mode hyper will try to guess which + /// mode to use + pub fn http1_writev(&mut self, val: bool) -> &mut Self { + self.http1.http1_writev(val); + self + } + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + /// + /// # Panics + /// + /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. + #[cfg(feature = "http1")] + #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] + pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { + self.http1.max_buf_size(max); + self + } + /// Aggregates flushes to better support pipelined responses. + /// + /// Experimental, may have bugs. + /// + /// Default is false. + pub fn http1_pipeline_flush(&mut self, enabled: bool) -> &mut Self { + self.http1.pipeline_flush(enabled); + self + } + /// Set the timer used in background tasks. + pub fn http1_timer(&mut self, timer: M) -> &mut Self + where + M: Timer + Send + Sync + 'static, + { + self.http1.timer(timer); + self + } + /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 + /// stream-level flow control. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + /// + /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { + self.http2.http2_initial_stream_window_size(sz); + self + } + /// Sets the max connection-level flow control for HTTP2. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_connection_window_size( + &mut self, + sz: impl Into>, + ) -> &mut Self { + self.http2.http2_initial_connection_window_size(sz); + self + } + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.http2.http2_adaptive_window(enabled); + self + } + /// Sets the maximum frame size to use for HTTP2. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { + self.http2.http2_max_frame_size(sz); + self + } + /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 + /// connections. + /// + /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. + /// + /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { + self.http2.http2_max_concurrent_streams(max); + self + } + /// Sets an interval for HTTP2 Ping frames should be sent to keep a + /// connection alive. + /// + /// Pass `None` to disable HTTP2 keep-alive. + /// + /// Default is currently disabled. + /// + /// # Cargo Feature + /// + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_interval( + &mut self, + interval: impl Into>, + ) -> &mut Self { + self.http2.http2_keep_alive_interval(interval); + self + } + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. + /// + /// Default is 20 seconds. + /// + /// # Cargo Feature + /// + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.http2.http2_keep_alive_timeout(timeout); + self + } + /// Set the maximum write buffer size for each HTTP/2 stream. + /// + /// Default is currently ~400KB, but may change. + /// + /// # Panics + /// + /// The value must be no larger than `u32::MAX`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { + self.http2.http2_max_send_buf_size(max); + self + } + /// Enables the [extended CONNECT protocol]. + /// + /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 + #[cfg(feature = "http2")] + pub fn http2_enable_connect_protocol(&mut self) -> &mut Self { + self.http2.http2_enable_connect_protocol(); + self + } + /// Sets the max size of received header frames. + /// + /// Default is currently ~16MB, but may change. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self { + self.http2.http2_max_header_list_size(max); + self + } + /// Set the timer used in background tasks. + pub fn http2_timer(&mut self, timer: M) -> &mut Self + where + M: Timer + Send + Sync + 'static, + { + self.http2.timer(timer); + self + } + + /// Bind a connection together with a [`Service`]. + pub async fn serve_connection(&self, mut io: I, service: S) -> Result<()> + where + S: Service, Response = Response> + Send, + S::Future: Send + 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + let mut buf = Vec::new(); + + let protocol = loop { + if buf.len() < 24 { + io.read_buf(&mut buf).await?; + + let len = buf.len().min(H2_PREFACE.len()); + + if buf[0..len] != H2_PREFACE[0..len] { + break Protocol::H1; + } + } else { + break Protocol::H2; + } + }; + + let io = Rewind::new_buffered(io, Bytes::from(buf)); + + match protocol { + Protocol::H1 => self.http1.serve_connection(io, service).await?, + Protocol::H2 => self.http2.serve_connection(io, service).await?, + } + + Ok(()) + } +} + +enum Protocol { + H1, + H2, +} + +#[cfg(test)] +mod tests { + use super::Builder; + use crate::rt::tokio_executor::TokioExecutor; + use http::{Request, Response}; + use http_body::Body; + use http_body_util::{BodyExt, Empty, Full}; + use hyper::{body, body::Bytes, client, service::service_fn}; + use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; + use tokio::net::{TcpListener, TcpStream}; + + const BODY: &'static [u8] = b"Hello, world!"; + + #[tokio::test] + async fn http1() { + let addr = start_server().await; + let mut sender = connect_h1(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + #[tokio::test] + async fn http2() { + let addr = start_server().await; + let mut sender = connect_h2(addr).await; + + let response = sender + .send_request(Request::new(Empty::::new())) + .await + .unwrap(); + + let body = response.into_body().collect().await.unwrap().to_bytes(); + + assert_eq!(body, BODY); + } + + async fn connect_h1(addr: SocketAddr) -> client::conn::http1::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest + where + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + { + let stream = TcpStream::connect(addr).await.unwrap(); + let (sender, connection) = client::conn::http2::Builder::new() + .executor(TokioExecutor::new()) + .handshake(stream) + .await + .unwrap(); + + tokio::spawn(connection); + + sender + } + + async fn start_server() -> SocketAddr { + let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + let listener = TcpListener::bind(addr).await.unwrap(); + + let local_addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + let (stream, _) = listener.accept().await.unwrap(); + tokio::task::spawn(async move { + let _ = Builder::new() + .serve_connection(stream, service_fn(hello)) + .await; + }); + } + }); + + local_addr + } + + async fn hello(_req: Request) -> Result>, Infallible> { + Ok(Response::new(Full::new(Bytes::from(BODY)))) + } +} diff --git a/src/server/conn/mod.rs b/src/server/conn/mod.rs new file mode 100644 index 0000000..df06f68 --- /dev/null +++ b/src/server/conn/mod.rs @@ -0,0 +1,3 @@ +//! Connection utilities. + +pub mod auto; diff --git a/src/server/mod.rs b/src/server/mod.rs index 53c2b91..7b4515c 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,5 +1,3 @@ //! Server utilities. -pub use conn::AutoConnection; - -mod conn; +pub mod conn; diff --git a/src/util/mod.rs b/src/util/mod.rs deleted file mode 100644 index af11c17..0000000 --- a/src/util/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub(crate) use prepend_async_read::PrependAsyncRead; - -mod prepend_async_read; diff --git a/src/util/prepend_async_read.rs b/src/util/prepend_async_read.rs deleted file mode 100644 index a7ad93a..0000000 --- a/src/util/prepend_async_read.rs +++ /dev/null @@ -1,83 +0,0 @@ -use crate::common::ready; -use pin_project_lite::pin_project; -use std::{ - pin::Pin, - task::{Context, Poll}, -}; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; - -pin_project! { - pub(crate) struct PrependAsyncRead { - #[pin] - first: T1, - #[pin] - second: T2, - state: State, - } -} - -impl PrependAsyncRead { - pub(crate) fn new(first: T1, second: T2) -> Self { - Self { - first, - second, - state: State::First, - } - } -} - -enum State { - First, - Second, -} - -impl AsyncRead for PrependAsyncRead { - fn poll_read( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut ReadBuf<'_>, - ) -> Poll> { - let mut this = self.project(); - - loop { - match &this.state { - State::First => { - let old_len = buf.filled().len(); - - match ready!(this.first.as_mut().poll_read(cx, buf)) { - Ok(()) => { - if buf.filled().len() == old_len { - *this.state = State::Second; - } else { - return Poll::Ready(Ok(())); - } - } - Err(e) => return Poll::Ready(Err(e)), - } - } - State::Second => return this.second.as_mut().poll_read(cx, buf), - } - } - } -} - -impl AsyncWrite for PrependAsyncRead { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - self.project().second.poll_write(cx, buf) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().second.poll_flush(cx) - } - - fn poll_shutdown( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { - self.project().second.poll_shutdown(cx) - } -} From d6aafb937c9c81bc8591ad56775bc37151a2c38d Mon Sep 17 00:00:00 2001 From: Programatik Date: Fri, 9 Dec 2022 09:28:10 +0300 Subject: [PATCH 05/10] fix miri CI --- src/server/conn/auto.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index c5d689f..e3a535f 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -319,6 +319,7 @@ mod tests { const BODY: &'static [u8] = b"Hello, world!"; + #[cfg(not(miri))] #[tokio::test] async fn http1() { let addr = start_server().await; @@ -334,6 +335,7 @@ mod tests { assert_eq!(body, BODY); } + #[cfg(not(miri))] #[tokio::test] async fn http2() { let addr = start_server().await; From 29b96517237d819435c08108106815e0f6fded32 Mon Sep 17 00:00:00 2001 From: Programatik Date: Thu, 5 Jan 2023 13:14:38 +0300 Subject: [PATCH 06/10] new builder style --- src/server/conn/auto.rs | 276 +++++++++++++++++++++++++--------------- 1 file changed, 176 insertions(+), 100 deletions(-) diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index e3a535f..d98afd6 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -29,6 +29,73 @@ impl Builder { http2: http2::Builder::new(TokioExecutor::new()), } } + + /// Http1 configuration. + pub fn http1(&mut self) -> Http1Builder<'_> { + Http1Builder { inner: self } + } + + /// Http2 configuration. + pub fn http2(&mut self) -> Http2Builder<'_> { + Http2Builder { inner: self } + } + + /// Bind a connection together with a [`Service`]. + pub async fn serve_connection(&self, mut io: I, service: S) -> Result<()> + where + S: Service, Response = Response> + Send, + S::Future: Send + 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + enum Protocol { + H1, + H2, + } + + let mut buf = Vec::new(); + + let protocol = loop { + if buf.len() < 24 { + io.read_buf(&mut buf).await?; + + let len = buf.len().min(H2_PREFACE.len()); + + if buf[0..len] != H2_PREFACE[0..len] { + break Protocol::H1; + } + } else { + break Protocol::H2; + } + }; + + let io = Rewind::new_buffered(io, Bytes::from(buf)); + + match protocol { + Protocol::H1 => self.http1.serve_connection(io, service).await?, + Protocol::H2 => self.http2.serve_connection(io, service).await?, + } + + Ok(()) + } +} + +/// Http1 part of builder. +pub struct Http1Builder<'a> { + inner: &'a mut Builder, +} + +impl Http1Builder<'_> { + /// Http2 configuration. + pub fn http2(&mut self) -> Http2Builder<'_> { + Http2Builder { + inner: &mut self.inner, + } + } + /// Set whether HTTP/1 connections should support half-closures. /// /// Clients can chose to shutdown their write-side while waiting @@ -37,27 +104,30 @@ impl Builder { /// detects an EOF in the middle of a request. /// /// Default is `false`. - pub fn http1_half_close(&mut self, val: bool) -> &mut Self { - self.http1.http1_half_close(val); + pub fn half_close(&mut self, val: bool) -> &mut Self { + self.inner.http1.http1_half_close(val); self } + /// Enables or disables HTTP/1 keep-alive. /// /// Default is true. - pub fn http1_keep_alive(&mut self, val: bool) -> &mut Self { - self.http1.http1_keep_alive(val); + pub fn keep_alive(&mut self, val: bool) -> &mut Self { + self.inner.http1.http1_keep_alive(val); self } + /// Set whether HTTP/1 connections will write header names as title case at /// the socket level. /// /// Note that this setting does not affect HTTP/2. /// /// Default is false. - pub fn http1_title_case_headers(&mut self, enabled: bool) -> &mut Self { - self.http1.http1_title_case_headers(enabled); + pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self { + self.inner.http1.http1_title_case_headers(enabled); self } + /// Set whether to support preserving original header cases. /// /// Currently, this will record the original cases received, and store them @@ -71,18 +141,20 @@ impl Builder { /// Note that this setting does not affect HTTP/2. /// /// Default is false. - pub fn http1_preserve_header_case(&mut self, enabled: bool) -> &mut Self { - self.http1.http1_preserve_header_case(enabled); + pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self { + self.inner.http1.http1_preserve_header_case(enabled); self } + /// Set a timeout for reading client request headers. If a client does not /// transmit the entire header within this time, the connection is closed. /// /// Default is None. - pub fn http1_header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.http1.http1_header_read_timeout(read_timeout); + pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { + self.inner.http1.http1_header_read_timeout(read_timeout); self } + /// Set whether HTTP/1 connections should try to use vectored writes, /// or always flatten into a single buffer. /// @@ -95,10 +167,11 @@ impl Builder { /// /// Default is `auto`. In this mode hyper will try to guess which /// mode to use - pub fn http1_writev(&mut self, val: bool) -> &mut Self { - self.http1.http1_writev(val); + pub fn writev(&mut self, val: bool) -> &mut Self { + self.inner.http1.http1_writev(val); self } + /// Set the maximum buffer size for the connection. /// /// Default is ~400kb. @@ -106,29 +179,58 @@ impl Builder { /// # Panics /// /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. - #[cfg(feature = "http1")] - #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] - pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { - self.http1.max_buf_size(max); + pub fn max_buf_size(&mut self, max: usize) -> &mut Self { + self.inner.http1.max_buf_size(max); self } + /// Aggregates flushes to better support pipelined responses. /// /// Experimental, may have bugs. /// /// Default is false. - pub fn http1_pipeline_flush(&mut self, enabled: bool) -> &mut Self { - self.http1.pipeline_flush(enabled); + pub fn pipeline_flush(&mut self, enabled: bool) -> &mut Self { + self.inner.http1.pipeline_flush(enabled); self } + /// Set the timer used in background tasks. - pub fn http1_timer(&mut self, timer: M) -> &mut Self + pub fn timer(&mut self, timer: M) -> &mut Self where M: Timer + Send + Sync + 'static, { - self.http1.timer(timer); + self.inner.http1.timer(timer); self } + + /// Bind a connection together with a [`Service`]. + pub async fn serve_connection(&self, io: I, service: S) -> Result<()> + where + S: Service, Response = Response> + Send, + S::Future: Send + 'static, + S::Error: Into>, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, + I: AsyncRead + AsyncWrite + Unpin + 'static, + { + self.inner.serve_connection(io, service).await + } +} + +/// Http2 part of builder. +pub struct Http2Builder<'a> { + inner: &'a mut Builder, +} + +impl Http2Builder<'_> { + /// Http1 configuration. + pub fn http1(&mut self) -> Http1Builder<'_> { + Http1Builder { + inner: &mut self.inner, + } + } + /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 /// stream-level flow control. /// @@ -137,60 +239,52 @@ impl Builder { /// If not set, hyper will use a default. /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.http2.http2_initial_stream_window_size(sz); + pub fn initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { + self.inner.http2.http2_initial_stream_window_size(sz); self } + /// Sets the max connection-level flow control for HTTP2. /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_initial_connection_window_size( - &mut self, - sz: impl Into>, - ) -> &mut Self { - self.http2.http2_initial_connection_window_size(sz); + pub fn initial_connection_window_size(&mut self, sz: impl Into>) -> &mut Self { + self.inner.http2.http2_initial_connection_window_size(sz); self } + /// Sets whether to use an adaptive flow control. /// /// Enabling this will override the limits set in /// `http2_initial_stream_window_size` and /// `http2_initial_connection_window_size`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { - self.http2.http2_adaptive_window(enabled); + pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.inner.http2.http2_adaptive_window(enabled); self } + /// Sets the maximum frame size to use for HTTP2. /// /// Passing `None` will do nothing. /// /// If not set, hyper will use a default. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - self.http2.http2_max_frame_size(sz); + pub fn max_frame_size(&mut self, sz: impl Into>) -> &mut Self { + self.inner.http2.http2_max_frame_size(sz); self } + /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2 /// connections. /// /// Default is no limit (`std::u32::MAX`). Passing `None` will do nothing. /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - self.http2.http2_max_concurrent_streams(max); + pub fn max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { + self.inner.http2.http2_max_concurrent_streams(max); self } + /// Sets an interval for HTTP2 Ping frames should be sent to keep a /// connection alive. /// @@ -200,15 +294,11 @@ impl Builder { /// /// # Cargo Feature /// - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_interval( - &mut self, - interval: impl Into>, - ) -> &mut Self { - self.http2.http2_keep_alive_interval(interval); + pub fn keep_alive_interval(&mut self, interval: impl Into>) -> &mut Self { + self.inner.http2.http2_keep_alive_interval(interval); self } + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. /// /// If the ping is not acknowledged within the timeout, the connection will @@ -218,12 +308,11 @@ impl Builder { /// /// # Cargo Feature /// - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.http2.http2_keep_alive_timeout(timeout); + pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.inner.http2.http2_keep_alive_timeout(timeout); self } + /// Set the maximum write buffer size for each HTTP/2 stream. /// /// Default is currently ~400KB, but may change. @@ -231,40 +320,38 @@ impl Builder { /// # Panics /// /// The value must be no larger than `u32::MAX`. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { - self.http2.http2_max_send_buf_size(max); + pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { + self.inner.http2.http2_max_send_buf_size(max); self } + /// Enables the [extended CONNECT protocol]. /// /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 - #[cfg(feature = "http2")] - pub fn http2_enable_connect_protocol(&mut self) -> &mut Self { - self.http2.http2_enable_connect_protocol(); + pub fn enable_connect_protocol(&mut self) -> &mut Self { + self.inner.http2.http2_enable_connect_protocol(); self } + /// Sets the max size of received header frames. /// /// Default is currently ~16MB, but may change. - #[cfg(feature = "http2")] - #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] - pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self { - self.http2.http2_max_header_list_size(max); + pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { + self.inner.http2.http2_max_header_list_size(max); self } + /// Set the timer used in background tasks. - pub fn http2_timer(&mut self, timer: M) -> &mut Self + pub fn timer(&mut self, timer: M) -> &mut Self where M: Timer + Send + Sync + 'static, { - self.http2.timer(timer); + self.inner.http2.timer(timer); self } /// Bind a connection together with a [`Service`]. - pub async fn serve_connection(&self, mut io: I, service: S) -> Result<()> + pub async fn serve_connection(&self, io: I, service: S) -> Result<()> where S: Service, Response = Response> + Send, S::Future: Send + 'static, @@ -274,42 +361,13 @@ impl Builder { B::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, { - let mut buf = Vec::new(); - - let protocol = loop { - if buf.len() < 24 { - io.read_buf(&mut buf).await?; - - let len = buf.len().min(H2_PREFACE.len()); - - if buf[0..len] != H2_PREFACE[0..len] { - break Protocol::H1; - } - } else { - break Protocol::H2; - } - }; - - let io = Rewind::new_buffered(io, Bytes::from(buf)); - - match protocol { - Protocol::H1 => self.http1.serve_connection(io, service).await?, - Protocol::H2 => self.http2.serve_connection(io, service).await?, - } - - Ok(()) + self.inner.serve_connection(io, service).await } } -enum Protocol { - H1, - H2, -} - #[cfg(test)] mod tests { - use super::Builder; - use crate::rt::tokio_executor::TokioExecutor; + use crate::{rt::tokio_executor::TokioExecutor, server::conn::auto}; use http::{Request, Response}; use http_body::Body; use http_body_util::{BodyExt, Empty, Full}; @@ -319,6 +377,24 @@ mod tests { const BODY: &'static [u8] = b"Hello, world!"; + #[test] + fn configuration() { + // One liner. + auto::Builder::new() + .http1() + .keep_alive(true) + .http2() + .keep_alive_interval(None); + // .serve_connection(io, service); + + // Using variable. + let mut builder = auto::Builder::new(); + + builder.http1().keep_alive(true); + builder.http2().keep_alive_interval(None); + // builder.serve_connection(io, service); + } + #[cfg(not(miri))] #[tokio::test] async fn http1() { @@ -393,7 +469,7 @@ mod tests { loop { let (stream, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { - let _ = Builder::new() + let _ = auto::Builder::new() .serve_connection(stream, service_fn(hello)) .await; }); From a72b9992f10c6131f04102df66ec1be8027e79b3 Mon Sep 17 00:00:00 2001 From: Programatik Date: Thu, 5 Jan 2023 13:44:51 +0300 Subject: [PATCH 07/10] arrange features --- Cargo.toml | 7 ++++--- src/common/rewind.rs | 1 + src/lib.rs | 2 +- src/server/conn/mod.rs | 1 + 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index db7995c..545ee7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = { version = "1.0.0-rc.1", features = ["server", "http1", "http2"] } +hyper = "1.0.0-rc.1" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" @@ -44,8 +44,9 @@ pnet_datalink = "0.27.2" [features] runtime = [] tcp = [] -http1 = [] -http2 = [] +http1 = ["hyper/http1"] +http2 = ["hyper/http2"] +auto = ["hyper/server", "http1", "http2"] # internal features used in CI __internal_happy_eyeballs_tests = [] diff --git a/src/common/rewind.rs b/src/common/rewind.rs index fa0c12d..18d8f58 100644 --- a/src/common/rewind.rs +++ b/src/common/rewind.rs @@ -25,6 +25,7 @@ impl Rewind { } } + #[allow(dead_code)] pub(crate) fn new_buffered(io: T, buf: Bytes) -> Self { Rewind { pre: Some(buf), diff --git a/src/lib.rs b/src/lib.rs index 02299cb..d00f2a4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,5 @@ #![deny(missing_docs)] -#![cfg_attr(docsrs, feature(doc_cfg))] +#![cfg_attr(docsrs, feature(doc_auto_cfg, doc_cfg))] //! hyper utilities pub use crate::error::{GenericError, Result}; diff --git a/src/server/conn/mod.rs b/src/server/conn/mod.rs index df06f68..70057c8 100644 --- a/src/server/conn/mod.rs +++ b/src/server/conn/mod.rs @@ -1,3 +1,4 @@ //! Connection utilities. +#[cfg(feature = "auto")] pub mod auto; From e05b091a93e1825aa480adb92fe298ef320aa696 Mon Sep 17 00:00:00 2001 From: Programatik Date: Tue, 24 Jan 2023 12:22:04 +0300 Subject: [PATCH 08/10] update dependencies --- Cargo.toml | 8 ++++---- src/server/conn/auto.rs | 32 ++++++++++++++++---------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 545ee7c..7ee129c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,11 +15,11 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = "1.0.0-rc.1" +hyper = "1.0.0-rc.2" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" -http-body = "1.0.0-rc1" +http-body = "1.0.0-rc.2" bytes = "1" # Necessary to overcome msrv check of rust 1.49, as 1.15.0 failed @@ -33,8 +33,8 @@ tower-service = "0.3" tower = { version = "0.4", features = ["util"] } [dev-dependencies] -hyper = { version = "1.0.0-rc.1", features = ["full"] } -http-body-util = "0.1.0-rc.1" +hyper = { version = "1.0.0-rc.2", features = ["full"] } +http-body-util = "0.1.0-rc.2" tokio = { version = "1", features = ["macros", "test-util"] } tokio-test = "0.4" diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index d98afd6..3c72f54 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -105,7 +105,7 @@ impl Http1Builder<'_> { /// /// Default is `false`. pub fn half_close(&mut self, val: bool) -> &mut Self { - self.inner.http1.http1_half_close(val); + self.inner.http1.half_close(val); self } @@ -113,7 +113,7 @@ impl Http1Builder<'_> { /// /// Default is true. pub fn keep_alive(&mut self, val: bool) -> &mut Self { - self.inner.http1.http1_keep_alive(val); + self.inner.http1.keep_alive(val); self } @@ -124,7 +124,7 @@ impl Http1Builder<'_> { /// /// Default is false. pub fn title_case_headers(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.http1_title_case_headers(enabled); + self.inner.http1.title_case_headers(enabled); self } @@ -142,7 +142,7 @@ impl Http1Builder<'_> { /// /// Default is false. pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Self { - self.inner.http1.http1_preserve_header_case(enabled); + self.inner.http1.preserve_header_case(enabled); self } @@ -151,7 +151,7 @@ impl Http1Builder<'_> { /// /// Default is None. pub fn header_read_timeout(&mut self, read_timeout: Duration) -> &mut Self { - self.inner.http1.http1_header_read_timeout(read_timeout); + self.inner.http1.header_read_timeout(read_timeout); self } @@ -168,7 +168,7 @@ impl Http1Builder<'_> { /// Default is `auto`. In this mode hyper will try to guess which /// mode to use pub fn writev(&mut self, val: bool) -> &mut Self { - self.inner.http1.http1_writev(val); + self.inner.http1.writev(val); self } @@ -240,7 +240,7 @@ impl Http2Builder<'_> { /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE pub fn initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.http2_initial_stream_window_size(sz); + self.inner.http2.initial_stream_window_size(sz); self } @@ -250,7 +250,7 @@ impl Http2Builder<'_> { /// /// If not set, hyper will use a default. pub fn initial_connection_window_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.http2_initial_connection_window_size(sz); + self.inner.http2.initial_connection_window_size(sz); self } @@ -260,7 +260,7 @@ impl Http2Builder<'_> { /// `http2_initial_stream_window_size` and /// `http2_initial_connection_window_size`. pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self { - self.inner.http2.http2_adaptive_window(enabled); + self.inner.http2.adaptive_window(enabled); self } @@ -270,7 +270,7 @@ impl Http2Builder<'_> { /// /// If not set, hyper will use a default. pub fn max_frame_size(&mut self, sz: impl Into>) -> &mut Self { - self.inner.http2.http2_max_frame_size(sz); + self.inner.http2.max_frame_size(sz); self } @@ -281,7 +281,7 @@ impl Http2Builder<'_> { /// /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS pub fn max_concurrent_streams(&mut self, max: impl Into>) -> &mut Self { - self.inner.http2.http2_max_concurrent_streams(max); + self.inner.http2.max_concurrent_streams(max); self } @@ -295,7 +295,7 @@ impl Http2Builder<'_> { /// # Cargo Feature /// pub fn keep_alive_interval(&mut self, interval: impl Into>) -> &mut Self { - self.inner.http2.http2_keep_alive_interval(interval); + self.inner.http2.keep_alive_interval(interval); self } @@ -309,7 +309,7 @@ impl Http2Builder<'_> { /// # Cargo Feature /// pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { - self.inner.http2.http2_keep_alive_timeout(timeout); + self.inner.http2.keep_alive_timeout(timeout); self } @@ -321,7 +321,7 @@ impl Http2Builder<'_> { /// /// The value must be no larger than `u32::MAX`. pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self { - self.inner.http2.http2_max_send_buf_size(max); + self.inner.http2.max_send_buf_size(max); self } @@ -329,7 +329,7 @@ impl Http2Builder<'_> { /// /// [extended CONNECT protocol]: https://datatracker.ietf.org/doc/html/rfc8441#section-4 pub fn enable_connect_protocol(&mut self) -> &mut Self { - self.inner.http2.http2_enable_connect_protocol(); + self.inner.http2.enable_connect_protocol(); self } @@ -337,7 +337,7 @@ impl Http2Builder<'_> { /// /// Default is currently ~16MB, but may change. pub fn max_header_list_size(&mut self, max: u32) -> &mut Self { - self.inner.http2.http2_max_header_list_size(max); + self.inner.http2.max_header_list_size(max); self } From 3bb1aa59e70c940fb71629038c3fa0b3f16401a8 Mon Sep 17 00:00:00 2001 From: Programatik Date: Sat, 25 Feb 2023 12:51:28 +0300 Subject: [PATCH 09/10] allow passing custom executor --- Cargo.toml | 2 +- src/server/conn/auto.rs | 60 ++++++++++++++++++++++++++--------------- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 7ee129c..2d333e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = "1.0.0-rc.2" +hyper = "1.0.0-rc.3" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index 3c72f54..44279c4 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -1,12 +1,12 @@ //! Http1 or Http2 connection. -use crate::{common::rewind::Rewind, rt::tokio_executor::TokioExecutor, Result}; +use crate::{common::rewind::Rewind, Result}; use bytes::Bytes; use http::{Request, Response}; use http_body::Body; use hyper::{ body::Incoming, - rt::Timer, + rt::{bounds::Http2ConnExec, Timer}, server::conn::{http1, http2}, service::Service, }; @@ -16,27 +16,41 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; const H2_PREFACE: &[u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"; /// Http1 or Http2 connection builder. -pub struct Builder { +pub struct Builder { http1: http1::Builder, - http2: http2::Builder, + http2: http2::Builder, } -impl Builder { +impl Builder { /// Create a new auto connection builder. - pub fn new() -> Self { + /// + /// `executor` parameter should be a type that implements + /// [`Executor`](hyper::rt::Executor) trait. + /// + /// # Example + /// + /// ``` + /// use hyper_util::{ + /// rt::tokio_executor::TokioExecutor, + /// server::conn::auto, + /// }; + /// + /// auto::Builder::new(TokioExecutor::new()); + /// ``` + pub fn new(executor: E) -> Self { Self { http1: http1::Builder::new(), - http2: http2::Builder::new(TokioExecutor::new()), + http2: http2::Builder::new(executor), } } /// Http1 configuration. - pub fn http1(&mut self) -> Http1Builder<'_> { + pub fn http1(&mut self) -> Http1Builder<'_, E> { Http1Builder { inner: self } } /// Http2 configuration. - pub fn http2(&mut self) -> Http2Builder<'_> { + pub fn http2(&mut self) -> Http2Builder<'_, E> { Http2Builder { inner: self } } @@ -50,6 +64,7 @@ impl Builder { B::Data: Send, B::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, + E: Http2ConnExec, { enum Protocol { H1, @@ -84,13 +99,13 @@ impl Builder { } /// Http1 part of builder. -pub struct Http1Builder<'a> { - inner: &'a mut Builder, +pub struct Http1Builder<'a, E> { + inner: &'a mut Builder, } -impl Http1Builder<'_> { +impl Http1Builder<'_, E> { /// Http2 configuration. - pub fn http2(&mut self) -> Http2Builder<'_> { + pub fn http2(&mut self) -> Http2Builder<'_, E> { Http2Builder { inner: &mut self.inner, } @@ -213,19 +228,20 @@ impl Http1Builder<'_> { B::Data: Send, B::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, + E: Http2ConnExec, { self.inner.serve_connection(io, service).await } } /// Http2 part of builder. -pub struct Http2Builder<'a> { - inner: &'a mut Builder, +pub struct Http2Builder<'a, E> { + inner: &'a mut Builder, } -impl Http2Builder<'_> { +impl Http2Builder<'_, E> { /// Http1 configuration. - pub fn http1(&mut self) -> Http1Builder<'_> { + pub fn http1(&mut self) -> Http1Builder<'_, E> { Http1Builder { inner: &mut self.inner, } @@ -360,6 +376,7 @@ impl Http2Builder<'_> { B::Data: Send, B::Error: Into>, I: AsyncRead + AsyncWrite + Unpin + 'static, + E: Http2ConnExec, { self.inner.serve_connection(io, service).await } @@ -380,7 +397,7 @@ mod tests { #[test] fn configuration() { // One liner. - auto::Builder::new() + auto::Builder::new(TokioExecutor::new()) .http1() .keep_alive(true) .http2() @@ -388,7 +405,7 @@ mod tests { // .serve_connection(io, service); // Using variable. - let mut builder = auto::Builder::new(); + let mut builder = auto::Builder::new(TokioExecutor::new()); builder.http1().keep_alive(true); builder.http2().keep_alive_interval(None); @@ -448,8 +465,7 @@ mod tests { B::Error: Into>, { let stream = TcpStream::connect(addr).await.unwrap(); - let (sender, connection) = client::conn::http2::Builder::new() - .executor(TokioExecutor::new()) + let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new()) .handshake(stream) .await .unwrap(); @@ -469,7 +485,7 @@ mod tests { loop { let (stream, _) = listener.accept().await.unwrap(); tokio::task::spawn(async move { - let _ = auto::Builder::new() + let _ = auto::Builder::new(TokioExecutor::new()) .serve_connection(stream, service_fn(hello)) .await; }); From 2cda70928c9ceb93ec2d4870fc761cedfc327b6b Mon Sep 17 00:00:00 2001 From: Eray Karatay Date: Fri, 15 Sep 2023 10:40:06 +0300 Subject: [PATCH 10/10] fix merge conflicts --- src/server/conn/auto.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/src/server/conn/auto.rs b/src/server/conn/auto.rs index 005f1db..fe5f525 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto.rs @@ -1,6 +1,6 @@ //! Http1 or Http2 connection. -use crate::common::rewind::Rewind; +use crate::{common::rewind::Rewind, rt::TokioIo}; use bytes::Bytes; use http::{Request, Response}; use http_body::Body; @@ -89,7 +89,7 @@ impl Builder { } }; - let io = Rewind::new_buffered(io, Bytes::from(buf)); + let io = TokioIo::new(Rewind::new_buffered(io, Bytes::from(buf))); match protocol { Protocol::H1 => self.http1.serve_connection(io, service).await?, @@ -386,7 +386,10 @@ impl Http2Builder<'_, E> { #[cfg(test)] mod tests { - use crate::{rt::tokio_executor::TokioExecutor, server::conn::auto}; + use crate::{ + rt::{tokio_executor::TokioExecutor, TokioIo}, + server::conn::auto, + }; use http::{Request, Response}; use http_body::Body; use http_body_util::{BodyExt, Empty, Full}; @@ -394,7 +397,7 @@ mod tests { use std::{convert::Infallible, error::Error as StdError, net::SocketAddr}; use tokio::net::{TcpListener, TcpStream}; - const BODY: &'static [u8] = b"Hello, world!"; + const BODY: &[u8] = b"Hello, world!"; #[test] fn configuration() { @@ -452,7 +455,7 @@ mod tests { B::Data: Send, B::Error: Into>, { - let stream = TcpStream::connect(addr).await.unwrap(); + let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap()); let (sender, connection) = client::conn::http1::handshake(stream).await.unwrap(); tokio::spawn(connection); @@ -462,11 +465,11 @@ mod tests { async fn connect_h2(addr: SocketAddr) -> client::conn::http2::SendRequest where - B: Body + Send + 'static, + B: Body + Unpin + Send + 'static, B::Data: Send, B::Error: Into>, { - let stream = TcpStream::connect(addr).await.unwrap(); + let stream = TokioIo::new(TcpStream::connect(addr).await.unwrap()); let (sender, connection) = client::conn::http2::Builder::new(TokioExecutor::new()) .handshake(stream) .await