From 308239d81c5488010c65e6d5eeccc8fc9d41e33c Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Mon, 17 Jun 2024 14:17:37 +0300 Subject: [PATCH 1/2] feat: adding region field to stats and minor string fields perf optimisations --- Cargo.toml | 4 +-- crates/http-service/src/lib.rs | 23 ++++--------- crates/runtime/Cargo.toml | 4 +-- crates/runtime/src/app.rs | 9 ++--- crates/runtime/src/util/stats.rs | 9 ++--- src/main.rs | 56 +++++++++++++++----------------- 6 files changed, 47 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9834493..44cb446 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["crates/*"] resolver = "2" [workspace.package] -version = "0.3.10" +version = "0.4.0" edition = "2021" publish = false authors = ["FastEdge Development Team"] @@ -21,7 +21,7 @@ wasi-common = { version = "20.0" } wasmtime-wasi-nn = { version = "20.0" } clap = { version = "4", features = ["derive"] } moka = { version = "0.12", features = ["sync"] } -smol_str = "0.2.1" +smol_str = { version = "0.2.1", features = ["serde"] } anyhow = "1.0" diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index e351fbe..485f3f4 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::net::SocketAddr; use std::sync::Arc; @@ -67,7 +66,7 @@ pub struct HttpState { } pub trait ContextHeaders { - fn append_headers(&self) -> impl Iterator, Cow)>; + fn append_headers(&self) -> impl Iterator; } impl Service for HttpService @@ -288,16 +287,6 @@ where } }; - #[cfg(feature = "stats")] - let pop = match request.headers().get("x-cdn-requestor") { - None => { - info!("missing pop info header"); - "unknown" - } - Some(v) => v.to_str().unwrap(), - } - .to_string(); - let start_ = std::time::Instant::now(); let response = match executor.execute(request).await { @@ -316,13 +305,13 @@ where app_id: cfg.app_id, client_id: cfg.client_id, timestamp: timestamp as u32, - app_name: app_name.to_string(), + app_name, status_code: response.status().as_u16() as u32, fail_reason: 0, // TODO: use AppResult billing_plan: cfg.plan.clone(), time_elapsed: time_elapsed.as_micros() as u64, memory_used: memory_used.as_u64(), - pop, + .. Default::default() }; self.context.write_stats(stat_row).await; } @@ -382,13 +371,13 @@ where app_id: cfg.app_id, client_id: cfg.client_id, timestamp: timestamp as u32, - app_name: app_name.to_string(), + app_name, status_code: status_code as u32, fail_reason: fail_reason as u32, billing_plan: cfg.plan.clone(), time_elapsed: time_elapsed.as_micros() as u64, memory_used: 0, - pop, + .. Default::default() }; self.context.write_stats(stat_row).await; } @@ -511,7 +500,7 @@ fn app_res_headers(app_cfg: App) -> HeaderMap { headers } -fn app_req_headers<'a>(geo: impl Iterator, Cow<'a, str>)>) -> HeaderMap { +fn app_req_headers<'a>(geo: impl Iterator) -> HeaderMap { let mut headers = HeaderMap::new(); for (key, value) in geo { trace!("append new request header {}={}", key, value); diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 4185f06..6d20009 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -21,8 +21,8 @@ wasi-common = { workspace = true } wasmtime-wasi-nn = { workspace = true } smol_str = { workspace = true } moka = { workspace = true } -wasmtime-environ = "19.0.2" -wit-component = "0.206.0" +wasmtime-environ = "20.0.2" +wit-component = "0.210.0" tracing = { workspace = true } bytesize = "1" http-backend = { path = "../http-backend" } diff --git a/crates/runtime/src/app.rs b/crates/runtime/src/app.rs index 723df52..b72eb13 100644 --- a/crates/runtime/src/app.rs +++ b/crates/runtime/src/app.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Deserializer}; use std::collections::HashMap; use std::fmt; use std::fmt::Formatter; +use smol_str::SmolStr; #[derive(Debug, Clone, PartialEq, Deserialize)] pub struct App { @@ -11,15 +12,15 @@ pub struct App { pub max_duration: u64, pub mem_limit: usize, #[serde(default)] - pub env: HashMap, + pub env: HashMap, #[serde(default)] - pub rsp_headers: HashMap, + pub rsp_headers: HashMap, #[serde(default)] pub log: Log, #[serde(default)] pub app_id: u64, pub client_id: u64, - pub plan: String, + pub plan: SmolStr, #[serde(default)] pub status: Status, #[serde(default)] @@ -72,7 +73,7 @@ impl<'de> Visitor<'de> for StatusVisitor { 2 => Ok(Status::Disabled), 3 | 4 => Ok(Status::RateLimited), 5 => Ok(Status::Suspended), - _ => Err(E::custom("not in range: [0..4]")), + _ => Err(E::custom("status not in range: [0..5]")), } } } diff --git a/crates/runtime/src/util/stats.rs b/crates/runtime/src/util/stats.rs index 58ca106..a0f1843 100644 --- a/crates/runtime/src/util/stats.rs +++ b/crates/runtime/src/util/stats.rs @@ -4,18 +4,19 @@ use clickhouse::Row; use serde::Serialize; #[cfg(feature = "stats")] -#[derive(Row, Debug, Serialize)] +#[derive(Row, Debug, Serialize, Default)] pub struct StatRow { pub app_id: u64, pub client_id: u64, pub timestamp: u32, - pub app_name: String, + pub app_name: smol_str::SmolStr, pub status_code: u32, pub fail_reason: u32, - pub billing_plan: String, + pub billing_plan: smol_str::SmolStr, pub time_elapsed: u64, pub memory_used: u64, - pub pop: String, + pub pop: smol_str::SmolStr, + pub region: smol_str::SmolStr } #[cfg(not(feature = "stats"))] diff --git a/src/main.rs b/src/main.rs index 188987d..5268b6f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,8 +11,7 @@ use runtime::{ componentize_if_necessary, App, ContextT, ExecutorCache, PreCompiledLoader, Router, WasiVersion, WasmConfig, WasmEngine, }; -use smol_str::SmolStr; -use std::borrow::Cow; +use smol_str::{SmolStr, ToSmolStr}; use std::collections::HashMap; use std::path::PathBuf; use tokio_util::sync::CancellationToken; @@ -44,13 +43,13 @@ struct HttpRunArgs { wasm: PathBuf, /// Environment variable list #[arg(long, value_parser = parse_key_value::< String, String >)] - envs: Option>, + envs: Option>, /// Add header from original request #[arg(long = "propagate-header", num_args = 0..)] propagate_headers: Option>, /// Execution context headers added to request #[arg(long, value_parser = parse_key_value::< String, String >)] - headers: Option>, + headers: Option>, /// Append sample Geo PoP headers #[arg(long, default_value = "false")] geo: bool, @@ -63,8 +62,8 @@ struct HttpRunArgs { } /// Test tool execution context -struct CliContext<'a> { - headers: HashMap, Cow<'a, str>>, +struct CliContext { + headers: HashMap, engine: Engine, app: Option, backend: Backend>, @@ -99,16 +98,15 @@ async fn main() -> anyhow::Result<()> { log: Default::default(), app_id: 0, client_id: 0, - plan: "cli".to_string(), + plan: SmolStr::new("cli"), status: Status::Enabled, debug_until: None, }; - let mut headers: HashMap, Cow> = run + let mut headers: HashMap = run .headers .unwrap_or_default() .into_iter() - .map(|(k, v)| (k.into(), v.into())) .collect(); append_headers(run.geo, &mut headers); @@ -144,32 +142,32 @@ async fn main() -> anyhow::Result<()> { /// Append to request headers: /// * server_name if it is missing /// * Gcore PoP sample Geo headers -fn append_headers(geo: bool, headers: &mut HashMap, Cow>) { +fn append_headers(geo: bool, headers: &mut HashMap) { if !headers .keys() .any(|k| "server_name".eq_ignore_ascii_case(k)) { headers.insert( - Cow::Borrowed("Server_name"), - Cow::Borrowed("test.localhost"), + "Server_name".to_smolstr(), + "test.localhost".to_smolstr(), ); } if geo { - headers.insert(Cow::Borrowed("PoP-Lat"), Cow::Borrowed("49.6113")); - headers.insert(Cow::Borrowed("PoP-Long"), Cow::Borrowed("6.1294")); - headers.insert(Cow::Borrowed("PoP-Reg"), Cow::Borrowed("LU")); - headers.insert(Cow::Borrowed("PoP-City"), Cow::Borrowed("Luxembourg")); - headers.insert(Cow::Borrowed("PoP-Continent"), Cow::Borrowed("EU")); - headers.insert(Cow::Borrowed("PoP-Country-Code"), Cow::Borrowed("AU")); + headers.insert("PoP-Lat".to_smolstr(),"49.6113".to_smolstr()); + headers.insert("PoP-Long".to_smolstr(), "6.1294".to_smolstr()); + headers.insert("PoP-Reg".to_smolstr(), "LU".to_smolstr()); + headers.insert("PoP-City".to_smolstr(), "Luxembourg".to_smolstr()); + headers.insert("PoP-Continent".to_smolstr(), "EU".to_smolstr()); + headers.insert("PoP-Country-Code".to_smolstr(), "AU".to_smolstr()); headers.insert( - Cow::Borrowed("PoP-Country-Name"), - Cow::Borrowed("Luxembourg"), + "PoP-Country-Name".to_smolstr(), + "Luxembourg".to_smolstr(), ); } } -impl PreCompiledLoader for CliContext<'_> { +impl PreCompiledLoader for CliContext { fn load_component(&self, _id: u64) -> anyhow::Result { let wasm_sample = componentize_if_necessary(&self.wasm_bytes)?; Component::new(&self.engine, wasm_sample) @@ -180,7 +178,7 @@ impl PreCompiledLoader for CliContext<'_> { } } -impl ContextT for CliContext<'_> { +impl ContextT for CliContext { type BackendConnector = HttpsConnector; fn make_logger(&self, _app_name: SmolStr, _wrk: &App) -> Logger { Logger::new(Console::default()) @@ -199,7 +197,7 @@ impl ContextT for CliContext<'_> { } } -impl ExecutorFactory>> for CliContext<'_> { +impl ExecutorFactory>> for CliContext { type Executor = HttpExecutorImpl>; fn get_executor( @@ -211,7 +209,7 @@ impl ExecutorFactory>> for CliContext<'_ let env = app .env .iter() - .collect::>(); + .collect::>(); let logger = self.make_logger(name, app); @@ -233,19 +231,19 @@ impl ExecutorFactory>> for CliContext<'_ } } -impl ExecutorCache for CliContext<'_> { +impl ExecutorCache for CliContext { fn invalidate(&self, _name: &str) { unreachable!() } } -impl ContextHeaders for CliContext<'_> { - fn append_headers(&self) -> impl Iterator, Cow)> { +impl ContextHeaders for CliContext { + fn append_headers(&self) -> impl Iterator { self.headers.iter().map(|(k, v)| (k.clone(), v.clone())) } } -impl Router for CliContext<'_> { +impl Router for CliContext { async fn lookup_by_name(&self, _name: &str) -> Option { self.app.to_owned() } @@ -255,7 +253,7 @@ impl Router for CliContext<'_> { } } -impl StatsWriter for CliContext<'_> { +impl StatsWriter for CliContext { async fn write_stats(&self, _stat: StatRow) {} } From 5567b58309d150e65c5ccbdfbfcaaf8e30179fb6 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Mon, 17 Jun 2024 14:51:53 +0300 Subject: [PATCH 2/2] fix: make tls as optional http-service feature --- crates/http-service/Cargo.toml | 9 +++++---- crates/http-service/src/lib.rs | 33 ++++++++++++++++++++++----------- src/main.rs | 3 +-- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/crates/http-service/Cargo.toml b/crates/http-service/Cargo.toml index 0210089..bb2fb31 100644 --- a/crates/http-service/Cargo.toml +++ b/crates/http-service/Cargo.toml @@ -9,6 +9,7 @@ authors.workspace = true default = [] metrics = ["runtime/metrics"] stats = ["runtime/stats"] +tls = ["tokio-rustls", "rustls-pemfile", "hyper-rustls", "rustls"] [dependencies] anyhow = { workspace = true } @@ -22,10 +23,10 @@ wasmtime-wasi-nn = { workspace = true } wasi-common = { workspace = true } tracing = { workspace = true } smol_str = { workspace = true } -tokio-rustls = "0.24.1" -rustls-pemfile = "1.0.2" -hyper-rustls = "0.24.1" -rustls = "0.21.6" +tokio-rustls = { version = "0.24.1", optional = true} +rustls-pemfile = { version = "1.0.2" , optional = true} +hyper-rustls = { version = "0.24.1", optional = true } +rustls = { version = "0.21.6", optional = true } reactor = { path = "../reactor" } runtime = { path = "../runtime" } http-backend = { path = "../http-backend" } diff --git a/crates/http-service/src/lib.rs b/crates/http-service/src/lib.rs index 485f3f4..dc7233a 100644 --- a/crates/http-service/src/lib.rs +++ b/crates/http-service/src/lib.rs @@ -1,18 +1,16 @@ use std::net::SocketAddr; use std::sync::Arc; -use anyhow::{anyhow, bail, Context, Error, Result}; +use anyhow::{ bail, Context, Error, Result}; use http::header::{ACCESS_CONTROL_ALLOW_ORIGIN, CACHE_CONTROL}; use http::{HeaderMap, HeaderName, HeaderValue, Request, Response, StatusCode}; use hyper::client::connect::Connect; -use hyper::server::conn::{AddrIncoming, AddrStream}; +use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Server}; use smol_str::SmolStr; -use tokio_rustls::rustls; -use tokio_rustls::rustls::ServerConfig; use tokio_util::sync::CancellationToken; -use tracing::{debug, error_span, info, info_span, trace, warn, Instrument}; +use tracing::{debug, info, info_span, trace, warn, Instrument}; use wasi_common::I32Exit; use wasmtime::Trap; use wasmtime_wasi_nn::WasiNnCtx; @@ -29,11 +27,14 @@ use crate::executor::{ HttpExecutor}; use runtime::util::stats::StatRow; use runtime::util::stats::StatsWriter; +#[cfg(feature = "tls")] use crate::tls::{load_certs, load_private_key, TlsAcceptor, TlsStream}; -pub mod executor; +#[cfg(feature = "tls")] mod tls; +pub mod executor; + pub use crate::executor::ExecutorFactory; pub(crate) static TRACEPARENT: &str = "traceparent"; @@ -43,14 +44,18 @@ const FASTEDGE_OUT_OF_MEMORY: u16 = 531; const FASTEDGE_EXECUTION_TIMEOUT: u16 = 532; const FASTEDGE_EXECUTION_PANIC: u16 = 533; +#[cfg(feature = "tls")] +#[derive(Default)] pub struct HttpsConfig { pub ssl_certs: &'static str, pub ssl_pkey: &'static str, } +#[derive(Default)] pub struct HttpConfig { pub all_interfaces: bool, pub port: u16, + #[cfg(feature = "tls")] pub https: Option, pub cancel: CancellationToken, } @@ -99,6 +104,8 @@ where }; let listen_addr = (interface, config.port).into(); + + #[cfg(feature = "tls")] if let Some(https) = config.https { let tls = { // Load public certificate. @@ -110,7 +117,7 @@ where .with_safe_defaults() .with_no_client_auth() .with_single_cert(certs, key) - .map_err(|e| anyhow!(format!("{}", e)))?; + .map_err(|e| anyhow::anyhow!(format!("{}", e)))?; // Configure ALPN to accept HTTP/2, HTTP/1.1 in that order. cfg.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; Arc::new(cfg) @@ -120,6 +127,9 @@ where self.serve(listen_addr).await? }; + #[cfg(not(feature = "tls"))] + self.serve(listen_addr).await?; + Ok(()) } @@ -176,7 +186,8 @@ where Ok(()) } - async fn serve_tls(self, listen_addr: SocketAddr, tls: Arc) -> Result<()> { + #[cfg(feature = "tls")] + async fn serve_tls(self, listen_addr: SocketAddr, tls: Arc) -> Result<()> { let service = Arc::new(self); let make_service = make_service_fn(|_conn: &TlsStream| { let service = service.clone(); @@ -187,7 +198,7 @@ where async move { service .handle_request(req) - .instrument(error_span!("https_handler", request_id)) + .instrument(tracing::error_span!("https_handler", request_id)) .await } }); @@ -195,7 +206,7 @@ where } }); - let incoming = AddrIncoming::bind(&listen_addr) + let incoming = hyper::server::conn::AddrIncoming::bind(&listen_addr) .with_context(|| format!("Unable to bind on {}", listen_addr))?; info!("Listening on https://{}", listen_addr); Server::builder(TlsAcceptor::new(tls, incoming)) @@ -500,7 +511,7 @@ fn app_res_headers(app_cfg: App) -> HeaderMap { headers } -fn app_req_headers<'a>(geo: impl Iterator) -> HeaderMap { +fn app_req_headers(geo: impl Iterator) -> HeaderMap { let mut headers = HeaderMap::new(); for (key, value) in geo { trace!("append new request header {}={}", key, value); diff --git a/src/main.rs b/src/main.rs index 5268b6f..b0ddd58 100644 --- a/src/main.rs +++ b/src/main.rs @@ -123,8 +123,7 @@ async fn main() -> anyhow::Result<()> { let http = http.run(HttpConfig { all_interfaces: false, port: run.port, - https: None, - cancel: cancel.clone(), + cancel: cancel.clone() }); tokio::select! { _ = http => {