Skip to content

Commit

Permalink
Refactor benchmark output handling and add JSON/HTML support
Browse files Browse the repository at this point in the history
This commit refactors the benchmark output handling by replacing CSV and TOML
file generation with JSON and HTML outputs. The `output_directory` argument is
replaced with `output`, and the results are now serialized into a JSON file.
Additionally, HTML plots are generated using the `charming` library, providing
a visual representation of throughput and latency over time.

- Removed CSV and TOML dependencies and related code.
- Added `charming` and `serde_json` dependencies for JSON and HTML output.
- Updated scripts to reflect changes in output handling.
- Improved directory handling in performance scripts.
  • Loading branch information
hubcio committed Jan 3, 2025
1 parent 8b3a72d commit 5363705
Show file tree
Hide file tree
Showing 19 changed files with 570 additions and 163 deletions.
100 changes: 78 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ edition = "2021"

[dependencies]
async-trait = "0.1.83"
charming = "0.4.0"
clap = { version = "4.5.23", features = ["derive"] }
colored = "2.2.0"
csv = "1.3.1"
derive-new = "0.7.0"
derive_more = "1.0.0"
figlet-rs = "0.1.5"
Expand All @@ -17,6 +17,7 @@ iggy = { path = "../sdk" }
integration = { path = "../integration" }
nonzero_lit = "0.1.2"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.42.0", features = ["full"] }
toml = "0.8.19"
tracing = { version = "0.1.41" }
Expand Down
13 changes: 5 additions & 8 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
pub skip_server_start: bool,

/// Output directory, in which the benchmark results will be stored as `csv` and `toml` files.
/// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each
/// producer/consumer will have its own file.
/// Actor summary, benchmark summary and parameters will be stored in a TOML file.
#[arg(long, short = 'o', default_value = None)]
pub output_directory: Option<String>,
/// Output directory path for the benchmark results
#[arg(long, short)]
pub output: Option<String>,
}

