Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Axum example to use axum 0.7 and hyper 1. #154

Merged
merged 2 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions examples/axum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ edition = "2021"
publish = false

[dependencies]
axum = "0.6"
hyper = { version = "0.14", features = ["full"] }
async-stream = "0.3"
axum = "0.7"
hyper = { version = "1", features = ["full"] }
hyper-util = { version = "0.1.2", features = ["full"] }
http-body-util = "0.1"
turmoil = { path = "../.." }
tracing = "0.1"
tracing-subscriber = "0.3"
tokio = "1"
tower = "0.4"
pin-project-lite = "0.2"
114 changes: 67 additions & 47 deletions examples/axum/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use axum::extract::Path;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
use axum::{body::Body, http::Request};
use hyper::server::accept::from_stream;
use hyper::{Client, Server, Uri};
use axum::{body::Body, extract::Path, http::Request, routing::get, Router};
use http_body_util::BodyExt as _;
use hyper_util::{client::legacy::Client, rt::TokioExecutor};
use std::net::{IpAddr, Ipv4Addr};
use tower::make::Shared;
use tracing::{info_span, Instrument};
use turmoil::{net, Builder};

Expand All @@ -29,15 +24,27 @@ fn main() {
sim.host("server", move || {
let router = router.clone();
async move {
Server::builder(from_stream(async_stream::stream! {
let listener = net::TcpListener::bind(addr).await?;
loop {
yield listener.accept().await.map(|(s, _)| s);
let listener = net::TcpListener::bind(addr).await?;
loop {
let (tcp_stream, _remote_addr) = listener.accept().await?;
let tcp_stream = hyper_util::rt::TokioIo::new(tcp_stream);

let hyper_service = hyper_util::service::TowerToHyperService::new(router.clone());

let result = hyper_util::server::conn::auto::Builder::new(
hyper_util::rt::TokioExecutor::new(),
)
.serve_connection_with_upgrades(tcp_stream, hyper_service)
.await;
if result.is_err() {
// This error only appears when the client doesn't send a request and
// terminate the connection.
//
// If client sends one request then terminate connection whenever, it doesn't
// appear.
break;
}
}))
.serve(Shared::new(router))
.await
.unwrap();
}

Ok(())
}
Expand All @@ -47,15 +54,15 @@ fn main() {
sim.client(
"client",
async move {
let client = Client::builder().build(connector::connector());
let client = Client::builder(TokioExecutor::new()).build(connector::connector());

let mut request = Request::new(Body::empty());
*request.uri_mut() = Uri::from_static("http://server:9999/greet/foo");
*request.uri_mut() = hyper::Uri::from_static("http://server:9999/greet/foo");
let res = client.request(request).await?;

let (parts, body) = res.into_parts();
let body = hyper::body::to_bytes(body).await?;
let res = Response::from_parts(parts, body);
let body = body.collect().await?.to_bytes();
let res = hyper::Response::from_parts(parts, body);

tracing::info!("Got response: {:?}", res);

Expand All @@ -68,68 +75,81 @@ fn main() {
}

mod connector {
use std::{future::Future, pin::Pin};

use hyper::{
client::connect::{Connected, Connection},
Uri,
};
use tokio::io::{AsyncRead, AsyncWrite};
use hyper::Uri;
use pin_project_lite::pin_project;
use std::{future::Future, io::Error, pin::Pin};
use tokio::io::AsyncWrite;
use tower::Service;
use turmoil::net::TcpStream;

type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, std::io::Error>> + Send>>;
type Fut = Pin<Box<dyn Future<Output = Result<TurmoilConnection, Error>> + Send>>;

pub fn connector(
) -> impl Service<Uri, Response = TurmoilConnection, Error = std::io::Error, Future = Fut> + Clone
{
) -> impl Service<Uri, Response = TurmoilConnection, Error = Error, Future = Fut> + Clone {
tower::service_fn(|uri: Uri| {
Box::pin(async move {
let conn = TcpStream::connect(uri.authority().unwrap().as_str()).await?;
Ok::<_, std::io::Error>(TurmoilConnection(conn))
Ok::<_, Error>(TurmoilConnection { fut: conn })
}) as Fut
})
}

pub struct TurmoilConnection(turmoil::net::TcpStream);
pin_project! {
pub struct TurmoilConnection{
#[pin]
fut: turmoil::net::TcpStream
}
}

impl AsyncRead for TurmoilConnection {
impl hyper::rt::Read for TurmoilConnection {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
Pin::new(&mut self.0).poll_read(cx, buf)
mut buf: hyper::rt::ReadBufCursor<'_>,
) -> std::task::Poll<Result<(), Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
let result = tokio::io::AsyncRead::poll_read(self.project().fut, cx, &mut tbuf);
match result {
std::task::Poll::Ready(Ok(())) => tbuf.filled().len(),
other => return other,
}
};

unsafe {
buf.advance(n);
}
std::task::Poll::Ready(Ok(()))
}
}

impl AsyncWrite for TurmoilConnection {
impl hyper::rt::Write for TurmoilConnection {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.0).poll_write(cx, buf)
) -> std::task::Poll<Result<usize, Error>> {
Pin::new(&mut self.fut).poll_write(cx, buf)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_flush(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_flush(cx)
}

fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.0).poll_shutdown(cx)
) -> std::task::Poll<Result<(), Error>> {
Pin::new(&mut self.fut).poll_shutdown(cx)
}
}

impl Connection for TurmoilConnection {
fn connected(&self) -> hyper::client::connect::Connected {
Connected::new()
impl hyper_util::client::legacy::connect::Connection for TurmoilConnection {
fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
hyper_util::client::legacy::connect::Connected::new()
}
}
}
Loading