diff --git a/Cargo.lock b/Cargo.lock index b918a67b..5aea89bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -764,6 +764,7 @@ dependencies = [ "deterministic-wasi-ctx", "is-terminal", "predicates", + "rmp-serde", "rust-embed", "serde", "serde_json", @@ -1460,6 +1461,28 @@ version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08c74e62047bb2de4ff487b251e4a92e24f48745648451635cec7d591162d9f" +[[package]] +name = "rmp" +version = "0.8.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f9860a6cc38ed1da53456442089b4dfa35e7cedaa326df63017af88385e6b20" +dependencies = [ + "byteorder", + "num-traits", + "paste", +] + +[[package]] +name = "rmp-serde" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + [[package]] name = "rust-embed" version = "8.3.0" diff --git a/Cargo.toml b/Cargo.toml index 39f5660e..5e98b513 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ serde_json = "1.0" colored = "2.1" serde = "1.0" rust-embed = "8.3.0" +rmp-serde = "1.1" is-terminal = "0.4.12" wasmprof = "0.3.0" diff --git a/src/engine.rs b/src/engine.rs index 5dceac7d..5235f0a1 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -51,12 +51,22 @@ fn import_modules( }); } -pub fn run( - function_path: PathBuf, - input: Vec, - export: &str, - profile_opts: Option<&ProfileOpts>, -) -> Result { +#[derive(Default)] +pub struct FunctionRunParams<'a> { + pub function_path: PathBuf, + pub input: Vec, + pub export: &'a str, + pub profile_opts: Option<&'a ProfileOpts>, +} + +pub fn run(params: FunctionRunParams) -> Result { + let FunctionRunParams { + function_path, + input, + export, + profile_opts, + } = params; + let engine = Engine::new( Config::new() .wasm_multi_memory(true) @@ -141,8 +151,7 @@ pub fn run( .try_into_inner() .expect("Log stream reference still exists"); - logs.append(error_logs.as_bytes()) - .expect("Couldn't append error logs"); + logs.append(error_logs.as_bytes()); let raw_output = output_stream .try_into_inner() @@ -189,24 +198,24 @@ mod tests { #[test] fn test_js_function() { let input = include_bytes!("../tests/fixtures/input/js_function_input.json").to_vec(); - let function_run_result = run( - Path::new("tests/fixtures/build/js_function.wasm").to_path_buf(), + let function_run_result = run(FunctionRunParams { + function_path: Path::new("tests/fixtures/build/js_function.wasm").to_path_buf(), input, - DEFAULT_EXPORT, - None, - ); + export: DEFAULT_EXPORT, + ..Default::default() + }); assert!(function_run_result.is_ok()); } #[test] fn test_exit_code_zero() { - let function_run_result = run( - Path::new("tests/fixtures/build/exit_code.wasm").to_path_buf(), - json!({ "code": 0 }).to_string().into(), - DEFAULT_EXPORT, - None, - ) + let function_run_result = run(FunctionRunParams { + function_path: Path::new("tests/fixtures/build/exit_code.wasm").to_path_buf(), + input: json!({ "code": 0 }).to_string().into(), + export: DEFAULT_EXPORT, + ..Default::default() + }) .unwrap(); assert_eq!(function_run_result.logs, ""); @@ -214,12 +223,12 @@ mod tests { #[test] fn test_exit_code_one() { - let function_run_result = run( - Path::new("tests/fixtures/build/exit_code.wasm").to_path_buf(), - json!({ "code": 1 }).to_string().into(), - DEFAULT_EXPORT, - None, - ) + let function_run_result = run(FunctionRunParams { + function_path: Path::new("tests/fixtures/build/exit_code.wasm").to_path_buf(), + input: json!({ "code": 1 }).to_string().into(), + export: DEFAULT_EXPORT, + ..Default::default() + }) .unwrap(); assert_eq!(function_run_result.logs, "module exited with code: 1"); @@ -227,12 +236,12 @@ mod tests { #[test] fn test_linear_memory_usage_in_kb() { - let function_run_result = run( - Path::new("tests/fixtures/build/linear_memory.wasm").to_path_buf(), - "{}".as_bytes().to_vec(), - DEFAULT_EXPORT, - None, - ) + let function_run_result = run(FunctionRunParams { + function_path: Path::new("tests/fixtures/build/linear_memory.wasm").to_path_buf(), + input: "{}".as_bytes().to_vec(), + export: DEFAULT_EXPORT, + ..Default::default() + }) .unwrap(); assert_eq!(function_run_result.memory_usage, 12800); // 200 * 64KiB pages @@ -241,29 +250,35 @@ mod tests { #[test] fn test_logs_truncation() { let input = "{}".as_bytes().to_vec(); - let function_run_result = run( - Path::new("tests/fixtures/build/log_truncation_function.wasm").to_path_buf(), + let function_run_result = run(FunctionRunParams { + function_path: Path::new("tests/fixtures/build/log_truncation_function.wasm") + .to_path_buf(), input, - DEFAULT_EXPORT, - None, - ) + export: DEFAULT_EXPORT, + ..Default::default() + }) .unwrap(); - assert!(function_run_result - .logs - .contains(&"...[TRUNCATED]".red().to_string())); + assert!( + function_run_result.to_string().contains( + &"Logs would be truncated in production, length 6000 > 1000 limit" + .red() + .to_string() + ), + "Expected logs to be truncated, but were: {function_run_result}" + ); } #[test] fn test_file_size_in_kb() { let file_path = Path::new("tests/fixtures/build/exit_code.wasm"); - let function_run_result = run( - file_path.to_path_buf(), - json!({ "code": 0 }).to_string().into(), - DEFAULT_EXPORT, - None, - ) + let function_run_result = run(FunctionRunParams { + function_path: file_path.to_path_buf(), + input: json!({ "code": 0 }).to_string().into(), + export: DEFAULT_EXPORT, + ..Default::default() + }) .unwrap(); assert_eq!( diff --git a/src/function_run_result.rs b/src/function_run_result.rs index 96b3b902..684716be 100644 --- a/src/function_run_result.rs +++ b/src/function_run_result.rs @@ -2,6 +2,8 @@ use colored::Colorize; use serde::{Deserialize, Serialize}; use std::fmt; +const FUNCTION_LOG_LIMIT: usize = 1_000; + #[derive(Serialize, Deserialize, Clone, Debug)] pub struct InvalidOutput { pub error: String, @@ -87,6 +89,17 @@ impl fmt::Display for FunctionRunResult { self.logs )?; + let logs_length = self.logs.len(); + if logs_length > FUNCTION_LOG_LIMIT { + writeln!( + formatter, + "{}\n\n", + &format!( + "Logs would be truncated in production, length {logs_length} > {FUNCTION_LOG_LIMIT} limit", + ).red() + )?; + } + match &self.output { FunctionOutput::JsonOutput(json_output) => { writeln!( diff --git a/src/logs.rs b/src/logs.rs index 8099b475..02db2586 100644 --- a/src/logs.rs +++ b/src/logs.rs @@ -1,25 +1,18 @@ use core::fmt; use std::io; -use colored::Colorize; - -const MAX_BOUNDED_LOG_BYTESIZE: usize = 1000; - #[derive(Debug)] pub struct LogStream { logs: Vec, - capacity: usize, // in bytes current_bytesize: usize, } impl Default for LogStream { fn default() -> Self { - let capacity = MAX_BOUNDED_LOG_BYTESIZE; let logs = Vec::new(); let current_bytesize = 0; Self { logs, - capacity, current_bytesize, } } @@ -36,7 +29,7 @@ impl fmt::Display for LogStream { impl io::Write for LogStream { fn write(&mut self, buf: &[u8]) -> io::Result { - self.append(buf) + Ok(self.append(buf)) } fn flush(&mut self) -> io::Result<()> { @@ -45,46 +38,18 @@ impl io::Write for LogStream { } impl LogStream { - #[must_use] - pub fn with_capacity(capacity: usize) -> Self { - Self { - capacity, - ..Default::default() - } - } - - /// Append a buffer to the log stream and truncates when hitting the capacity. - /// We return the input buffer size regardless of whether we truncated or not to avoid a panic. + /// Append a buffer to the log stream. + /// /// # Arguments /// * `buf` - the buffer to append - /// # Returns - /// * `Ok(usize)` - the number of bytes in the buffer that was passed in - /// * `Err(io::Error)` - if the buffer is empty - /// # Errors - /// * `io::Error` - if the buffer is empty - pub fn append(&mut self, buf: &[u8]) -> io::Result { - if self.current_bytesize > self.capacity { - return Ok(buf.len()); - } - - if buf.is_empty() { - return Ok(0); - } - + pub fn append(&mut self, buf: &[u8]) -> usize { let log = String::from_utf8_lossy(buf); - let (truncated, log) = - truncate_to_char_boundary(&log, self.capacity - self.current_bytesize); - let mut log = log.to_string(); - if truncated { - log.push_str("...[TRUNCATED]".red().to_string().as_str()); - } - let size = log.len(); + let log_length = log.len(); + self.current_bytesize += log_length; + self.logs.push(log.into()); - self.current_bytesize += size; - self.logs.push(log); - - Ok(buf.len()) + log_length } #[must_use] @@ -98,61 +63,25 @@ impl LogStream { } } -// truncate `&str` to length at most equal to `max` -// return `true` if it were truncated, and the new str. -fn truncate_to_char_boundary(s: &str, mut max: usize) -> (bool, &str) { - if max >= s.len() { - (false, s) - } else { - while !s.is_char_boundary(max) { - max -= 1; - } - (true, &s[..max]) - } -} - #[cfg(test)] mod tests { use super::*; #[test] fn test_bounded_log() { - let mut bounded_log = LogStream::with_capacity(15); + let mut bounded_log = LogStream::default(); let log = b"hello world"; - bounded_log.append(log).unwrap(); + bounded_log.append(log); assert_eq!(Some("hello world"), bounded_log.last_message()); } - #[test] - fn test_bounded_log_when_truncated() { - let mut bounded_log = LogStream::with_capacity(10); - let log = b"hello world"; - bounded_log.append(log).unwrap(); - let truncation_message = "...[TRUNCATED]".red().to_string(); - assert_eq!( - Some(format!("hello worl{}", truncation_message).as_str()), - bounded_log.last_message() - ); - } - - #[test] - fn test_bounded_log_when_truncated_nearest_valid_utf8() { - let mut bounded_log = LogStream::with_capacity(15); - bounded_log.append("✌️✌️✌️".as_bytes()).unwrap(); // ✌️ is 6 bytes, ✌ is 3; - let truncation_message = "...[TRUNCATED]".red().to_string(); - assert_eq!( - Some(format!("✌\u{fe0f}✌\u{fe0f}✌{}", truncation_message).as_str()), - bounded_log.last_message() - ); - } - #[test] fn test_display() { - let mut logs = LogStream::with_capacity(10); + let mut logs = LogStream::default(); assert_eq!(String::new(), logs.to_string()); - logs.append(b"hello").unwrap(); - logs.append(b"world").unwrap(); + logs.append(b"hello"); + logs.append(b"world"); assert_eq!("helloworld", logs.to_string()); } diff --git a/src/main.rs b/src/main.rs index 12ca3124..425af2d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,13 +5,24 @@ use std::{ }; use anyhow::{anyhow, Result}; -use clap::Parser; -use function_runner::engine::{run, ProfileOpts}; +use clap::{Parser, ValueEnum}; +use function_runner::engine::{run, FunctionRunParams, ProfileOpts}; use is_terminal::IsTerminal; const PROFILE_DEFAULT_INTERVAL: u32 = 500_000; // every 5us +/// Supported input flavors +#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)] +enum Codec { + /// JSON input, must be valid JSON + Json, + /// Raw input, no validation, passed as-is + Raw, + /// JSON input, will be converted to MessagePack, must be valid JSON + JsonToMessagepack, +} + /// Simple Function runner which takes JSON as a convenience. #[derive(Parser, Debug)] #[clap(version)] @@ -43,9 +54,12 @@ struct Opts { #[clap(long)] profile_out: Option, - #[clap(long)] /// How many samples per seconds. Defaults to 500_000 (every 5us). + #[clap(long)] profile_frequency: Option, + + #[clap(short = 'c', long, value_enum, default_value = "json")] + codec: Codec, } impl Opts { @@ -94,16 +108,29 @@ fn main() -> Result<()> { let mut buffer = Vec::new(); input.read_to_end(&mut buffer)?; - let _ = serde_json::from_slice::(&buffer) - .map_err(|e| anyhow!("Invalid input JSON: {}", e))?; + + let buffer = match opts.codec { + Codec::Json => { + let _ = serde_json::from_slice::(&buffer) + .map_err(|e| anyhow!("Invalid input JSON: {}", e))?; + buffer + } + Codec::Raw => buffer, + Codec::JsonToMessagepack => { + let json: serde_json::Value = serde_json::from_slice(&buffer) + .map_err(|e| anyhow!("Invalid input JSON: {}", e))?; + rmp_serde::to_vec(&json) + .map_err(|e| anyhow!("Couldn't convert JSON to MessagePack: {}", e))? + } + }; let profile_opts = opts.profile_opts(); - let function_run_result = run( - opts.function, - buffer, - opts.export.as_ref(), - profile_opts.as_ref(), - )?; + let function_run_result = run(FunctionRunParams { + function_path: opts.function, + input: buffer, + export: opts.export.as_ref(), + profile_opts: profile_opts.as_ref(), + })?; if opts.json { println!("{}", function_run_result.to_json());