Skip to content

Commit

Permalink
feat: add support for WASI HTTP interface
Browse files Browse the repository at this point in the history
  • Loading branch information
ruslanti committed Jul 29, 2024
1 parent b45bff7 commit 5bcf13b
Show file tree
Hide file tree
Showing 15 changed files with 610 additions and 403 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@ jobs:
with:
token: ${{ secrets.GITHUB_TOKEN }}

- name: Release Build
run: cargo build --release --all-features
- name: Build
run: cargo build --all-features

- name: Unit Tests
run: cargo test --all --exclude http-backend

- name: Run Clippy
run: cargo clippy --all-targets --all-features
Expand Down
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,15 @@ tokio-util = { workspace = true }
wasmtime = { workspace = true }
wasmtime-wasi = { workspace = true }
smol_str = { workspace = true }
async-trait = {workspace = true}
clap = { version = "4.5", features = ["derive"] }
pretty_env_logger = "0.5"
runtime = { path = "crates/runtime", default-features = false }
http-service = { path = "crates/http-service" }
http-backend = { path = "crates/http-backend" }
hyper-tls = "0.6"
hyper-util = { version = "0.1", features = ["client", "client-legacy", "http1", "tokio"] }
http-body-util = "0.1"
shellflip = {workspace = true}
bytesize = "1.3.0"

1 change: 1 addition & 0 deletions crates/http-backend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pin-project = "1.1.3"
log = "0.4.20"
url = "2.5.0"
tower-service = "0.3.2"
smol_str = {workspace = true}

[dev-dependencies]
claims = "0.7"
Expand Down
103 changes: 62 additions & 41 deletions crates/http-backend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,15 @@ use std::time::Duration;

use anyhow::{anyhow, Error, Result};
use async_trait::async_trait;
use http::{uri::Scheme, HeaderMap, HeaderValue, Uri};
use http::{header, uri::Scheme, HeaderMap, HeaderName, Uri};
use http_body_util::{BodyExt, Full};
use hyper::body::Bytes;
use hyper::rt::ReadBufCursor;
use hyper_util::client::legacy::connect::{Connect, HttpConnector};
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use pin_project::pin_project;
use smol_str::{SmolStr, ToSmolStr};
use tokio::net::TcpStream;
use tower_service::Service;
use tracing::{debug, trace, warn};
Expand All @@ -24,9 +25,7 @@ use reactor::gcore::fastedge::{
http_client::Host,
};

const HOST_HEADER_NAME: &str = "host";

