From 164d926aec059c0620ef43d600b157b833efa7ee Mon Sep 17 00:00:00 2001 From: Anthony Ramine <123095+nox@users.noreply.github.com> Date: Wed, 4 Sep 2024 18:33:21 +0200 Subject: [PATCH] Introduce hyper_util::server::conn::auto::upgrade::downcast (#147) This fixes . --- Cargo.toml | 2 +- src/common/rewind.rs | 4 +- src/server/conn/{auto.rs => auto/mod.rs} | 6 +++ src/server/conn/auto/upgrade.rs | 66 ++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 3 deletions(-) rename src/server/conn/{auto.rs => auto/mod.rs} (99%) create mode 100644 src/server/conn/auto/upgrade.rs diff --git a/Cargo.toml b/Cargo.toml index 7d2ab33..2a4a5df 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ hyper = "1.4.0" futures-util = { version = "0.3.16", default-features = false } http = "1.0" http-body = "1.0.0" -bytes = "1" +bytes = "1.7.1" pin-project-lite = "0.2.4" futures-channel = { version = "0.3", optional = true } socket2 = { version = "0.5", optional = true, features = ["all"] } diff --git a/src/common/rewind.rs b/src/common/rewind.rs index c75464e..87e0f64 100644 --- a/src/common/rewind.rs +++ b/src/common/rewind.rs @@ -11,8 +11,8 @@ use std::{ /// Combine a buffer with an IO, rewinding reads to use the buffer. #[derive(Debug)] pub(crate) struct Rewind { - pre: Option, - inner: T, + pub(crate) pre: Option, + pub(crate) inner: T, } impl Rewind { diff --git a/src/server/conn/auto.rs b/src/server/conn/auto/mod.rs similarity index 99% rename from src/server/conn/auto.rs rename to src/server/conn/auto/mod.rs index 1351a80..afe5a39 100644 --- a/src/server/conn/auto.rs +++ b/src/server/conn/auto/mod.rs @@ -1,5 +1,7 @@ //! Http1 or Http2 connection. +pub mod upgrade; + use futures_util::ready; use hyper::service::HttpService; use std::future::Future; @@ -163,6 +165,10 @@ impl Builder { /// Bind a connection together with a [`Service`], with the ability to /// handle HTTP upgrades. This requires that the IO object implements /// `Send`. + /// + /// Note that if you ever want to use [`hyper::upgrade::Upgraded::downcast`] + /// with this crate, you'll need to use [`hyper_util::server::conn::auto::upgrade::downcast`] + /// instead. See the documentation of the latter to understand why. pub fn serve_connection_with_upgrades( &self, io: I, diff --git a/src/server/conn/auto/upgrade.rs b/src/server/conn/auto/upgrade.rs new file mode 100644 index 0000000..f4063bc --- /dev/null +++ b/src/server/conn/auto/upgrade.rs @@ -0,0 +1,66 @@ +//! Upgrade utilities. + +use bytes::{Bytes, BytesMut}; +use hyper::{ + rt::{Read, Write}, + upgrade::Upgraded, +}; + +use crate::common::rewind::Rewind; + +/// Tries to downcast the internal trait object to the type passed. +/// +/// On success, returns the downcasted parts. On error, returns the Upgraded back. +/// This is a kludge to work around the fact that the machinery provided by +/// [`hyper_util::server::con::auto`] wraps the inner `T` with a private type +/// that is not reachable from outside the crate. +/// +/// This kludge will be removed when this machinery is added back to the main +/// `hyper` code. +pub fn downcast(upgraded: Upgraded) -> Result, Upgraded> +where + T: Read + Write + Unpin + 'static, +{ + let hyper::upgrade::Parts { + io: rewind, + mut read_buf, + .. + } = upgraded.downcast::>()?; + + if let Some(pre) = rewind.pre { + read_buf = if read_buf.is_empty() { + pre + } else { + let mut buf = BytesMut::from(read_buf); + + buf.extend_from_slice(&pre); + + buf.freeze() + }; + } + + Ok(Parts { + io: rewind.inner, + read_buf, + }) +} + +/// The deconstructed parts of an [`Upgraded`] type. +/// +/// Includes the original IO type, and a read buffer of bytes that the +/// HTTP state machine may have already read before completing an upgrade. +#[derive(Debug)] +#[non_exhaustive] +pub struct Parts { + /// The original IO object used before the upgrade. + pub io: T, + /// A buffer of bytes that have been read but not processed as HTTP. + /// + /// For instance, if the `Connection` is used for an HTTP upgrade request, + /// it is possible the server sent back the first bytes of the new protocol + /// along with the response upgrade. + /// + /// You will want to check for any existing bytes if you plan to continue + /// communicating on the IO object. + pub read_buf: Bytes, +}