Skip to content

Commit

Permalink
feat: Adding cli support for wasi-http
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslanti committed May 22, 2024
1 parent 806d89a commit a2518ee
Show file tree
Hide file tree
Showing 11 changed files with 316 additions and 145 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ authors = ["FastEdge Development Team"]
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["codec"] }
tracing = "0.1"
hyper = { version = "0.14", features = ["full"] }
http = "0.2.9"
hyper = { version = "1.3.1", features = ["full", ] }
http = "1.1.0"
async-trait = "0.1"
wasmtime = { version = "20.0" }
wasmtime-wasi = { version = "20.0" }
Expand All @@ -24,7 +24,6 @@ moka = { version = "0.12", features = ["sync"] }
smol_str = "0.2.1"
anyhow = "1.0"


[workspace.lints.rust]
unused_extern_crates = 'warn'
trivial_numeric_casts = 'warn'
Expand Down Expand Up @@ -57,3 +56,5 @@ runtime = { path = "crates/runtime", default-features = false }
http-service = { path = "crates/http-service" }
http-backend = { path = "crates/http-backend" }
hyper-tls = "0.5.0"
hyper-util = { version = "0.1.3", features = ["client", "client-legacy", "http1", "tokio"] }

3 changes: 3 additions & 0 deletions crates/http-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ anyhow = {workspace = true}
tracing = {workspace = true}
hyper = { workspace = true }
tokio = { workspace = true }
hyper-util = { version = "0.1.3", features = ["client", "client-legacy", "http1", "tokio"] }
http-body-util = "0.1.1"
pin-project = "1.1.3"
log = "0.4.20"
url = "2.5.0"
tower-service = "0.3.2"

[dev-dependencies]
claims = "0.7"
Expand Down
51 changes: 29 additions & 22 deletions crates/http-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ use std::time::Duration;
use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use http::{uri::Scheme, HeaderMap, HeaderValue, Uri};
use hyper::client::connect::Connect;
use hyper::{client::HttpConnector, service::Service, Client};
use http_body_util::Full;
use hyper::body::Bytes;
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use pin_project::pin_project;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tower_service::Service;
use tracing::{debug, trace, warn};