fn validate_server_executable_path(v: &str) -> Result<String, String> {
Expand Down Expand Up @@ -141,7 +138,7 @@ impl IggyBenchArgs {
self.warmup_time
}

pub fn output_directory(&self) -> Option<String> {
self.output_directory.clone()
pub fn output(&self) -> Option<String> {
self.output.clone()
}
}
3 changes: 2 additions & 1 deletion bench/src/args/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_more::Display;
use serde::Serialize;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)]
pub enum BenchmarkKind {
#[display("send messages")]
Send,
Expand Down
40 changes: 13 additions & 27 deletions bench/src/benchmark_params.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
use std::io::Write;

use crate::args::common::IggyBenchArgs;
use iggy::utils::timestamp::IggyTimestamp;
use serde::Serialize;

#[derive(Debug, Serialize)]
pub struct BenchmarkParams {
timestamp_micros: i64,
benchmark_name: String,
transport: String,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
producers: u32,
consumers: u32,
streams: u32,
partitions: u32,
number_of_consumer_groups: u32,
disable_parallel_consumers: bool,
disable_parallel_producers: bool,
}

impl BenchmarkParams {
pub fn dump_to_toml(&self, output_directory: &str) {
let output_file = format!("{}/params.toml", output_directory);
let toml_str = toml::to_string(self).unwrap();
Write::write_all(
&mut std::fs::File::create(output_file).unwrap(),
toml_str.as_bytes(),
)
.unwrap();
}
pub timestamp_micros: i64,
pub benchmark_name: String,
pub transport: String,
pub messages_per_batch: u32,
pub message_batches: u32,
pub message_size: u32,
pub producers: u32,
pub consumers: u32,
pub streams: u32,
pub partitions: u32,
pub number_of_consumer_groups: u32,
pub disable_parallel_consumers: bool,
pub disable_parallel_producers: bool,
}

impl From<&IggyBenchArgs> for BenchmarkParams {
Expand Down
85 changes: 74 additions & 11 deletions bench/src/benchmark_result.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::args::simple::BenchmarkKind;
use crate::benchmark_params::BenchmarkParams;
use crate::plotting::generate_plots;
use crate::statistics::actor_statistics::BenchmarkActorStatistics;
use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics;
use crate::statistics::record::BenchmarkRecord;
use serde::Serialize;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::fs;
use std::path;

#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkResult {
Expand Down Expand Up @@ -48,23 +54,80 @@ impl BenchmarkResults {
BenchmarkAggregateStatistics::from_actors_statistics(&records)
}

pub fn dump_to_toml(&self, output_directory: &str) {
let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send);
if let Some(producer_statics) = producer_statics {
let file_path = format!("{}/producers_summary.toml", output_directory);
pub fn dump_to_json(&self, output_dir: &str, params: BenchmarkParams) {
let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send);

producer_statics.dump_to_toml(&file_path);
}
// Get overall statistics for all producers and consumers
let overall_stats = self.calculate_statistics(|x| {
x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll
});

let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll);
if let Some(consumer_statics) = consumer_statics {
let file_path = format!("{}/consumers_summary.toml", output_directory);
// Get first producer statistics and raw data
let (first_producer_stats, first_producer_raw_data) =
if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_producer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Send)
{
(
Some(first_producer.statistics.clone()),
Some(first_producer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

consumer_statics.dump_to_toml(&file_path);
}
// Get first consumer statistics and raw data
let (first_consumer_stats, first_consumer_raw_data) =
if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_consumer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Poll)
{
(
Some(first_consumer.statistics.clone()),
Some(first_consumer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

let output = BenchmarkOutput {
params,
overall_statistics: overall_stats,
first_producer_statistics: first_producer_stats,
first_consumer_statistics: first_consumer_stats,
first_producer_raw_data,
first_consumer_raw_data,
};

// Create the output directory
std::fs::create_dir_all(output_dir).expect("Failed to create output directory");

// Write JSON to data.json in the output directory
let json_path = path::Path::new(output_dir).join("data.json");
let json = serde_json::to_string_pretty(&output).expect("Failed to serialize to JSON");
fs::write(json_path, json).expect("Failed to write JSON file");

// Generate plots in the same directory
generate_plots(&output, output_dir).expect("Failed to generate plots");
}
}

#[derive(Debug, Serialize)]
pub struct BenchmarkOutput {
pub params: BenchmarkParams,
pub overall_statistics: Option<BenchmarkAggregateStatistics>,
pub first_producer_statistics: Option<BenchmarkActorStatistics>,
pub first_consumer_statistics: Option<BenchmarkActorStatistics>,
pub first_producer_raw_data: Option<Vec<BenchmarkRecord>>,
pub first_consumer_raw_data: Option<Vec<BenchmarkRecord>>,
}

impl Display for BenchmarkResults {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Ok(test_type) = self.get_test_type() {
Expand Down
5 changes: 2 additions & 3 deletions bench/src/benchmark_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ impl BenchmarkRunner {
benchmark.display_settings();
info!("{results}");

if let Some(output_directory) = benchmark.args().output_directory() {
results.dump_to_toml(&output_directory);
if let Some(output_file) = benchmark.args().output() {
let params = BenchmarkParams::from(benchmark.args());
params.dump_to_toml(&output_directory);
results.dump_to_json(&output_file, params);
}

Ok(())
Expand Down
3 changes: 0 additions & 3 deletions bench/src/benchmarks/consumer_group_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark {
let message_batches = self.args.message_batches();
let warmup_time = self.args.warmup_time();
let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize));
let output_directory = self.args.output_directory();

for consumer_id in 1..=consumers {
let consumer_group_id =
start_consumer_group_id + 1 + (consumer_id % consumer_groups_count);
let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count);
let output_directory = output_directory.clone();

let consumer = Consumer::new(
self.client_factory.clone(),
Expand All @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
Loading

0 comments on commit 5363705

Please sign in to comment.