type HeaderList = Vec<(String, String)>;
type HeaderNameList = Vec<SmolStr>;

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum BackendStrategy {
Expand All @@ -53,15 +52,15 @@ pub struct FastEdgeConnector {
pub struct Backend<C> {
client: Client<C, Full<Bytes>>,
uri: Uri,
propagate_headers: HeaderList,
propagate_header_names: Vec<String>,
propagate_headers: HeaderMap,
propagate_header_names: HeaderNameList,
max_sub_requests: usize,
strategy: BackendStrategy,
}

pub struct Builder {
uri: Uri,
propagate_header_names: Vec<String>,
propagate_header_names: HeaderNameList,
max_sub_requests: usize,
strategy: BackendStrategy,
}
Expand All @@ -71,7 +70,7 @@ impl Builder {
self.uri = uri;
self
}
pub fn propagate_headers_names(&mut self, propagate: Vec<String>) -> &mut Self {
pub fn propagate_headers_names(&mut self, propagate: HeaderNameList) -> &mut Self {
self.propagate_header_names = propagate;
self
}
Expand All @@ -92,7 +91,7 @@ impl Builder {
Backend {
client,
uri: self.uri.to_owned(),
propagate_headers: vec![],
propagate_headers: HeaderMap::new(),
propagate_header_names: self.propagate_header_names.to_owned(),
max_sub_requests: self.max_sub_requests,
strategy: self.strategy,
Expand All @@ -110,43 +109,57 @@ impl<C> Backend<C> {
}
}

pub fn uri(&self) -> &Uri {
&self.uri
pub fn uri(&self) -> Uri {
self.uri.to_owned()
}

pub fn propagate_header_names(&self) -> Vec<SmolStr> {
self.propagate_header_names.to_owned()
}

/// Propagate filtered headers from original requests
pub fn propagate_headers(&mut self, headers: &HeaderMap<HeaderValue>) -> Result<()> {
pub fn propagate_headers(&mut self, headers: HeaderMap) -> Result<()> {
self.propagate_headers.clear();

if self.strategy == BackendStrategy::FastEdge {
let server_name = headers
.get("Server_Name")
.get("server_name")
.and_then(|v| v.to_str().ok())
.ok_or(anyhow!("header Server_name is missing"))?;
self.propagate_headers
.push(("Host".to_string(), be_base_domain(server_name)));
self.propagate_headers.insert(
HeaderName::from_static("host"),
be_base_domain(server_name).parse()?,
);
}

for header_name in self.propagate_header_names.iter() {
if let Some(value) = headers.get(header_name).and_then(|v| v.to_str().ok()) {
trace!("add original request header: {}={}", header_name, value);
self.propagate_headers
.push((header_name.to_string(), value.to_string()));
let headers = headers.into_iter().filter(|(k, _)| {
if let Some(name) = k {
self.propagate_header_names.contains(&name.to_smolstr())
} else {
false
}
}
});
self.propagate_headers.extend(headers);

Ok(())
}

fn propagate_headers_vec(&self) -> Vec<(String, String)> {
self.propagate_headers
.iter()
.filter_map(|(k, v)| v.to_str().ok().map(|v| (k.to_string(), v.to_string())))
.collect::<Vec<(String, String)>>()
}

fn make_request(&self, req: Request) -> Result<http::Request<Full<Bytes>>> {
trace!("strategy: {:?}", self.strategy);
let builder = match self.strategy {
BackendStrategy::Direct => {
let mut headers = req.headers.into_iter().collect::<Vec<(String, String)>>();
headers.extend(self.propagate_headers.clone());
headers.extend(self.propagate_headers_vec());
// CLI has to set Host header from URL, if it is not set already by the request
if !headers
.iter()
.any(|(k, _)| k.eq_ignore_ascii_case(HOST_HEADER_NAME))
.any(|(k, _)| k.eq_ignore_ascii_case(header::HOST.as_str()))
{
if let Ok(uri) = req.uri.parse::<Uri>() {
if let Some(host) = uri.authority().map(|a| {
Expand All @@ -156,7 +169,7 @@ impl<C> Backend<C> {
a.host().to_string()
}
}) {
headers.push((HOST_HEADER_NAME.to_string(), host))
headers.push((header::HOST.as_str().to_string(), host))
}
}
}
Expand Down Expand Up @@ -220,8 +233,12 @@ impl<C> Backend<C> {
})
.collect::<Vec<(String, String)>>();

headers.extend(backend_headers(&original_url, original_host));
headers.extend(self.propagate_headers.clone());
headers.push(("fastedge-hostname".to_string(), original_host));
headers.push((
"fastedge-scheme".to_string(),
original_url.scheme_str().unwrap_or("http").to_string(),
));
headers.extend(self.propagate_headers_vec());

let host = canonical_host_name(&headers, &original_url)?;
let url = canonical_url(&original_url, &host, self.uri.path())?;
Expand Down Expand Up @@ -354,16 +371,6 @@ fn canonical_url(original_url: &Uri, canonical_host: &str, backend_path: &str) -
.map_err(Error::msg)
}

fn backend_headers(original_url: &Uri, original_host: String) -> HeaderList {
vec![
("Fastedge-Hostname".to_string(), original_host),
(
"Fastedge-Scheme".to_string(),
original_url.scheme_str().unwrap_or("http").to_string(),
),
]
}

impl FastEdgeConnector {
pub fn new(backend: Uri) -> Self {
let mut inner = HttpConnector::new();
Expand Down Expand Up @@ -399,7 +406,11 @@ impl Service<Uri> for FastEdgeConnector {
}

impl hyper::rt::Read for Connection {
fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, mut buf: ReadBufCursor<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
mut buf: ReadBufCursor<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
let n = unsafe {
let mut tbuf = tokio::io::ReadBuf::uninit(buf.as_mut());
match tokio::io::AsyncRead::poll_read(self.project().inner, cx, &mut tbuf) {
Expand All @@ -416,15 +427,25 @@ impl hyper::rt::Read for Connection {
}

impl hyper::rt::Write for Connection {
fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<std::result::Result<usize, std::io::Error>> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<std::result::Result<usize, std::io::Error>> {
tokio::io::AsyncWrite::poll_write(self.project().inner, cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_flush(self.project().inner, cx)
}

fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<std::result::Result<(), std::io::Error>> {
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<std::result::Result<(), std::io::Error>> {
tokio::io::AsyncWrite::poll_shutdown(self.project().inner, cx)
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/http-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ reactor = { path = "../reactor" }
runtime = { path = "../runtime" }
http-backend = { path = "../http-backend" }
nanoid = "0.4"
bytesize = "1.2.0"
bytesize = "1.3.0"
futures = "0.3.30"
once_cell = "1.19"
prometheus = { version = "0.13.3", features = ["process"], optional = true }
Expand All @@ -40,6 +40,7 @@ hyper-util = "0.1"
http-body-util = "0.1"
shellflip = {workspace = true}
bytes = "1.6"
uri = "0.4"

[dev-dependencies]
claims = "0.7"
Expand Down
Loading

0 comments on commit 5bcf13b

Please sign in to comment.