use reactor::gcore::fastedge::http::Headers;
Expand Down Expand Up @@ -47,7 +51,7 @@ pub struct FastEdgeConnector {

#[derive(Clone, Debug)]
pub struct Backend<C> {
client: Client<C>,
client: Client<C, Full<Bytes>>,
uri: Uri,
propagate_headers: HeaderList,
propagate_header_names: Vec<String>,
Expand Down Expand Up @@ -80,7 +84,7 @@ impl Builder {
where
C: Connect + Clone,
{
let client = hyper::Client::builder()
let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.set_host(false)
.pool_idle_timeout(Duration::from_secs(30))
.build(connector);
Expand Down Expand Up @@ -133,7 +137,7 @@ impl<C> Backend<C> {
Ok(())
}

fn make_request(&self, req: Request) -> Result<http::Request<hyper::Body>> {
fn make_request(&self, req: Request) -> Result<http::Request<Full<Bytes>>> {
trace!("strategy: {:?}", self.strategy);
let builder = match self.strategy {
BackendStrategy::Direct => {
Expand Down Expand Up @@ -240,7 +244,7 @@ impl<C> Backend<C> {
};
debug!("request builder: {:?}", builder);
let body = req.body.unwrap_or_default();
builder.body(hyper::Body::from(body)).map_err(Error::msg)
Ok(builder.body(Full::new(Bytes::from(body)))?)
}
}

Expand All @@ -260,9 +264,9 @@ where
let request = self.make_request(req)?;
let res = self.client.request(request).await?;

let status = res.status().as_u16();
let (parts, body) = res.into_parts();
let headers = if !parts.headers.is_empty() {
let _status = res.status().as_u16();
let (parts, _body) = res.into_parts();
let _headers = if !parts.headers.is_empty() {
Some(
parts
.headers
Expand All @@ -280,7 +284,7 @@ where
None
};

let body_bytes = hyper::body::to_bytes(body).await?;
/*let body_bytes = body.into();
let body = Some(body_bytes.to_vec());
trace!(?status, ?headers, len = body_bytes.len(), "reply");
Expand All @@ -289,7 +293,8 @@ where
status,
headers,
body,
}))
}))*/
unimplemented!("send request")
}
}

Expand Down Expand Up @@ -351,13 +356,13 @@ fn canonical_url(original_url: &Uri, canonical_host: &str, backend_path: &str) -
}

fn backend_headers(original_url: &Uri, original_host: String) -> HeaderList {
vec![("Fastedge-Hostname".to_string(), original_host),(
"Fastedge-Scheme".to_string(),
original_url
.scheme_str()
.unwrap_or( "http")
.to_string(),
)]
vec![
("Fastedge-Hostname".to_string(), original_host),
(
"Fastedge-Scheme".to_string(),
original_url.scheme_str().unwrap_or("http").to_string(),
),
]
}

impl FastEdgeConnector {
Expand Down Expand Up @@ -385,7 +390,9 @@ impl Service<Uri> for FastEdgeConnector {
Box::pin(async move {
let conn = connect_fut
.await
.map(|inner| Connection { inner })
.map(|inner| Connection {
inner: inner.into_inner(),
})
.map_err(Box::new)?;
Ok(conn)
})
Expand Down Expand Up @@ -430,9 +437,9 @@ impl AsyncWrite for Connection {
}
}

impl hyper::client::connect::Connection for Connection {
fn connected(&self) -> hyper::client::connect::Connected {
hyper::client::connect::Connected::new()
impl hyper_util::client::legacy::connect::Connection for Connection {
fn connected(&self) -> hyper_util::client::legacy::connect::Connected {
hyper_util::client::legacy::connect::Connected::new()
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/http-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ stats = ["clickhouse"]
anyhow = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
hyper = { workspace = true }
hyper = { version = "1", features = ["server", "http1"] }
http = { workspace = true }
wasmtime = { workspace = true }
wasmtime-wasi = { workspace = true }
Expand All @@ -38,6 +38,9 @@ serde = "1.0"
clickhouse = { version = "0.11.6", optional = true }
chrono = "0.4"
async-trait = "0.1"
wasmtime-wasi-http = "20.0.2"
hyper-util = "0.1.3"
http-body-util = "0.1.1"

[dev-dependencies]
claims = "0.7"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
mod wasi_http;

use std::collections::HashMap;
use std::time::{Duration, Instant};

use anyhow::{anyhow, bail, Context, Error, Result};
use async_trait::async_trait;
use bytesize::ByteSize;
use http::{HeaderMap, HeaderValue, Method, Request, Response};
use http::{HeaderMap, HeaderValue, Method};
use http_backend::Backend;
use hyper::Body;
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use smol_str::SmolStr;
use wasmtime_wasi::StdoutStream;

Expand All @@ -16,13 +19,18 @@ use runtime::{App, InstancePre, WasmEngine};

use crate::HttpState;

pub use wasi_http::WasiHttpExecutorImpl;

pub(crate) static X_REAL_IP: &str = "x-real-ip";
pub(crate) static TRACEPARENT: &str = "traceparent";
pub(crate) static X_CDN_REQUESTOR: &str = "x-cdn-requestor";

#[async_trait]
pub trait HttpExecutor {
async fn execute(&self, req: Request<Body>) -> Result<(Response<Body>, Duration, ByteSize)>;
async fn execute(
&self,
req: hyper::Request<Incoming>,
) -> Result<(hyper::Response<Full<Bytes>>, Duration, ByteSize)>;
}

pub trait ExecutorFactory<C> {
Expand All @@ -38,7 +46,7 @@ pub trait ExecutorFactory<C> {
/// Execute context used by ['HttpService']
#[derive(Clone)]
pub struct HttpExecutorImpl<C> {
instance_pre: InstancePre<HttpState<C>>,
instance_pre: InstancePre<HttpState>,
store_builder: StoreBuilder,
backend: Backend<C>,
}
Expand All @@ -48,7 +56,10 @@ impl<C> HttpExecutor for HttpExecutorImpl<C>
where
C: Clone + Send + Sync + 'static,
{
async fn execute(&self, req: Request<Body>) -> Result<(Response<Body>, Duration, ByteSize)> {
async fn execute(
&self,
req: hyper::Request<Incoming>,
) -> Result<(hyper::Response<Full<Bytes>>, Duration, ByteSize)> {
let start_ = Instant::now();
let response = self.execute_impl(req).await;
let elapsed = Instant::now().duration_since(start_);
Expand All @@ -61,7 +72,7 @@ where
C: Clone + Send + Sync + 'static,
{
pub fn new(
instance_pre: InstancePre<HttpState<C>>,
instance_pre: InstancePre<HttpState>,
store_builder: StoreBuilder,
backend: Backend<C>,
) -> Self {
Expand All @@ -72,7 +83,10 @@ where
}
}

async fn execute_impl(&self, req: Request<Body>) -> Result<(Response<Body>, ByteSize)> {
async fn execute_impl(
&self,
req: hyper::Request<Incoming>,
) -> Result<(hyper::Response<Full<Bytes>>, ByteSize)> {
let (parts, body) = req.into_parts();
let method = to_fastedge_http_method(&parts.method)?;

Expand All @@ -87,7 +101,7 @@ where
})
.collect::<Vec<(String, String)>>();

let body = hyper::body::to_bytes(body).await?;
let body = body.collect().await?.to_bytes();
let body = if body.is_empty() {
None
} else {
Expand All @@ -111,10 +125,7 @@ where
.propagate_headers(&parts.headers)
.context("propagate headers")?;

let state = HttpState {
wasi_nn,
http_backend,
};
let state = HttpState { wasi_nn };

let mut store = store_builder.build(state)?;

Expand Down Expand Up @@ -147,7 +158,7 @@ where
};
let used = ByteSize::b(store.memory_used() as u64);

let body = resp.body.map_or_else(Body::empty, Body::from);
let body = resp.body.map(|b| Full::from(b)).unwrap_or_default();

Check warning on line 161 in crates/http-service/src/executor/mod.rs

View workflow job for this annotation

GitHub Actions / clippy

redundant closure

warning: redundant closure --> crates/http-service/src/executor/mod.rs:161:34 | 161 | let body = resp.body.map(|b| Full::from(b)).unwrap_or_default(); | ^^^^^^^^^^^^^^^^^ help: replace the closure with the function itself: `Full::from` | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#redundant_closure = note: `#[warn(clippy::redundant_closure)]` on by default
builder.body(body).map(|r| (r, used)).map_err(Error::msg)
}

Expand Down
Loading

0 comments on commit a2518ee

Please sign in to comment.