diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d7bd76a..7254244 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -82,6 +82,11 @@ jobs: with: toolchain: ${{ matrix.rust }} + - name: Make sure log v0.4.18 is used + run: | + cargo update + cargo update -p log --precise 0.4.18 + - run: cargo check miri: diff --git a/Cargo.toml b/Cargo.toml index 57c85e8..475f731 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = "=1.0.0-rc.3" +hyper = "=1.0.0-rc.4" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" @@ -31,7 +31,7 @@ tower = { version = "0.4", features = ["make", "util"] } [dev-dependencies] bytes = "1" -http-body-util = "0.1.0-rc.2" +http-body-util = "0.1.0-rc.3" tokio = { version = "1", features = ["macros", "test-util"] } [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] diff --git a/src/client/connect/http.rs b/src/client/connect/http.rs index f5ccb99..089fb16 100644 --- a/src/client/connect/http.rs +++ b/src/client/connect/http.rs @@ -19,6 +19,7 @@ use tracing::{debug, trace, warn}; use super::dns::{self, resolve, GaiResolver, Resolve}; use super::{Connected, Connection}; +use crate::rt::TokioIo; /// A connector for the `http` scheme. /// @@ -335,7 +336,7 @@ where R: Resolve + Clone + Send + Sync + 'static, R::Future: Send, { - type Response = TcpStream; + type Response = TokioIo; type Error = ConnectError; type Future = HttpConnecting; @@ -402,7 +403,7 @@ impl HttpConnector where R: Resolve, { - async fn call_async(&mut self, dst: Uri) -> Result { + async fn call_async(&mut self, dst: Uri) -> Result, ConnectError> { let config = &self.config; let (host, port) = get_host_port(config, &dst)?; @@ -433,14 +434,16 @@ where warn!("tcp set_nodelay error: {}", e); } - Ok(sock) + Ok(TokioIo::new(sock)) } } -impl Connection for TcpStream { +impl Connection for TokioIo { fn connected(&self) -> Connected { let connected = Connected::new(); - if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) { + if let (Ok(remote_addr), Ok(local_addr)) = + (self.inner().peer_addr(), self.inner().local_addr()) + { connected.extra(HttpInfo { remote_addr, local_addr, @@ -478,7 +481,7 @@ pin_project! { } } -type ConnectResult = Result; +type ConnectResult = Result, ConnectError>; type BoxConnecting = Pin + Send>>; impl Future for HttpConnecting { diff --git a/src/client/connect/mod.rs b/src/client/connect/mod.rs index 492f80c..147ff3c 100644 --- a/src/client/connect/mod.rs +++ b/src/client/connect/mod.rs @@ -9,7 +9,7 @@ //! # Connectors //! //! A "connector" is a [`Service`][] that takes a [`Uri`][] destination, and -//! its `Response` is some type implementing [`AsyncRead`][], [`AsyncWrite`][], +//! its `Response` is some type implementing [`Read`][], [`Write`][], //! and [`Connection`][]. //! //! ## Custom Connectors @@ -59,8 +59,8 @@ //! [`HttpConnector`]: HttpConnector //! [`Service`]: tower::Service //! [`Uri`]: ::http::Uri -//! [`AsyncRead`]: tokio::io::AsyncRead -//! [`AsyncWrite`]: tokio::io::AsyncWrite +//! [`Read`]: hyper::rt::Read +//! [`Write`]: hyper::rt::Write //! [`Connection`]: Connection use std::fmt; @@ -248,7 +248,7 @@ pub(super) mod sealed { use std::marker::Unpin; use ::http::Uri; - use tokio::io::{AsyncRead, AsyncWrite}; + use hyper::rt::{Read, Write}; use super::Connection; @@ -272,7 +272,7 @@ pub(super) mod sealed { } pub trait ConnectSvc { - type Connection: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static; + type Connection: Read + Write + Connection + Unpin + Send + 'static; type Error: Into>; type Future: Future> + Unpin + Send + 'static; @@ -284,7 +284,7 @@ pub(super) mod sealed { S: tower_service::Service + Send + 'static, S::Error: Into>, S::Future: Unpin + Send, - T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + T: Read + Write + Connection + Unpin + Send + 'static, { type _Svc = S; @@ -298,7 +298,7 @@ pub(super) mod sealed { S: tower_service::Service + Send + 'static, S::Error: Into>, S::Future: Unpin + Send, - T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + T: Read + Write + Connection + Unpin + Send + 'static, { type Connection = T; type Error = S::Error; @@ -314,7 +314,7 @@ pub(super) mod sealed { S: tower_service::Service + Send, S::Error: Into>, S::Future: Unpin + Send, - T: AsyncRead + AsyncWrite + Connection + Unpin + Send + 'static, + T: Read + Write + Connection + Unpin + Send + 'static, { } diff --git a/src/client/legacy.rs b/src/client/legacy.rs index a1edd59..1d2dae8 100644 --- a/src/client/legacy.rs +++ b/src/client/legacy.rs @@ -37,7 +37,7 @@ pub struct Client { #[cfg(feature = "http1")] h1_builder: hyper::client::conn::http1::Builder, #[cfg(feature = "http2")] - h2_builder: hyper::client::conn::http2::Builder, + h2_builder: hyper::client::conn::http2::Builder, pool: pool::Pool, PoolKey>, } @@ -128,7 +128,7 @@ impl Client<(), ()> { impl Client where C: Connect + Clone + Send + Sync + 'static, - B: Body + Send + 'static, + B: Body + Send + 'static + Unpin, B::Data: Send, B::Error: Into>, { @@ -458,7 +458,8 @@ where fn connect_to( &self, pool_key: PoolKey, - ) -> impl Lazy, PoolKey>, Error>> + Unpin { + ) -> impl Lazy, PoolKey>, Error>> + Send + Unpin + { let executor = self.exec.clone(); let pool = self.pool.clone(); #[cfg(feature = "http1")] @@ -574,7 +575,7 @@ where impl tower_service::Service> for Client where C: Connect + Clone + Send + Sync + 'static, - B: Body + Send + 'static, + B: Body + Send + 'static + Unpin, B::Data: Send, B::Error: Into>, { @@ -594,7 +595,7 @@ where impl tower_service::Service> for &'_ Client where C: Connect + Clone + Send + Sync + 'static, - B: Body + Send + 'static, + B: Body + Send + 'static + Unpin, B::Data: Send, B::Error: Into>, { @@ -955,7 +956,7 @@ pub struct Builder { #[cfg(feature = "http1")] h1_builder: hyper::client::conn::http1::Builder, #[cfg(feature = "http2")] - h2_builder: hyper::client::conn::http2::Builder, + h2_builder: hyper::client::conn::http2::Builder, pool_config: pool::Config, } @@ -965,18 +966,18 @@ impl Builder { where E: hyper::rt::Executor + Send + Sync + Clone + 'static, { - let exec = Exec::new(executor.clone()); + let exec = Exec::new(executor); Self { client_config: Config { retry_canceled_requests: true, set_host: true, ver: Ver::Auto, }, - exec, + exec: exec.clone(), #[cfg(feature = "http1")] h1_builder: hyper::client::conn::http1::Builder::new(), #[cfg(feature = "http2")] - h2_builder: hyper::client::conn::http2::Builder::new(executor), + h2_builder: hyper::client::conn::http2::Builder::new(exec), pool_config: pool::Config { idle_timeout: Some(Duration::from_secs(90)), max_idle_per_host: std::usize::MAX, diff --git a/src/common/exec.rs b/src/common/exec.rs index 1cef09c..2f23dad 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -54,3 +54,12 @@ impl fmt::Debug for Exec { f.debug_struct("Exec").finish() } } + +impl hyper::rt::Executor for Exec +where + F: Future + Send + 'static, +{ + fn execute(&self, fut: F) { + Exec::execute(self, fut); + } +} diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 32f532a..b6c18cd 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -2,5 +2,7 @@ /// Implementation of [`hyper::rt::Executor`] that utilises [`tokio::spawn`]. pub mod tokio_executor; +mod tokio_io; pub use tokio_executor::TokioExecutor; +pub use tokio_io::TokioIo; diff --git a/src/rt/tokio_io.rs b/src/rt/tokio_io.rs new file mode 100644 index 0000000..344c099 --- /dev/null +++ b/src/rt/tokio_io.rs @@ -0,0 +1,163 @@ +#![allow(dead_code)] +//! Tokio IO integration for hyper +use std::{ + pin::Pin, + task::{Context, Poll}, +}; + +use pin_project_lite::pin_project; + +pin_project! { + /// A wrapping implementing hyper IO traits for a type that + /// implements Tokio's IO traits. + #[derive(Debug)] + pub struct TokioIo { + #[pin] + inner: T, + } +} + +impl TokioIo { + /// Wrap a type implementing Tokio's IO traits. + pub fn new(inner: T) -> Self { + Self { inner } + } + + /// Borrow the inner type. + pub fn inner(&self) -> &T { + &self.inner + } + + /// Consume this wrapper and get the inner type. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl hyper::rt::Read for TokioIo +where + T: tokio::io::AsyncRead, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: hyper::rt::ReadBufCursor<'_>, + ) -> Poll> { + let n = unsafe { + let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut()); + match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) { + Poll::Ready(Ok(())) => tbuf.filled().len(), + other => return other, + } + }; + + unsafe { + buf.advance(n); + } + Poll::Ready(Ok(())) + } +} + +impl hyper::rt::Write for TokioIo +where + T: tokio::io::AsyncWrite, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + tokio::io::AsyncWrite::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + tokio::io::AsyncWrite::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + tokio::io::AsyncWrite::poll_write_vectored(self.project().inner, cx, bufs) + } +} + +impl tokio::io::AsyncRead for TokioIo +where + T: hyper::rt::Read, +{ + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + tbuf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + //let init = tbuf.initialized().len(); + let filled = tbuf.filled().len(); + let sub_filled = unsafe { + let mut buf = hyper::rt::ReadBuf::uninit(tbuf.unfilled_mut()); + + match hyper::rt::Read::poll_read(self.project().inner, cx, buf.unfilled()) { + Poll::Ready(Ok(())) => buf.filled().len(), + other => return other, + } + }; + + let n_filled = filled + sub_filled; + // At least sub_filled bytes had to have been initialized. + let n_init = sub_filled; + unsafe { + tbuf.assume_init(n_init); + tbuf.set_filled(n_filled); + } + + Poll::Ready(Ok(())) + } +} + +impl tokio::io::AsyncWrite for TokioIo +where + T: hyper::rt::Write, +{ + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + hyper::rt::Write::poll_write(self.project().inner, cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + hyper::rt::Write::poll_flush(self.project().inner, cx) + } + + fn poll_shutdown( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + hyper::rt::Write::poll_shutdown(self.project().inner, cx) + } + + fn is_write_vectored(&self) -> bool { + hyper::rt::Write::is_write_vectored(&self.inner) + } + + fn poll_write_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + hyper::rt::Write::poll_write_vectored(self.project().inner, cx, bufs) + } +}