From 09898adaf0e12fc46b38881cb5d1b8b9c081de4e Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Fri, 3 Jan 2025 16:50:49 +0100 Subject: [PATCH] Refactor benchmark output handling and remove CSV dependency This commit refactors the benchmark output handling by replacing CSV and TOML file outputs with JSON. The `csv` and `csv-core` dependencies have been removed from the project. The `output_directory` argument has been replaced with a single `output` file path argument. The benchmark results are now serialized into a JSON format, which simplifies the data handling and storage process. Additionally, the scripts have been updated to reflect these changes. --- Cargo.lock | 23 +----- bench/Cargo.toml | 2 +- bench/src/args/common.rs | 13 ++-- bench/src/args/simple.rs | 3 +- bench/src/benchmark_params.rs | 16 +--- bench/src/benchmark_result.rs | 77 ++++++++++++++++--- bench/src/benchmark_runner.rs | 5 +- .../benchmarks/consumer_group_benchmark.rs | 3 - bench/src/benchmarks/poll_benchmark.rs | 3 - .../src/benchmarks/send_and_poll_benchmark.rs | 5 -- bench/src/benchmarks/send_benchmark.rs | 2 - bench/src/consumer.rs | 29 ------- bench/src/producer.rs | 30 -------- bench/src/statistics/actor_statistics.rs | 4 + bench/src/statistics/aggregate_statistics.rs | 10 --- .../run-standard-performance-suite.sh | 4 +- scripts/performance/utils.sh | 10 +-- 17 files changed, 89 insertions(+), 150 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 519659919..84500d9e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -569,7 +569,6 @@ dependencies = [ "async-trait", "clap", "colored", - "csv", "derive-new", "derive_more", "figlet-rs", @@ -579,6 +578,7 @@ dependencies = [ "integration", "nonzero_lit", "serde", + "serde_json", "tokio", "toml", "tracing", @@ -1192,27 +1192,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" -dependencies = [ - "memchr", -] - [[package]] name = "ctor" version = "0.2.9" diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 36a0cee32..27fe8badb 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" async-trait = "0.1.83" clap = { version = "4.5.23", features = ["derive"] } colored = "2.2.0" -csv = "1.3.1" derive-new = "0.7.0" derive_more = "1.0.0" figlet-rs = "0.1.5" @@ -17,6 +16,7 @@ iggy = { path = "../sdk" } integration = { path = "../integration" } nonzero_lit = "0.1.2" serde = { version = "1.0.217", features = ["derive"] } +serde_json = "1.0.114" tokio = { version = "1.42.0", features = ["full"] } toml = "0.8.19" tracing = { version = "0.1.41" } diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs index 887cdfd07..398d13cac 100644 --- a/bench/src/args/common.rs +++ b/bench/src/args/common.rs @@ -40,12 +40,9 @@ pub struct IggyBenchArgs { #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)] pub skip_server_start: bool, - /// Output directory, in which the benchmark results will be stored as `csv` and `toml` files. - /// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each - /// producer/consumer will have its own file. - /// Actor summary, benchmark summary and parameters will be stored in a TOML file. - #[arg(long, short = 'o', default_value = None)] - pub output_directory: Option, + /// Output file path for the benchmark results + #[arg(long, short)] + pub output: Option, } fn validate_server_executable_path(v: &str) -> Result { @@ -141,7 +138,7 @@ impl IggyBenchArgs { self.warmup_time } - pub fn output_directory(&self) -> Option { - self.output_directory.clone() + pub fn output(&self) -> Option { + self.output.clone() } } diff --git a/bench/src/args/simple.rs b/bench/src/args/simple.rs index c004ef203..c0eff46fd 100644 --- a/bench/src/args/simple.rs +++ b/bench/src/args/simple.rs @@ -1,6 +1,7 @@ use derive_more::Display; +use serde::Serialize; -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)] pub enum BenchmarkKind { #[display("send messages")] Send, diff --git a/bench/src/benchmark_params.rs b/bench/src/benchmark_params.rs index d12be63a8..ab81edaba 100644 --- a/bench/src/benchmark_params.rs +++ b/bench/src/benchmark_params.rs @@ -1,5 +1,3 @@ -use std::io::Write; - use crate::args::common::IggyBenchArgs; use iggy::utils::timestamp::IggyTimestamp; use serde::Serialize; @@ -9,6 +7,7 @@ pub struct BenchmarkParams { timestamp_micros: i64, benchmark_name: String, transport: String, + server_address: String, messages_per_batch: u32, message_batches: u32, message_size: u32, @@ -21,24 +20,13 @@ pub struct BenchmarkParams { disable_parallel_producers: bool, } -impl BenchmarkParams { - pub fn dump_to_toml(&self, output_directory: &str) { - let output_file = format!("{}/params.toml", output_directory); - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(output_file).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } -} - impl From<&IggyBenchArgs> for BenchmarkParams { fn from(args: &IggyBenchArgs) -> Self { BenchmarkParams { timestamp_micros: IggyTimestamp::now().as_micros() as i64, benchmark_name: args.benchmark_kind.as_simple_kind().to_string(), transport: args.transport().to_string(), + server_address: args.server_address().to_string(), messages_per_batch: args.messages_per_batch(), message_batches: args.message_batches(), message_size: args.message_size(), diff --git a/bench/src/benchmark_result.rs b/bench/src/benchmark_result.rs index a3cf2a6ed..ca5d4f8cf 100644 --- a/bench/src/benchmark_result.rs +++ b/bench/src/benchmark_result.rs @@ -1,8 +1,13 @@ use crate::args::simple::BenchmarkKind; +use crate::benchmark_params::BenchmarkParams; use crate::statistics::actor_statistics::BenchmarkActorStatistics; use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics; +use crate::statistics::record::BenchmarkRecord; +use serde::Serialize; use std::collections::HashSet; use std::fmt::{Display, Formatter}; +use std::fs; +use tracing::info; #[derive(Debug, Clone, PartialEq)] pub struct BenchmarkResult { @@ -48,23 +53,73 @@ impl BenchmarkResults { BenchmarkAggregateStatistics::from_actors_statistics(&records) } - pub fn dump_to_toml(&self, output_directory: &str) { - let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send); - if let Some(producer_statics) = producer_statics { - let file_path = format!("{}/producers_summary.toml", output_directory); + pub fn dump_to_json(&self, output_file: &str, params: BenchmarkParams) { + let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send); - producer_statics.dump_to_toml(&file_path); - } + // Get overall statistics for all producers and consumers + let overall_stats = self.calculate_statistics(|x| { + x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll + }); - let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll); - if let Some(consumer_statics) = consumer_statics { - let file_path = format!("{}/consumers_summary.toml", output_directory); + // Get first producer statistics and raw data + let (first_producer_stats, first_producer_raw_data) = + if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_producer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Send) + { + ( + Some(first_producer.statistics.clone()), + Some(first_producer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; - consumer_statics.dump_to_toml(&file_path); - } + // Get first consumer statistics and raw data + let (first_consumer_stats, first_consumer_raw_data) = + if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll { + if let Some(first_consumer) = + self.results.iter().find(|x| x.kind == BenchmarkKind::Poll) + { + ( + Some(first_consumer.statistics.clone()), + Some(first_consumer.statistics.raw_data.clone()), + ) + } else { + (None, None) + } + } else { + (None, None) + }; + + let output = BenchmarkOutput { + params, + overall_statistics: overall_stats, + first_producer_statistics: first_producer_stats, + first_consumer_statistics: first_consumer_stats, + first_producer_raw_data, + first_consumer_raw_data, + }; + + let json_str = serde_json::to_string_pretty(&output).unwrap(); + info!("Writing results to: {}", output_file); + fs::write(output_file, json_str).unwrap(); } } +#[derive(Debug, Serialize)] +pub struct BenchmarkOutput { + params: BenchmarkParams, + overall_statistics: Option, + first_producer_statistics: Option, + first_consumer_statistics: Option, + first_producer_raw_data: Option>, + first_consumer_raw_data: Option>, +} + impl Display for BenchmarkResults { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { if let Ok(test_type) = self.get_test_type() { diff --git a/bench/src/benchmark_runner.rs b/bench/src/benchmark_runner.rs index f476d615f..4713bf480 100644 --- a/bench/src/benchmark_runner.rs +++ b/bench/src/benchmark_runner.rs @@ -53,10 +53,9 @@ impl BenchmarkRunner { benchmark.display_settings(); info!("{results}"); - if let Some(output_directory) = benchmark.args().output_directory() { - results.dump_to_toml(&output_directory); + if let Some(output_file) = benchmark.args().output() { let params = BenchmarkParams::from(benchmark.args()); - params.dump_to_toml(&output_directory); + results.dump_to_json(&output_file, params); } Ok(()) diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs index bbbd3db68..2b1ad6455 100644 --- a/bench/src/benchmarks/consumer_group_benchmark.rs +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark { let message_batches = self.args.message_batches(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); - let output_directory = self.args.output_directory(); for consumer_id in 1..=consumers { let consumer_group_id = start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs index d74be2fc9..da48b837b 100644 --- a/bench/src/benchmarks/poll_benchmark.rs +++ b/bench/src/benchmarks/poll_benchmark.rs @@ -30,7 +30,6 @@ impl Benchmarkable for PollMessagesBenchmark { info!("Creating {} client(s)...", clients_count); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); for client_id in 1..=clients_count { @@ -46,7 +45,6 @@ impl Benchmarkable for PollMessagesBenchmark { false => start_stream_id + 1, }; let warmup_time = args.warmup_time(); - let output_directory = output_directory.clone(); let consumer = Consumer::new( client_factory, @@ -56,7 +54,6 @@ impl Benchmarkable for PollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs index a73f83104..5bf75fbfc 100644 --- a/bench/src/benchmarks/send_and_poll_benchmark.rs +++ b/bench/src/benchmarks/send_and_poll_benchmark.rs @@ -62,7 +62,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let message_size = self.args.message_size(); let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((producers + consumers) as usize)); @@ -71,7 +70,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + producer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let producer = Producer::new( self.client_factory.clone(), producer_id, @@ -81,7 +79,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); @@ -92,7 +89,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + consumer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), consumer_id, @@ -101,7 +97,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs index d16861e92..de29fc875 100644 --- a/bench/src/benchmarks/send_benchmark.rs +++ b/bench/src/benchmarks/send_benchmark.rs @@ -42,7 +42,6 @@ impl Benchmarkable for SendMessagesBenchmark { let args = args.clone(); let start_stream_id = args.start_stream_id(); let client_factory = client_factory.clone(); - let output_directory = args.output_directory.clone(); let parallel_producer_streams = !args.disable_parallel_producer_streams(); let stream_id = match parallel_producer_streams { @@ -59,7 +58,6 @@ impl Benchmarkable for SendMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); diff --git a/bench/src/consumer.rs b/bench/src/consumer.rs index d6cb86726..d4f616cee 100644 --- a/bench/src/consumer.rs +++ b/bench/src/consumer.rs @@ -24,7 +24,6 @@ pub struct Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Consumer { @@ -37,7 +36,6 @@ impl Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Self { client_factory, @@ -47,7 +45,6 @@ impl Consumer { messages_per_batch, message_batches, warmup_time, - output_directory, } } @@ -224,32 +221,6 @@ impl Consumer { let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let output_file = format!( - "{}/raw_data/consumer_{}_data.csv", - output_directory, self.consumer_id - ); - info!( - "Consumer #{} → writing the results to {}...", - self.consumer_id, output_file - ); - let mut writer = csv::Writer::from_path(output_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/consumer_{}_summary.toml", - output_directory, self.consumer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_consumer_statistics( self.consumer_id, total_messages, diff --git a/bench/src/producer.rs b/bench/src/producer.rs index 7cc699d96..1800b076d 100644 --- a/bench/src/producer.rs +++ b/bench/src/producer.rs @@ -25,7 +25,6 @@ pub struct Producer { messages_per_batch: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, } impl Producer { @@ -39,7 +38,6 @@ impl Producer { message_batches: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, ) -> Self { Producer { client_factory, @@ -50,7 +48,6 @@ impl Producer { message_batches, message_size, warmup_time, - output_directory, } } @@ -136,33 +133,6 @@ impl Producer { } let statistics = BenchmarkActorStatistics::from_records(&records); - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let results_file = format!( - "{}/raw_data/producer_{}_data.csv", - output_directory, self.producer_id - ); - info!( - "Producer #{} → writing the results to {}...", - self.producer_id, results_file - ); - - let mut writer = csv::Writer::from_path(results_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/producer_{}_summary.toml", - output_directory, self.producer_id - ); - statistics.dump_to_toml(&summary_file); - } - Self::log_producer_statistics( self.producer_id, total_messages, diff --git a/bench/src/statistics/actor_statistics.rs b/bench/src/statistics/actor_statistics.rs index 546807e66..d991b93d4 100644 --- a/bench/src/statistics/actor_statistics.rs +++ b/bench/src/statistics/actor_statistics.rs @@ -16,6 +16,8 @@ pub struct BenchmarkActorStatistics { pub p999_latency_ms: f64, pub avg_latency_ms: f64, pub median_latency_ms: f64, + #[serde(skip_serializing)] + pub raw_data: Vec, } impl BenchmarkActorStatistics { @@ -35,6 +37,7 @@ impl BenchmarkActorStatistics { p999_latency_ms: 0.0, avg_latency_ms: 0.0, median_latency_ms: 0.0, + raw_data: Vec::new(), }; } @@ -90,6 +93,7 @@ impl BenchmarkActorStatistics { p999_latency_ms, avg_latency_ms, median_latency_ms, + raw_data: records.to_vec(), } } diff --git a/bench/src/statistics/aggregate_statistics.rs b/bench/src/statistics/aggregate_statistics.rs index 414f3c22c..64054c3cb 100644 --- a/bench/src/statistics/aggregate_statistics.rs +++ b/bench/src/statistics/aggregate_statistics.rs @@ -1,4 +1,3 @@ -use std::io::Write; use super::actor_statistics::BenchmarkActorStatistics; use colored::{ColoredString, Colorize}; @@ -80,13 +79,4 @@ impl BenchmarkAggregateStatistics { self.average_median_latency_ms ).green() } - - pub fn dump_to_toml(&self, file_name: &str) { - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(file_name).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } } diff --git a/scripts/performance/run-standard-performance-suite.sh b/scripts/performance/run-standard-performance-suite.sh index 2ea8372aa..b6ab49a31 100755 --- a/scripts/performance/run-standard-performance-suite.sh +++ b/scripts/performance/run-standard-performance-suite.sh @@ -26,8 +26,8 @@ trap on_exit_bench EXIT echo "Building project..." cargo build --release -# Create a directory for the performance results -(mkdir performance_results || true) &> /dev/null +# Create a directory for the performance results and make it the working directory +(mkdir -p performance_results || true) &> /dev/null # Construct standard performance suites, each should process 8 GB of data STANDARD_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp) # 8 producers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp diff --git a/scripts/performance/utils.sh b/scripts/performance/utils.sh index 88c6a3195..7c18b8ab3 100755 --- a/scripts/performance/utils.sh +++ b/scripts/performance/utils.sh @@ -75,15 +75,13 @@ function construct_bench_command() { local streams=${count} - local superdir - superdir="performance_results/$(get_git_iggy_server_tag_or_sha1 .)" || { echo "Failed to get git commit or tag."; exit 1; } - rm -rf "$superdir" || true - mkdir -p "$superdir" || { echo "Failed to create directory '$superdir'."; exit 1; } - local output_directory="${superdir}/${type}_${count}${type:0:1}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}" + local commit_hash + commit_hash=$(get_git_iggy_server_tag_or_sha1 .) || { echo "Failed to get git commit or tag."; exit 1; } + local output_file="${type}_${count}${type:0:1}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}_${commit_hash}.json" echo "$bench_command \ $COMMON_ARGS \ - --output-directory $output_directory \ + --output $output_file \ ${type} \ --${role} ${count} \ --streams ${streams} \