Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor benchmark output handling and add JSON/HTML support #1416

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 78 additions & 22 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion bench/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ edition = "2021"

[dependencies]
async-trait = "0.1.83"
charming = "0.4.0"
clap = { version = "4.5.23", features = ["derive"] }
colored = "2.2.0"
csv = "1.3.1"
derive-new = "0.7.0"
derive_more = "1.0.0"
figlet-rs = "0.1.5"
Expand All @@ -17,6 +17,7 @@ iggy = { path = "../sdk" }
integration = { path = "../integration" }
nonzero_lit = "0.1.2"
serde = { version = "1.0.217", features = ["derive"] }
serde_json = "1.0.114"
tokio = { version = "1.42.0", features = ["full"] }
toml = "0.8.19"
tracing = { version = "0.1.41" }
Expand Down
13 changes: 5 additions & 8 deletions bench/src/args/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ pub struct IggyBenchArgs {
#[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)]
pub skip_server_start: bool,

/// Output directory, in which the benchmark results will be stored as `csv` and `toml` files.
/// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each
/// producer/consumer will have its own file.
/// Actor summary, benchmark summary and parameters will be stored in a TOML file.
#[arg(long, short = 'o', default_value = None)]
pub output_directory: Option<String>,
/// Output directory path for the benchmark results
#[arg(long, short)]
pub output: Option<String>,
}

