From 031ad9cb9588daec85c447dbd23d34a870eb6228 Mon Sep 17 00:00:00 2001 From: Piotr Sarnacki Date: Sat, 23 Mar 2024 10:31:00 +0100 Subject: [PATCH] Implement single-binary mode Running in a distributed mode is great for bigger tests, but sometimes you just want to test something locally. This commit adds a `run` subcommand that can run a wasm scenario directly: crows run foo.wasm --- Cargo.lock | 1 + README.md | 31 ++ cli/Cargo.toml | 5 +- cli/src/commands/mod.rs | 2 + cli/src/commands/run.rs | 28 ++ cli/src/commands/start.rs | 340 +--------------- cli/src/lib.rs | 1 + cli/src/main.rs | 13 +- cli/src/output.rs | 382 ++++++++++++++++++ coordinator/src/main.rs | 2 +- rust-example/src/lib.rs | 27 +- utils/src/lib.rs | 48 +++ .../src/executors/constant_arrival_rate.rs | 10 +- {worker => wasm}/src/executors/mod.rs | 4 +- wasm/src/lib.rs | 115 +++--- worker/src/main.rs | 44 +- 16 files changed, 608 insertions(+), 445 deletions(-) create mode 100644 cli/src/commands/run.rs create mode 100644 cli/src/lib.rs create mode 100644 cli/src/output.rs rename {worker => wasm}/src/executors/constant_arrival_rate.rs (83%) rename {worker => wasm}/src/executors/mod.rs (92%) diff --git a/Cargo.lock b/Cargo.lock index 2b186b4..b30920e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -607,6 +607,7 @@ dependencies = [ "crossterm", "crows-service", "crows-utils", + "crows-wasm", "futures 0.3.28", "serde", "serde_json", diff --git a/README.md b/README.md index 8be49cf..822b1b3 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,32 @@ Crows is a distributed load and stress testing runner. The tests can be written in any language that can compile to WASM given the bindings for the library are available. At the moment the bindings are available only for Rust, but once the ABIs are stable it should be relatively straightforward to add more languages. +### Showcase + +A sample scenario written in Rust looks like this: + +``` +#[config] +fn config() -> ExecutorConfig { + let config = ConstantArrivalRateConfig { + duration: Duration::from_secs(5), + rate: 10, + allocated_vus: 10, + ..Default::default() + }; + ExecutorConfig::ConstantArrivalRate(config) +} + +#[export_name = "scenario"] +pub fn scenario() { + http_request( + "https://google.com".into(), GET, HashMap::new(), "".into(), + ); +} +``` + +It will send 10 requests per second to google.com. For information on how to compile and run it please go to the [Usage section](#usage) + ### State of the project This project is at a "working proof of concept" stage. It has solid foundation as I put a lot of thought into figuring out the best way to implement it, but there are a lot of details that are either missing or are knowingly implemented in a not optimal/best way. I've started thinking about the project about 3 years ago and I've started the current iteration about 1.5 years ago and I decided I have to finish it as soon as possible or it will end up where most of my personal projects - aiming for perfection, half finished and never released. @@ -38,6 +64,10 @@ Crows is not production ready, so it misses some of the features, but my persona * plugin system (probably based on WASM) to allow extending the tool without necessarily extending the source * a web interface * a nice TUI (Terminal UI) + +### Installing + +... ### Usage @@ -153,6 +183,7 @@ I don't have a very precise plan on where I want Crows to go, but some loose tho 8. For simplicity I use Reqwest HTTP library on the host, which has an internal pool of clients. I haven't given it too much thought yet, but I think that I would prefer to have a greater control over how HTTP connections are handled. For example I think it makes most sense if a scenario uses dedicated connections and is generally not reused between scenarios. It could be nice to allow sharing a connection between scenarios for performance, which is something for example `wrk` does by default between requests, but it should be configurable and in general we should have more granular control on how it works 9. Look closer into serialization formats, both for the communication between components and communication between WASM modules and the host. I would ideally want something with ability do define a schema and make backwards compatible changes to the schema. 10. More performant memory management. For example at the moment sending an HTTP request means serializing the entire request in the WASM module and then deserializing it on the host side. If a stress test requires sending big files it's generally a waste of CPU cycles as we could quite easily serialize only the headers and url, write body to the WASM memory directly and then send the body straight from the memory rather than copy it anywhere. Another example is that most of the time RPC services and clients could use references. +11. At the moment there is only one stage possible to be executed - running the scenario itself. I want to also add some kind of a "prepare" step which would run before the scenario itself and which would prepare the data, but I need to think a bit more on how exactly it should work. ### License diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 055322d..45e7cf2 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -6,8 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -crows-utils = { path = "../utils" } -crows-service = { path = "../service" } clap = { version = "4.4.5", features = ["derive"] } crossterm = "0.27" @@ -19,3 +17,6 @@ tokio-util.workspace = true serde.workspace = true uuid.workspace = true anyhow.workspace = true +crows-utils.workspace = true +crows-service.workspace = true +crows-wasm.workspace = true diff --git a/cli/src/commands/mod.rs b/cli/src/commands/mod.rs index 3d82729..abbd42a 100644 --- a/cli/src/commands/mod.rs +++ b/cli/src/commands/mod.rs @@ -1,3 +1,5 @@ mod start; +mod run; pub use start::start; +pub use run::run; diff --git a/cli/src/commands/run.rs b/cli/src/commands/run.rs new file mode 100644 index 0000000..e078101 --- /dev/null +++ b/cli/src/commands/run.rs @@ -0,0 +1,28 @@ +use std::path::PathBuf; + +use crows_cli::output::{drive_progress, LocalProgressFetcher}; +use crows_utils::services::RunId; +use crows_wasm::{fetch_config, run_scenario}; + +pub async fn run(path: &PathBuf) -> anyhow::Result<()> { + let scenario = std::fs::read(path).unwrap(); + let (runtime, info_handle) = + crows_wasm::Runtime::new(&scenario).expect("Could not create a runtime"); + let (instance, _, mut store) = runtime + .new_instance() + .await + .expect("Could not create an instance"); + let config = fetch_config(instance, &mut store) + .await + .expect("Config not found in the module"); + + run_scenario(runtime, scenario, config).await; + + let mut client = LocalProgressFetcher::new(info_handle, "worker".to_string()); + + drive_progress(&mut client, &RunId::new(), vec!["worker".to_string()]) + .await + .expect("Error while running the scenario"); + + Ok(()) +} diff --git a/cli/src/commands/start.rs b/cli/src/commands/start.rs index a9c0e2f..3a3c5f0 100644 --- a/cli/src/commands/start.rs +++ b/cli/src/commands/start.rs @@ -1,27 +1,8 @@ -use crows_utils::services::{CoordinatorClient, IterationInfo, RequestInfo}; - -use std::collections::HashMap; -use std::io::Stdout; -use std::io::{stdout, Write}; -use std::time::Duration; - -use anyhow::anyhow; -use crossterm::{ - cursor::{self, MoveTo, MoveToNextLine, MoveUp, RestorePosition, SavePosition}, - execute, - style::Print, - terminal::{self, Clear, ClearType, ScrollUp}, -}; - -#[derive(Default)] -struct WorkerState { - active_instances: isize, - capacity: isize, - done: bool, -} +use crows_cli::output::drive_progress; +use crows_utils::services::CoordinatorClient; pub async fn start( - coordinator: &CoordinatorClient, + coordinator: &mut CoordinatorClient, name: &str, workers_number: &usize, ) -> anyhow::Result<()> { @@ -32,319 +13,10 @@ pub async fn start( .unwrap(); worker_names.sort(); - println!("Worker names: {worker_names:?}"); - - let mut stdout = stdout(); - - let progress_lines = worker_names.len() as u16; - - let mut all_request_stats: Vec = Vec::new(); - let mut all_iteration_stats: Vec = Vec::new(); - let mut worker_states: HashMap = HashMap::new(); - let mut bars = HashMap::new(); - - for name in worker_names { - worker_states.insert(name.clone(), Default::default()); - - bars.insert( - name.clone(), - BarData { - worker_name: name.clone(), - left: Duration::from_secs(1), - ..Default::default() - }, - ); - } - - loop { - let mut lines = Vec::new(); - let result = coordinator.get_run_status(run_id.clone()).await.unwrap(); - - if worker_states.values().all(|s| s.done) { - break; - } - - for (worker_name, run_info) in result.unwrap().iter() { - let state = worker_states - .get_mut(worker_name) - .ok_or(anyhow!("Couldn't findt the worker"))?; - state.active_instances += run_info.active_instances_delta; - state.capacity += run_info.capacity_delta; - - all_request_stats.extend(run_info.request_stats.clone()); - all_iteration_stats.extend(run_info.iteration_stats.clone()); - - for log_line in &run_info.stdout { - lines.push(format!( - "[INFO][{worker_name}] {}", - String::from_utf8_lossy(log_line) - )); - } - for log_line in &run_info.stderr { - lines.push(format!( - "[ERROR][{worker_name}] {}", - String::from_utf8_lossy(log_line) - )); - } - - if run_info.done { - state.done = true; - } - - let bar = bars - .get_mut(worker_name) - .ok_or(anyhow!("Couldn't find bar data for worker {worker_name}"))?; - bar.active_vus = state.active_instances as usize; - bar.all_vus = state.capacity as usize; - if let Some(duration) = run_info.elapsed { - bar.duration = duration; - } - if let Some(left) = run_info.left { - bar.left = left; - } - bar.done = state.done; - } - - print(&mut stdout, progress_lines, lines, &bars, false).unwrap(); - tokio::time::sleep(Duration::from_millis(250)).await; - } - - let request_summary = calculate_summary(&all_request_stats); - let iteration_summary = calculate_summary(&all_iteration_stats); - - let mut lines = Vec::new(); - lines.push(format!("\n\nSummary:\n")); - lines.push(format!( - "http_req_duration..........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n", - format_duration(request_summary.avg), - format_duration(request_summary.min), - format_duration(request_summary.med), - format_duration(request_summary.max), - format_duration(request_summary.p90), - format_duration(request_summary.p95) - )); - lines.push(format!( - "http_req_failed............: {:.2}%\t✓ {}\t✗ {}\n", - request_summary.fail_rate, request_summary.success_count, request_summary.fail_count - )); - lines.push(format!( - "http_reqs..................: {}\n", - request_summary.total - )); - lines.push(format!( - "iteration_duration.........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n", - format_duration(iteration_summary.avg), - format_duration(iteration_summary.min), - format_duration(iteration_summary.med), - format_duration(iteration_summary.max), - format_duration(iteration_summary.p90), - format_duration(iteration_summary.p95) - )); - lines.push(format!( - "iterations.................: {}\n", - iteration_summary.total - )); - lines.push(format!("\n\n")); - - print(&mut stdout, progress_lines, lines, &bars, true)?; - - Ok(()) -} - -trait LatencyInfo { - fn latency(&self) -> f64; - fn successful(&self) -> bool; -} - -impl LatencyInfo for RequestInfo { - fn latency(&self) -> f64 { - self.latency.as_secs_f64() - } - - fn successful(&self) -> bool { - self.successful - } -} - -impl LatencyInfo for IterationInfo { - fn latency(&self) -> f64 { - self.latency.as_secs_f64() - } - - fn successful(&self) -> bool { - true - } -} - -fn calculate_summary(latencies: &Vec) -> SummaryStats -where - T: LatencyInfo, -{ - let mut latencies_sorted: Vec = latencies.iter().map(|l| l.latency()).collect(); - latencies_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); - let fail_count = latencies.iter().filter(|l| !l.successful()).count(); - let success_count = latencies.iter().filter(|l| l.successful()).count(); - let fail_rate = fail_count as f64 / latencies.len() as f64; - - SummaryStats { - avg: Duration::from_secs_f64(calculate_avg(&latencies_sorted)), - min: Duration::from_secs_f64(calculate_min(&latencies_sorted)), - max: Duration::from_secs_f64(calculate_max(&latencies_sorted)), - med: Duration::from_secs_f64(calculate_median(&latencies_sorted)), - p90: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 90.0)), - p95: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 95.0)), - total: latencies.len(), - fail_rate, - success_count, - fail_count, - } -} - -#[derive(Default)] -struct SummaryStats { - avg: Duration, - min: Duration, - med: Duration, - max: Duration, - p90: Duration, - p95: Duration, - fail_rate: f64, - success_count: usize, - fail_count: usize, - total: usize, -} - -fn calculate_avg(latencies: &[f64]) -> f64 { - latencies.iter().sum::() / latencies.len() as f64 -} - -fn calculate_min(latencies: &[f64]) -> f64 { - *latencies - .iter() - .min_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap() -} - -fn calculate_max(latencies: &[f64]) -> f64 { - *latencies - .iter() - .max_by(|a, b| a.partial_cmp(b).unwrap()) - .unwrap() -} - -fn calculate_percentile(latencies: &[f64], percentile: f64) -> f64 { - let idx = (percentile / 100.0 * latencies.len() as f64).ceil() as usize - 1; - latencies[idx] -} - -fn calculate_median(latencies: &[f64]) -> f64 { - let mid = latencies.len() / 2; - if latencies.len() % 2 == 0 { - (latencies[mid - 1] + latencies[mid]) / 2.0 - } else { - latencies[mid] - } -} - -fn format_duration(duration: Duration) -> String { - let secs = duration.as_secs(); - let total_millis = duration.as_millis(); - let total_micros = duration.as_micros(); - let nanos = duration.subsec_nanos(); - - if secs > 0 { - format!("{:.2}s", secs as f64 + nanos as f64 / 1_000_000_000.0) - } else if total_millis > 0 { - format!( - "{:.2}ms", - total_millis as f64 + (nanos % 1_000_000) as f64 / 1_000_000.0 - ) - } else if total_micros > 0 { - format!( - "{:.2}µs", - total_micros as f64 + (nanos % 1_000) as f64 / 1_000.0 - ) - } else { - format!("{}ns", nanos) - } -} - -fn print( - stdout: &mut Stdout, - progress_lines: u16, - lines: Vec, - bars: &HashMap, - last: bool, -) -> anyhow::Result<()> { - let (_, height) = terminal::size()?; - - execute!( - stdout, - MoveTo(0, height - progress_lines as u16), - Clear(ClearType::FromCursorDown), - )?; - - execute!(stdout, RestorePosition)?; - - for line in lines { - execute!(stdout, Print(line),)?; - } - - let (_, y) = cursor::position()?; - if y > height - progress_lines { - let n = y - (height - progress_lines); - execute!(stdout, ScrollUp(n), MoveUp(n))?; - } - execute!(stdout, SavePosition)?; - - execute!( - stdout, - MoveTo(0, height - progress_lines as u16 + 1), - Clear(ClearType::FromCursorDown), - MoveUp(1), - )?; - - for (_, bar) in bars { - if bar.done { - execute!( - stdout, - Print(format!("{}: Done", bar.worker_name,)), - MoveToNextLine(1), - )?; - } else { - let progress_percentage = bar.duration.as_secs_f64() - / (bar.duration.as_secs_f64() + bar.left.as_secs_f64()) - * 100 as f64; - execute!( - stdout, - Print(format!( - "{}: [{: <25}] {:.2}% ({}/{})", - bar.worker_name, - "*".repeat((progress_percentage as usize) / 4), - progress_percentage, - bar.active_vus, - bar.all_vus, - )), - MoveToNextLine(1), - )?; - } - } - if last { - execute!(stdout, Print("\n"),)?; - } - - stdout.flush()?; + drive_progress(coordinator, &run_id, worker_names) + .await + .expect("Error while running a scenario"); Ok(()) } - -#[derive(Default)] -struct BarData { - worker_name: String, - active_vus: usize, - all_vus: usize, - duration: Duration, - left: Duration, - done: bool, -} diff --git a/cli/src/lib.rs b/cli/src/lib.rs new file mode 100644 index 0000000..1da76e1 --- /dev/null +++ b/cli/src/lib.rs @@ -0,0 +1 @@ +pub mod output; diff --git a/cli/src/main.rs b/cli/src/main.rs index aa0f91c..5e7e611 100644 --- a/cli/src/main.rs +++ b/cli/src/main.rs @@ -4,6 +4,8 @@ use crows_utils::services::connect_to_coordinator; use crows_utils::services::Client; use clap::{Parser, Subcommand}; +use crows_wasm::fetch_config; +use crows_wasm::run_scenario; mod commands; @@ -34,6 +36,10 @@ enum Commands { #[arg(short, long)] workers_number: usize, }, + Run { + #[arg()] + path: PathBuf, + }, Workers { #[command(subcommand)] command: Option, @@ -50,7 +56,7 @@ enum WorkersCommands { pub async fn main() -> anyhow::Result<()> { let cli = Cli::parse(); let service = ClientService {}; - let coordinator = connect_to_coordinator("127.0.0.1:8282", service) + let mut coordinator = connect_to_coordinator("127.0.0.1:8282", service) .await .unwrap(); @@ -67,7 +73,7 @@ pub async fn main() -> anyhow::Result<()> { name, workers_number, }) => { - commands::start(&coordinator, name, workers_number).await?; + commands::start(&mut coordinator, name, workers_number).await?; } Some(Commands::Workers { command }) => match &command { Some(WorkersCommands::List) => { @@ -83,6 +89,9 @@ pub async fn main() -> anyhow::Result<()> { } None => {} }, + Some(Commands::Run { path }) => { + commands::run(path).await.expect("An error while running a scenario"); + }, None => {} } diff --git a/cli/src/output.rs b/cli/src/output.rs new file mode 100644 index 0000000..6bdd806 --- /dev/null +++ b/cli/src/output.rs @@ -0,0 +1,382 @@ +use crows_utils::services::{CoordinatorClient, IterationInfo, RequestInfo, RunId, RunInfo}; +use crows_utils::{process_info_handle, InfoHandle}; + +use std::collections::HashMap; +use std::io::Stdout; +use std::io::{stdout, Write}; +use std::time::Duration; + +use anyhow::anyhow; +use crossterm::{ + cursor::{self, MoveTo, MoveToNextLine, MoveUp}, + execute, + style::Print, + terminal::{self, Clear, ClearType, ScrollUp}, +}; + +#[derive(Default)] +pub struct SummaryStats { + pub avg: Duration, + pub min: Duration, + pub med: Duration, + pub max: Duration, + pub p90: Duration, + pub p95: Duration, + pub fail_rate: f64, + pub success_count: usize, + pub fail_count: usize, + pub total: usize, +} + +#[derive(Default)] +pub struct WorkerState { + pub active_instances: isize, + pub capacity: isize, + pub done: bool, +} + +#[derive(Default)] +pub struct BarData { + pub worker_name: String, + pub active_vus: usize, + pub all_vus: usize, + pub duration: Duration, + pub left: Duration, + pub done: bool, +} + +pub trait LatencyInfo { + fn latency(&self) -> f64; + fn successful(&self) -> bool; +} + +impl LatencyInfo for RequestInfo { + fn latency(&self) -> f64 { + self.latency.as_secs_f64() + } + + fn successful(&self) -> bool { + self.successful + } +} + +impl LatencyInfo for IterationInfo { + fn latency(&self) -> f64 { + self.latency.as_secs_f64() + } + + fn successful(&self) -> bool { + true + } +} + +pub fn print( + stdout: &mut Stdout, + progress_lines: u16, + lines: Vec, + bars: &HashMap, + last: bool, +) -> anyhow::Result<()> { + let (_, height) = terminal::size()?; + + execute!( + stdout, + MoveTo(0, height - progress_lines as u16), + Clear(ClearType::FromCursorDown), + MoveUp(1), + )?; + + for line in lines { + let (_, y) = cursor::position()?; + execute!(stdout, Print(line))?; + let (_, new_y) = cursor::position()?; + let n = new_y - y; + execute!(stdout, ScrollUp(n), MoveUp(n))?; + } + + execute!( + stdout, + MoveTo(0, height - progress_lines as u16 + 1), + Clear(ClearType::FromCursorDown), + MoveUp(1), + )?; + + for (_, bar) in bars { + if bar.done { + execute!( + stdout, + Print(format!("{}: Done", bar.worker_name,)), + MoveToNextLine(1), + )?; + } else { + let progress_percentage = bar.duration.as_secs_f64() + / (bar.duration.as_secs_f64() + bar.left.as_secs_f64()) + * 100 as f64; + execute!( + stdout, + Print(format!( + "{}: [{: <25}] {:.2}% ({}/{})", + bar.worker_name, + "*".repeat((progress_percentage as usize) / 4), + progress_percentage, + bar.active_vus, + bar.all_vus, + )), + MoveToNextLine(1), + )?; + } + } + if last { + execute!(stdout, Print("\n"),)?; + } + + stdout.flush()?; + + Ok(()) +} + +pub fn format_duration(duration: Duration) -> String { + let secs = duration.as_secs(); + let total_millis = duration.as_millis(); + let total_micros = duration.as_micros(); + let nanos = duration.subsec_nanos(); + + if secs > 0 { + format!("{:.2}s", secs as f64 + nanos as f64 / 1_000_000_000.0) + } else if total_millis > 0 { + format!( + "{:.2}ms", + total_millis as f64 + (nanos % 1_000_000) as f64 / 1_000_000.0 + ) + } else if total_micros > 0 { + format!( + "{:.2}µs", + total_micros as f64 + (nanos % 1_000) as f64 / 1_000.0 + ) + } else { + format!("{}ns", nanos) + } +} + +fn calculate_avg(latencies: &[f64]) -> f64 { + latencies.iter().sum::() / latencies.len() as f64 +} + +fn calculate_min(latencies: &[f64]) -> f64 { + *latencies + .iter() + .min_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap() +} + +fn calculate_max(latencies: &[f64]) -> f64 { + *latencies + .iter() + .max_by(|a, b| a.partial_cmp(b).unwrap()) + .unwrap() +} + +fn calculate_percentile(latencies: &[f64], percentile: f64) -> f64 { + let idx = (percentile / 100.0 * latencies.len() as f64).ceil() as usize - 1; + latencies[idx] +} + +fn calculate_median(latencies: &[f64]) -> f64 { + let mid = latencies.len() / 2; + if latencies.len() % 2 == 0 { + (latencies[mid - 1] + latencies[mid]) / 2.0 + } else { + latencies[mid] + } +} + +pub fn calculate_summary(latencies: &Vec) -> SummaryStats +where + T: LatencyInfo, +{ + let mut latencies_sorted: Vec = latencies.iter().map(|l| l.latency()).collect(); + latencies_sorted.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + let fail_count = latencies.iter().filter(|l| !l.successful()).count(); + let success_count = latencies.iter().filter(|l| l.successful()).count(); + let fail_rate = fail_count as f64 / latencies.len() as f64; + + SummaryStats { + avg: Duration::from_secs_f64(calculate_avg(&latencies_sorted)), + min: Duration::from_secs_f64(calculate_min(&latencies_sorted)), + max: Duration::from_secs_f64(calculate_max(&latencies_sorted)), + med: Duration::from_secs_f64(calculate_median(&latencies_sorted)), + p90: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 90.0)), + p95: Duration::from_secs_f64(calculate_percentile(&latencies_sorted, 95.0)), + total: latencies.len(), + fail_rate, + success_count, + fail_count, + } +} + +pub trait ProgressFetcher { + #[allow(async_fn_in_trait)] + async fn get_run_status( + &mut self, + id: RunId, + ) -> anyhow::Result>>; +} + +impl ProgressFetcher for CoordinatorClient { + async fn get_run_status( + &mut self, + id: RunId, + ) -> anyhow::Result>> { + CoordinatorClient::get_run_status(self, id).await + } +} + +pub struct LocalProgressFetcher { + info_handle: InfoHandle, + worker_name: String, +} + +impl LocalProgressFetcher { + pub fn new(info_handle: InfoHandle, worker_name: String) -> Self { + Self { + info_handle, + worker_name, + } + } +} + +impl ProgressFetcher for LocalProgressFetcher { + async fn get_run_status( + &mut self, + _: RunId, + ) -> anyhow::Result>> { + let run_info = process_info_handle(&mut self.info_handle).await; + Ok(Some(vec![(self.worker_name.clone(), run_info)].into_iter().collect())) + } +} + +pub async fn drive_progress( + client: &mut T, + run_id: &RunId, + worker_names: Vec, +) -> anyhow::Result<()> +where + T: ProgressFetcher, +{ + let mut stdout = stdout(); + + let progress_lines = worker_names.len() as u16; + + let mut all_request_stats: Vec = Vec::new(); + let mut all_iteration_stats: Vec = Vec::new(); + let mut worker_states: HashMap = HashMap::new(); + let mut bars = HashMap::new(); + + for name in worker_names { + worker_states.insert(name.clone(), Default::default()); + + bars.insert( + name.clone(), + BarData { + worker_name: name.clone(), + left: Duration::from_secs(1), + ..Default::default() + }, + ); + } + + loop { + let mut lines = Vec::new(); + let result = client.get_run_status(run_id.clone()).await.unwrap(); + + if worker_states.values().all(|s| s.done) { + break; + } + + for (worker_name, run_info) in result.unwrap().iter() { + let state = worker_states + .get_mut(worker_name) + .ok_or(anyhow!("Couldn't findt the worker"))?; + state.active_instances += run_info.active_instances_delta; + state.capacity += run_info.capacity_delta; + + all_request_stats.extend(run_info.request_stats.clone()); + all_iteration_stats.extend(run_info.iteration_stats.clone()); + + for log_line in &run_info.stdout { + lines.push(format!( + "[INFO][{worker_name}] {}", + String::from_utf8_lossy(log_line) + )); + } + for log_line in &run_info.stderr { + lines.push(format!( + "[ERROR][{worker_name}] {}", + String::from_utf8_lossy(log_line) + )); + } + + if run_info.done { + state.done = true; + } + + let bar = bars + .get_mut(worker_name) + .ok_or(anyhow!("Couldn't find bar data for worker {worker_name}"))?; + bar.active_vus = state.active_instances as usize; + bar.all_vus = state.capacity as usize; + if let Some(duration) = run_info.elapsed { + bar.duration = duration; + } + if let Some(left) = run_info.left { + bar.left = left; + } + bar.done = state.done; + } + + print(&mut stdout, progress_lines, lines, &bars, false).unwrap(); + tokio::time::sleep(Duration::from_millis(250)).await; + } + + let request_summary = calculate_summary(&all_request_stats); + let iteration_summary = calculate_summary(&all_iteration_stats); + + let mut lines = Vec::new(); + lines.push(format!("\n\nSummary:\n")); + lines.push(format!( + "http_req_duration..........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n", + format_duration(request_summary.avg), + format_duration(request_summary.min), + format_duration(request_summary.med), + format_duration(request_summary.max), + format_duration(request_summary.p90), + format_duration(request_summary.p95) + )); + lines.push(format!( + "http_req_failed............: {:.2}%\t✓ {}\t✗ {}\n", + request_summary.fail_rate, request_summary.success_count, request_summary.fail_count + )); + lines.push(format!( + "http_reqs..................: {}\n", + request_summary.total + )); + lines.push(format!( + "iteration_duration.........: avg={}\tmin={}\tmed={}\tmax={}\tp(90)={}\tp(95)={}\n", + format_duration(iteration_summary.avg), + format_duration(iteration_summary.min), + format_duration(iteration_summary.med), + format_duration(iteration_summary.max), + format_duration(iteration_summary.p90), + format_duration(iteration_summary.p95) + )); + lines.push(format!( + "iterations.................: {}\n", + iteration_summary.total + )); + lines.push(format!("\n\n")); + + print(&mut stdout, progress_lines, lines, &bars, true)?; + + Ok(()) +} diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 7ec43f7..7228291 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -94,7 +94,7 @@ impl Coordinator for CoordinatorService { let (runtime, _) = crows_wasm::Runtime::new(&scenario) .map_err(|err| CoordinatorError::FailedToCreateRuntime(err.to_string()))?; - let (instance, _, mut store) = Instance::new(&runtime.environment, &runtime.module) + let (instance, _, mut store) = runtime.new_instance() .await .map_err(|_| CoordinatorError::FailedToCompileModule)?; let config = fetch_config(instance, &mut store) diff --git a/rust-example/src/lib.rs b/rust-example/src/lib.rs index aee0638..90f52a6 100644 --- a/rust-example/src/lib.rs +++ b/rust-example/src/lib.rs @@ -9,35 +9,18 @@ use std::time::Duration; fn config() -> ExecutorConfig { let config = ConstantArrivalRateConfig { duration: Duration::from_secs(5), - rate: 1, - time_unit: Duration::from_secs(1), - allocated_vus: 1, + rate: 10, + allocated_vus: 10, ..Default::default() }; ExecutorConfig::ConstantArrivalRate(config) } #[export_name = "test"] -pub fn test() { +pub fn scenario() { let i: usize = rand::random(); println!("log line from a worker, random number: {i}"); - let response = http_request( - "http://127.0.0.1:8080/".into(), - GET, - HashMap::new(), - "".into(), + http_request( + "https://google.com".into(), GET, HashMap::new(), "".into(), ); - let response = http_request( - "http://127.0.0.1:8080/".into(), - GET, - HashMap::new(), - "".into(), - ); - let response = http_request( - "http://127.0.0.1:8080/".into(), - GET, - HashMap::new(), - "".into(), - ); - let i: usize = rand::random(); } diff --git a/utils/src/lib.rs b/utils/src/lib.rs index 675a21f..870edab 100644 --- a/utils/src/lib.rs +++ b/utils/src/lib.rs @@ -1,12 +1,14 @@ use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use futures::prelude::*; use futures::TryStreamExt; +use services::{RunInfo, RequestInfo, IterationInfo}; use std::future::Future; use tokio::net::{TcpListener, TcpStream, ToSocketAddrs}; use tokio::sync::RwLock; @@ -377,3 +379,49 @@ pub trait Service: Send + Sync { message: Self::Request, ) -> Pin + Send + '_>>; } + +pub async fn process_info_handle(handle: &mut InfoHandle) -> RunInfo { + let mut run_info: RunInfo = Default::default(); + run_info.done = false; + + while let Ok(update) = handle.receiver.try_recv() { + match update { + InfoMessage::Stderr(buf) => run_info.stderr.push(buf), + InfoMessage::Stdout(buf) => run_info.stdout.push(buf), + InfoMessage::RequestInfo(info) => run_info.request_stats.push(info), + InfoMessage::IterationInfo(info) => run_info.iteration_stats.push(info), + InfoMessage::InstanceCheckedOut => run_info.active_instances_delta += 1, + InfoMessage::InstanceReserved => run_info.capacity_delta += 1, + InfoMessage::InstanceCheckedIn => run_info.active_instances_delta -= 1, + InfoMessage::TimingUpdate((elapsed, left)) => { + run_info.elapsed = Some(elapsed); + run_info.left = Some(left); + } + InfoMessage::Done => run_info.done = true, + } + } + + run_info +} + +// TODO: I don't like that name, I think it should be changed +pub enum InfoMessage { + Stderr(Vec), + Stdout(Vec), + RequestInfo(RequestInfo), + IterationInfo(IterationInfo), + // TODO: I'm not sure if shoving any kind of update here is a good idea, + // but at the moment it's the easiest way to pass data back to the client, + // so I'm going with it. I'd like to revisit it in the future, though and + // consider alternatives + InstanceCheckedOut, + InstanceReserved, + InstanceCheckedIn, + // elapsed, left + TimingUpdate((Duration, Duration)), + Done, +} + +pub struct InfoHandle { + pub receiver: UnboundedReceiver, +} diff --git a/worker/src/executors/constant_arrival_rate.rs b/wasm/src/executors/constant_arrival_rate.rs similarity index 83% rename from worker/src/executors/constant_arrival_rate.rs rename to wasm/src/executors/constant_arrival_rate.rs index 897991e..360a2b5 100644 --- a/worker/src/executors/constant_arrival_rate.rs +++ b/wasm/src/executors/constant_arrival_rate.rs @@ -4,7 +4,7 @@ use std::time::{Duration, Instant}; // but I also don't like the idea of having to depend on the worker in WASM // modules use crows_shared::ConstantArrivalRateConfig; -use crows_wasm::{Runtime, InfoMessage}; +use crate::{Runtime, InfoMessage}; use super::Executor; pub struct ConstantArrivalRateExecutor { @@ -41,12 +41,16 @@ impl Executor for ConstantArrivalRateExecutor { // TODO: wait for all of the allocated instances finish, ie. implement // "graceful stop" if instant.elapsed() > self.config.duration { - self.runtime.send_update(InfoMessage::Done); + if let Err(err) = self.runtime.send_update(InfoMessage::Done) { + eprintln!("Got an error when sending an update: {err:?}"); + } return Ok(()); } if last_time_update.elapsed() > update_duration { - self.runtime.send_update(InfoMessage::TimingUpdate((instant.elapsed(), self.config.duration.checked_sub(instant.elapsed()).unwrap()))); + if let Err(err) = self.runtime.send_update(InfoMessage::TimingUpdate((instant.elapsed(), self.config.duration.checked_sub(instant.elapsed()).unwrap()))) { + eprintln!("Got an error when sending an update: {err:?}"); + } last_time_update = Instant::now(); } } diff --git a/worker/src/executors/mod.rs b/wasm/src/executors/mod.rs similarity index 92% rename from worker/src/executors/mod.rs rename to wasm/src/executors/mod.rs index df044d1..96f8003 100644 --- a/worker/src/executors/mod.rs +++ b/wasm/src/executors/mod.rs @@ -1,10 +1,12 @@ mod constant_arrival_rate; use constant_arrival_rate::ConstantArrivalRateExecutor; use crows_shared::Config; -use crows_wasm::Runtime; +use crate::Runtime; pub trait Executor { + #[allow(async_fn_in_trait)] async fn prepare(&mut self) -> anyhow::Result<()>; + #[allow(async_fn_in_trait)] async fn run(&mut self) -> anyhow::Result<()>; } diff --git a/wasm/src/lib.rs b/wasm/src/lib.rs index 9ca5e89..a0a22f0 100644 --- a/wasm/src/lib.rs +++ b/wasm/src/lib.rs @@ -1,6 +1,7 @@ use anyhow::anyhow; use crows_bindings::{HTTPError, HTTPMethod, HTTPRequest, HTTPResponse}; -use crows_utils::services::{RunId, RequestInfo, IterationInfo}; +use crows_utils::{InfoHandle, InfoMessage}; +use crows_utils::services::{IterationInfo, RequestInfo, RunId}; use futures::Future; use reqwest::header::{HeaderName, HeaderValue}; use reqwest::{Body, Request, Url}; @@ -22,6 +23,11 @@ use wasi_common::WasiFile; use wasmtime::{Caller, Engine, Linker, Memory, MemoryType, Module, Store}; use wasmtime_wasi::{StdoutStream, StreamResult}; +pub mod executors; + +use crows_shared::Config; +use executors::Executors; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("the module with a given name couldn't be found")] @@ -55,7 +61,12 @@ impl InstanceHandle { inner.sender.send(RuntimeMessage::RunTest(sender))?; receiver.await?; let latency = instant.elapsed(); - inner.runtime.write().await.info_sender.send(InfoMessage::IterationInfo(IterationInfo { latency }))?; + inner + .runtime + .write() + .await + .info_sender + .send(InfoMessage::IterationInfo(IterationInfo { latency }))?; Ok(()) } } @@ -80,9 +91,11 @@ impl Drop for InstanceHandle { if let Some(inner) = self.inner.take() { tokio::spawn(async move { let mut runtime = inner.runtime.write().await; - runtime.checkin_instance(InstanceHandle { - inner: Some(inner.clone()), - }).await; + runtime + .checkin_instance(InstanceHandle { + inner: Some(inner.clone()), + }) + .await; }); } } @@ -104,21 +117,30 @@ impl Runtime { pub fn new(content: &Vec) -> anyhow::Result<(Self, InfoHandle)> { let environment = Environment::new()?; let module = Module::from_binary(&environment.engine, content)?; - + let (info_sender, info_receiver) = unbounded_channel(); - let info_handle = InfoHandle { receiver: info_receiver }; + let info_handle = InfoHandle { + receiver: info_receiver, + }; + + Ok(( + Self { + module, + environment, + inner: Arc::new(RwLock::new(RuntimeInner { + instances: VecDeque::new(), + info_sender: info_sender.clone(), + })), + info_sender, + length: 0, + }, + info_handle, + )) + } - Ok((Self { - module, - environment, - inner: Arc::new(RwLock::new(RuntimeInner { - instances: VecDeque::new(), - info_sender: info_sender.clone(), - })), - info_sender, - length: 0, - }, info_handle)) + pub async fn new_instance(&self) -> anyhow::Result<(Instance, InfoHandle, Store)> { + Instance::new(&self.environment, &self.module).await } pub async fn reserve_instance(&mut self) -> anyhow::Result<()> { @@ -129,7 +151,8 @@ impl Runtime { }; let handle = InstanceHandle { inner: Some(inner) }; - let (instance, mut info_handle, store) = Instance::new(&self.environment, &self.module).await?; + let (instance, mut info_handle, store) = + Instance::new(&self.environment, &self.module).await?; let info_sender = self.info_sender.clone(); tokio::spawn(async move { @@ -171,7 +194,11 @@ impl Runtime { } pub async fn checkin_instance(&self, instance_handle: InstanceHandle) { - self.inner.write().await.checkin_instance(instance_handle).await; + self.inner + .write() + .await + .checkin_instance(instance_handle) + .await; } pub async fn checkout_or_create_instance(&mut self) -> anyhow::Result { @@ -203,7 +230,7 @@ pub struct WasiHostCtx { memory: Option, buffers: slab::Slab>, client: reqwest::Client, - request_info_sender: UnboundedSender + request_info_sender: UnboundedSender, } fn create_return_value(status: u8, length: u32, ptr: u32) -> u64 { @@ -440,33 +467,14 @@ pub fn get_memory(caller: &mut Caller<'_, T>) -> anyhow::Result { Ok(caller.get_export("memory").unwrap().into_memory().unwrap()) } -// TODO: I don't like that name, I think it should be changed -pub enum InfoMessage { - Stderr(Vec), - Stdout(Vec), - RequestInfo(RequestInfo), - IterationInfo(IterationInfo), - // TODO: I'm not sure if shoving any kind of update here is a good idea, - // but at the moment it's the easiest way to pass data back to the client, - // so I'm going with it. I'd like to revisit it in the future, though and - // consider alternatives - InstanceCheckedOut, - InstanceReserved, - InstanceCheckedIn, - // elapsed, left - TimingUpdate((Duration, Duration)), - Done, -} - -pub struct InfoHandle { - pub receiver: UnboundedReceiver -} - impl Instance { - pub fn new_store(engine: &Engine) -> anyhow::Result<(wasmtime::Store, InfoHandle)> { + pub fn new_store( + engine: &Engine, + ) -> anyhow::Result<(wasmtime::Store, InfoHandle)> { let (stdout_sender, mut stdout_receiver) = tokio::sync::mpsc::unbounded_channel(); let (stderr_sender, mut stderr_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (request_info_sender, mut request_info_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (request_info_sender, mut request_info_receiver) = + tokio::sync::mpsc::unbounded_channel(); let (info_sender, info_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -482,7 +490,7 @@ impl Instance { }); let info_handle = InfoHandle { - receiver: info_receiver + receiver: info_receiver, }; let stdout = RemoteIo { @@ -620,3 +628,20 @@ impl StdoutStream for RemoteIo { impl wasmtime_wasi::Subscribe for RemoteIo { async fn ready(&mut self) {} } + +pub async fn run_scenario( + runtime: Runtime, + scenario: Vec, + config: Config, +) { + let mut executor = Executors::create_executor(config, runtime).await; + + tokio::spawn(async move { + // TODO: prepare should be an entirely separate step and coordinator should wait for + // prepare from all of the workers + executor.prepare().await; + executor.run().await; + }); +} + + diff --git a/worker/src/main.rs b/worker/src/main.rs index 796ee96..0bac01f 100644 --- a/worker/src/main.rs +++ b/worker/src/main.rs @@ -1,5 +1,5 @@ -use crows_wasm::{InfoHandle, Runtime, InfoMessage}; -use executors::Executors; +use crows_utils::{process_info_handle, InfoHandle}; +use crows_wasm::{run_scenario, Runtime}; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use tokio::sync::RwLock; @@ -7,12 +7,10 @@ use tokio::time::sleep; use uuid::Uuid; use crows_utils::services::{ - connect_to_worker_to_coordinator, RunId, Worker, WorkerData, WorkerError, - WorkerToCoordinatorClient, RunInfo, + connect_to_worker_to_coordinator, RunId, RunInfo, Worker, WorkerData, WorkerError, + WorkerToCoordinatorClient, }; -mod executors; - type ScenariosList = Arc>>>; type RunsList = Arc>>; @@ -50,14 +48,7 @@ impl Worker for WorkerService { let (runtime, info_handle) = Runtime::new(&scenario) .map_err(|err| WorkerError::CouldNotCreateRuntime(err.to_string()))?; - let mut executor = Executors::create_executor(config, runtime).await; - - tokio::spawn(async move { - // TODO: prepare should be an entirely separate step and coordinator should wait for - // prepare from all of the workers - executor.prepare().await; - executor.run().await; - }); + run_scenario(runtime, scenario, config).await; self.runs.write().await.insert(id, info_handle); @@ -72,29 +63,12 @@ impl Worker for WorkerService { } async fn get_run_status(&self, _: WorkerToCoordinatorClient, id: RunId) -> RunInfo { - let mut run_info: RunInfo = Default::default(); - run_info.done = false; - if let Some(handle) = self.runs.write().await.get_mut(&id) { - while let Ok(update) = handle.receiver.try_recv() { - match update { - InfoMessage::Stderr(buf) => run_info.stderr.push(buf), - InfoMessage::Stdout(buf) => run_info.stdout.push(buf), - InfoMessage::RequestInfo(info) => run_info.request_stats.push(info), - InfoMessage::IterationInfo(info) => run_info.iteration_stats.push(info), - InfoMessage::InstanceCheckedOut => run_info.active_instances_delta += 1, - InfoMessage::InstanceReserved => run_info.capacity_delta += 1, - InfoMessage::InstanceCheckedIn => run_info.active_instances_delta -= 1, - InfoMessage::TimingUpdate((elapsed, left)) => { - run_info.elapsed = Some(elapsed); - run_info.left = Some(left); - }, - crows_wasm::InfoMessage::Done => run_info.done = true, - } - } + process_info_handle(handle).await + } else { + // TODO: this should really be just None + Default::default() } - - run_info } }