diff --git a/Cargo.lock b/Cargo.lock index 519659919..dc7edf765 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -567,9 +567,9 @@ name = "bench" version = "0.1.2" dependencies = [ "async-trait", + "charming", "clap", "colored", - "csv", "derive-new", "derive_more", "figlet-rs", @@ -579,6 +579,7 @@ dependencies = [ "integration", "nonzero_lit", "serde", + "serde_json", "tokio", "toml", "tracing", @@ -855,6 +856,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "charming" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88f802d7b8011655a1162e04e6e6849bb57baae3c0ea7026c64ba280d80d1d77" +dependencies = [ + "handlebars", + "serde", + "serde_json", +] + [[package]] name = "chrono" version = "0.4.39" @@ -1192,27 +1204,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" -dependencies = [ - "memchr", -] - [[package]] name = "ctor" version = "0.2.9" @@ -1872,6 +1863,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -3306,6 +3311,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" +dependencies = [ + "memchr", + "thiserror 2.0.9", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "816518421cfc6887a0d62bf441b6ffb4536fcc926395a69e1a85852d4363f57e" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d1396fd3a870fc7838768d171b4616d5c91f6cc25e377b673d714567d99377b" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.93", +] + +[[package]] +name = "pest_meta" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e58089ea25d717bfd31fb534e4f3afcc2cc569c70de3e239778991ea3b7dea" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.7" @@ -5163,6 +5213,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "ulid" version = "1.1.3" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 36a0cee32..099572fd6 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -5,9 +5,9 @@ edition = "2021" [dependencies] async-trait = "0.1.83" +charming = "0.4.0" clap = { version = "4.5.23", features = ["derive"] } colored = "2.2.0" -csv = "1.3.1" derive-new = "0.7.0" derive_more = "1.0.0" figlet-rs = "0.1.5" @@ -17,6 +17,7 @@ iggy = { path = "../sdk" } integration = { path = "../integration" } nonzero_lit = "0.1.2" serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.114" tokio = { version = "1.42.0", features = ["full"] } toml = "0.8.19" tracing = { version = "0.1.41" } diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs index 887cdfd07..8c1efdcbe 100644 --- a/bench/src/args/common.rs +++ b/bench/src/args/common.rs @@ -40,12 +40,9 @@ pub struct IggyBenchArgs { #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)] pub skip_server_start: bool, - /// Output directory, in which the benchmark results will be stored as `csv` and `toml` files. - /// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each - /// producer/consumer will have its own file. - /// Actor summary, benchmark summary and parameters will be stored in a TOML file. - #[arg(long, short = 'o', default_value = None)] - pub output_directory: Option, + /// Output directory path for the benchmark results + #[arg(long, short)] + pub output: Option, } fn validate_server_executable_path(v: &str) -> Result { @@ -141,7 +138,7 @@ impl IggyBenchArgs { self.warmup_time } - pub fn output_directory(&self) -> Option { - self.output_directory.clone() + pub fn output(&self) -> Option { + self.output.clone() } } diff --git a/bench/src/args/simple.rs b/bench/src/args/simple.rs index c004ef203..c0eff46fd 100644 --- a/bench/src/args/simple.rs +++ b/bench/src/args/simple.rs @@ -1,6 +1,7 @@ use derive_more::Display; +use serde::Serialize; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)] pub enum BenchmarkKind { #[display("send messages")] Send, diff --git a/bench/src/benchmark_params.rs b/bench/src/benchmark_params.rs index d12be63a8..897e2c792 100644 --- a/bench/src/benchmark_params.rs +++ b/bench/src/benchmark_params.rs @@ -1,36 +1,22 @@ -use std::io::Write; - use crate::args::common::IggyBenchArgs; use iggy::utils::timestamp::IggyTimestamp; use serde::Serialize; #[derive(Debug, Serialize)] pub struct BenchmarkParams { - timestamp_micros: i64, - benchmark_name: String, - transport: String, - messages_per_batch: u32, - message_batches: u32, - message_size: u32, - producers: u32, - consumers: u32, - streams: u32, - partitions: u32, - number_of_consumer_groups: u32, - disable_parallel_consumers: bool, - disable_parallel_producers: bool, -} - -impl BenchmarkParams { - pub fn dump_to_toml(&self, output_directory: &str) { - let output_file = format!("{}/params.toml", output_directory); - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(output_file).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } + pub timestamp_micros: i64, + pub benchmark_name: String, + pub transport: String, + pub messages_per_batch: u32, + pub message_batches: u32, + pub message_size: u32, + pub producers: u32, + pub consumers: u32, + pub streams: u32, + pub partitions: u32, + pub number_of_consumer_groups: u32, + pub disable_parallel_consumers: bool, + pub disable_parallel_producers: bool, } impl From<&IggyBenchArgs> for BenchmarkParams { diff --git a/bench/src/benchmark_result.rs b/bench/src/benchmark_result.rs index a3cf2a6ed..0c3a0d689 100644 --- a/bench/src/benchmark_result.rs +++ b/bench/src/benchmark_result.rs @@ -1,8 +1,14 @@ use crate::args::simple::BenchmarkKind; +use crate::benchmark_params::BenchmarkParams; +use crate::plotting::generate_plots; use crate::statistics::actor_statistics::BenchmarkActorStatistics; use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics; +use crate::statistics::record::BenchmarkRecord; +use serde::Serialize; use std::collections::HashSet; use std::fmt::{Display, Formatter}; +use std::fs; +use std::path; #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkResult { @@ -48,23 +54,80 @@ impl BenchmarkResults { BenchmarkAggregateStatistics::from_actors_statistics(&records) } - pub fn dump_to_toml(&self, output_directory: &str) { - let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send); - if let Some(producer_statics) = producer_statics { - let file_path = format!("{}/producers_summary.toml", output_directory); + pub fn dump_to_json(&self, output_dir: &str, params: BenchmarkParams) { + let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send); - producer_statics.dump_to_toml(&file_path); - } + // Get overall statistics for all producers and consumers + let overall_stats = self.calculate_statistics(|x| { + x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll + }); - let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll); - if let Some(consumer_statics) = consumer_statics { - let file_path = format!("{}/consumers_summary.toml", output_directory); + // Get first producer statistics and raw data + let (first_producer_stats, first_producer_raw_data) = + if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_producer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Send) + { + ( + Some(first_producer.statistics.clone()), + Some(first_producer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; - consumer_statics.dump_to_toml(&file_path); - } + // Get first consumer statistics and raw data + let (first_consumer_stats, first_consumer_raw_data) = + if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_consumer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Poll) + { + ( + Some(first_consumer.statistics.clone()), + Some(first_consumer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; + + let output = BenchmarkOutput { + params, + overall_statistics: overall_stats, + first_producer_statistics: first_producer_stats, + first_consumer_statistics: first_consumer_stats, + first_producer_raw_data, + first_consumer_raw_data, + }; + + // Create the output directory + std::fs::create_dir_all(output_dir).expect("Failed to create output directory"); + + // Write JSON to data.json in the output directory + let json_path = path::Path::new(output_dir).join("data.json"); + let json = serde_json::to_string_pretty(&output).expect("Failed to serialize to JSON"); + fs::write(json_path, json).expect("Failed to write JSON file"); + + // Generate plots in the same directory + generate_plots(&output, output_dir).expect("Failed to generate plots"); } } +#[derive(Debug, Serialize)] +pub struct BenchmarkOutput { + pub params: BenchmarkParams, + pub overall_statistics: Option, + pub first_producer_statistics: Option, + pub first_consumer_statistics: Option, + pub first_producer_raw_data: Option>, + pub first_consumer_raw_data: Option>, +} + impl Display for BenchmarkResults { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if let Ok(test_type) = self.get_test_type() { diff --git a/bench/src/benchmark_runner.rs b/bench/src/benchmark_runner.rs index f476d615f..4713bf480 100644 --- a/bench/src/benchmark_runner.rs +++ b/bench/src/benchmark_runner.rs @@ -53,10 +53,9 @@ impl BenchmarkRunner { benchmark.display_settings(); info!("{results}"); - if let Some(output_directory) = benchmark.args().output_directory() { - results.dump_to_toml(&output_directory); + if let Some(output_file) = benchmark.args().output() { let params = BenchmarkParams::from(benchmark.args()); - params.dump_to_toml(&output_directory); + results.dump_to_json(&output_file, params); } Ok(()) diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs index bbbd3db68..2b1ad6455 100644 --- a/bench/src/benchmarks/consumer_group_benchmark.rs +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark { let message_batches = self.args.message_batches(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); - let output_directory = self.args.output_directory(); for consumer_id in 1..=consumers { let consumer_group_id = start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs index d74be2fc9..da48b837b 100644 --- a/bench/src/benchmarks/poll_benchmark.rs +++ b/bench/src/benchmarks/poll_benchmark.rs @@ -30,7 +30,6 @@ impl Benchmarkable for PollMessagesBenchmark { info!("Creating {} client(s)...", clients_count); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); for client_id in 1..=clients_count { @@ -46,7 +45,6 @@ impl Benchmarkable for PollMessagesBenchmark { false => start_stream_id + 1, }; let warmup_time = args.warmup_time(); - let output_directory = output_directory.clone(); let consumer = Consumer::new( client_factory, @@ -56,7 +54,6 @@ impl Benchmarkable for PollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs index a73f83104..5bf75fbfc 100644 --- a/bench/src/benchmarks/send_and_poll_benchmark.rs +++ b/bench/src/benchmarks/send_and_poll_benchmark.rs @@ -62,7 +62,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let message_size = self.args.message_size(); let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((producers + consumers) as usize)); @@ -71,7 +70,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + producer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let producer = Producer::new( self.client_factory.clone(), producer_id, @@ -81,7 +79,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); @@ -92,7 +89,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + consumer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), consumer_id, @@ -101,7 +97,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs index d16861e92..de29fc875 100644 --- a/bench/src/benchmarks/send_benchmark.rs +++ b/bench/src/benchmarks/send_benchmark.rs @@ -42,7 +42,6 @@ impl Benchmarkable for SendMessagesBenchmark { let args = args.clone(); let start_stream_id = args.start_stream_id(); let client_factory = client_factory.clone(); - let output_directory = args.output_directory.clone(); let parallel_producer_streams = !args.disable_parallel_producer_streams(); let stream_id = match parallel_producer_streams { @@ -59,7 +58,6 @@ impl Benchmarkable for SendMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index d6cb86726..d4f616cee 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -24,7 +24,6 @@ pub struct Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Consumer { @@ -37,7 +36,6 @@ impl Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Self { client_factory, @@ -47,7 +45,6 @@ impl Consumer { messages_per_batch, message_batches, warmup_time, - output_directory, } } @@ -224,32 +221,6 @@ impl Consumer { let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let output_file = format!( - "{}/raw_data/consumer_{}_data.csv", - output_directory, self.consumer_id - ); - info!( - "Consumer #{} → writing the results to {}...", - self.consumer_id, output_file - ); - let mut writer = csv::Writer::from_path(output_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/consumer_{}_summary.toml", - output_directory, self.consumer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_consumer_statistics( self.consumer_id, total_messages, diff --git a/bench/src/main.rs b/bench/src/main.rs index f4ce76de4..1bb201a67 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -5,6 +5,7 @@ mod benchmark_runner; mod benchmarks; mod client_factory; mod consumer; +mod plotting; mod producer; mod server_starter; mod statistics; diff --git a/bench/src/plotting.rs b/bench/src/plotting.rs new file mode 100644 index 000000000..f9cf37bb0 --- /dev/null +++ b/bench/src/plotting.rs @@ -0,0 +1,384 @@ +use crate::statistics::record::BenchmarkRecord; +use crate::{ + benchmark_result::BenchmarkOutput, statistics::actor_statistics::BenchmarkActorStatistics, +}; +use charming::{ + component::{Axis, Grid, Title}, + element::{ + AxisLabel, AxisPointer, AxisPointerType, Formatter, NameLocation, Symbol, Tooltip, Trigger, + }, + series::Line, + Chart, HtmlRenderer, +}; +use std::path::Path; +use tracing::info; + +fn format_throughput_stats(stats: &BenchmarkActorStatistics) -> String { + format!( + "Average throughput: {:.2} msg/s", + stats.throughput_messages_per_second, + ) +} + +fn format_latency_stats(stats: &BenchmarkActorStatistics) -> String { + format!( + "Average: {:.2} ms, Median: {:.2} ms, P99: {:.2} ms, P999: {:.2} ms", + stats.avg_latency_ms, stats.median_latency_ms, stats.p99_latency_ms, stats.p999_latency_ms, + ) +} + +fn format_throughput_mb_stats(stats: &BenchmarkActorStatistics) -> String { + format!( + "Average throughput: {:.2} MB/s", + stats.throughput_megabytes_per_second, + ) +} + +pub fn generate_plots(output: &BenchmarkOutput, output_dir: &str) -> std::io::Result<()> { + let actors_info = match (output.params.producers, output.params.consumers) { + (0, c) => format!("{} consumers", c), + (p, 0) => format!("{} producers", p), + (p, c) => format!("{} producers/{} consumers", p, c), + }; + + let subtext = format!( + "{}, {} msg/batch, {} batches, {} bytes/msg", + actors_info, + output.params.messages_per_batch, + output.params.message_batches, + output.params.message_size + ); + + if let Some(producer_data) = &output.first_producer_raw_data { + if let Some(producer_stats) = &output.first_producer_statistics { + plot_throughput_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_throughput") + .to_string_lossy(), + &subtext, + )?; + plot_throughput_mb_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_throughput_mb") + .to_string_lossy(), + &subtext, + )?; + plot_latency_over_time( + producer_data, + producer_stats, + "Producer", + &Path::new(output_dir) + .join("producer_latency") + .to_string_lossy(), + &subtext, + )?; + } + } + + if let Some(consumer_data) = &output.first_consumer_raw_data { + if let Some(consumer_stats) = &output.first_consumer_statistics { + plot_throughput_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_throughput") + .to_string_lossy(), + &subtext, + )?; + plot_throughput_mb_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_throughput_mb") + .to_string_lossy(), + &subtext, + )?; + plot_latency_over_time( + consumer_data, + consumer_stats, + "Consumer", + &Path::new(output_dir) + .join("consumer_latency") + .to_string_lossy(), + &subtext, + )?; + } + } + + Ok(()) +} + +fn plot_throughput_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + // Calculate throughput per second + let points: Vec<_> = data + .windows(2) + .map(|w| { + let time_diff = (w[1].elapsed_time_us - w[0].elapsed_time_us) as f64 / 1_000_000.0; + let messages_diff = (w[1].messages - w[0].messages) as f64; + let throughput = messages_diff / time_diff; + let time_point = w[1].elapsed_time_us as f64 / 1_000_000.0; + vec![time_point, throughput] + }) + .collect(); + + let title = format!("{} Throughput Over Time", title_prefix); + let stats_text = format_throughput_stats(stats); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Item) + .formatter(Formatter::String(String::from( + "Time: {b} s
Throughput: {c} msg/s", + ))), + ) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .padding(5) + .item_gap(5) + .left("center") + .top(5), + ) + .grid( + Grid::new() + .left("10%") + .right("15%") + .top("10%") + .bottom("10%"), + ) + .x_axis( + Axis::new() + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")), + ) + .y_axis( + Axis::new() + .name("Messages per Second") + .name_location(NameLocation::Middle) + .name_gap(50) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} msg/s")), + ) + .series( + Line::new() + .name("Throughput") + .symbol(Symbol::None) + .data(points), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating throughput HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} + +fn plot_latency_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + let points: Vec<_> = data + .iter() + .map(|r| { + let time_point = r.elapsed_time_us as f64 / 1_000_000.0; // to seconds + let latency = r.latency_us as f64 / 1_000.0; // to milliseconds + vec![time_point, latency] + }) + .collect(); + + let title = format!("{} Latency Over Time", title_prefix); + let stats_text = format_latency_stats(stats); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Item) + .formatter(Formatter::String(String::from( + "Time: {b} s
Latency: {c} ms", + ))), + ) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .padding(5) + .item_gap(5) + .left("center") + .top(5), + ) + .grid( + Grid::new() + .left("10%") + .right("15%") + .top("10%") + .bottom("10%"), + ) + .x_axis( + Axis::new() + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")), + ) + .y_axis( + Axis::new() + .name("Latency (ms)") + .name_location(NameLocation::Middle) + .name_gap(50) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} ms")), + ) + .series( + Line::new() + .name("Latency") + .symbol(Symbol::None) + .data(points), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating latency HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} + +fn plot_throughput_mb_over_time( + data: &[BenchmarkRecord], + stats: &BenchmarkActorStatistics, + title_prefix: &str, + output_path: &str, + subtext: &str, +) -> std::io::Result<()> { + // Calculate throughput per second in MB/s + let points: Vec<_> = data + .windows(2) + .map(|w| { + let time_diff = (w[1].elapsed_time_us - w[0].elapsed_time_us) as f64 / 1_000_000.0; + let bytes_diff = (w[1].total_bytes - w[0].total_bytes) as f64; + let throughput = bytes_diff / (1024.0 * 1024.0) / time_diff; // Convert bytes/s to MB/s + let time_point = w[1].elapsed_time_us as f64 / 1_000_000.0; + vec![time_point, throughput] + }) + .collect(); + + let title = format!("{} Throughput Over Time", title_prefix); + let stats_text = format_throughput_mb_stats(stats); + let full_subtext = format!("{}\n{}", subtext, stats_text); + + let chart = Chart::new() + .tooltip( + Tooltip::new() + .trigger(Trigger::Item) + .formatter(Formatter::String(String::from( + "Time: {b} s
Throughput: {c} MB/s", + ))), + ) + .axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross)) + .title( + Title::new() + .text(title.clone()) + .subtext(full_subtext) + .padding(5) + .item_gap(5) + .left("center") + .top(5), + ) + .grid( + Grid::new() + .left("10%") + .right("15%") + .top("10%") + .bottom("10%"), + ) + .x_axis( + Axis::new() + .name("Time (seconds)") + .name_location(NameLocation::Center) + .name_gap(35) + .axis_label(AxisLabel::new().formatter("{value} s")), + ) + .y_axis( + Axis::new() + .name("Megabytes per Second") + .name_location(NameLocation::Middle) + .name_gap(50) + .position("left") + .axis_label(AxisLabel::new().formatter("{value} MB/s")), + ) + .series( + Line::new() + .name("Throughput") + .symbol(Symbol::None) + .data(points), + ); + + // Save as HTML + let mut renderer = HtmlRenderer::new(title, 1600, 1200); + let html_path = format!("{}.html", output_path); + info!("Generating throughput MB/s HTML plot: {}", html_path); + + // Create parent directories if they don't exist + if let Some(parent) = Path::new(&html_path).parent() { + std::fs::create_dir_all(parent)?; + } + + renderer.save(&chart, &html_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + })?; + + Ok(()) +} diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 7cc699d96..1800b076d 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -25,7 +25,6 @@ pub struct Producer { messages_per_batch: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Producer { @@ -39,7 +38,6 @@ impl Producer { message_batches: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Producer { client_factory, @@ -50,7 +48,6 @@ impl Producer { message_batches, message_size, warmup_time, - output_directory, } } @@ -136,33 +133,6 @@ impl Producer { } let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let results_file = format!( - "{}/raw_data/producer_{}_data.csv", - output_directory, self.producer_id - ); - info!( - "Producer #{} → writing the results to {}...", - self.producer_id, results_file - ); - - let mut writer = csv::Writer::from_path(results_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/producer_{}_summary.toml", - output_directory, self.producer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_producer_statistics( self.producer_id, total_messages, diff --git a/bench/src/statistics/actor_statistics.rs b/bench/src/statistics/actor_statistics.rs index 546807e66..d991b93d4 100644 --- a/bench/src/statistics/actor_statistics.rs +++ b/bench/src/statistics/actor_statistics.rs @@ -16,6 +16,8 @@ pub struct BenchmarkActorStatistics { pub p999_latency_ms: f64, pub avg_latency_ms: f64, pub median_latency_ms: f64, + #[serde(skip_serializing)] + pub raw_data: Vec, } impl BenchmarkActorStatistics { @@ -35,6 +37,7 @@ impl BenchmarkActorStatistics { p999_latency_ms: 0.0, avg_latency_ms: 0.0, median_latency_ms: 0.0, + raw_data: Vec::new(), }; } @@ -90,6 +93,7 @@ impl BenchmarkActorStatistics { p999_latency_ms, avg_latency_ms, median_latency_ms, + raw_data: records.to_vec(), } } diff --git a/bench/src/statistics/aggregate_statistics.rs b/bench/src/statistics/aggregate_statistics.rs index 414f3c22c..dc3e5f029 100644 --- a/bench/src/statistics/aggregate_statistics.rs +++ b/bench/src/statistics/aggregate_statistics.rs @@ -1,5 +1,3 @@ -use std::io::Write; - use super::actor_statistics::BenchmarkActorStatistics; use colored::{ColoredString, Colorize}; use serde::Serialize; @@ -80,13 +78,4 @@ impl BenchmarkAggregateStatistics { self.average_median_latency_ms ).green() } - - pub fn dump_to_toml(&self, file_name: &str) { - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(file_name).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } } diff --git a/scripts/performance/run-standard-performance-suite.sh b/scripts/performance/run-standard-performance-suite.sh index 2ea8372aa..53babed39 100755 --- a/scripts/performance/run-standard-performance-suite.sh +++ b/scripts/performance/run-standard-performance-suite.sh @@ -27,7 +27,7 @@ echo "Building project..." cargo build --release # Create a directory for the performance results -(mkdir performance_results || true) &> /dev/null +(mkdir -p performance_results || true) &> /dev/null # Construct standard performance suites, each should process 8 GB of data STANDARD_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp) # 8 producers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp diff --git a/scripts/performance/utils.sh b/scripts/performance/utils.sh index 88c6a3195..dbfb0cb2e 100755 --- a/scripts/performance/utils.sh +++ b/scripts/performance/utils.sh @@ -75,15 +75,13 @@ function construct_bench_command() { local streams=${count} - local superdir - superdir="performance_results/$(get_git_iggy_server_tag_or_sha1 .)" || { echo "Failed to get git commit or tag."; exit 1; } - rm -rf "$superdir" || true - mkdir -p "$superdir" || { echo "Failed to create directory '$superdir'."; exit 1; } - local output_directory="${superdir}/${type}_${count}${type:0:1}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}" + local commit_hash + commit_hash=$(get_git_iggy_server_tag_or_sha1 .) || { echo "Failed to get git commit or tag."; exit 1; } + local output_file="performance_results/${type}_${count}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}_${commit_hash}" echo "$bench_command \ $COMMON_ARGS \ - --output-directory $output_directory \ + --output $output_file \ ${type} \ --${role} ${count} \ --streams ${streams} \