From bb991fe6d8f8b9f6c22d91607277d31cdf9360ae Mon Sep 17 00:00:00 2001 From: Hubert Gruszecki Date: Fri, 3 Jan 2025 20:45:29 +0100 Subject: [PATCH] Refactor benchmark output handling and add JSON/HTML support This commit refactors the benchmark output handling by replacing CSV and TOML file generation with JSON and HTML outputs. The `output_directory` argument is replaced with `output`, and the results are now serialized into a JSON file. Additionally, HTML plots are generated using the `charming` library, providing a visual representation of throughput and latency over time. - Removed CSV and TOML dependencies and related code. - Added `charming` and `serde_json` dependencies for JSON and HTML output. - Updated scripts to reflect changes in output handling. - Improved directory handling in performance scripts. --- Cargo.lock | 135 +++++-- Cargo.toml | 11 +- bench/Cargo.toml | 10 +- bench/report/Cargo.toml | 18 + bench/report/src/lib.rs | 95 +++++ bench/report/src/plotting/chart.rs | 229 +++++++++++ bench/report/src/plotting/chart_kind.rs | 10 + bench/report/src/plotting/mod.rs | 3 + bench/report/src/plotting/text/mod.rs | 2 + bench/report/src/plotting/text/subtext.rs | 109 ++++++ bench/report/src/plotting/text/title.rs | 21 + bench/report/src/prints.rs | 75 ++++ bench/report/src/types/actor_kind.rs | 21 + bench/report/src/types/benchmark_kind.rs | 32 ++ bench/report/src/types/group_metrics.rs | 11 + bench/report/src/types/group_metrics_kind.rs | 26 ++ .../report/src/types/group_metrics_summary.rs | 30 ++ bench/report/src/types/hardware.rs | 35 ++ bench/report/src/types/individual_metrics.rs | 12 + .../src/types/individual_metrics_summary.rs | 34 ++ bench/report/src/types/mod.rs | 12 + bench/report/src/types/params.rs | 36 ++ bench/report/src/types/report.rs | 39 ++ bench/report/src/types/time_series.rs | 40 ++ bench/report/src/types/transport.rs | 16 + bench/report/src/utils.rs | 8 + bench/src/{ => actors}/consumer.rs | 98 ++--- bench/src/actors/mod.rs | 2 + bench/src/{ => actors}/producer.rs | 103 ++--- bench/src/analytics/metrics/group.rs | 116 ++++++ bench/src/analytics/metrics/individual.rs | 136 +++++++ bench/src/analytics/metrics/mod.rs | 2 + bench/src/analytics/mod.rs | 4 + bench/src/{statistics => analytics}/record.rs | 5 +- bench/src/analytics/report_builder.rs | 80 ++++ bench/src/analytics/time_series/calculator.rs | 76 ++++ .../time_series/calculators/latency.rs | 60 +++ .../analytics/time_series/calculators/mod.rs | 16 + .../time_series/calculators/throughput.rs | 125 ++++++ bench/src/analytics/time_series/mod.rs | 3 + .../analytics/time_series/processors/mod.rs | 8 + .../time_series/processors/moving_average.rs | 43 +++ bench/src/args/common.rs | 361 +++++++++++++++++- bench/src/args/defaults.rs | 3 + bench/src/args/kind.rs | 3 +- bench/src/args/mod.rs | 3 +- bench/src/args/simple.rs | 13 - bench/src/benchmark_params.rs | 54 --- bench/src/benchmark_result.rs | 93 ----- bench/src/benchmark_runner.rs | 64 ---- bench/src/benchmarks/benchmark.rs | 11 +- .../benchmarks/consumer_group_benchmark.rs | 32 +- bench/src/benchmarks/poll_benchmark.rs | 26 +- .../src/benchmarks/send_and_poll_benchmark.rs | 45 +-- bench/src/benchmarks/send_benchmark.rs | 24 +- bench/src/main.rs | 104 ++++- bench/src/plot.rs | 94 +++++ bench/src/runner.rs | 91 +++++ bench/src/statistics/actor_statistics.rs | 117 ------ bench/src/statistics/aggregate_statistics.rs | 92 ----- bench/src/statistics/mod.rs | 3 - bench/src/{ => utils}/client_factory.rs | 0 bench/src/utils/mod.rs | 17 + bench/src/{ => utils}/server_starter.rs | 2 +- integration/Cargo.toml | 1 + .../run-standard-performance-suite.sh | 54 ++- scripts/performance/utils.sh | 56 ++- scripts/profile.sh | 10 +- scripts/utils.sh | 30 +- 69 files changed, 2582 insertions(+), 768 deletions(-) create mode 100644 bench/report/Cargo.toml create mode 100644 bench/report/src/lib.rs create mode 100644 bench/report/src/plotting/chart.rs create mode 100644 bench/report/src/plotting/chart_kind.rs create mode 100644 bench/report/src/plotting/mod.rs create mode 100644 bench/report/src/plotting/text/mod.rs create mode 100644 bench/report/src/plotting/text/subtext.rs create mode 100644 bench/report/src/plotting/text/title.rs create mode 100644 bench/report/src/prints.rs create mode 100644 bench/report/src/types/actor_kind.rs create mode 100644 bench/report/src/types/benchmark_kind.rs create mode 100644 bench/report/src/types/group_metrics.rs create mode 100644 bench/report/src/types/group_metrics_kind.rs create mode 100644 bench/report/src/types/group_metrics_summary.rs create mode 100644 bench/report/src/types/hardware.rs create mode 100644 bench/report/src/types/individual_metrics.rs create mode 100644 bench/report/src/types/individual_metrics_summary.rs create mode 100644 bench/report/src/types/mod.rs create mode 100644 bench/report/src/types/params.rs create mode 100644 bench/report/src/types/report.rs create mode 100644 bench/report/src/types/time_series.rs create mode 100644 bench/report/src/types/transport.rs create mode 100644 bench/report/src/utils.rs rename bench/src/{ => actors}/consumer.rs (79%) create mode 100644 bench/src/actors/mod.rs rename bench/src/{ => actors}/producer.rs (68%) create mode 100644 bench/src/analytics/metrics/group.rs create mode 100644 bench/src/analytics/metrics/individual.rs create mode 100644 bench/src/analytics/metrics/mod.rs create mode 100644 bench/src/analytics/mod.rs rename bench/src/{statistics => analytics}/record.rs (67%) create mode 100644 bench/src/analytics/report_builder.rs create mode 100644 bench/src/analytics/time_series/calculator.rs create mode 100644 bench/src/analytics/time_series/calculators/latency.rs create mode 100644 bench/src/analytics/time_series/calculators/mod.rs create mode 100644 bench/src/analytics/time_series/calculators/throughput.rs create mode 100644 bench/src/analytics/time_series/mod.rs create mode 100644 bench/src/analytics/time_series/processors/mod.rs create mode 100644 bench/src/analytics/time_series/processors/moving_average.rs delete mode 100644 bench/src/args/simple.rs delete mode 100644 bench/src/benchmark_params.rs delete mode 100644 bench/src/benchmark_result.rs delete mode 100644 bench/src/benchmark_runner.rs create mode 100644 bench/src/plot.rs create mode 100644 bench/src/runner.rs delete mode 100644 bench/src/statistics/actor_statistics.rs delete mode 100644 bench/src/statistics/aggregate_statistics.rs delete mode 100644 bench/src/statistics/mod.rs rename bench/src/{ => utils}/client_factory.rs (100%) create mode 100644 bench/src/utils/mod.rs rename bench/src/{ => utils}/server_starter.rs (98%) diff --git a/Cargo.lock b/Cargo.lock index b0e00ed4e..f7a22855e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -568,22 +568,22 @@ name = "bench" version = "0.1.2" dependencies = [ "async-trait", + "charming", + "chrono", "clap", - "colored", - "csv", - "derive-new", - "derive_more", "figlet-rs", "futures", - "human_format", "iggy", + "iggy-benchmark-report", "integration", "nonzero_lit", "serde", "tokio", "toml", "tracing", + "tracing-appender", "tracing-subscriber", + "uuid", ] [[package]] @@ -856,6 +856,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "charming" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88f802d7b8011655a1162e04e6e6849bb57baae3c0ea7026c64ba280d80d1d77" +dependencies = [ + "handlebars", + "serde", + "serde_json", +] + [[package]] name = "chrono" version = "0.4.39" @@ -1201,27 +1212,6 @@ dependencies = [ "typenum", ] -[[package]] -name = "csv" -version = "1.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" -dependencies = [ - "csv-core", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "csv-core" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" -dependencies = [ - "memchr", -] - [[package]] name = "ctor" version = "0.2.9" @@ -1893,6 +1883,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "4.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "faa67bab9ff362228eb3d00bd024a4965d8231bbb7921167f0cfa66c6626b225" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror 1.0.69", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2025,12 +2029,6 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" -[[package]] -name = "human_format" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c3b1f728c459d27b12448862017b96ad4767b1ec2ec5e6434e99f1577f085b8" - [[package]] name = "humantime" version = "2.1.0" @@ -2377,6 +2375,22 @@ dependencies = [ "webpki-roots", ] +[[package]] +name = "iggy-benchmark-report" +version = "0.1.0" +dependencies = [ + "byte-unit", + "charming", + "colored", + "derive-new", + "derive_more", + "serde", + "serde_json", + "sysinfo", + "tracing", + "uuid", +] + [[package]] name = "iggy-cli" version = "0.8.6" @@ -2484,6 +2498,7 @@ dependencies = [ "futures", "humantime", "iggy", + "iggy-benchmark-report", "keyring", "lazy_static", "libc", @@ -3353,6 +3368,51 @@ version = "2.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" +[[package]] +name = "pest" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b7cafe60d6cf8e62e1b9b2ea516a089c008945bb5a275416789e7db0bc199dc" +dependencies = [ + "memchr", + "thiserror 2.0.11", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "816518421cfc6887a0d62bf441b6ffb4536fcc926395a69e1a85852d4363f57e" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d1396fd3a870fc7838768d171b4616d5c91f6cc25e377b673d714567d99377b" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.96", +] + +[[package]] +name = "pest_meta" +version = "2.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1e58089ea25d717bfd31fb534e4f3afcc2cc569c70de3e239778991ea3b7dea" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "pin-project" version = "1.1.8" @@ -5180,6 +5240,12 @@ version = "1.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42ff0bf0c66b8238c6f3b578df37d0b7848e55df8577b3f74f92a69acceeb825" +[[package]] +name = "ucd-trie" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2896d95c02a80c6d6a5d6e953d479f5ddf2dfdb6a244441010e373ac0fb88971" + [[package]] name = "ulid" version = "1.1.4" @@ -5276,12 +5342,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.12.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "744018581f9a3454a9e15beb8a33b017183f1e7c0cd170232a2d1453b23a51c4" +checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" dependencies = [ "getrandom", "rand", + "serde", "zerocopy 0.8.14", ] diff --git a/Cargo.toml b/Cargo.toml index 1debf518c..48db2e133 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,16 @@ codegen-units = 1 [workspace] resolver = "2" -members = ["bench", "cli", "examples", "integration", "sdk", "server", "tools"] +members = [ + "bench", + "bench/report", + "cli", + "examples", + "integration", + "sdk", + "server", + "tools" +] [workspace.metadata.cargo-machete] ignored = ["openssl"] diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 7b2e02a7d..230c360c7 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -6,22 +6,22 @@ license = "Apache-2.0" [dependencies] async-trait = "0.1.85" +charming = "0.4.0" +chrono = "0.4.31" clap = { version = "4.5.26", features = ["derive"] } -colored = "3.0.0" -csv = "1.3.1" -derive-new = "0.7.0" -derive_more = "1.0.0" figlet-rs = "0.1.5" futures = "0.3.31" -human_format = "1.1.0" iggy = { path = "../sdk" } +iggy-benchmark-report = { path = "report" } integration = { path = "../integration" } nonzero_lit = "0.1.2" serde = { version = "1.0.217", features = ["derive"] } tokio = { version = "1.43.0", features = ["full"] } toml = "0.8.19" tracing = { version = "0.1.41" } +tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3.19", features = ["fmt", "env-filter"] } +uuid = { version = "1.12.1", features = ["serde"] } [[bin]] name = "iggy-bench" diff --git a/bench/report/Cargo.toml b/bench/report/Cargo.toml new file mode 100644 index 000000000..5764db0ca --- /dev/null +++ b/bench/report/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "iggy-benchmark-report" +version = "0.1.0" +edition = "2021" +description = "Benchmark report and chart generation library for iggy-bench binary and iggy-benchmarks-dashboard web app" +license = "Apache-2.0" + +[dependencies] +byte-unit = "5.1.6" +charming = "0.4.0" +colored = "3.0.0" +derive-new = "0.7.0" +derive_more = { version = "1.0.0", features = ["full"] } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +sysinfo = "0.33.1" +tracing = "0.1" +uuid = { version = "1.12.1", features = ["serde"] } diff --git a/bench/report/src/lib.rs b/bench/report/src/lib.rs new file mode 100644 index 000000000..0dd071c21 --- /dev/null +++ b/bench/report/src/lib.rs @@ -0,0 +1,95 @@ +pub mod plotting; +pub mod types; + +mod prints; +mod utils; + +use crate::report::BenchmarkReport; +use actor_kind::ActorKind; +use charming::Chart; +use group_metrics_kind::GroupMetricsKind; +use plotting::chart::IggyChart; +use plotting::chart_kind::ChartKind; + +pub use types::*; + +pub fn create_throughput_chart(report: &BenchmarkReport, dark: bool) -> Chart { + let title = report.title(ChartKind::Throughput); + + let mut chart = IggyChart::new(&title, &report.subtext(), dark) + .with_time_x_axis() + .with_dual_y_axis("Throughput [MB/s]", "Throughput [msg/s]"); + + // Add group metrics series + for metrics in &report.group_metrics { + // Skip aggregate metrics in charts + if metrics.summary.kind == GroupMetricsKind::ProducersAndConsumers { + continue; + } + + chart = chart.add_dual_time_line_series( + &format!("All {}s [MB/s]", metrics.summary.kind.actor()), + metrics.throughput_mb_ts.as_charming_points(), + None, + 1.0, + 0, + ); + chart = chart.add_dual_time_line_series( + &format!("All {}s [msg/s]", metrics.summary.kind.actor()), + metrics.throughput_msg_ts.as_charming_points(), + None, + 1.0, + 1, + ); + } + + // Add individual metrics series + for metrics in &report.individual_metrics { + let actor_type = match metrics.summary.actor_kind { + ActorKind::Producer => "Producer", + ActorKind::Consumer => "Consumer", + }; + + chart = chart.add_dual_time_line_series( + &format!("{} {} [MB/s]", actor_type, metrics.summary.actor_id), + metrics.throughput_mb_ts.as_charming_points(), + None, + 0.4, + 0, + ); + chart = chart.add_dual_time_line_series( + &format!("{} {} [msg/s]", actor_type, metrics.summary.actor_id), + metrics.throughput_msg_ts.as_charming_points(), + None, + 0.4, + 1, + ); + } + + chart.inner +} + +pub fn create_latency_chart(report: &BenchmarkReport, dark: bool) -> Chart { + let title = report.title(ChartKind::Latency); + + let mut chart = IggyChart::new(&title, &report.subtext(), dark) + .with_time_x_axis() + .with_y_axis("Latency [ms]"); + + // Add individual metrics series + for metrics in &report.individual_metrics { + let actor_type = match metrics.summary.actor_kind { + ActorKind::Producer => "Producer", + ActorKind::Consumer => "Consumer", + }; + + chart = chart.add_time_series( + &format!("{} {} [Latency]", actor_type, metrics.summary.actor_id), + metrics.latency_ts.as_charming_points(), + None, + 0.5, + ); + } + + chart.inner +} diff --git a/bench/report/src/plotting/chart.rs b/bench/report/src/plotting/chart.rs new file mode 100644 index 000000000..d34b80ed3 --- /dev/null +++ b/bench/report/src/plotting/chart.rs @@ -0,0 +1,229 @@ +use charming::{ + component::{ + Axis, DataView, DataZoom, DataZoomType, Feature, Grid, Legend, LegendSelectedMode, + LegendType, Restore, SaveAsImage, Title, Toolbox, ToolboxDataZoom, + }, + element::{ + AxisLabel, AxisPointer, AxisPointerType, AxisType, Emphasis, ItemStyle, LineStyle, + NameLocation, Orient, SplitLine, Symbol, TextAlign, TextStyle, Tooltip, + }, + series::Line, + Chart, +}; + +pub struct IggyChart { + pub inner: Chart, +} + +impl IggyChart { + /// Create a new `IggyChart` with default tooltip, legend, grid, and toolbox. + pub fn new(title: &str, subtext: &str, dark: bool) -> Self { + let mut chart = Chart::new() + .title( + Title::new() + .text(title) + .text_align(TextAlign::Center) + .subtext(subtext) + .text_style(TextStyle::new().font_size(24).font_weight("bold")) + .subtext_style(TextStyle::new().font_size(14).line_height(20)) + .left("50%") + .top("1%"), + ) + .tooltip(Tooltip::new().axis_pointer(AxisPointer::new().type_(AxisPointerType::Cross))) + .legend( + Legend::new() + .show(true) + .right("2%") + .top("middle") + .orient(Orient::Vertical) + .selected_mode(LegendSelectedMode::Multiple) + .text_style(TextStyle::new().font_size(12)) + .padding(10) + .item_gap(10) + .item_width(25) + .item_height(14) + .type_(LegendType::Scroll), + ) + .grid(Grid::new().left("5%").right("19%").top("16%").bottom("8%")) + .data_zoom( + DataZoom::new() + .show(true) + .type_(DataZoomType::Slider) + .bottom("2%") + .start(0) + .end(100), + ) + .toolbox( + Toolbox::new().feature( + Feature::new() + .data_zoom(ToolboxDataZoom::new()) + .data_view(DataView::new()) + .restore(Restore::new()) + .save_as_image(SaveAsImage::new()), + ), + ); + + if dark { + chart = chart.background_color("#242424"); + } + + Self { inner: chart } + } + + /// Configure the X axis (time axis). + pub fn with_time_x_axis(mut self) -> Self { + self.inner = self.inner.x_axis( + Axis::new() + .type_(AxisType::Value) + .name_location(NameLocation::End) + .name_gap(15) + .axis_label(AxisLabel::new().formatter("{value} s")) + .split_line(SplitLine::new().show(true)), + ); + self + } + + /// Configure the X axis (category axis). + pub fn with_category_x_axis(mut self, axis_label: &str, categories: Vec) -> Self { + self.inner = self.inner.x_axis( + Axis::new() + .type_(AxisType::Category) + .name(axis_label) + .name_location(NameLocation::End) + .name_gap(15) + .data(categories) + .axis_label(AxisLabel::new().formatter("{value} s")) + .split_line(SplitLine::new().show(true)), + ); + self + } + + /// Configure a Y axis for e.g. throughput in msg/s or MB/s. + pub fn with_y_axis(mut self, axis_label: &str) -> Self { + self.inner = self.inner.y_axis( + Axis::new() + .type_(AxisType::Value) + .name(axis_label) + .name_location(NameLocation::End) + .name_gap(15) + .position("left") + .axis_label(AxisLabel::new()) + .split_line(SplitLine::new().show(true)), + ); + self + } + + /// Configure dual Y axes for e.g. throughput in MB/s and msg/s. + pub fn with_dual_y_axis(mut self, y1_label: &str, y2_label: &str) -> Self { + // Configure left Y axis (MB/s) + self.inner = self.inner.y_axis( + Axis::new() + .type_(AxisType::Value) + .name(y1_label) + .name_location(NameLocation::End) + .name_gap(15) + .position("left") + .axis_label(AxisLabel::new()) + .split_line(SplitLine::new().show(true)), + ); + // Configure right Y axis (messages/s) + self.inner = self.inner.y_axis( + Axis::new() + .type_(AxisType::Value) + .name(y2_label) + .name_location(NameLocation::End) + .name_gap(15) + .position("right") + .axis_label(AxisLabel::new()) + .split_line(SplitLine::new().show(true)), + ); + self + } + + pub fn add_series(mut self, name: &str, data: Vec, symbol: Symbol, color: &str) -> Self { + let line = Line::new() + .name(name) + .data(data) + .symbol(symbol) + .symbol_size(8.0) + .line_style(LineStyle::new().width(3.0)) + .item_style(ItemStyle::new().color(color)); + + self.inner = self.inner.series(line); + self + } + + pub fn add_dual_series( + mut self, + name: &str, + data: Vec, + symbol: Symbol, + color: &str, + y_axis_index: usize, + ) -> Self { + let line = Line::new() + .name(name) + .data(data) + .symbol(symbol) + .symbol_size(8.0) + .line_style(LineStyle::new().width(3.0)) + .item_style(ItemStyle::new().color(color)) + .y_axis_index(y_axis_index as f64); + + self.inner = self.inner.series(line); + self + } + + /// Add a new line series to the chart. + /// + /// `name` is displayed in the legend. `points` is a list of `[x, y]` pairs. + /// Use `color` if you want a custom color (e.g. `#FF0000`), otherwise pass `None`. + /// `opacity` controls the line opacity (use e.g. 1.0 for solid line, 0.3 for translucent). + pub fn add_time_series( + mut self, + name: &str, + points: Vec>, + color: Option<&str>, + opacity: f64, + ) -> Self { + let mut line = Line::new() + .name(name) + .data(points) + .show_symbol(false) + .emphasis(Emphasis::new()) + .line_style(LineStyle::new().width(2).opacity(opacity)); + + if let Some(color) = color { + line = line.item_style(ItemStyle::new().color(color)); + } + + self.inner = self.inner.series(line); + self + } + + /// Add a new line series to the chart with specified Y axis. + /// y_axis_index: 0 for left axis, 1 for right axis + pub fn add_dual_time_line_series( + mut self, + name: &str, + points: Vec>, + color: Option<&str>, + opacity: f64, + y_axis_index: usize, + ) -> Self { + let mut line = Line::new() + .name(name) + .data(points) + .show_symbol(false) + .emphasis(Emphasis::new()) + .line_style(LineStyle::new().width(2).opacity(opacity)) + .y_axis_index(y_axis_index as f64); + + if let Some(color) = color { + line = line.item_style(ItemStyle::new().color(color)); + } + + self.inner = self.inner.series(line); + self + } +} diff --git a/bench/report/src/plotting/chart_kind.rs b/bench/report/src/plotting/chart_kind.rs new file mode 100644 index 000000000..966d2a591 --- /dev/null +++ b/bench/report/src/plotting/chart_kind.rs @@ -0,0 +1,10 @@ +use derive_more::derive::Display; + +#[derive(Debug, Display)] +pub enum ChartKind { + #[display("Throughput")] + Throughput, + #[display("Latency")] + Latency, + // Trend TODO +} diff --git a/bench/report/src/plotting/mod.rs b/bench/report/src/plotting/mod.rs new file mode 100644 index 000000000..be59f16eb --- /dev/null +++ b/bench/report/src/plotting/mod.rs @@ -0,0 +1,3 @@ +pub mod chart; +pub mod chart_kind; +pub mod text; diff --git a/bench/report/src/plotting/text/mod.rs b/bench/report/src/plotting/text/mod.rs new file mode 100644 index 000000000..e848b04f9 --- /dev/null +++ b/bench/report/src/plotting/text/mod.rs @@ -0,0 +1,2 @@ +pub mod subtext; +pub mod title; diff --git a/bench/report/src/plotting/text/subtext.rs b/bench/report/src/plotting/text/subtext.rs new file mode 100644 index 000000000..1ebc7a746 --- /dev/null +++ b/bench/report/src/plotting/text/subtext.rs @@ -0,0 +1,109 @@ +use crate::{ + benchmark_kind::BenchmarkKind, group_metrics::BenchmarkGroupMetrics, + group_metrics_kind::GroupMetricsKind, report::BenchmarkReport, +}; +use byte_unit::{Byte, UnitType}; + +impl BenchmarkReport { + pub fn subtext(&self) -> String { + let params = self.format_params(); + let mut stats = Vec::new(); + + // First add latency stats + let latency_stats = self.format_latency_stats(); + if !latency_stats.is_empty() { + stats.push(latency_stats); + } + + // Then add throughput stats + let throughput_stats = self.format_throughput_stats(); + if !throughput_stats.is_empty() { + stats.push(throughput_stats); + } + + // For SendAndPoll tests, add total throughput as last line + if self.params.benchmark_kind == BenchmarkKind::SendAndPoll { + if let Some(total) = self + .group_metrics + .iter() + .find(|s| s.summary.kind == GroupMetricsKind::ProducersAndConsumers) + { + stats.push(format!( + "Total System Throughput: {:.2} MB/s, {:.0} msg/s", + total.summary.total_throughput_megabytes_per_second, + total.summary.total_throughput_messages_per_second + )); + } + } + + format!("{}\n{}", params, stats.join("\n")) + } + + fn format_params(&self) -> String { + let total_bytes = Byte::from_u64( + self.individual_metrics + .iter() + .map(|s| s.summary.total_bytes) + .sum(), + ); + let user_bytes = Byte::from_u64( + self.individual_metrics + .iter() + .map(|s| s.summary.total_user_data_bytes) + .sum(), + ); + format!( + "{}, {} msg/batch, {} batches, {} bytes/msg, {:.2} user bytes, {:.2} in total", + self.params.format_actors_info(), + self.params.messages_per_batch, + self.params.message_batches, + self.params.message_size, + user_bytes.get_appropriate_unit(UnitType::Decimal), + total_bytes.get_appropriate_unit(UnitType::Decimal) + ) + } + + fn format_latency_stats(&self) -> String { + self.group_metrics + .iter() + .filter(|s| s.summary.kind != GroupMetricsKind::ProducersAndConsumers) // Skip total summary + .map(|summary| summary.format_latency()) + .collect::>() + .join("\n") + } + + fn format_throughput_stats(&self) -> String { + self.group_metrics + .iter() + .filter(|s| s.summary.kind != GroupMetricsKind::ProducersAndConsumers) // Skip total summary as it will be added separately + .map(|summary| summary.format_throughput_per_actor_kind()) + .collect::>() + .join("\n") + } +} + +impl BenchmarkGroupMetrics { + fn format_throughput_per_actor_kind(&self) -> String { + format!( + "{} Throughput • Total: {:.2} MB/s, {:.0} msg/s • Avg Per {}: {:.2} MB/s, {:.0} msg/s", + self.summary.kind, + self.summary.total_throughput_megabytes_per_second, + self.summary.total_throughput_messages_per_second, + self.summary.kind.actor(), + self.summary.average_throughput_megabytes_per_second, + self.summary.average_throughput_messages_per_second, + ) + } + + fn format_latency(&self) -> String { + format!( + "{} Latency • Avg: {:.2} ms • Med: {:.2} ms • P95: {:.2} ms • P99: {:.2} ms • P999: {:.2} ms", + self.summary.kind, + self.summary.average_latency_ms, + self.summary.average_median_latency_ms, + self.summary.average_p95_latency_ms, + self.summary.average_p99_latency_ms, + self.summary.average_p999_latency_ms + ) + } +} diff --git a/bench/report/src/plotting/text/title.rs b/bench/report/src/plotting/text/title.rs new file mode 100644 index 000000000..4023ba509 --- /dev/null +++ b/bench/report/src/plotting/text/title.rs @@ -0,0 +1,21 @@ +use crate::{ + benchmark_kind::BenchmarkKind, plotting::chart_kind::ChartKind, report::BenchmarkReport, +}; + +/// Returns a title for a benchmark report +impl BenchmarkReport { + pub fn title(&self, kind: ChartKind) -> String { + let kind_str = match self.params.benchmark_kind { + BenchmarkKind::Send => "Send", + BenchmarkKind::Poll => "Poll", + BenchmarkKind::SendAndPoll => "Send and Poll", + BenchmarkKind::ConsumerGroupPoll => "Consumer Group Poll", + }; + + if let Some(remark) = &self.params.remark { + format!("{} - {} Benchmark ({})", kind, kind_str, remark) + } else { + format!("{} - {} Benchmark", kind, kind_str) + } + } +} diff --git a/bench/report/src/prints.rs b/bench/report/src/prints.rs new file mode 100644 index 000000000..2f47556d5 --- /dev/null +++ b/bench/report/src/prints.rs @@ -0,0 +1,75 @@ +use colored::{Color, ColoredString, Colorize}; +use tracing::info; + +use crate::{ + group_metrics::BenchmarkGroupMetrics, group_metrics_kind::GroupMetricsKind, + report::BenchmarkReport, +}; + +impl BenchmarkReport { + pub fn print_summary(&self) { + let kind = self.params.benchmark_kind; + let total_messages = self.params.messages_per_batch * self.params.message_batches; + let total_size_bytes = total_messages * self.params.message_size; + let streams = self.params.streams; + let messages_per_batch = self.params.messages_per_batch; + let message_batches = self.params.message_batches; + let message_size = self.params.message_size; + let producers = self.params.producers; + let consumers = self.params.consumers; + println!(); + let params_print = format!("Benchmark: {}, total messages: {}, total size: {} bytes, {} streams, {} messages per batch, {} batches, {} bytes per message, {} producers, {} consumers\n", + kind, + total_messages, + total_size_bytes, + streams, + messages_per_batch, + message_batches, + message_size, + producers, + consumers, + ).blue(); + + info!("{}", params_print); + + self.group_metrics + .iter() + .for_each(|s| info!("{}\n", s.formatted_string())); + } +} + +impl BenchmarkGroupMetrics { + pub fn formatted_string(&self) -> ColoredString { + let (prefix, color) = match self.summary.kind { + GroupMetricsKind::Producers => ("Producers Results", Color::Green), + GroupMetricsKind::Consumers => ("Consumers Results", Color::Green), + GroupMetricsKind::ProducersAndConsumers => ("Aggregate Results", Color::Red), + }; + + let actor = self.summary.kind.actor(); + + let total_mb = format!("{:.2}", self.summary.total_throughput_megabytes_per_second); + let total_msg = format!("{:.0}", self.summary.total_throughput_messages_per_second); + let avg_mb = format!( + "{:.2}", + self.summary.average_throughput_megabytes_per_second + ); + + let p50 = format!("{:.2}", self.summary.average_p50_latency_ms); + let p90 = format!("{:.2}", self.summary.average_p90_latency_ms); + let p95 = format!("{:.2}", self.summary.average_p95_latency_ms); + let p99 = format!("{:.2}", self.summary.average_p99_latency_ms); + let p999 = format!("{:.2}", self.summary.average_p999_latency_ms); + let avg = format!("{:.2}", self.summary.average_latency_ms); + let median = format!("{:.2}", self.summary.average_median_latency_ms); + + format!( + "{}: Total throughput: {} MB/s, {} messages/s, average throughput per {}: {} MB/s, \ + p50 latency: {} ms, p90 latency: {} ms, p95 latency: {} ms, \ + p99 latency: {} ms, p999 latency: {} ms, average latency: {} ms, \ + median latency: {} ms", + prefix, total_mb, total_msg, actor, avg_mb, p50, p90, p95, p99, p999, avg, median, + ) + .color(color) + } +} diff --git a/bench/report/src/types/actor_kind.rs b/bench/report/src/types/actor_kind.rs new file mode 100644 index 000000000..b7d80d5f7 --- /dev/null +++ b/bench/report/src/types/actor_kind.rs @@ -0,0 +1,21 @@ +use derive_more::derive::Display; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, Display)] +pub enum ActorKind { + #[display("Producer")] + #[serde(rename = "producer")] + Producer, + #[display("Consumer")] + #[serde(rename = "consumer")] + Consumer, +} + +impl ActorKind { + pub fn plural(&self) -> &str { + match self { + ActorKind::Producer => "Producers", + ActorKind::Consumer => "Consumers", + } + } +} diff --git a/bench/report/src/types/benchmark_kind.rs b/bench/report/src/types/benchmark_kind.rs new file mode 100644 index 000000000..9a95722da --- /dev/null +++ b/bench/report/src/types/benchmark_kind.rs @@ -0,0 +1,32 @@ +use derive_more::Display; +use serde::{Deserialize, Serialize}; + +#[derive( + Debug, + Clone, + Copy, + PartialEq, + Eq, + Hash, + Display, + Serialize, + Deserialize, + Default, + PartialOrd, + Ord, +)] +pub enum BenchmarkKind { + #[default] + #[display("Send")] + #[serde(rename = "send")] + Send, + #[display("Poll")] + #[serde(rename = "poll")] + Poll, + #[display("Send And Poll")] + #[serde(rename = "send_and_poll")] + SendAndPoll, + #[display("Consumer Group Poll")] + #[serde(rename = "consumer_group_poll")] + ConsumerGroupPoll, +} diff --git a/bench/report/src/types/group_metrics.rs b/bench/report/src/types/group_metrics.rs new file mode 100644 index 000000000..22090059f --- /dev/null +++ b/bench/report/src/types/group_metrics.rs @@ -0,0 +1,11 @@ +use super::{group_metrics_summary::BenchmarkGroupMetricsSummary, time_series::TimeSeries}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Clone, PartialEq, Deserialize)] +pub struct BenchmarkGroupMetrics { + pub summary: BenchmarkGroupMetricsSummary, + pub throughput_mb_ts: TimeSeries, + pub throughput_msg_ts: TimeSeries, + // There is no sense in aggregating latencies + // pub latency_ts: TimeSeries, +} diff --git a/bench/report/src/types/group_metrics_kind.rs b/bench/report/src/types/group_metrics_kind.rs new file mode 100644 index 000000000..ca88530d7 --- /dev/null +++ b/bench/report/src/types/group_metrics_kind.rs @@ -0,0 +1,26 @@ +use derive_more::derive::Display; +use serde::{Deserialize, Serialize}; + +/// The kind of group metrics to be displayed +#[derive(Debug, Copy, Clone, Eq, Hash, PartialEq, Serialize, Deserialize, Display)] +pub enum GroupMetricsKind { + #[display("Producers")] + #[serde(rename = "producers")] + Producers, + #[display("Consumers")] + #[serde(rename = "consumers")] + Consumers, + #[display("Producers and Consumers")] + #[serde(rename = "producers_and_consumers")] + ProducersAndConsumers, +} + +impl GroupMetricsKind { + pub fn actor(&self) -> &str { + match self { + GroupMetricsKind::Producers => "Producer", + GroupMetricsKind::Consumers => "Consumer", + GroupMetricsKind::ProducersAndConsumers => "Actor", + } + } +} diff --git a/bench/report/src/types/group_metrics_summary.rs b/bench/report/src/types/group_metrics_summary.rs new file mode 100644 index 000000000..882ae4a1d --- /dev/null +++ b/bench/report/src/types/group_metrics_summary.rs @@ -0,0 +1,30 @@ +use super::group_metrics_kind::GroupMetricsKind; +use crate::utils::round_float; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Clone, PartialEq, Deserialize)] +pub struct BenchmarkGroupMetricsSummary { + pub kind: GroupMetricsKind, + #[serde(serialize_with = "round_float")] + pub total_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] + pub total_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] + pub average_p50_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p90_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p95_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p99_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_p999_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub average_median_latency_ms: f64, +} diff --git a/bench/report/src/types/hardware.rs b/bench/report/src/types/hardware.rs new file mode 100644 index 000000000..b7a47b9f6 --- /dev/null +++ b/bench/report/src/types/hardware.rs @@ -0,0 +1,35 @@ +use serde::{Deserialize, Serialize}; +use sysinfo::System; + +#[derive(Debug, Serialize, Deserialize, Clone, derive_new::new, PartialEq, Default)] +pub struct BenchmarkHardware { + pub identifier: Option, + pub cpu_name: String, + pub cpu_cores: usize, + pub total_memory_mb: u64, + pub os_name: String, + pub os_version: String, +} + +impl BenchmarkHardware { + pub fn get_system_info_with_identifier(identifier: Option) -> Self { + let mut sys = System::new(); + sys.refresh_all(); + + let cpu = sys + .cpus() + .first() + .map(|cpu| (cpu.brand().to_string(), cpu.frequency())) + .unwrap_or_else(|| (String::from("unknown"), 0)); + + Self { + identifier, + cpu_name: cpu.0, + cpu_cores: sys.cpus().len(), + total_memory_mb: sys.total_memory() / 1024 / 1024, + os_name: sysinfo::System::name().unwrap_or_else(|| String::from("unknown")), + os_version: sysinfo::System::kernel_version() + .unwrap_or_else(|| String::from("unknown")), + } + } +} diff --git a/bench/report/src/types/individual_metrics.rs b/bench/report/src/types/individual_metrics.rs new file mode 100644 index 000000000..f0580e889 --- /dev/null +++ b/bench/report/src/types/individual_metrics.rs @@ -0,0 +1,12 @@ +use super::{ + individual_metrics_summary::BenchmarkIndividualMetricsSummary, time_series::TimeSeries, +}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, PartialEq, Deserialize)] +pub struct BenchmarkIndividualMetrics { + pub summary: BenchmarkIndividualMetricsSummary, + pub throughput_mb_ts: TimeSeries, + pub throughput_msg_ts: TimeSeries, + pub latency_ts: TimeSeries, +} diff --git a/bench/report/src/types/individual_metrics_summary.rs b/bench/report/src/types/individual_metrics_summary.rs new file mode 100644 index 000000000..3cdfa6565 --- /dev/null +++ b/bench/report/src/types/individual_metrics_summary.rs @@ -0,0 +1,34 @@ +use super::actor_kind::ActorKind; +use crate::benchmark_kind::BenchmarkKind; +use crate::utils::round_float; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, PartialEq, Deserialize)] +pub struct BenchmarkIndividualMetricsSummary { + pub benchmark_kind: BenchmarkKind, + pub actor_kind: ActorKind, + pub actor_id: u32, + #[serde(serialize_with = "round_float")] + pub total_time_secs: f64, + pub total_user_data_bytes: u64, + pub total_bytes: u64, + pub total_messages: u64, + #[serde(serialize_with = "round_float")] + pub throughput_megabytes_per_second: f64, + #[serde(serialize_with = "round_float")] + pub throughput_messages_per_second: f64, + #[serde(serialize_with = "round_float")] + pub p50_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub p90_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub p95_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub p99_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub p999_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub avg_latency_ms: f64, + #[serde(serialize_with = "round_float")] + pub median_latency_ms: f64, +} diff --git a/bench/report/src/types/mod.rs b/bench/report/src/types/mod.rs new file mode 100644 index 000000000..35d2923de --- /dev/null +++ b/bench/report/src/types/mod.rs @@ -0,0 +1,12 @@ +pub mod actor_kind; +pub mod benchmark_kind; +pub mod group_metrics; +pub mod group_metrics_kind; +pub mod group_metrics_summary; +pub mod hardware; +pub mod individual_metrics; +pub mod individual_metrics_summary; +pub mod params; +pub mod report; +pub mod time_series; +pub mod transport; diff --git a/bench/report/src/types/params.rs b/bench/report/src/types/params.rs new file mode 100644 index 000000000..5b7bb9e0e --- /dev/null +++ b/bench/report/src/types/params.rs @@ -0,0 +1,36 @@ +use super::{benchmark_kind::BenchmarkKind, transport::BenchmarkTransport}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +pub struct BenchmarkParams { + pub benchmark_kind: BenchmarkKind, + pub transport: BenchmarkTransport, + pub server_address: String, + pub remark: Option, + pub extra_info: Option, + pub gitref: Option, + pub gitref_date: Option, + pub messages_per_batch: u32, + pub message_batches: u32, + pub message_size: u32, + pub producers: u32, + pub consumers: u32, + pub streams: u32, + pub partitions: u32, + pub number_of_consumer_groups: u32, + pub disable_parallel_consumers: bool, + pub disable_parallel_producers: bool, + pub pretty_name: String, + pub bench_command: String, + pub params_identifier: String, +} + +impl BenchmarkParams { + pub fn format_actors_info(&self) -> String { + match (self.producers, self.consumers) { + (0, c) => format!("{} consumers", c), + (p, 0) => format!("{} producers", p), + (p, c) => format!("{} producers/{} consumers", p, c), + } + } +} diff --git a/bench/report/src/types/report.rs b/bench/report/src/types/report.rs new file mode 100644 index 000000000..0d8343514 --- /dev/null +++ b/bench/report/src/types/report.rs @@ -0,0 +1,39 @@ +use crate::group_metrics::BenchmarkGroupMetrics; +use crate::individual_metrics::BenchmarkIndividualMetrics; +use crate::types::hardware::BenchmarkHardware; +use crate::types::params::BenchmarkParams; +use serde::{Deserialize, Serialize}; +use std::path::Path; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Default)] +pub struct BenchmarkReport { + /// Benchmark unique identifier + pub uuid: Uuid, + + /// Timestamp when the benchmark was finished + pub timestamp: String, + + /// Benchmark hardware + pub hardware: BenchmarkHardware, + + /// Benchmark parameters + pub params: BenchmarkParams, + + /// Benchmark metrics for all actors of same type + pub group_metrics: Vec, + + /// Benchmark summaries per actor (producer/consumer) + pub individual_metrics: Vec, +} + +impl BenchmarkReport { + pub fn dump_to_json(&self, output_dir: &str) { + // Create the output directory + std::fs::create_dir_all(output_dir).expect("Failed to create output directory"); + + let report_path = Path::new(output_dir).join("report.json"); + let report_json = serde_json::to_string(self).unwrap(); + std::fs::write(report_path, report_json).expect("Failed to write report to file"); + } +} diff --git a/bench/report/src/types/time_series.rs b/bench/report/src/types/time_series.rs new file mode 100644 index 000000000..abf64d728 --- /dev/null +++ b/bench/report/src/types/time_series.rs @@ -0,0 +1,40 @@ +use serde::{Deserialize, Serialize}; + +/// A point in time series data +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +pub struct TimePoint { + pub time_s: f64, + pub value: f64, +} + +impl TimePoint { + pub fn new(time_s: f64, value: f64) -> Self { + Self { time_s, value } + } +} + +/// Time series data with associated metadata +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)] +pub struct TimeSeries { + pub points: Vec, + #[serde(skip)] + pub kind: TimeSeriesKind, +} + +/// Types of time series data we can calculate +#[derive(Debug, Clone, Copy, Default, PartialEq)] +pub enum TimeSeriesKind { + #[default] + ThroughputMB, + ThroughputMsg, + Latency, +} + +impl TimeSeries { + pub fn as_charming_points(&self) -> Vec> { + self.points + .iter() + .map(|p| vec![p.time_s, p.value]) + .collect() + } +} diff --git a/bench/report/src/types/transport.rs b/bench/report/src/types/transport.rs new file mode 100644 index 000000000..47225d3e0 --- /dev/null +++ b/bench/report/src/types/transport.rs @@ -0,0 +1,16 @@ +use derive_more::derive::Display; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Display, Default, Hash)] +pub enum BenchmarkTransport { + #[default] + #[display("TCP")] + #[serde(rename = "tcp")] + Tcp, + #[display("HTTP")] + #[serde(rename = "http")] + Http, + #[display("QUIC")] + #[serde(rename = "quic")] + Quic, +} diff --git a/bench/report/src/utils.rs b/bench/report/src/utils.rs new file mode 100644 index 000000000..0c124d041 --- /dev/null +++ b/bench/report/src/utils.rs @@ -0,0 +1,8 @@ +use serde::Serializer; + +pub(crate) fn round_float(value: &f64, serializer: S) -> Result +where + S: Serializer, +{ + serializer.serialize_f64((value * 1000.0).round() / 1000.0) +} diff --git a/bench/src/consumer.rs b/bench/src/actors/consumer.rs similarity index 79% rename from bench/src/consumer.rs rename to bench/src/actors/consumer.rs index d6cb86726..30b25551d 100644 --- a/bench/src/consumer.rs +++ b/bench/src/actors/consumer.rs @@ -1,7 +1,5 @@ -use crate::args::simple::BenchmarkKind; -use crate::benchmark_result::BenchmarkResult; -use crate::statistics::actor_statistics::BenchmarkActorStatistics; -use crate::statistics::record::BenchmarkRecord; +use crate::analytics::metrics::individual::from_records; +use crate::analytics::record::BenchmarkRecord; use iggy::client::{ConsumerGroupClient, MessageClient}; use iggy::clients::client::IggyClient; use iggy::consumer::Consumer as IggyConsumer; @@ -10,6 +8,9 @@ use iggy::messages::poll_messages::PollingStrategy; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::duration::IggyDuration; use iggy::utils::sizeable::Sizeable; +use iggy_benchmark_report::actor_kind::ActorKind; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; +use iggy_benchmark_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::{login_root, ClientFactory}; use std::sync::Arc; use std::time::Duration; @@ -24,7 +25,8 @@ pub struct Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, + sampling_time: IggyDuration, + moving_average_window: u32, } impl Consumer { @@ -37,7 +39,8 @@ impl Consumer { messages_per_batch: u32, message_batches: u32, warmup_time: IggyDuration, - output_directory: Option, + sampling_time: IggyDuration, + moving_average_window: u32, ) -> Self { Self { client_factory, @@ -47,11 +50,12 @@ impl Consumer { messages_per_batch, message_batches, warmup_time, - output_directory, + sampling_time, + moving_average_window, } } - pub async fn run(&self) -> Result { + pub async fn run(&self) -> Result { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let message_batches = self.message_batches as u64; @@ -212,56 +216,34 @@ impl Consumer { total_bytes += IggyByteSize::from(batch_size_total_bytes); current_iteration += 1; let message_batches = current_iteration; - records.push(BenchmarkRecord::new( - start_timestamp.elapsed().as_micros() as u64, - latency.as_micros() as u64, - received_messages, + records.push(BenchmarkRecord { + elapsed_time_us: start_timestamp.elapsed().as_micros() as u64, + latency_us: latency.as_micros() as u64, + messages: received_messages, message_batches, - total_user_data_bytes.as_bytes_u64(), - total_bytes.as_bytes_u64(), - )); + user_data_bytes: total_user_data_bytes.as_bytes_u64(), + total_bytes: total_bytes.as_bytes_u64(), + }); } - let statistics = BenchmarkActorStatistics::from_records(&records); - - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let output_file = format!( - "{}/raw_data/consumer_{}_data.csv", - output_directory, self.consumer_id - ); - info!( - "Consumer #{} → writing the results to {}...", - self.consumer_id, output_file - ); - let mut writer = csv::Writer::from_path(output_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/consumer_{}_summary.toml", - output_directory, self.consumer_id - ); - statistics.dump_to_toml(&summary_file); - } + let metrics = from_records( + records, + BenchmarkKind::Poll, + ActorKind::Consumer, + self.consumer_id, + self.sampling_time, + self.moving_average_window, + ); Self::log_consumer_statistics( self.consumer_id, total_messages, message_batches as u32, messages_per_batch, - &statistics, + &metrics, ); - Ok(BenchmarkResult { - kind: BenchmarkKind::Poll, - statistics, - }) + Ok(metrics) } pub fn log_consumer_statistics( @@ -269,7 +251,7 @@ impl Consumer { total_messages: u64, message_batches: u32, messages_per_batch: u32, - stats: &BenchmarkActorStatistics, + stats: &BenchmarkIndividualMetrics, ) { info!( "Consumer #{} → polled {} messages, {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ @@ -279,16 +261,16 @@ impl Consumer { total_messages, message_batches, messages_per_batch, - stats.total_time_secs, - IggyByteSize::from(stats.total_user_data_bytes), - stats.throughput_megabytes_per_second, - stats.p50_latency_ms, - stats.p90_latency_ms, - stats.p95_latency_ms, - stats.p99_latency_ms, - stats.p999_latency_ms, - stats.avg_latency_ms, - stats.median_latency_ms + stats.summary.total_time_secs, + IggyByteSize::from(stats.summary.total_user_data_bytes), + stats.summary.throughput_megabytes_per_second, + stats.summary.p50_latency_ms, + stats.summary.p90_latency_ms, + stats.summary.p95_latency_ms, + stats.summary.p99_latency_ms, + stats.summary.p999_latency_ms, + stats.summary.avg_latency_ms, + stats.summary.median_latency_ms ); } } diff --git a/bench/src/actors/mod.rs b/bench/src/actors/mod.rs new file mode 100644 index 000000000..934046ab4 --- /dev/null +++ b/bench/src/actors/mod.rs @@ -0,0 +1,2 @@ +pub mod consumer; +pub mod producer; diff --git a/bench/src/producer.rs b/bench/src/actors/producer.rs similarity index 68% rename from bench/src/producer.rs rename to bench/src/actors/producer.rs index 7cc699d96..e4cc214d5 100644 --- a/bench/src/producer.rs +++ b/bench/src/actors/producer.rs @@ -1,7 +1,5 @@ -use crate::args::simple::BenchmarkKind; -use crate::benchmark_result::BenchmarkResult; -use crate::statistics::actor_statistics::BenchmarkActorStatistics; -use crate::statistics::record::BenchmarkRecord; +use crate::analytics::metrics::individual::from_records; +use crate::analytics::record::BenchmarkRecord; use iggy::client::MessageClient; use iggy::clients::client::IggyClient; use iggy::error::IggyError; @@ -9,6 +7,9 @@ use iggy::messages::send_messages::{Message, Partitioning}; use iggy::utils::byte_size::IggyByteSize; use iggy::utils::duration::IggyDuration; use iggy::utils::sizeable::Sizeable; +use iggy_benchmark_report::actor_kind::ActorKind; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; +use iggy_benchmark_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::{login_root, ClientFactory}; use std::str::FromStr; use std::sync::Arc; @@ -25,7 +26,8 @@ pub struct Producer { messages_per_batch: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, + sampling_time: IggyDuration, + moving_average_window: u32, } impl Producer { @@ -39,7 +41,8 @@ impl Producer { message_batches: u32, message_size: u32, warmup_time: IggyDuration, - output_directory: Option, + sampling_time: IggyDuration, + moving_average_window: u32, ) -> Self { Producer { client_factory, @@ -50,11 +53,12 @@ impl Producer { message_batches, message_size, warmup_time, - output_directory, + sampling_time, + moving_average_window, } } - pub async fn run(&self) -> Result { + pub async fn run(&self) -> Result { let topic_id: u32 = 1; let default_partition_id: u32 = 1; let partitions_count = self.partitions_count; @@ -111,7 +115,7 @@ impl Producer { let start_timestamp = Instant::now(); let mut latencies: Vec = Vec::with_capacity(message_batches as usize); - let mut records: Vec = Vec::with_capacity(message_batches as usize); + let mut records = Vec::with_capacity(message_batches as usize); for i in 1..=message_batches { let before_send = Instant::now(); client @@ -121,60 +125,37 @@ impl Producer { let messages_processed = (i * messages_per_batch) as u64; let batches_processed = i as u64; - let total_user_data_bytes = batches_processed * batch_user_data_bytes; + let user_data_bytes = batches_processed * batch_user_data_bytes; let total_bytes = batches_processed * batch_total_bytes; latencies.push(latency); - records.push(BenchmarkRecord::new( - start_timestamp.elapsed().as_micros() as u64, - latency.as_micros() as u64, - messages_processed, - batches_processed, - total_user_data_bytes, + records.push(BenchmarkRecord { + elapsed_time_us: start_timestamp.elapsed().as_micros() as u64, + latency_us: latency.as_micros() as u64, + messages: messages_processed, + message_batches: batches_processed, + user_data_bytes, total_bytes, - )); - } - let statistics = BenchmarkActorStatistics::from_records(&records); - - if let Some(output_directory) = &self.output_directory { - std::fs::create_dir_all(format!("{}/raw_data", output_directory)).unwrap(); - - // Dump raw data to file - let results_file = format!( - "{}/raw_data/producer_{}_data.csv", - output_directory, self.producer_id - ); - info!( - "Producer #{} → writing the results to {}...", - self.producer_id, results_file - ); - - let mut writer = csv::Writer::from_path(results_file).unwrap(); - for sample in records { - writer.serialize(sample).unwrap(); - } - writer.flush().unwrap(); - - // Dump summary to file - let summary_file = format!( - "{}/raw_data/producer_{}_summary.toml", - output_directory, self.producer_id - ); - statistics.dump_to_toml(&summary_file); + }); } + let metrics = from_records( + records, + BenchmarkKind::Send, + ActorKind::Producer, + self.producer_id, + self.sampling_time, + self.moving_average_window, + ); Self::log_producer_statistics( self.producer_id, total_messages, message_batches, messages_per_batch, - &statistics, + &metrics, ); - Ok(BenchmarkResult { - kind: BenchmarkKind::Send, - statistics, - }) + Ok(metrics) } fn create_payload(size: u32) -> String { @@ -192,7 +173,7 @@ impl Producer { total_messages: u64, message_batches: u32, messages_per_batch: u32, - stats: &BenchmarkActorStatistics, + stats: &BenchmarkIndividualMetrics, ) { info!( "Producer #{} → sent {} messages in {} batches of {} messages in {:.2} s, total size: {}, average throughput: {:.2} MB/s, \ @@ -202,16 +183,16 @@ impl Producer { total_messages, message_batches, messages_per_batch, - stats.total_time_secs, - IggyByteSize::from(stats.total_bytes), - stats.throughput_megabytes_per_second, - stats.p50_latency_ms, - stats.p90_latency_ms, - stats.p95_latency_ms, - stats.p99_latency_ms, - stats.p999_latency_ms, - stats.avg_latency_ms, - stats.median_latency_ms + stats.summary.total_time_secs, + IggyByteSize::from(stats.summary.total_user_data_bytes), + stats.summary.throughput_megabytes_per_second, + stats.summary.p50_latency_ms, + stats.summary.p90_latency_ms, + stats.summary.p95_latency_ms, + stats.summary.p99_latency_ms, + stats.summary.p999_latency_ms, + stats.summary.avg_latency_ms, + stats.summary.median_latency_ms ); } } diff --git a/bench/src/analytics/metrics/group.rs b/bench/src/analytics/metrics/group.rs new file mode 100644 index 000000000..375f2feb0 --- /dev/null +++ b/bench/src/analytics/metrics/group.rs @@ -0,0 +1,116 @@ +use crate::analytics::time_series::{ + calculator::TimeSeriesCalculator, + processors::{moving_average::MovingAverageProcessor, TimeSeriesProcessor}, +}; +use iggy_benchmark_report::{ + actor_kind::ActorKind, group_metrics::BenchmarkGroupMetrics, + group_metrics_kind::GroupMetricsKind, group_metrics_summary::BenchmarkGroupMetricsSummary, + individual_metrics::BenchmarkIndividualMetrics, +}; + +pub fn from_producers_and_consumers_statistics( + producers_stats: &[BenchmarkIndividualMetrics], + consumers_stats: &[BenchmarkIndividualMetrics], + moving_average_window: u32, +) -> Option { + let mut summary = from_individual_metrics( + &[producers_stats, consumers_stats].concat(), + moving_average_window, + )?; + summary.summary.kind = GroupMetricsKind::ProducersAndConsumers; + Some(summary) +} + +pub fn from_individual_metrics( + stats: &[BenchmarkIndividualMetrics], + moving_average_window: u32, +) -> Option { + if stats.is_empty() { + return None; + } + let count = stats.len() as f64; + + // Compute aggregate throughput MB/s + let total_throughput_megabytes_per_second: f64 = stats + .iter() + .map(|r| r.summary.throughput_megabytes_per_second) + .sum(); + + // Compute aggregate throughput messages/s + let total_throughput_messages_per_second: f64 = stats + .iter() + .map(|r| r.summary.throughput_messages_per_second) + .sum(); + + // Compute average throughput MB/s + let average_throughput_megabytes_per_second = total_throughput_megabytes_per_second / count; + + // Compute average throughput messages/s + let average_throughput_messages_per_second = total_throughput_messages_per_second / count; + + // Compute average latencies + let average_p50_latency_ms = + stats.iter().map(|r| r.summary.p50_latency_ms).sum::() / count; + let average_p90_latency_ms = + stats.iter().map(|r| r.summary.p90_latency_ms).sum::() / count; + let average_p95_latency_ms = + stats.iter().map(|r| r.summary.p95_latency_ms).sum::() / count; + let average_p99_latency_ms = + stats.iter().map(|r| r.summary.p99_latency_ms).sum::() / count; + let average_p999_latency_ms: f64 = + stats.iter().map(|r| r.summary.p999_latency_ms).sum::() / count; + let average_avg_latency_ms = + stats.iter().map(|r| r.summary.avg_latency_ms).sum::() / count; + let average_median_latency_ms = stats + .iter() + .map(|r| r.summary.median_latency_ms) + .sum::() + / count; + + let kind = match stats.iter().next().unwrap().summary.actor_kind { + ActorKind::Producer => GroupMetricsKind::Producers, + ActorKind::Consumer => GroupMetricsKind::Consumers, + }; + + let calculator = TimeSeriesCalculator::new(); + + let mut throughput_mb_ts = calculator.aggregate( + stats + .iter() + .map(|r| r.throughput_mb_ts.clone()) + .collect::>() + .as_slice(), + ); + let mut throughput_msg_ts = calculator.aggregate( + stats + .iter() + .map(|r| r.throughput_msg_ts.clone()) + .collect::>() + .as_slice(), + ); + + let sma = MovingAverageProcessor::new(moving_average_window as usize); + throughput_mb_ts = sma.process(&throughput_mb_ts); + throughput_msg_ts = sma.process(&throughput_msg_ts); + + let summary = BenchmarkGroupMetricsSummary { + kind, + total_throughput_megabytes_per_second, + total_throughput_messages_per_second, + average_throughput_megabytes_per_second, + average_throughput_messages_per_second, + average_p50_latency_ms, + average_p90_latency_ms, + average_p95_latency_ms, + average_p99_latency_ms, + average_p999_latency_ms, + average_latency_ms: average_avg_latency_ms, + average_median_latency_ms, + }; + + Some(BenchmarkGroupMetrics { + summary, + throughput_mb_ts, + throughput_msg_ts, + }) +} diff --git a/bench/src/analytics/metrics/individual.rs b/bench/src/analytics/metrics/individual.rs new file mode 100644 index 000000000..0b4305cc6 --- /dev/null +++ b/bench/src/analytics/metrics/individual.rs @@ -0,0 +1,136 @@ +use crate::analytics::record::BenchmarkRecord; +use crate::analytics::time_series::calculator::TimeSeriesCalculator; +use crate::analytics::time_series::processors::moving_average::MovingAverageProcessor; +use crate::analytics::time_series::processors::TimeSeriesProcessor; +use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::actor_kind::ActorKind; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; +use iggy_benchmark_report::individual_metrics::BenchmarkIndividualMetrics; +use iggy_benchmark_report::individual_metrics_summary::BenchmarkIndividualMetricsSummary; +use iggy_benchmark_report::time_series::TimeSeries; + +pub fn from_records( + records: Vec, + benchmark_kind: BenchmarkKind, + actor_kind: ActorKind, + actor_id: u32, + sampling_time: IggyDuration, + moving_average_window: u32, +) -> BenchmarkIndividualMetrics { + if records.is_empty() { + return BenchmarkIndividualMetrics { + summary: BenchmarkIndividualMetricsSummary { + benchmark_kind, + actor_kind, + actor_id, + total_time_secs: 0.0, + total_user_data_bytes: 0, + total_bytes: 0, + total_messages: 0, + throughput_megabytes_per_second: 0.0, + throughput_messages_per_second: 0.0, + p50_latency_ms: 0.0, + p90_latency_ms: 0.0, + p95_latency_ms: 0.0, + p99_latency_ms: 0.0, + p999_latency_ms: 0.0, + avg_latency_ms: 0.0, + median_latency_ms: 0.0, + }, + throughput_mb_ts: TimeSeries::default(), + throughput_msg_ts: TimeSeries::default(), + latency_ts: TimeSeries::default(), + }; + } + + let total_time_secs = records.last().unwrap().elapsed_time_us as f64 / 1_000_000.0; + + let total_user_data_bytes: u64 = records.iter().last().unwrap().user_data_bytes; + let total_bytes: u64 = records.iter().last().unwrap().total_bytes; + let total_messages: u64 = records.iter().last().unwrap().messages; + + let throughput_megabytes_per_second = if total_time_secs > 0.0 { + (total_bytes as f64) / 1_000_000.0 / total_time_secs + } else { + 0.0 + }; + + let throughput_messages_per_second = if total_time_secs > 0.0 { + (total_messages as f64) / total_time_secs + } else { + 0.0 + }; + + let mut latencies_ms: Vec = records + .iter() + .map(|r| r.latency_us as f64 / 1_000.0) + .collect(); + latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap()); + + let p50_latency_ms = calculate_percentile(&latencies_ms, 50.0); + let p90_latency_ms = calculate_percentile(&latencies_ms, 90.0); + let p95_latency_ms = calculate_percentile(&latencies_ms, 95.0); + let p99_latency_ms = calculate_percentile(&latencies_ms, 99.0); + let p999_latency_ms = calculate_percentile(&latencies_ms, 99.9); + + let avg_latency_ms = latencies_ms.iter().sum::() / latencies_ms.len() as f64; + let len = latencies_ms.len() / 2; + let median_latency_ms = if latencies_ms.len() % 2 == 0 { + (latencies_ms[len - 1] + latencies_ms[len]) / 2.0 + } else { + latencies_ms[len] + }; + + let calculator = TimeSeriesCalculator::new(); + + // Calculate throughput time series + let throughput_mb_ts = calculator.throughput_mb(&records, sampling_time); + let throughput_msg_ts = calculator.throughput_msg(&records, sampling_time); + + let sma = MovingAverageProcessor::new(moving_average_window as usize); + let throughput_mb_ts = sma.process(&throughput_mb_ts); + let throughput_msg_ts = sma.process(&throughput_msg_ts); + + let latency_ts = calculator.latency(&records, sampling_time); + + BenchmarkIndividualMetrics { + summary: BenchmarkIndividualMetricsSummary { + benchmark_kind, + actor_kind, + actor_id, + total_time_secs, + total_user_data_bytes, + total_bytes, + total_messages, + throughput_megabytes_per_second, + throughput_messages_per_second, + p50_latency_ms, + p90_latency_ms, + p95_latency_ms, + p99_latency_ms, + p999_latency_ms, + avg_latency_ms, + median_latency_ms, + }, + throughput_mb_ts, + throughput_msg_ts, + latency_ts, + } +} + +fn calculate_percentile(sorted_data: &[f64], percentile: f64) -> f64 { + if sorted_data.is_empty() { + return 0.0; + } + + let rank = percentile / 100.0 * (sorted_data.len() - 1) as f64; + let lower = rank.floor() as usize; + let upper = rank.ceil() as usize; + + if upper >= sorted_data.len() { + return sorted_data[sorted_data.len() - 1]; + } + + let weight = rank - lower as f64; + sorted_data[lower] * (1.0 - weight) + sorted_data[upper] * weight +} diff --git a/bench/src/analytics/metrics/mod.rs b/bench/src/analytics/metrics/mod.rs new file mode 100644 index 000000000..3007c2009 --- /dev/null +++ b/bench/src/analytics/metrics/mod.rs @@ -0,0 +1,2 @@ +pub mod group; +pub mod individual; diff --git a/bench/src/analytics/mod.rs b/bench/src/analytics/mod.rs new file mode 100644 index 000000000..efd05bf08 --- /dev/null +++ b/bench/src/analytics/mod.rs @@ -0,0 +1,4 @@ +pub mod metrics; +pub mod record; +pub mod report_builder; +pub mod time_series; diff --git a/bench/src/statistics/record.rs b/bench/src/analytics/record.rs similarity index 67% rename from bench/src/statistics/record.rs rename to bench/src/analytics/record.rs index f143dbc26..7ca5ff6cc 100644 --- a/bench/src/statistics/record.rs +++ b/bench/src/analytics/record.rs @@ -1,7 +1,4 @@ -use derive_new::new; -use serde::Serialize; - -#[derive(Debug, Clone, PartialEq, Serialize, new)] +#[derive(Debug, Clone, PartialEq)] pub struct BenchmarkRecord { pub elapsed_time_us: u64, pub latency_us: u64, diff --git a/bench/src/analytics/report_builder.rs b/bench/src/analytics/report_builder.rs new file mode 100644 index 000000000..eba10f8a2 --- /dev/null +++ b/bench/src/analytics/report_builder.rs @@ -0,0 +1,80 @@ +use super::metrics::group::{from_individual_metrics, from_producers_and_consumers_statistics}; +use chrono::{DateTime, Utc}; +use iggy::utils::timestamp::IggyTimestamp; +use iggy_benchmark_report::{ + actor_kind::ActorKind, benchmark_kind::BenchmarkKind, hardware::BenchmarkHardware, + individual_metrics::BenchmarkIndividualMetrics, params::BenchmarkParams, + report::BenchmarkReport, +}; + +pub struct BenchmarkReportBuilder; + +impl BenchmarkReportBuilder { + pub fn build( + hardware: BenchmarkHardware, + params: BenchmarkParams, + mut individual_metrics: Vec, + moving_average_window: u32, + ) -> BenchmarkReport { + let uuid = uuid::Uuid::new_v4(); + + let timestamp = + DateTime::::from_timestamp_micros(IggyTimestamp::now().as_micros() as i64) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| String::from("unknown")); + + let mut group_metrics = Vec::new(); + + // Sort metrics by actor type and ID + individual_metrics.sort_by_key(|m| (m.summary.actor_kind, m.summary.actor_id)); + + // Split metrics by actor type + let producer_metrics: Vec = individual_metrics + .iter() + .filter(|m| m.summary.actor_kind == ActorKind::Producer) + .cloned() + .collect(); + let consumer_metrics: Vec = individual_metrics + .iter() + .filter(|m| m.summary.actor_kind == ActorKind::Consumer) + .cloned() + .collect(); + + if !producer_metrics.is_empty() { + if let Some(metrics) = from_individual_metrics(&producer_metrics, moving_average_window) + { + group_metrics.push(metrics); + } + } + + if !consumer_metrics.is_empty() { + if let Some(metrics) = from_individual_metrics(&consumer_metrics, moving_average_window) + { + group_metrics.push(metrics); + } + } + + // Only add aggregate metrics for send and poll benchmark + if params.benchmark_kind == BenchmarkKind::SendAndPoll + && !producer_metrics.is_empty() + && !consumer_metrics.is_empty() + { + if let Some(metrics) = from_producers_and_consumers_statistics( + &producer_metrics, + &consumer_metrics, + moving_average_window, + ) { + group_metrics.push(metrics); + } + } + + BenchmarkReport { + uuid, + timestamp, + hardware, + params, + group_metrics, + individual_metrics, + } + } +} diff --git a/bench/src/analytics/time_series/calculator.rs b/bench/src/analytics/time_series/calculator.rs new file mode 100644 index 000000000..fede66def --- /dev/null +++ b/bench/src/analytics/time_series/calculator.rs @@ -0,0 +1,76 @@ +use super::calculators::{ + LatencyTimeSeriesCalculator, MBThroughputCalculator, MessageThroughputCalculator, + ThroughputTimeSeriesCalculator, TimeSeriesCalculation, +}; +use crate::analytics::record::BenchmarkRecord; +use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind}; +use tracing::warn; + +/// Calculate time series data from benchmark records +pub struct TimeSeriesCalculator; + +impl TimeSeriesCalculator { + pub fn new() -> Self { + Self + } + + pub fn throughput_mb( + &self, + records: &[BenchmarkRecord], + bucket_size: IggyDuration, + ) -> TimeSeries { + let calculator = ThroughputTimeSeriesCalculator::new(MBThroughputCalculator); + calculator.calculate(records, bucket_size) + } + + pub fn throughput_msg( + &self, + records: &[BenchmarkRecord], + bucket_size: IggyDuration, + ) -> TimeSeries { + let calculator = ThroughputTimeSeriesCalculator::new(MessageThroughputCalculator); + calculator.calculate(records, bucket_size) + } + + pub fn latency(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries { + let calculator = LatencyTimeSeriesCalculator; + calculator.calculate(records, bucket_size) + } + + pub fn aggregate(&self, series: &[TimeSeries]) -> TimeSeries { + if series.is_empty() { + warn!("Attempting to aggregate empty series"); + return TimeSeries { + points: Vec::new(), + kind: TimeSeriesKind::default(), + }; + } + + let kind = series[0].kind; + let mut all_times = series + .iter() + .flat_map(|s| s.points.iter().map(|p: &TimePoint| p.time_s)) + .collect::>(); + all_times.sort_by(|a, b| a.partial_cmp(b).unwrap()); + all_times.dedup(); + + let points = all_times + .into_iter() + .map(|time| { + let sum: f64 = series + .iter() + .filter_map(|s| { + s.points + .iter() + .find(|p| (p.time_s - time).abs() < f64::EPSILON) + .map(|p| p.value) + }) + .sum(); + TimePoint::new(time, sum) + }) + .collect(); + + TimeSeries { points, kind } + } +} diff --git a/bench/src/analytics/time_series/calculators/latency.rs b/bench/src/analytics/time_series/calculators/latency.rs new file mode 100644 index 000000000..91c922093 --- /dev/null +++ b/bench/src/analytics/time_series/calculators/latency.rs @@ -0,0 +1,60 @@ +use super::TimeSeriesCalculation; +use crate::analytics::record::BenchmarkRecord; +use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind}; +use tracing::warn; + +/// Calculator for latency time series +pub struct LatencyTimeSeriesCalculator; + +impl TimeSeriesCalculation for LatencyTimeSeriesCalculator { + fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries { + if records.len() < 2 { + warn!("Not enough records to calculate latency"); + return TimeSeries { + points: Vec::new(), + kind: TimeSeriesKind::Latency, + }; + } + + let bucket_size_us = bucket_size.as_micros(); + + let max_time_us = records.iter().map(|r| r.elapsed_time_us).max().unwrap(); + let num_buckets = max_time_us.div_ceil(bucket_size_us); + let mut total_latency_per_bucket = vec![0u64; num_buckets as usize]; + let mut message_count_per_bucket = vec![0u64; num_buckets as usize]; + + for window in records.windows(2) { + let (prev, current) = (&window[0], &window[1]); + let bucket_index = current.elapsed_time_us / bucket_size_us; + if bucket_index >= num_buckets { + continue; + } + + let delta_messages = current.messages.saturating_sub(prev.messages); + if delta_messages == 0 { + continue; + } + + let delta_latency = current.latency_us.saturating_sub(prev.latency_us); + total_latency_per_bucket[bucket_index as usize] += delta_latency; + message_count_per_bucket[bucket_index as usize] += delta_messages; + } + + let points = (0..num_buckets) + .filter(|&i| message_count_per_bucket[i as usize] > 0) + .map(|i| { + let time_s = (i * bucket_size_us) as f64 / 1_000_000.0; + let avg_latency_us = total_latency_per_bucket[i as usize] as f64 + / message_count_per_bucket[i as usize] as f64; + let rounded_avg_latency_us = (avg_latency_us * 1000.0).round() / 1000.0; + TimePoint::new(time_s, rounded_avg_latency_us) + }) + .collect(); + + TimeSeries { + points, + kind: TimeSeriesKind::Latency, + } + } +} diff --git a/bench/src/analytics/time_series/calculators/mod.rs b/bench/src/analytics/time_series/calculators/mod.rs new file mode 100644 index 000000000..992ae1438 --- /dev/null +++ b/bench/src/analytics/time_series/calculators/mod.rs @@ -0,0 +1,16 @@ +mod latency; +mod throughput; + +use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::time_series::TimeSeries; +pub use latency::LatencyTimeSeriesCalculator; +pub use throughput::{ + MBThroughputCalculator, MessageThroughputCalculator, ThroughputTimeSeriesCalculator, +}; + +use crate::analytics::record::BenchmarkRecord; + +/// Common functionality for time series calculations +pub trait TimeSeriesCalculation { + fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries; +} diff --git a/bench/src/analytics/time_series/calculators/throughput.rs b/bench/src/analytics/time_series/calculators/throughput.rs new file mode 100644 index 000000000..4c73a6a52 --- /dev/null +++ b/bench/src/analytics/time_series/calculators/throughput.rs @@ -0,0 +1,125 @@ +use super::TimeSeriesCalculation; +use crate::analytics::record::BenchmarkRecord; +use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::time_series::{TimePoint, TimeSeries, TimeSeriesKind}; + +/// Common functionality for throughput calculations +pub trait ThroughputCalculation { + fn get_delta_value(&self, current: &BenchmarkRecord, prev: &BenchmarkRecord) -> u64; + fn calculate_throughput(&self, value: u64, bucket_size_us: u64) -> f64; + fn kind(&self) -> TimeSeriesKind; +} + +/// Calculator for MB/s throughput +pub struct MBThroughputCalculator; + +impl ThroughputCalculation for MBThroughputCalculator { + fn get_delta_value(&self, current: &BenchmarkRecord, prev: &BenchmarkRecord) -> u64 { + current.user_data_bytes.saturating_sub(prev.user_data_bytes) + } + + fn calculate_throughput(&self, bytes: u64, bucket_size_us: u64) -> f64 { + (bytes as f64) / 1_000_000.0 / (bucket_size_us as f64 / 1_000_000.0) + } + + fn kind(&self) -> TimeSeriesKind { + TimeSeriesKind::ThroughputMB + } +} + +/// Calculator for messages/s throughput +pub struct MessageThroughputCalculator; + +impl ThroughputCalculation for MessageThroughputCalculator { + fn get_delta_value(&self, current: &BenchmarkRecord, prev: &BenchmarkRecord) -> u64 { + current.messages.saturating_sub(prev.messages) + } + + fn calculate_throughput(&self, messages: u64, bucket_size_us: u64) -> f64 { + (messages as f64) / (bucket_size_us as f64 / 1_000_000.0) + } + + fn kind(&self) -> TimeSeriesKind { + TimeSeriesKind::ThroughputMsg + } +} + +/// Generic throughput calculator that works with different throughput metrics +pub struct ThroughputTimeSeriesCalculator { + calculator: T, +} + +impl ThroughputTimeSeriesCalculator { + pub fn new(calculator: T) -> Self { + Self { calculator } + } +} + +impl TimeSeriesCalculation for ThroughputTimeSeriesCalculator { + fn calculate(&self, records: &[BenchmarkRecord], bucket_size: IggyDuration) -> TimeSeries { + if records.len() < 2 { + return TimeSeries { + points: Vec::new(), + kind: self.calculator.kind(), + }; + } + let bucket_size_us = bucket_size.as_micros(); + + let max_time_us = records.iter().map(|r| r.elapsed_time_us).max().unwrap_or(0); + let num_buckets = max_time_us.div_ceil(bucket_size_us); + let mut values_per_bucket = vec![0u64; num_buckets as usize]; + + for window in records.windows(2) { + let (prev, current) = (&window[0], &window[1]); + let delta_time_us = current.elapsed_time_us.saturating_sub(prev.elapsed_time_us); + if delta_time_us == 0 { + continue; + } + + let delta_value = self.calculator.get_delta_value(current, prev); + let value_per_us = delta_value as f64 / delta_time_us as f64; + + let mut remaining_time_us = delta_time_us; + let mut current_time_us = prev.elapsed_time_us; + + while remaining_time_us > 0 { + let bucket_index = current_time_us / bucket_size_us; + if bucket_index >= num_buckets { + break; + } + + let bucket_start_us = bucket_index * bucket_size_us; + let bucket_end_us = bucket_start_us + bucket_size_us; + let overlap_start_us = current_time_us.max(bucket_start_us); + let overlap_end_us = (current_time_us + remaining_time_us).min(bucket_end_us); + let overlap_us = overlap_end_us.saturating_sub(overlap_start_us); + + if overlap_us > 0 { + let allocated_value = (value_per_us * overlap_us as f64).round() as u64; + values_per_bucket[bucket_index as usize] += allocated_value; + } + + let allocated_time_us = overlap_end_us.saturating_sub(current_time_us); + remaining_time_us = remaining_time_us.saturating_sub(allocated_time_us); + current_time_us = overlap_end_us; + } + } + + let points = values_per_bucket + .iter() + .enumerate() + .filter(|(_, &value)| value > 0) + .map(|(i, &value)| { + let time_s = (i as u64 * bucket_size_us) as f64 / 1_000_000.0; + let throughput = self.calculator.calculate_throughput(value, bucket_size_us); + let rounded_throughput = (throughput * 1000.0).round() / 1000.0; + TimePoint::new(time_s, rounded_throughput) + }) + .collect(); + + TimeSeries { + points, + kind: self.calculator.kind(), + } + } +} diff --git a/bench/src/analytics/time_series/mod.rs b/bench/src/analytics/time_series/mod.rs new file mode 100644 index 000000000..aa437b0bd --- /dev/null +++ b/bench/src/analytics/time_series/mod.rs @@ -0,0 +1,3 @@ +pub mod calculator; +pub mod calculators; +pub mod processors; diff --git a/bench/src/analytics/time_series/processors/mod.rs b/bench/src/analytics/time_series/processors/mod.rs new file mode 100644 index 000000000..4e97d027f --- /dev/null +++ b/bench/src/analytics/time_series/processors/mod.rs @@ -0,0 +1,8 @@ +use iggy_benchmark_report::time_series::TimeSeries; + +pub mod moving_average; + +/// Process time series data +pub trait TimeSeriesProcessor { + fn process(&self, data: &TimeSeries) -> TimeSeries; +} diff --git a/bench/src/analytics/time_series/processors/moving_average.rs b/bench/src/analytics/time_series/processors/moving_average.rs new file mode 100644 index 000000000..64a088b04 --- /dev/null +++ b/bench/src/analytics/time_series/processors/moving_average.rs @@ -0,0 +1,43 @@ +use super::TimeSeriesProcessor; +use iggy_benchmark_report::time_series::{TimePoint, TimeSeries}; +use std::collections::VecDeque; +use tracing::warn; + +/// Moving average processor +pub struct MovingAverageProcessor { + window_size: usize, +} + +impl MovingAverageProcessor { + pub fn new(window_size: usize) -> Self { + Self { window_size } + } +} + +impl TimeSeriesProcessor for MovingAverageProcessor { + fn process(&self, data: &TimeSeries) -> TimeSeries { + if data.points.is_empty() { + warn!("Attempting to process empty series"); + return data.clone(); + } + + let mut window: VecDeque = VecDeque::with_capacity(self.window_size); + let mut points = Vec::with_capacity(data.points.len()); + + for point in &data.points { + window.push_back(point.value); + if window.len() > self.window_size { + window.pop_front(); + } + + let avg = window.iter().sum::() / window.len() as f64; + let rounded_avg = (avg * 1000.0).round() / 1000.0; + points.push(TimePoint::new(point.time_s, rounded_avg)); + } + + TimeSeries { + points, + kind: data.kind, + } + } +} diff --git a/bench/src/args/common.rs b/bench/src/args/common.rs index 887cdfd07..b4cca36f5 100644 --- a/bench/src/args/common.rs +++ b/bench/src/args/common.rs @@ -4,10 +4,14 @@ use super::{defaults::*, transport::BenchmarkTransportCommand}; use clap::error::ErrorKind; use clap::{CommandFactory, Parser}; use iggy::utils::duration::IggyDuration; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; +use iggy_benchmark_report::params::BenchmarkParams; +use iggy_benchmark_report::transport::BenchmarkTransport; use integration::test_server::Transport; use std::net::SocketAddr; use std::path::Path; use std::str::FromStr; +use tracing::info; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -36,16 +40,41 @@ pub struct IggyBenchArgs { #[arg(long, short = 'w', default_value_t = IggyDuration::from_str(DEFAULT_WARMUP_TIME).unwrap())] pub warmup_time: IggyDuration, + /// Sampling time for metrics collection. It is also used as bucket size for time series calculations. + #[arg(long, default_value_t = IggyDuration::from_str(DEFAULT_SAMPLING_TIME).unwrap(), value_parser = IggyDuration::from_str)] + pub sampling_time: IggyDuration, + + /// Window size for moving average calculations in time series data + #[arg(long, default_value_t = DEFAULT_MOVING_AVERAGE_WINDOW)] + pub moving_average_window: u32, + /// Skip server start #[arg(long, short = 'k', default_value_t = DEFAULT_SKIP_SERVER_START)] pub skip_server_start: bool, - /// Output directory, in which the benchmark results will be stored as `csv` and `toml` files. - /// Sample from the benchmark will be stored in a `csv` file on per-actor manner - each - /// producer/consumer will have its own file. - /// Actor summary, benchmark summary and parameters will be stored in a TOML file. - #[arg(long, short = 'o', default_value = None)] - pub output_directory: Option, + /// Output directory path for storing benchmark results + #[arg(long)] + pub output_dir: Option, + + /// Identifier for the benchmark run (e.g., machine name) + #[arg(long)] + pub identifier: Option, + + /// Additional remark for the benchmark (e.g., no-cache) + #[arg(long)] + pub remark: Option, + + /// Extra information for future use + #[arg(long)] + pub extra_info: Option, + + /// Git reference (commit hash, branch or tag) used for note in the benchmark results + #[arg(long)] + pub gitref: Option, + + /// Git reference date used for note in the benchmark results, preferably merge date of the commit + #[arg(long)] + pub gitref_date: Option, } fn validate_server_executable_path(v: &str) -> Result { @@ -90,6 +119,21 @@ impl IggyBenchArgs { .exit(); } + if self.output_dir.is_none() + && (self.gitref.is_some() + || self.identifier.is_some() + || self.remark.is_some() + || self.extra_info.is_some() + || self.gitref_date.is_some()) + { + IggyBenchArgs::command() + .error( + ErrorKind::ArgumentConflict, + "--git-ref, --git-ref-date, --identifier, --remark, --extra-info can only be used with --output-dir", + ) + .exit(); + } + self.benchmark_kind.inner().validate() } @@ -141,7 +185,308 @@ impl IggyBenchArgs { self.warmup_time } - pub fn output_directory(&self) -> Option { - self.output_directory.clone() + pub fn sampling_time(&self) -> IggyDuration { + self.sampling_time + } + + pub fn moving_average_window(&self) -> u32 { + self.moving_average_window + } + + pub fn output_dir(&self) -> Option { + self.output_dir.clone() + } + + pub fn identifier(&self) -> Option { + self.identifier.clone() + } + + pub fn remark(&self) -> Option { + self.remark.clone() + } + + pub fn extra_info(&self) -> Option { + self.extra_info.clone() + } + + pub fn gitref(&self) -> Option { + self.gitref.clone() + } + + pub fn gitref_date(&self) -> Option { + self.gitref_date.clone() + } + + /// Generates the output directory name based on benchmark parameters. + pub fn generate_dir_name(&self) -> String { + let benchmark_kind = match &self.benchmark_kind { + BenchmarkKindCommand::Send(_) => "send", + BenchmarkKindCommand::Poll(_) => "poll", + BenchmarkKindCommand::SendAndPoll(_) => "send_and_poll", + BenchmarkKindCommand::ConsumerGroupPoll(_) => "consumer_group_poll", + BenchmarkKindCommand::Examples => unreachable!(), + }; + + let transport = match self.transport_command() { + BenchmarkTransportCommand::Tcp(_) => "tcp", + BenchmarkTransportCommand::Quic(_) => "quic", + BenchmarkTransportCommand::Http(_) => "http", + }; + + let mut parts = vec![ + benchmark_kind.to_string(), + match benchmark_kind { + "send" => self.producers().to_string(), + _ => self.consumers().to_string(), + }, + self.message_size().to_string(), + self.messages_per_batch().to_string(), + self.message_batches().to_string(), + transport.to_string(), + ]; + + if let Some(remark) = &self.remark { + parts.push(remark.to_string()); + } + + if let Some(gitref) = &self.gitref { + parts.push(gitref.to_string()); + } + + if let Some(identifier) = &self.identifier { + parts.push(identifier.to_string()); + } + + parts.join("_") + } + + /// Generates a human-readable pretty name for the benchmark + pub fn generate_pretty_name(&self) -> String { + let consumer_or_producer = match &self.benchmark_kind { + BenchmarkKindCommand::Send(_) => format!("{} producers", self.producers()), + _ => format!("{} consumers", self.consumers()), + }; + + let mut name = format!( + "{}, {}B msgs, {} msgs/batch", + consumer_or_producer, + self.message_size(), + self.messages_per_batch(), + ); + + if let Some(remark) = &self.remark { + name.push_str(&format!(" ({})", remark)); + } + + name + } +} + +fn recreate_bench_command(args: &IggyBenchArgs) -> String { + let mut parts = Vec::new(); + + // If using localhost, add env vars + let server_address = args.server_address(); + let is_localhost = server_address + .split(':') + .next() + .map(|host| host == "localhost" || host == "127.0.0.1") + .unwrap_or(false); + + if is_localhost { + // Get all env vars starting with IGGY_ + let iggy_vars: Vec<_> = std::env::vars() + .filter(|(k, _)| k.starts_with("IGGY_")) + .collect(); + + if !iggy_vars.is_empty() { + info!("Found env vars starting with IGGY_: {:?}", iggy_vars); + parts.extend(iggy_vars.into_iter().map(|(k, v)| format!("{}={}", k, v))); + } + } + + parts.push("iggy-bench".to_string()); + + // Add optional global args + if let Some(ref remark) = args.remark() { + parts.push(format!("--remark \'{}\'", remark)); + } + + if let Some(ref output_dir) = args.output_dir() { + parts.push(format!("--output-dir \'{}\'", output_dir)); + } + + // Add warmup time if not default + if args.warmup_time().to_string() != DEFAULT_WARMUP_TIME { + parts.push(format!("--warmup-time {}", args.warmup_time())); + } + + // Add benchmark kind + let kind_str = match args.benchmark_kind.as_simple_kind() { + BenchmarkKind::Send => "send", + BenchmarkKind::Poll => "poll", + BenchmarkKind::SendAndPoll => "send-and-poll", + BenchmarkKind::ConsumerGroupPoll => "consumer-group-poll", + }; + parts.push(kind_str.to_string()); + + // Add benchmark params, skipping defaults + let producers = args.producers(); + let consumers = args.consumers(); + + match args.benchmark_kind.as_simple_kind() { + BenchmarkKind::Send => { + if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() { + parts.push(format!("--producers {}", producers)); + } + } + BenchmarkKind::Poll | BenchmarkKind::ConsumerGroupPoll => { + if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() { + parts.push(format!("--consumers {}", consumers)); + } + } + BenchmarkKind::SendAndPoll => { + if producers != DEFAULT_NUMBER_OF_PRODUCERS.get() { + parts.push(format!("--producers {}", producers)); + } + if consumers != DEFAULT_NUMBER_OF_CONSUMERS.get() { + parts.push(format!("--consumers {}", consumers)); + } + } + } + + let messages_per_batch = args.messages_per_batch(); + if messages_per_batch != DEFAULT_MESSAGES_PER_BATCH.get() { + parts.push(format!("--messages-per-batch {}", messages_per_batch)); + } + + let message_batches = args.message_batches(); + if message_batches != DEFAULT_MESSAGE_BATCHES.get() { + parts.push(format!("--message-batches {}", message_batches)); + } + + let message_size = args.message_size(); + if message_size != DEFAULT_MESSAGE_SIZE.get() { + parts.push(format!("--message-size {}", message_size)); + } + + let streams = args.number_of_streams(); + let default_streams = match args.benchmark_kind.as_simple_kind() { + BenchmarkKind::ConsumerGroupPoll => DEFAULT_NUMBER_OF_STREAMS_CONSUMER_GROUP.get(), + _ => DEFAULT_NUMBER_OF_STREAMS.get(), + }; + if streams != default_streams { + parts.push(format!("--streams {}", streams)); + } + + let partitions = args.number_of_partitions(); + if partitions != DEFAULT_NUMBER_OF_PARTITIONS.get() { + parts.push(format!("--partitions {}", partitions)); + } + + // Add transport and server address, skipping if default + let transport = args.transport().to_string().to_lowercase(); + parts.push(transport.clone()); + + let default_address = match transport.as_str() { + "tcp" => DEFAULT_TCP_SERVER_ADDRESS, + "quic" => DEFAULT_QUIC_SERVER_ADDRESS, + "http" => DEFAULT_HTTP_SERVER_ADDRESS, + _ => "", + }; + if server_address != default_address { + parts.push(format!("--server-address {}", server_address)); + } + + // Add optional flags, skipping defaults + if args.disable_parallel_producer_streams() != DEFAULT_DISABLE_PARALLEL_PRODUCER_STREAMS { + parts.push("--disable-parallel-producer-streams".to_string()); + } + if args.disable_parallel_consumer_streams() != DEFAULT_DISABLE_PARALLEL_CONSUMER_STREAMS { + parts.push("--disable-parallel-consumer-streams".to_string()); + } + + // Only add consumer groups for consumer group poll + let consumer_groups = args.number_of_consumer_groups(); + if args.benchmark_kind.as_simple_kind() == BenchmarkKind::ConsumerGroupPoll + && consumer_groups != DEFAULT_NUMBER_OF_CONSUMER_GROUPS.get() + { + parts.push(format!("--consumer-groups {}", consumer_groups)); + } + + parts.join(" ") +} + +impl From<&IggyBenchArgs> for BenchmarkParams { + fn from(args: &IggyBenchArgs) -> Self { + let benchmark_kind = args.benchmark_kind.as_simple_kind(); + + // Ugly conversion but let it stay here to not have dependencies on other modules + let transport = match args.transport() { + Transport::Tcp => BenchmarkTransport::Tcp, + Transport::Quic => BenchmarkTransport::Quic, + Transport::Http => BenchmarkTransport::Http, + }; + let server_address = args.server_address().to_string(); + let remark = args.remark(); + let extra_info = args.extra_info(); + let gitref = args.gitref(); + let gitref_date = args.gitref_date(); + let messages_per_batch = args.messages_per_batch(); + let message_batches = args.message_batches(); + let message_size = args.message_size(); + let producers = args.producers(); + let consumers = args.consumers(); + let streams = args.number_of_streams(); + let partitions = args.number_of_partitions(); + let number_of_consumer_groups = args.number_of_consumer_groups(); + let disable_parallel_consumers = args.disable_parallel_consumer_streams(); + let disable_parallel_producers = args.disable_parallel_producer_streams(); + let pretty_name = args.generate_pretty_name(); + let bench_command = recreate_bench_command(args); + + let remark_for_identifier = remark + .clone() + .unwrap_or("no_remark".to_string()) + .replace(' ', "_"); + + let params_identifier = vec![ + benchmark_kind.to_string(), + transport.to_string(), + remark_for_identifier, + messages_per_batch.to_string(), + message_batches.to_string(), + message_size.to_string(), + producers.to_string(), + consumers.to_string(), + streams.to_string(), + partitions.to_string(), + number_of_consumer_groups.to_string(), + ]; + + let params_identifier = params_identifier.join("_"); + + BenchmarkParams { + benchmark_kind, + transport, + server_address, + remark, + extra_info, + gitref, + gitref_date, + messages_per_batch, + message_batches, + message_size, + producers, + consumers, + streams, + partitions, + number_of_consumer_groups, + disable_parallel_consumers, + disable_parallel_producers, + pretty_name, + bench_command, + params_identifier, + } } } diff --git a/bench/src/args/defaults.rs b/bench/src/args/defaults.rs index 6489ca6ef..10d565d65 100644 --- a/bench/src/args/defaults.rs +++ b/bench/src/args/defaults.rs @@ -34,3 +34,6 @@ pub const DEFAULT_SERVER_STDOUT_VISIBILITY: bool = false; pub const DEFAULT_WARMUP_TIME: &str = "1 s"; pub const DEFAULT_SKIP_SERVER_START: bool = false; + +pub const DEFAULT_SAMPLING_TIME: &str = "10 ms"; +pub const DEFAULT_MOVING_AVERAGE_WINDOW: u32 = 20; diff --git a/bench/src/args/kind.rs b/bench/src/args/kind.rs index ebd95de21..7c0229b3c 100644 --- a/bench/src/args/kind.rs +++ b/bench/src/args/kind.rs @@ -1,9 +1,10 @@ +use super::common::IggyBenchArgs; use super::defaults::*; use super::examples::print_examples; use super::props::BenchmarkKindProps; use super::transport::BenchmarkTransportCommand; -use super::{common::IggyBenchArgs, simple::BenchmarkKind}; use clap::{error::ErrorKind, CommandFactory, Parser, Subcommand}; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; use serde::Serialize; use std::num::NonZeroU32; diff --git a/bench/src/args/mod.rs b/bench/src/args/mod.rs index 7a0368e6e..0746d6661 100644 --- a/bench/src/args/mod.rs +++ b/bench/src/args/mod.rs @@ -1,8 +1,7 @@ pub mod common; +pub mod defaults; pub mod kind; -pub mod simple; -mod defaults; mod examples; mod props; mod transport; diff --git a/bench/src/args/simple.rs b/bench/src/args/simple.rs deleted file mode 100644 index c004ef203..000000000 --- a/bench/src/args/simple.rs +++ /dev/null @@ -1,13 +0,0 @@ -use derive_more::Display; - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Display)] -pub enum BenchmarkKind { - #[display("send messages")] - Send, - #[display("poll messages")] - Poll, - #[display("send and poll messages")] - SendAndPoll, - #[display("consumer group poll")] - ConsumerGroupPoll, -} diff --git a/bench/src/benchmark_params.rs b/bench/src/benchmark_params.rs deleted file mode 100644 index d12be63a8..000000000 --- a/bench/src/benchmark_params.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::io::Write; - -use crate::args::common::IggyBenchArgs; -use iggy::utils::timestamp::IggyTimestamp; -use serde::Serialize; - -#[derive(Debug, Serialize)] -pub struct BenchmarkParams { - timestamp_micros: i64, - benchmark_name: String, - transport: String, - messages_per_batch: u32, - message_batches: u32, - message_size: u32, - producers: u32, - consumers: u32, - streams: u32, - partitions: u32, - number_of_consumer_groups: u32, - disable_parallel_consumers: bool, - disable_parallel_producers: bool, -} - -impl BenchmarkParams { - pub fn dump_to_toml(&self, output_directory: &str) { - let output_file = format!("{}/params.toml", output_directory); - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(output_file).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } -} - -impl From<&IggyBenchArgs> for BenchmarkParams { - fn from(args: &IggyBenchArgs) -> Self { - BenchmarkParams { - timestamp_micros: IggyTimestamp::now().as_micros() as i64, - benchmark_name: args.benchmark_kind.as_simple_kind().to_string(), - transport: args.transport().to_string(), - messages_per_batch: args.messages_per_batch(), - message_batches: args.message_batches(), - message_size: args.message_size(), - producers: args.producers(), - consumers: args.consumers(), - streams: args.number_of_streams(), - partitions: args.number_of_partitions(), - number_of_consumer_groups: args.number_of_consumer_groups(), - disable_parallel_consumers: args.disable_parallel_consumer_streams(), - disable_parallel_producers: args.disable_parallel_producer_streams(), - } - } -} diff --git a/bench/src/benchmark_result.rs b/bench/src/benchmark_result.rs deleted file mode 100644 index a3cf2a6ed..000000000 --- a/bench/src/benchmark_result.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::args::simple::BenchmarkKind; -use crate::statistics::actor_statistics::BenchmarkActorStatistics; -use crate::statistics::aggregate_statistics::BenchmarkAggregateStatistics; -use std::collections::HashSet; -use std::fmt::{Display, Formatter}; - -#[derive(Debug, Clone, PartialEq)] -pub struct BenchmarkResult { - pub kind: BenchmarkKind, - pub statistics: BenchmarkActorStatistics, -} - -#[derive(Debug, Clone)] -struct ImpossibleBenchmarkKind; - -pub struct BenchmarkResults { - pub results: Vec, -} - -impl BenchmarkResults { - fn get_test_type(&self) -> Result { - let result_kinds = self - .results - .iter() - .map(|r| r.kind) - .collect::>(); - match ( - result_kinds.contains(&BenchmarkKind::Poll), - result_kinds.contains(&BenchmarkKind::Send), - ) { - (true, true) => Ok(BenchmarkKind::SendAndPoll), - (true, false) => Ok(BenchmarkKind::Poll), - (false, true) => Ok(BenchmarkKind::Send), - (false, false) => Err(ImpossibleBenchmarkKind), - } - } - - fn calculate_statistics(&self, predicate: F) -> Option - where - F: Fn(&BenchmarkResult) -> bool, - { - let records: Vec<_> = self - .results - .iter() - .filter(|&result| predicate(result)) - .map(|result| result.statistics.clone()) - .collect(); - BenchmarkAggregateStatistics::from_actors_statistics(&records) - } - - pub fn dump_to_toml(&self, output_directory: &str) { - let producer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Send); - if let Some(producer_statics) = producer_statics { - let file_path = format!("{}/producers_summary.toml", output_directory); - - producer_statics.dump_to_toml(&file_path); - } - - let consumer_statics = self.calculate_statistics(|x| x.kind == BenchmarkKind::Poll); - if let Some(consumer_statics) = consumer_statics { - let file_path = format!("{}/consumers_summary.toml", output_directory); - - consumer_statics.dump_to_toml(&file_path); - } - } -} - -impl Display for BenchmarkResults { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - if let Ok(test_type) = self.get_test_type() { - if test_type == BenchmarkKind::SendAndPoll { - let producer_statics = self - .calculate_statistics(|x| x.kind == BenchmarkKind::Send) - .unwrap(); - let consumer_statics = self - .calculate_statistics(|x| x.kind == BenchmarkKind::Poll) - .unwrap(); - - let producer_info = producer_statics.formatted_string("Producer"); - let consumer_info = consumer_statics.formatted_string("Consumer"); - - writeln!(f, "{}, {}", producer_info, consumer_info)?; - } - } - - let results = self.calculate_statistics(|x| { - x.kind == BenchmarkKind::Send || x.kind == BenchmarkKind::Poll - }); - - let summary_info = results.unwrap().formatted_string("Results"); - writeln!(f, "{}", summary_info) - } -} diff --git a/bench/src/benchmark_runner.rs b/bench/src/benchmark_runner.rs deleted file mode 100644 index f476d615f..000000000 --- a/bench/src/benchmark_runner.rs +++ /dev/null @@ -1,64 +0,0 @@ -use crate::args::common::IggyBenchArgs; -use crate::benchmark_params::BenchmarkParams; -use crate::benchmark_result::BenchmarkResults; -use crate::benchmarks::benchmark::Benchmarkable; -use crate::server_starter::start_server_if_needed; -use futures::future::select_all; -use iggy::error::IggyError; -use integration::test_server::TestServer; -use std::time::Duration; -use tokio::time::sleep; -use tracing::info; - -pub struct BenchmarkRunner { - args: Option, - test_server: Option, -} - -impl BenchmarkRunner { - pub fn new(args: IggyBenchArgs) -> Self { - Self { - args: Some(args), - test_server: None, - } - } - - pub async fn run(&mut self) -> Result<(), IggyError> { - let mut args = self.args.take().unwrap(); - self.test_server = start_server_if_needed(&mut args).await; - - let transport = args.transport(); - let server_addr = args.server_address(); - info!("Starting to benchmark: {transport} with server: {server_addr}",); - - let mut benchmark: Box = args.into(); - let mut join_handles = benchmark.run().await?; - - let mut results = Vec::new(); - - while !join_handles.is_empty() { - let (result, _index, remaining) = select_all(join_handles).await; - join_handles = remaining; - - match result { - Ok(r) => results.push(r), - Err(e) => return Err(e), - } - } - - // Sleep just to see result prints after all the join handles are done and tcp connections are closed - sleep(Duration::from_millis(10)).await; - - let results = BenchmarkResults { results }; - benchmark.display_settings(); - info!("{results}"); - - if let Some(output_directory) = benchmark.args().output_directory() { - results.dump_to_toml(&output_directory); - let params = BenchmarkParams::from(benchmark.args()); - params.dump_to_toml(&output_directory); - } - - Ok(()) - } -} diff --git a/bench/src/benchmarks/benchmark.rs b/bench/src/benchmarks/benchmark.rs index c9ae2bfa8..67e6da034 100644 --- a/bench/src/benchmarks/benchmark.rs +++ b/bench/src/benchmarks/benchmark.rs @@ -2,11 +2,7 @@ use super::{ consumer_group_benchmark::ConsumerGroupBenchmark, poll_benchmark::PollMessagesBenchmark, send_and_poll_benchmark::SendAndPollMessagesBenchmark, send_benchmark::SendMessagesBenchmark, }; -use crate::{ - args::{common::IggyBenchArgs, simple::BenchmarkKind}, - benchmark_result::BenchmarkResult, - client_factory::create_client_factory, -}; +use crate::{args::common::IggyBenchArgs, utils::client_factory::create_client_factory}; use async_trait::async_trait; use futures::Future; use iggy::client::{StreamClient, TopicClient}; @@ -15,12 +11,14 @@ use iggy::compression::compression_algorithm::CompressionAlgorithm; use iggy::error::IggyError; use iggy::utils::expiry::IggyExpiry; use iggy::utils::topic_size::MaxTopicSize; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; +use iggy_benchmark_report::individual_metrics::BenchmarkIndividualMetrics; use integration::test_server::{login_root, ClientFactory}; use std::{pin::Pin, sync::Arc}; use tracing::info; pub type BenchmarkFutures = Result< - Vec> + Send>>>, + Vec> + Send>>>, IggyError, >; @@ -52,7 +50,6 @@ pub trait Benchmarkable { fn kind(&self) -> BenchmarkKind; fn args(&self) -> &IggyBenchArgs; fn client_factory(&self) -> &Arc; - fn display_settings(&self); /// Below methods have common implementation for all benchmarks. /// Initializes the streams and topics for the benchmark. diff --git a/bench/src/benchmarks/consumer_group_benchmark.rs b/bench/src/benchmarks/consumer_group_benchmark.rs index bbbd3db68..9a47d47e1 100644 --- a/bench/src/benchmarks/consumer_group_benchmark.rs +++ b/bench/src/benchmarks/consumer_group_benchmark.rs @@ -1,13 +1,11 @@ use crate::{ - args::{common::IggyBenchArgs, simple::BenchmarkKind}, + actors::consumer::Consumer, + args::common::IggyBenchArgs, benchmarks::{CONSUMER_GROUP_BASE_ID, CONSUMER_GROUP_NAME_PREFIX}, - consumer::Consumer, }; use async_trait::async_trait; -use iggy::{ - client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError, - utils::byte_size::IggyByteSize, -}; +use iggy::{client::ConsumerGroupClient, clients::client::IggyClient, error::IggyError}; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; use integration::test_server::{login_root, ClientFactory}; use std::sync::Arc; use tracing::{error, info}; @@ -82,13 +80,11 @@ impl Benchmarkable for ConsumerGroupBenchmark { let message_batches = self.args.message_batches(); let warmup_time = self.args.warmup_time(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((consumers) as usize)); - let output_directory = self.args.output_directory(); for consumer_id in 1..=consumers { let consumer_group_id = start_consumer_group_id + 1 + (consumer_id % consumer_groups_count); let stream_id = start_stream_id + 1 + (consumer_id % consumer_groups_count); - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), @@ -98,7 +94,8 @@ impl Benchmarkable for ConsumerGroupBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, + self.args.sampling_time(), + self.args.moving_average_window(), ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); @@ -121,21 +118,4 @@ impl Benchmarkable for ConsumerGroupBenchmark { fn client_factory(&self) -> &Arc { &self.client_factory } - - fn display_settings(&self) { - let total_messages = self.total_messages(); - let total_size_bytes = total_messages * self.args().message_size() as u64; - // TODO(numinex) - add more details about consumer groups. - info!( - "\x1B[32mBenchmark: {}, total messages: {}, processed: {}, {} streams, {} messages per batch, {} batches, {} bytes per message, {} consumers\x1B[0m", - self.kind(), - total_messages, - IggyByteSize::from(total_size_bytes), - self.args().number_of_streams(), - self.args().messages_per_batch(), - self.args().message_batches(), - self.args().message_size(), - self.args().consumers(), - ); - } } diff --git a/bench/src/benchmarks/poll_benchmark.rs b/bench/src/benchmarks/poll_benchmark.rs index d74be2fc9..751f825c3 100644 --- a/bench/src/benchmarks/poll_benchmark.rs +++ b/bench/src/benchmarks/poll_benchmark.rs @@ -1,9 +1,8 @@ use super::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::actors::consumer::Consumer; use crate::args::common::IggyBenchArgs; -use crate::args::simple::BenchmarkKind; -use crate::consumer::Consumer; use async_trait::async_trait; -use iggy::utils::byte_size::IggyByteSize; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; use integration::test_server::ClientFactory; use std::sync::Arc; use tracing::info; @@ -30,7 +29,6 @@ impl Benchmarkable for PollMessagesBenchmark { info!("Creating {} client(s)...", clients_count); let messages_per_batch = self.args.messages_per_batch(); let message_batches = self.args.message_batches(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity(clients_count as usize)); for client_id in 1..=clients_count { @@ -46,7 +44,6 @@ impl Benchmarkable for PollMessagesBenchmark { false => start_stream_id + 1, }; let warmup_time = args.warmup_time(); - let output_directory = output_directory.clone(); let consumer = Consumer::new( client_factory, @@ -56,7 +53,8 @@ impl Benchmarkable for PollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, + args.sampling_time(), + args.moving_average_window(), ); let future = Box::pin(async move { consumer.run().await }); @@ -77,20 +75,4 @@ impl Benchmarkable for PollMessagesBenchmark { fn client_factory(&self) -> &Arc { &self.client_factory } - - fn display_settings(&self) { - let total_messages = self.total_messages(); - let total_size_bytes = total_messages * self.args().message_size() as u64; - info!( - "\x1B[32mBenchmark: {}, total messages: {}, processed: {}, {} streams, {} messages per batch, {} batches, {} bytes per message, {} consumers\x1B[0m", - self.kind(), - total_messages, - IggyByteSize::from(total_size_bytes), - self.args().number_of_streams(), - self.args().messages_per_batch(), - self.args().message_batches(), - self.args().message_size(), - self.args().consumers(), - ); - } } diff --git a/bench/src/benchmarks/send_and_poll_benchmark.rs b/bench/src/benchmarks/send_and_poll_benchmark.rs index a73f83104..332a006b7 100644 --- a/bench/src/benchmarks/send_and_poll_benchmark.rs +++ b/bench/src/benchmarks/send_and_poll_benchmark.rs @@ -1,14 +1,10 @@ use super::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::actors::consumer::Consumer; +use crate::actors::producer::Producer; use crate::args::common::IggyBenchArgs; -use crate::args::simple::BenchmarkKind; -use crate::consumer::Consumer; -use crate::producer::Producer; use async_trait::async_trait; -use colored::Colorize; -use human_format::Formatter; -use iggy::utils::byte_size::IggyByteSize; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; use integration::test_server::ClientFactory; -use std::fmt::Display; use std::sync::Arc; use tracing::info; @@ -26,28 +22,6 @@ impl SendAndPollMessagesBenchmark { } } -impl Display for SendAndPollMessagesBenchmark { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - let total_messages = self.total_messages(); - let processed = IggyByteSize::from(total_messages * self.args().message_size() as u64); - let total_messages_human_readable = Formatter::new().format(total_messages as f64); - let info = format!("Benchmark: {}, transport: {}, total messages: {}, processed: {}, {} streams, {} messages per batch, {} batches, {} bytes per message, {} producers, {} consumers", - self.kind(), - self.args().transport(), - total_messages_human_readable, - processed, - self.args().number_of_streams(), - self.args().messages_per_batch(), - self.args().message_batches(), - self.args().message_size(), - self.args().producers(), - self.args().consumers(), - ).green(); - - writeln!(f, "{}", info) - } -} - #[async_trait] impl Benchmarkable for SendAndPollMessagesBenchmark { async fn run(&mut self) -> BenchmarkFutures { @@ -62,7 +36,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { let message_size = self.args.message_size(); let partitions_count = self.args.number_of_partitions(); let warmup_time = self.args.warmup_time(); - let output_directory = self.args.output_directory(); let mut futures: BenchmarkFutures = Ok(Vec::with_capacity((producers + consumers) as usize)); @@ -71,7 +44,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + producer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let producer = Producer::new( self.client_factory.clone(), producer_id, @@ -81,7 +53,8 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, + self.args.sampling_time(), + self.args.moving_average_window(), ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); @@ -92,7 +65,6 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { true => start_stream_id + consumer_id, false => start_stream_id + 1, }; - let output_directory = output_directory.clone(); let consumer = Consumer::new( self.client_factory.clone(), consumer_id, @@ -101,7 +73,8 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { messages_per_batch, message_batches, warmup_time, - output_directory, + self.args.sampling_time(), + self.args.moving_average_window(), ); let future = Box::pin(async move { consumer.run().await }); futures.as_mut().unwrap().push(future); @@ -124,8 +97,4 @@ impl Benchmarkable for SendAndPollMessagesBenchmark { fn client_factory(&self) -> &Arc { &self.client_factory } - - fn display_settings(&self) { - info!("{}", self.to_string()); - } } diff --git a/bench/src/benchmarks/send_benchmark.rs b/bench/src/benchmarks/send_benchmark.rs index d16861e92..6b7ffeaeb 100644 --- a/bench/src/benchmarks/send_benchmark.rs +++ b/bench/src/benchmarks/send_benchmark.rs @@ -1,8 +1,8 @@ use super::benchmark::{BenchmarkFutures, Benchmarkable}; +use crate::actors::producer::Producer; use crate::args::common::IggyBenchArgs; -use crate::args::simple::BenchmarkKind; -use crate::producer::Producer; use async_trait::async_trait; +use iggy_benchmark_report::benchmark_kind::BenchmarkKind; use integration::test_server::ClientFactory; use std::sync::Arc; use tracing::info; @@ -42,7 +42,6 @@ impl Benchmarkable for SendMessagesBenchmark { let args = args.clone(); let start_stream_id = args.start_stream_id(); let client_factory = client_factory.clone(); - let output_directory = args.output_directory.clone(); let parallel_producer_streams = !args.disable_parallel_producer_streams(); let stream_id = match parallel_producer_streams { @@ -59,7 +58,8 @@ impl Benchmarkable for SendMessagesBenchmark { message_batches, message_size, warmup_time, - output_directory, + args.sampling_time(), + args.moving_average_window(), ); let future = Box::pin(async move { producer.run().await }); futures.as_mut().unwrap().push(future); @@ -86,20 +86,4 @@ impl Benchmarkable for SendMessagesBenchmark { fn client_factory(&self) -> &Arc { &self.client_factory } - - fn display_settings(&self) { - let total_messages = self.total_messages(); - let total_size_bytes = total_messages * self.args().message_size() as u64; - info!( - "\x1B[32mBenchmark: {}, total messages: {}, total size: {} bytes, {} streams, {} messages per batch, {} batches, {} bytes per message, {} producers\x1B[0m", - self.kind(), - total_messages, - total_size_bytes, - self.args().number_of_streams(), - self.args().messages_per_batch(), - self.args().message_batches(), - self.args().message_size(), - self.args().producers(), - ); - } } diff --git a/bench/src/main.rs b/bench/src/main.rs index f4ce76de4..2de96c4be 100644 --- a/bench/src/main.rs +++ b/bench/src/main.rs @@ -1,35 +1,71 @@ +mod actors; +mod analytics; mod args; -mod benchmark_params; -mod benchmark_result; -mod benchmark_runner; mod benchmarks; -mod client_factory; -mod consumer; -mod producer; -mod server_starter; -mod statistics; +mod plot; +mod runner; +mod utils; -use crate::{args::common::IggyBenchArgs, benchmark_runner::BenchmarkRunner}; +use crate::{args::common::IggyBenchArgs, runner::BenchmarkRunner}; use clap::Parser; use figlet_rs::FIGfont; use iggy::error::IggyError; +use std::fs; +use std::path::Path; use tracing::{error, info}; -use tracing_subscriber::EnvFilter; +use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use utils::copy_directory_contents; #[tokio::main] async fn main() -> Result<(), IggyError> { let standard_font = FIGfont::standard().unwrap(); let figure = standard_font.convert("Iggy Bench"); println!("{}", figure.unwrap()); - tracing_subscriber::fmt::Subscriber::builder() - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO"))) - .with_ansi(true) - .init(); let args = IggyBenchArgs::parse(); args.validate(); + // Store output_dir before moving args + let output_dir = args.output_dir(); + let benchmark_dir = output_dir.as_ref().map(|dir| { + let dir_path = Path::new(dir); + if !dir_path.exists() { + fs::create_dir_all(dir_path).unwrap(); + } + dir_path.join(args.generate_dir_name()) + }); + + // Configure logging + let env_filter = EnvFilter::try_from_default_env().unwrap_or(EnvFilter::new("INFO")); + let stdout_layer = fmt::layer().with_ansi(true); + + // If output directory is specified, also log to file + if let Some(ref benchmark_dir) = benchmark_dir { + // Create output directory if it doesn't exist + fs::create_dir_all(benchmark_dir).unwrap(); + let file_appender = tracing_appender::rolling::never(benchmark_dir, "bench.log"); + let file_layer = fmt::layer().with_ansi(false).with_writer(file_appender); + + tracing_subscriber::registry() + .with(env_filter) + .with(stdout_layer) + .with(file_layer) + .init(); + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(stdout_layer) + .init(); + } + let mut benchmark_runner = BenchmarkRunner::new(args); + // Store server address before running benchmark since args will be taken + let server_addr = benchmark_runner + .args + .as_ref() + .unwrap() + .server_address() + .to_string(); info!("Starting the benchmarks..."); let ctrl_c = tokio::signal::ctrl_c(); @@ -38,13 +74,51 @@ async fn main() -> Result<(), IggyError> { tokio::select! { _ = ctrl_c => { info!("Received Ctrl-C, exiting..."); + // Clean up unfinished benchmark directory on manual interruption + if let Some(ref benchmark_dir) = benchmark_dir { + info!("Cleaning up unfinished benchmark directory..."); + if let Err(e) = std::fs::remove_dir_all(benchmark_dir) { + error!("Failed to clean up benchmark directory: {}", e); + } + } } result = benchmark_future => { if let Err(e) = result { error!("Benchmark failed with error: {:?}", e); return Err(e); } - info!("Finished the benchmarks"); + + // Copy server logs and config if output directory is specified and + // (server was started by benchmark OR (running on localhost and local_data exists)) + if let Some(ref benchmark_dir) = benchmark_dir { + let is_localhost = server_addr + .split(':') + .next() + .map(|host| host == "localhost" || host == "127.0.0.1") + .unwrap_or(false); + + let local_data = Path::new("local_data"); + let should_copy = benchmark_runner.test_server.is_some() || + (is_localhost && local_data.exists()); + + if should_copy { + // Copy server logs + let logs_dir = local_data.join("logs"); + if logs_dir.exists() { + if let Err(e) = copy_directory_contents(&logs_dir, benchmark_dir) { + error!("Failed to copy server logs: {}", e); + } + } + + // Copy server config + let runtime_dir = local_data.join("runtime"); + if runtime_dir.exists() { + if let Err(e) = copy_directory_contents(&runtime_dir, benchmark_dir) { + error!("Failed to copy server configuration: {}", e); + } + } + } + } } } diff --git a/bench/src/plot.rs b/bench/src/plot.rs new file mode 100644 index 000000000..c26574474 --- /dev/null +++ b/bench/src/plot.rs @@ -0,0 +1,94 @@ +use charming::theme::Theme; +use charming::{Chart, HtmlRenderer}; +use iggy::utils::byte_size::IggyByteSize; +use iggy_benchmark_report::report::BenchmarkReport; +use std::path::Path; +use std::time::Instant; +use tracing::info; + +pub enum ChartType { + Throughput, + Latency, +} + +impl ChartType { + fn name(&self) -> &'static str { + match self { + ChartType::Throughput => "throughput", + ChartType::Latency => "latency", + } + } + + fn create_chart(&self) -> fn(&BenchmarkReport, bool) -> Chart { + match self { + ChartType::Throughput => iggy_benchmark_report::create_throughput_chart, + ChartType::Latency => iggy_benchmark_report::create_latency_chart, + } + } + + fn get_samples(&self, report: &BenchmarkReport) -> usize { + match self { + ChartType::Throughput => report + .individual_metrics + .iter() + .map(|m| m.throughput_mb_ts.points.len()) + .sum(), + ChartType::Latency => report + .individual_metrics + .iter() + .filter(|m| !m.latency_ts.points.is_empty()) + .map(|m| m.latency_ts.points.len()) + .sum(), + } + } +} + +pub fn plot_chart( + report: &BenchmarkReport, + output_directory: &str, + chart_type: ChartType, +) -> std::io::Result<()> { + let data_processing_start = Instant::now(); + let chart = (chart_type.create_chart())(report, true); // Use dark theme by default + let data_processing_time = data_processing_start.elapsed(); + + let chart_render_start = Instant::now(); + let chart_path = format!("{}/{}.html", output_directory, chart_type.name()); + save_chart(&chart, chart_type.name(), output_directory, 1600, 1200)?; + let chart_render_time = chart_render_start.elapsed(); + + let total_samples = chart_type.get_samples(report); + let report_path = format!("{}/report.json", output_directory); + let report_size = IggyByteSize::from(std::fs::metadata(&report_path)?.len()); + + info!( + "Generated {} plot at: {} ({} samples, report.json size: {}, data processing: {:.2?}, chart render: {:.2?})", + chart_type.name(), + chart_path, + total_samples, + report_size, + data_processing_time, + chart_render_time + ); + Ok(()) +} + +fn save_chart( + chart: &Chart, + file_name: &str, + output_directory: &str, + width: u64, + height: u64, +) -> std::io::Result<()> { + let parent = Path::new(output_directory).parent().unwrap(); + std::fs::create_dir_all(parent)?; + let full_output_path = Path::new(output_directory).join(format!("{}.html", file_name)); + + let mut renderer = HtmlRenderer::new(file_name, width, height).theme(Theme::Dark); + renderer.save(chart, &full_output_path).map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + format!("Failed to save HTML plot: {}", e), + ) + }) +} diff --git a/bench/src/runner.rs b/bench/src/runner.rs new file mode 100644 index 000000000..eb8b21119 --- /dev/null +++ b/bench/src/runner.rs @@ -0,0 +1,91 @@ +use crate::analytics::report_builder::BenchmarkReportBuilder; +use crate::args::common::IggyBenchArgs; +use crate::benchmarks::benchmark::Benchmarkable; +use crate::plot::{plot_chart, ChartType}; +use crate::utils::server_starter::start_server_if_needed; +use futures::future::select_all; +use iggy::error::IggyError; +use iggy_benchmark_report::hardware::BenchmarkHardware; +use iggy_benchmark_report::params::BenchmarkParams; +use integration::test_server::TestServer; +use std::path::Path; +use std::time::Duration; +use tokio::time::sleep; +use tracing::{error, info}; + +pub struct BenchmarkRunner { + pub args: Option, + pub test_server: Option, +} + +impl BenchmarkRunner { + pub fn new(args: IggyBenchArgs) -> Self { + Self { + args: Some(args), + test_server: None, + } + } + + pub async fn run(&mut self) -> Result<(), IggyError> { + let mut args = self.args.take().unwrap(); + self.test_server = start_server_if_needed(&mut args).await; + + let transport = args.transport(); + let server_addr = args.server_address(); + info!("Starting to benchmark: {transport} with server: {server_addr}",); + + let mut benchmark: Box = args.into(); + let mut join_handles = benchmark.run().await?; + + let mut individual_metrics = Vec::new(); + + while !join_handles.is_empty() { + let (result, _index, remaining) = select_all(join_handles).await; + join_handles = remaining; + + match result { + Ok(r) => individual_metrics.push(r), + Err(e) => return Err(e), + } + } + + let params = BenchmarkParams::from(benchmark.args()); + let hardware = + BenchmarkHardware::get_system_info_with_identifier(benchmark.args().identifier()); + let report = BenchmarkReportBuilder::build( + hardware, + params, + individual_metrics, + benchmark.args().moving_average_window(), + ); + + // Sleep just to see result prints after all tasks are joined (they print per-actor results) + sleep(Duration::from_millis(10)).await; + + report.print_summary(); + + if let Some(output_dir) = benchmark.args().output_dir() { + // Generate the full output path using the directory name generator + let dir_name = benchmark.args().generate_dir_name(); + let full_output_path = Path::new(&output_dir) + .join(dir_name) + .to_string_lossy() + .to_string(); + + // Dump the report to JSON + report.dump_to_json(&full_output_path); + + // Generate the plots + plot_chart(&report, &full_output_path, ChartType::Throughput).map_err(|e| { + error!("Failed to generate plots: {e}"); + IggyError::CannotWriteToFile + })?; + plot_chart(&report, &full_output_path, ChartType::Latency).map_err(|e| { + error!("Failed to generate plots: {e}"); + IggyError::CannotWriteToFile + })?; + } + + Ok(()) + } +} diff --git a/bench/src/statistics/actor_statistics.rs b/bench/src/statistics/actor_statistics.rs deleted file mode 100644 index 546807e66..000000000 --- a/bench/src/statistics/actor_statistics.rs +++ /dev/null @@ -1,117 +0,0 @@ -use super::record::BenchmarkRecord; -use serde::Serialize; - -#[derive(Debug, Serialize, Clone, PartialEq)] -pub struct BenchmarkActorStatistics { - pub total_time_secs: f64, - pub total_user_data_bytes: u64, - pub total_bytes: u64, - pub total_messages: u64, - pub throughput_megabytes_per_second: f64, - pub throughput_messages_per_second: f64, - pub p50_latency_ms: f64, - pub p90_latency_ms: f64, - pub p95_latency_ms: f64, - pub p99_latency_ms: f64, - pub p999_latency_ms: f64, - pub avg_latency_ms: f64, - pub median_latency_ms: f64, -} - -impl BenchmarkActorStatistics { - pub fn from_records(records: &[BenchmarkRecord]) -> Self { - if records.is_empty() { - return BenchmarkActorStatistics { - total_time_secs: 0.0, - total_user_data_bytes: 0, - total_bytes: 0, - total_messages: 0, - throughput_megabytes_per_second: 0.0, - throughput_messages_per_second: 0.0, - p50_latency_ms: 0.0, - p90_latency_ms: 0.0, - p95_latency_ms: 0.0, - p99_latency_ms: 0.0, - p999_latency_ms: 0.0, - avg_latency_ms: 0.0, - median_latency_ms: 0.0, - }; - } - - let total_time_secs = records.last().unwrap().elapsed_time_us as f64 / 1_000_000.0; - - let total_user_data_bytes: u64 = records.iter().last().unwrap().user_data_bytes; - let total_bytes: u64 = records.iter().last().unwrap().total_bytes; - let total_messages: u64 = records.iter().last().unwrap().messages; - - let throughput_megabytes_per_second = if total_time_secs > 0.0 { - (total_bytes as f64) / 1_000_000.0 / total_time_secs - } else { - 0.0 - }; - - let throughput_messages_per_second = if total_time_secs > 0.0 { - (total_messages as f64) / total_time_secs - } else { - 0.0 - }; - - let mut latencies_ms: Vec = records - .iter() - .map(|r| r.latency_us as f64 / 1_000.0) - .collect(); - latencies_ms.sort_by(|a, b| a.partial_cmp(b).unwrap()); - - let p50_latency_ms = calculate_percentile(&latencies_ms, 50.0); - let p90_latency_ms = calculate_percentile(&latencies_ms, 90.0); - let p95_latency_ms = calculate_percentile(&latencies_ms, 95.0); - let p99_latency_ms = calculate_percentile(&latencies_ms, 99.0); - let p999_latency_ms = calculate_percentile(&latencies_ms, 99.9); - - let avg_latency_ms = latencies_ms.iter().sum::() / latencies_ms.len() as f64; - let len = latencies_ms.len() / 2; - let median_latency_ms = if latencies_ms.len() % 2 == 0 { - (latencies_ms[len - 1] + latencies_ms[len]) / 2.0 - } else { - latencies_ms[len] - }; - - BenchmarkActorStatistics { - total_time_secs, - total_user_data_bytes, - total_bytes, - total_messages, - throughput_megabytes_per_second, - throughput_messages_per_second, - p50_latency_ms, - p90_latency_ms, - p95_latency_ms, - p99_latency_ms, - p999_latency_ms, - avg_latency_ms, - median_latency_ms, - } - } - - pub fn dump_to_toml(&self, file_name: &str) { - let toml_str = toml::to_string(self).unwrap(); - std::fs::write(file_name, toml_str).unwrap(); - } -} - -fn calculate_percentile(sorted_data: &[f64], percentile: f64) -> f64 { - if sorted_data.is_empty() { - return 0.0; - } - - let rank = percentile / 100.0 * (sorted_data.len() - 1) as f64; - let lower = rank.floor() as usize; - let upper = rank.ceil() as usize; - - if upper >= sorted_data.len() { - return sorted_data[sorted_data.len() - 1]; - } - - let weight = rank - lower as f64; - sorted_data[lower] * (1.0 - weight) + sorted_data[upper] * weight -} diff --git a/bench/src/statistics/aggregate_statistics.rs b/bench/src/statistics/aggregate_statistics.rs deleted file mode 100644 index 414f3c22c..000000000 --- a/bench/src/statistics/aggregate_statistics.rs +++ /dev/null @@ -1,92 +0,0 @@ -use std::io::Write; - -use super::actor_statistics::BenchmarkActorStatistics; -use colored::{ColoredString, Colorize}; -use serde::Serialize; - -#[derive(Debug, Serialize, Clone, PartialEq)] -pub struct BenchmarkAggregateStatistics { - pub total_throughput_megabytes_per_second: f64, - pub total_throughput_messages_per_second: f64, - pub average_throughput_megabytes_per_second: f64, - pub average_throughput_messages_per_second: f64, - pub average_p50_latency_ms: f64, - pub average_p90_latency_ms: f64, - pub average_p95_latency_ms: f64, - pub average_p99_latency_ms: f64, - pub average_p999_latency_ms: f64, - pub average_avg_latency_ms: f64, - pub average_median_latency_ms: f64, -} - -impl BenchmarkAggregateStatistics { - pub fn from_actors_statistics(stats: &[BenchmarkActorStatistics]) -> Option { - if stats.is_empty() { - return None; - } - let count = stats.len() as f64; - - // Compute total throughput - let total_throughput_megabytes_per_second: f64 = stats - .iter() - .map(|r| r.throughput_megabytes_per_second) - .sum(); - - let total_throughput_messages_per_second: f64 = - stats.iter().map(|r| r.throughput_messages_per_second).sum(); - - // Compute average throughput - let average_throughput_megabytes_per_second = total_throughput_megabytes_per_second / count; - - let average_throughput_messages_per_second = total_throughput_messages_per_second / count; - - // Compute average latencies - let average_p50_latency_ms = stats.iter().map(|r| r.p50_latency_ms).sum::() / count; - let average_p90_latency_ms = stats.iter().map(|r| r.p90_latency_ms).sum::() / count; - let average_p95_latency_ms = stats.iter().map(|r| r.p95_latency_ms).sum::() / count; - let average_p99_latency_ms = stats.iter().map(|r| r.p99_latency_ms).sum::() / count; - let average_p999_latency_ms = stats.iter().map(|r| r.p999_latency_ms).sum::() / count; - let average_avg_latency_ms = stats.iter().map(|r| r.avg_latency_ms).sum::() / count; - let average_median_latency_ms = - stats.iter().map(|r| r.median_latency_ms).sum::() / count; - - Some(BenchmarkAggregateStatistics { - total_throughput_megabytes_per_second, - total_throughput_messages_per_second, - average_throughput_megabytes_per_second, - average_throughput_messages_per_second, - average_p50_latency_ms, - average_p90_latency_ms, - average_p95_latency_ms, - average_p99_latency_ms, - average_p999_latency_ms, - average_avg_latency_ms, - average_median_latency_ms, - }) - } - - pub fn formatted_string(&self, prefix: &str) -> ColoredString { - format!( - "{prefix}: Total throughput: {:.2} MB/s, {:.0} messages/s, average throughput: {:.2} MB/s, average p50 latency: {:.2} ms, average p90 latency: {:.2} ms, average p95 latency: {:.2} ms, average p99 latency: {:.2} ms, average p999 latency: {:.2} ms, average latency: {:.2} ms, median latency: {:.2} ms", - self.total_throughput_megabytes_per_second, - self.total_throughput_messages_per_second, - self.average_throughput_megabytes_per_second, - self.average_p50_latency_ms, - self.average_p90_latency_ms, - self.average_p95_latency_ms, - self.average_p99_latency_ms, - self.average_p999_latency_ms, - self.average_avg_latency_ms, - self.average_median_latency_ms - ).green() - } - - pub fn dump_to_toml(&self, file_name: &str) { - let toml_str = toml::to_string(self).unwrap(); - Write::write_all( - &mut std::fs::File::create(file_name).unwrap(), - toml_str.as_bytes(), - ) - .unwrap(); - } -} diff --git a/bench/src/statistics/mod.rs b/bench/src/statistics/mod.rs deleted file mode 100644 index 066a6754d..000000000 --- a/bench/src/statistics/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -pub mod actor_statistics; -pub mod aggregate_statistics; -pub mod record; diff --git a/bench/src/client_factory.rs b/bench/src/utils/client_factory.rs similarity index 100% rename from bench/src/client_factory.rs rename to bench/src/utils/client_factory.rs diff --git a/bench/src/utils/mod.rs b/bench/src/utils/mod.rs new file mode 100644 index 000000000..94e740132 --- /dev/null +++ b/bench/src/utils/mod.rs @@ -0,0 +1,17 @@ +use std::{fs, path::Path}; + +pub mod client_factory; +pub mod server_starter; + +pub fn copy_directory_contents(source_dir: &Path, target_dir: &Path) -> std::io::Result<()> { + for entry in fs::read_dir(source_dir)? { + let entry = entry?; + let path = entry.path(); + if path.is_file() { + let file_name = path.file_name().unwrap(); + let target_path = target_dir.join(file_name); + fs::copy(&path, &target_path)?; + } + } + Ok(()) +} diff --git a/bench/src/server_starter.rs b/bench/src/utils/server_starter.rs similarity index 98% rename from bench/src/server_starter.rs rename to bench/src/utils/server_starter.rs index 3aa6bc5e2..98bbd17f8 100644 --- a/bench/src/server_starter.rs +++ b/bench/src/utils/server_starter.rs @@ -25,7 +25,7 @@ pub async fn start_server_if_needed(args: &mut IggyBenchArgs) -> Option { let args_http_address = args.server_address().parse::().unwrap(); diff --git a/integration/Cargo.toml b/integration/Cargo.toml index 2bbd657a5..fc2ad1c51 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -16,6 +16,7 @@ env_logger = "0.11.6" futures = "0.3.31" humantime = "2.1.0" iggy = { path = "../sdk", features = ["iggy-cli"] } +iggy-benchmark-report = { path = "../bench/report" } keyring = "3.6.1" lazy_static = "1.5.0" libc = "0.2.169" diff --git a/scripts/performance/run-standard-performance-suite.sh b/scripts/performance/run-standard-performance-suite.sh index 2ea8372aa..f8a317429 100755 --- a/scripts/performance/run-standard-performance-suite.sh +++ b/scripts/performance/run-standard-performance-suite.sh @@ -27,27 +27,32 @@ echo "Building project..." cargo build --release # Create a directory for the performance results -(mkdir performance_results || true) &> /dev/null +(mkdir -p performance_results || true) &> /dev/null # Construct standard performance suites, each should process 8 GB of data -STANDARD_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp) # 8 producers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp -STANDARD_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp) # 8 consumers, 8 streams, 1000 byte messages, 1000 messages per batch, 1000 message batches, tcp -SMALL_BATCH_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 100 10000 tcp) # 8 producers, 8 streams, 1000 byte messages, 100 messages per batch, 10000 message batches, tcp -SMALL_BATCH_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 100 10000 tcp) # 8 consumers, 8 streams, 1000 byte messages, 100 messages per batch, 10000 message batches, tcp - -# SMALL_BATCH_SMALL_MSG_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 20 100 500000 tcp) # Uncomment and adjust if needed -# SMALL_BATCH_SMALL_MSG_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 20 100 500000 tcp) # Uncomment and adjust if needed -# SINGLE_MESSAGE_BATCH_SMALL_MSG_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 20 1 50000000 tcp) # Uncomment and adjust if needed -# SINGLE_MESSAGE_BATCH_SMALL_MSG_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 20 1 50000000 tcp) # Uncomment and adjust if needed +LARGE_BATCH_FORCED_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp "forced_cache") # 8GB data, 1KB messages, 1000 msgs/batch with forced cache +LARGE_BATCH_FORCED_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp "forced_cache") # 8GB data, 1KB messages, 1000 msgs/batch with forced cache +LARGE_BATCH_NO_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp "no_cache") # 8GB data, 1KB messages, 1000 msgs/batch with disabled cache +LARGE_BATCH_NO_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp "no_cache") # 8GB data, 1KB messages, 1000 msgs/batch with disabled cache +LARGE_BATCH_NO_WAIT_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 1000 1000 tcp "no_wait") # 8GB data, 1KB messages, 1000 msgs/batch with no_wait config +LARGE_BATCH_NO_WAIT_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 1000 1000 tcp "no_wait") # 8GB data, 1KB messages, 1000 msgs/batch with no_wait config +SMALL_BATCH_FORCED_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 100 10000 tcp "forced_cache") # 8GB data, 1KB messages, 100 msgs/batch with forced cache +SMALL_BATCH_FORCED_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 100 10000 tcp "forced_cache") # 8GB data, 1KB messages, 100 msgs/batch with forced cache +SMALL_BATCH_NO_CACHE_SEND=$(construct_bench_command "$IGGY_BENCH_CMD" "send" 8 1000 100 10000 tcp "no_cache") # 8GB data, 1KB messages, 100 msgs/batch, no cache +SMALL_BATCH_NO_CACHE_POLL=$(construct_bench_command "$IGGY_BENCH_CMD" "poll" 8 1000 100 10000 tcp "no_cache") # 8GB data, 1KB messages, 100 msgs/batch, no cache # Make an array of the suites SUITES=( - "${STANDARD_SEND}" - "${STANDARD_POLL}" - "${SMALL_BATCH_SEND}" - "${SMALL_BATCH_POLL}" - # "${SMALL_BATCH_SMALL_MSG_SEND}" - # "${SMALL_BATCH_SMALL_MSG_POLL}" + "${LARGE_BATCH_FORCED_CACHE_SEND}" + "${LARGE_BATCH_FORCED_CACHE_POLL}" + "${LARGE_BATCH_NO_CACHE_SEND}" + "${LARGE_BATCH_NO_CACHE_POLL}" + "${LARGE_BATCH_NO_WAIT_SEND}" + "${LARGE_BATCH_NO_WAIT_POLL}" + "${SMALL_BATCH_FORCED_CACHE_SEND}" + "${SMALL_BATCH_FORCED_CACHE_POLL}" + "${SMALL_BATCH_NO_CACHE_SEND}" + "${SMALL_BATCH_NO_CACHE_POLL}" ) # Run the suites, iterate over two elements at a time @@ -59,9 +64,20 @@ for (( i=0; i<${#SUITES[@]} ; i+=2 )) ; do echo "Cleaning old local_data..." rm -rf local_data || true - # Start iggy-server - echo "Starting iggy-server..." - target/release/iggy-server &> /dev/null & + # Start iggy-server with appropriate configuration + if [[ "$SEND_BENCH" == *"_forced_cache_"* ]] || [[ "$POLL_BENCH" == *"_forced_cache_"* ]]; then + echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_SIZE=\"9GB\" target/release/iggy-server" + IGGY_SYSTEM_CACHE_SIZE="9GB" target/release/iggy-server &> /dev/null & + elif [[ "$SEND_BENCH" == *"_no_cache_"* ]] || [[ "$POLL_BENCH" == *"_no_cache_"* ]]; then + echo "Starting iggy-server with command: IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server" + IGGY_SYSTEM_CACHE_ENABLED=false target/release/iggy-server &> /dev/null & + elif [[ "$SEND_BENCH" == *"_no_wait_"* ]] || [[ "$POLL_BENCH" == *"_no_wait_"* ]]; then + echo "Starting iggy-server with command: IGGY_SYSTEM_SEGMENT_SERVER_CONFIRMATION=no_wait target/release/iggy-server" + IGGY_SYSTEM_SEGMENT_SERVER_CONFIRMATION=no_wait target/release/iggy-server &> /dev/null & + else + echo "Starting iggy-server with command: target/release/iggy-server" + target/release/iggy-server &> /dev/null & + fi IGGY_SERVER_PID=$! sleep 2 diff --git a/scripts/performance/utils.sh b/scripts/performance/utils.sh index 88c6a3195..f4fa2fee9 100755 --- a/scripts/performance/utils.sh +++ b/scripts/performance/utils.sh @@ -50,6 +50,43 @@ function get_git_iggy_server_tag_or_sha1() { fi } +# Function to get the commit date (last modified date) for HEAD +function get_git_commit_date() { + local dir="$1" + + if [ -d "$dir" ]; then + pushd "$dir" > /dev/null || { + echo "Error: Failed to enter directory '$dir'." >&2 + exit 1 + } + + if git rev-parse --git-dir > /dev/null 2>&1; then + # Get the committer date (last modified) in ISO 8601 format + local commit_date + commit_date=$(git show -s --format=%cI HEAD 2>/dev/null || echo "") + + popd > /dev/null || { + echo "Error: Failed to return from directory '$dir'." >&2 + exit 1 + } + + if [ -n "$commit_date" ]; then + echo "$commit_date" + else + echo "Error: Could not get commit date for HEAD." >&2 + return 1 + fi + else + echo "Error: Directory '$dir' is not a git repository." >&2 + popd > /dev/null || exit 1 + return 1 + fi + else + echo "Error: Directory '$dir' does not exist." >&2 + return 1 + fi +} + # Function to construct a bench command (send or poll) function construct_bench_command() { local bench_command=$1 @@ -59,6 +96,7 @@ function construct_bench_command() { local messages_per_batch=$5 local message_batches=$6 local protocol=$7 + local remark=${8:-""} # Validate the type if [[ "$type" != "send" && "$type" != "poll" ]]; then @@ -75,15 +113,21 @@ function construct_bench_command() { local streams=${count} - local superdir - superdir="performance_results/$(get_git_iggy_server_tag_or_sha1 .)" || { echo "Failed to get git commit or tag."; exit 1; } - rm -rf "$superdir" || true - mkdir -p "$superdir" || { echo "Failed to create directory '$superdir'."; exit 1; } - local output_directory="${superdir}/${type}_${count}${type:0:1}_${message_size}_${messages_per_batch}_${message_batches}_${protocol}" + local commit_hash + commit_hash=$(get_git_iggy_server_tag_or_sha1 .) || { echo "Failed to get git commit or tag."; exit 1; } + local commit_date + commit_date=$(get_git_commit_date .) || { echo "Failed to get git commit date."; exit 1; } + local hostname + hostname=$(hostname) echo "$bench_command \ $COMMON_ARGS \ - --output-directory $output_directory \ + --output-dir performance_results \ + --identifier ${hostname} \ + --remark ${remark} \ + --extra-info \"\" \ + --gitref \"${commit_hash}\" \ + --gitref-date \"${commit_date}\" \ ${type} \ --${role} ${count} \ --streams ${streams} \ diff --git a/scripts/profile.sh b/scripts/profile.sh index eef83d6fe..65dc9c118 100755 --- a/scripts/profile.sh +++ b/scripts/profile.sh @@ -42,7 +42,15 @@ if [[ "$PROFILING_MODE" == "io" ]]; then fi fi -# Check system settings for perf +# Detect OS +OS="$(uname -s)" +if [ "$OS" = "Darwin" ]; then + echo "Error: Profiling with perf and flamegraph is currently only supported on Linux systems." + echo "For macOS profiling, consider using Instruments.app or DTrace." + exit 1 +fi + +# Linux-specific profiling setup paranoid=$(cat /proc/sys/kernel/perf_event_paranoid) restrict=$(cat /proc/sys/kernel/kptr_restrict) diff --git a/scripts/utils.sh b/scripts/utils.sh index 0dbbbe87b..18a660f17 100755 --- a/scripts/utils.sh +++ b/scripts/utils.sh @@ -9,6 +9,30 @@ function get_git_info() { echo "${git_branch}_${git_info}" } +# OS detection +OS="$(uname -s)" + +# Function to check if process exists - OS specific implementations +function process_exists() { + local pid=$1 + if [ "$OS" = "Darwin" ]; then + ps -p "${pid}" > /dev/null + else + [[ -e /proc/${pid} ]] + fi +} + +# Function to send signal to process - OS specific implementations +function send_signal_to_pid() { + local pid=$1 + local signal=$2 + if [ "$OS" = "Darwin" ]; then + kill "-${signal}" "${pid}" + else + kill -s "${signal}" "${pid}" + fi +} + # Function to wait for a process to exit function wait_for_process() { local process_name=$1 @@ -23,7 +47,7 @@ function wait_for_process() { pids=$(pgrep "${process_name}") || true if [[ -n "${pids}" ]]; then for pid in ${pids}; do - if [[ -e /proc/${pid} ]]; then + if process_exists "${pid}"; then sleep 0.1 continue_outer_loop=true break @@ -47,8 +71,8 @@ function send_signal() { if [[ -n "${pids}" ]]; then for pid in ${pids}; do - if [[ -e /proc/${pid} ]]; then - kill -s "${signal}" "${pid}" + if process_exists "${pid}"; then + send_signal_to_pid "${pid}" "${signal}" fi done fi