fn validate_server_executable_path(v: &str) -> Result<String, String> {
Expand Down Expand Up @@ -141,7 +138,7 @@ impl IggyBenchArgs {
self.warmup_time
}

pub fn output_directory(&self) -> Option<String> {
self.output_directory.clone()
pub fn output(&self) -> Option<String> {
self.output.clone()
}
}
3 changes: 2 additions & 1 deletion bench/src/args/simple.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use derive_more::Display;
use serde::Serialize;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)]
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display, Serialize)]
pub enum BenchmarkKind {
#[display("send messages")]
Send,
Expand Down
40 changes: 13 additions & 27 deletions bench/src/benchmark_params.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,22 @@
use std::io::Write;

use crate::args::common::IggyBenchArgs;
use iggy::utils::timestamp::IggyTimestamp;
use serde::Serialize;

#[derive(Debug, Serialize)]
pub struct BenchmarkParams {
timestamp_micros: i64,
benchmark_name: String,
transport: String,
messages_per_batch: u32,
message_batches: u32,
message_size: u32,
producers: u32,
consumers: u32,
streams: u32,
partitions: u32,
number_of_consumer_groups: u32,
disable_parallel_consumers: bool,
disable_parallel_producers: bool,
}

impl BenchmarkParams {
pub fn dump_to_toml(&self, output_directory: &str) {
let output_file = format!("{}/params.toml", output_directory);
let toml_str = toml::to_string(self).unwrap();
Write::write_all(
&mut std::fs::File::create(output_file).unwrap(),
toml_str.as_bytes(),
)
.unwrap();
}
pub timestamp_micros: i64,
pub benchmark_name: String,
pub transport: String,
pub messages_per_batch: u32,
pub message_batches: u32,
pub message_size: u32,
pub producers: u32,
pub consumers: u32,
pub streams: u32,
pub partitions: u32,
pub number_of_consumer_groups: u32,
pub disable_parallel_consumers: bool,
pub disable_parallel_producers: bool,
}

impl From<&IggyBenchArgs> for BenchmarkParams {
Expand Down
85 changes: 74 additions & 11 deletions bench/src/benchmark_result.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use crate::args::simple::BenchmarkKind;
use crate::benchmark_params::BenchmarkParams;
use crate::plotting::generate_plots;
use crate::statistics::actor_statistics::BenchmarkActorStatistics;
use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics;
use crate::statistics::record::BenchmarkRecord;
use serde::Serialize;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::fs;
use std::path;

#[derive(Debug, Clone, PartialEq)]
pub struct BenchmarkResult {
Expand Down Expand Up @@ -48,23 +54,80 @@ impl BenchmarkResults {
BenchmarkAggregateStatistics::from_actors_statistics(&records)
}

pub fn dump_to_toml(&self, output_directory: &str) {
let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send);
if let Some(producer_statics) = producer_statics {
let file_path = format!("{}/producers_summary.toml", output_directory);
pub fn dump_to_json(&self, output_dir: &str, params: BenchmarkParams) {
let test_type = self.get_test_type().unwrap_or(BenchmarkKind::Send);

producer_statics.dump_to_toml(&file_path);
}
// Get overall statistics for all producers and consumers
let overall_stats = self.calculate_statistics(|x| {
x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll
});

let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll);
if let Some(consumer_statics) = consumer_statics {
let file_path = format!("{}/consumers_summary.toml", output_directory);
// Get first producer statistics and raw data
let (first_producer_stats, first_producer_raw_data) =
if test_type == BenchmarkKind::Send || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_producer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Send)
{
(
Some(first_producer.statistics.clone()),
Some(first_producer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

consumer_statics.dump_to_toml(&file_path);
}
// Get first consumer statistics and raw data
let (first_consumer_stats, first_consumer_raw_data) =
if test_type == BenchmarkKind::Poll || test_type == BenchmarkKind::SendAndPoll {
if let Some(first_consumer) =
self.results.iter().find(|x| x.kind == BenchmarkKind::Poll)
{
(
Some(first_consumer.statistics.clone()),
Some(first_consumer.statistics.raw_data.clone()),
)
} else {
(None, None)
}
} else {
(None, None)
};

let output = BenchmarkOutput {
params,
overall_statistics: overall_stats,
first_producer_statistics: first_producer_stats,
first_consumer_statistics: first_consumer_stats,
first_producer_raw_data,
first_consumer_raw_data,
};

// Create the output directory
std::fs::create_dir_all(output_dir).expect("Failed to create output directory");

// Write JSON to data.json in the output directory
let json_path = path::Path::new(output_dir).join("data.json");
let json = serde_json::to_string_pretty(&output).expect("Failed to serialize to JSON");
fs::write(json_path, json).expect("Failed to write JSON file");

// Generate plots in the same directory
generate_plots(&output, output_dir).expect("Failed to generate plots");
}
}

#[derive(Debug, Serialize)]
pub struct BenchmarkOutput {
pub params: BenchmarkParams,
pub overall_statistics: Option<BenchmarkAggregateStatistics>,
pub first_producer_statistics: Option<BenchmarkActorStatistics>,
pub first_consumer_statistics: Option<BenchmarkActorStatistics>,
pub first_producer_raw_data: Option<Vec<BenchmarkRecord>>,
pub first_consumer_raw_data: Option<Vec<BenchmarkRecord>>,
}

impl Display for BenchmarkResults {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
if let Ok(test_type) = self.get_test_type() {
Expand Down
5 changes: 2 additions & 3 deletions bench/src/benchmark_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ impl BenchmarkRunner {
benchmark.display_settings();
info!("{results}");

if let Some(output_directory) = benchmark.args().output_directory() {
results.dump_to_toml(&output_directory);
if let Some(output_file) = benchmark.args().output() {
let params = BenchmarkParams::from(benchmark.args());
params.dump_to_toml(&output_directory);
results.dump_to_json(&output_file, params);
}

Ok(())
Expand Down
3 changes: 0 additions & 3 deletions bench/src/benchmarks/consumer_group_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,11 @@ impl Benchmarkable for ConsumerGroupBenchmark {
let message_batches = self.args.message_batches();
let warmup_time = self.args.warmup_time();
let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize));
let output_directory = self.args.output_directory();

for consumer_id in 1..=consumers {
let consumer_group_id =
start_consumer_group_id + 1 + (consumer_id % consumer_groups_count);
let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count);
let output_directory = output_directory.clone();

let consumer = Consumer::new(
self.client_factory.clone(),
Expand All @@ -98,7 +96,6 @@ impl Benchmarkable for ConsumerGroupBenchmark {
messages_per_batch,
message_batches,
warmup_time,
output_directory,
);
let future = Box::pin(async move { consumer.run().await });
futures.as_mut().unwrap().push(future);
Expand Down
Loading
Loading