Skip to content

Commit

Permalink
Merge pull request #12 from escobera/benchmark/test-async-producer
Browse files Browse the repository at this point in the history
Benchmark async producer
  • Loading branch information
oliveigah authored Nov 17, 2024
2 parents ab61197 + fba00d5 commit 069bc76
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 2 deletions.
19 changes: 18 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,12 @@ config :klife, MyClient,
producers: [
[
name: :benchmark_producer,
client_id: "my_custom_client_id"
client_id: "my_custom_client_id",
],
[
name: :async_benchmark_producer,
client_id: "my_custom_client_id",
batchers_count: 4
],
[
name: :benchmark_producer_in_flight,
Expand Down Expand Up @@ -68,6 +73,18 @@ config :klife, MyClient,
name: "benchmark_topic_2",
default_producer: :benchmark_producer
],
[
name: "async_benchmark_topic_0",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_1",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_2",
default_producer: :async_benchmark_producer
],
[
name: "benchmark_topic_in_flight",
default_producer: :benchmark_producer_in_flight
Expand Down
2 changes: 1 addition & 1 deletion lib/klife/testing.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ defmodule Klife.Testing do
end)
end

defp get_latest_offsets(leader_id, metas, client_name) do
def get_latest_offsets(leader_id, metas, client_name) do
content = %{
replica_id: -1,
isolation_level: 1,
Expand Down
7 changes: 7 additions & 0 deletions lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ if Mix.env() in [:dev] do
defmodule Mix.Tasks.Benchmark do
use Mix.Task

alias Klife.Producer.Controller, as: PController
alias Klife.TestUtils.AsyncProducerBenchmark

def run(args) do
Application.ensure_all_started(:klife)

Expand Down Expand Up @@ -78,6 +81,10 @@ if Mix.env() in [:dev] do
)
end

def do_run_bench("producer_async", parallel) do
AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"])
end

def do_run_bench("producer_sync", parallel) do
%{
records_0: records_0,
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ defmodule Klife.MixProject do
{:benchee, "~> 1.0", only: :dev, runtime: false},
{:kafka_ex, "~> 0.13", only: :dev},
{:brod, "~> 3.16", only: :dev},
{:erlkaf, "~> 2.1.6", only: :dev},
{:observer_cli, "~> 1.7", only: :dev}
]
end
Expand Down
6 changes: 6 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
"brod": {:hex, :brod, "3.17.0", "437daa5204a2175a3f6d01ee31152ca881539ca90acdf123d69835577f6133b1", [:rebar3], [{:kafka_protocol, "4.1.3", [hex: :kafka_protocol, repo: "hexpm", optional: false]}, {:snappyer, "1.2.9", [hex: :snappyer, repo: "hexpm", optional: false]}], "hexpm", "1bf5eb9d1bad1140f97b9d0c5a819ceb30414231cb7f5ad5d5e18201cfaf09f4"},
"connection": {:hex, :connection, "1.1.0", "ff2a49c4b75b6fb3e674bfc5536451607270aac754ffd1bdfe175abe4a6d7a68", [:mix], [], "hexpm", "722c1eb0a418fbe91ba7bd59a47e28008a189d47e37e0e7bb85585a016b2869c"},
"crc32cer": {:hex, :crc32cer, "0.1.8", "c6c2275c5fb60a95f4935d414f30b50ee9cfed494081c9b36ebb02edfc2f48db", [:rebar3], [], "hexpm", "251499085482920deb6c9b7aadabf9fb4c432f96add97ab42aee4501e5b6f591"},
"datum": {:hex, :datum, "4.6.1", "93b131203a60cfea9ffff6435a50dc24239f689dfebb76e6aecf6ce689efe8f4", [:rebar3], [], "hexpm", "e14340f8280fedb1731d5cd6e9f5aeaa14b880c51f0b3dc16c42c6671c167e4d"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"erlkaf": {:hex, :erlkaf, "2.1.6", "fb9aed863f09249dc549135391f5d173d1a1064bf222dc14a74c92fe3408cd60", [:rebar3], [{:esq, "2.0.6", [hex: :esq, repo: "hexpm", optional: false]}, {:jsone, "1.8.1", [hex: :jsone, repo: "hexpm", optional: false]}], "hexpm", "22ab3e870e78b6d16ae1ffc3fee0c155c9179fcd6e2f2a703398d6fb677ddff9"},
"esq": {:hex, :esq, "2.0.6", "9917e1a731c609b42624a4bb8594a25d537ea30e7b55d46cd46fa1b95e6db675", [:rebar3], [{:datum, "~> 4.6.1", [hex: :datum, repo: "hexpm", optional: false]}, {:pipe, "~> 2.0.1", [hex: :pipes, repo: "hexpm", optional: false]}, {:uid, "~> 1.3.4", [hex: :uid, repo: "hexpm", optional: false]}], "hexpm", "3b798da50c508fe93248dbbd64d3d2cb618cab5387e66515ab83cadf2b1abac1"},
"ex_doc": {:hex, :ex_doc, "0.34.0", "ab95e0775db3df71d30cf8d78728dd9261c355c81382bcd4cefdc74610bef13e", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "60734fb4c1353f270c3286df4a0d51e65a2c1d9fba66af3940847cc65a8066d7"},
"jsone": {:hex, :jsone, "1.8.1", "6bc74d3863d55d420077346da97c601711017a057f2fd1df65d6d65dd562fbab", [:rebar3], [], "hexpm", "c78918124148c51a7a84c678e39bbc6281f8cb582f1d88584628a98468e99738"},
"kafka_ex": {:hex, :kafka_ex, "0.13.0", "2bfaf3c81d4ee01ed2088cb09e46c070c245f60f5752ec7043f29e807f6679ec", [:mix], [{:kayrock, "~> 0.1.12", [hex: :kayrock, repo: "hexpm", optional: false]}], "hexpm", "8a806eee5cd8191f45870b2ef4b3f4f52c57d798039f2d3fc602ce47053db7b9"},
"kafka_protocol": {:hex, :kafka_protocol, "4.1.3", "362d85a898d4148a43dbabb10a30bb2d6ff32ba0097eb06981d11b34e2e0a9cd", [:rebar3], [{:crc32cer, "0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}], "hexpm", "28cf73001270d972524dd0fad4a59074f4441219f9cf237ad808a2ac1ec97487"},
"kayrock": {:hex, :kayrock, "0.1.15", "61ce03b65dd2236479357ca4162f43fe3a42923b39fbb6551a16d57cf2b93072", [:mix], [{:connection, "~> 1.1.0", [hex: :connection, repo: "hexpm", optional: false]}, {:crc32cer, "~> 0.1.8", [hex: :crc32cer, repo: "hexpm", optional: false]}, {:varint, "~> 1.2.0", [hex: :varint, repo: "hexpm", optional: false]}], "hexpm", "61d7b3579db68e61c26f316b9246e0231b878148bb1887adc59fecedcbc46c12"},
Expand All @@ -17,8 +21,10 @@
"nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"},
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"observer_cli": {:hex, :observer_cli, "1.7.4", "3c1bfb6d91bf68f6a3d15f46ae20da0f7740d363ee5bc041191ce8722a6c4fae", [:mix, :rebar3], [{:recon, "~> 2.5.1", [hex: :recon, repo: "hexpm", optional: false]}], "hexpm", "50de6d95d814f447458bd5d72666a74624eddb0ef98bdcee61a0153aae0865ff"},
"pipe": {:hex, :pipes, "2.0.1", "a2b56796c63690ed0e78bb77bb389af250bd70afa15a6869369dbdc11087d68f", [:rebar3], [], "hexpm", "623357a158e4c33ee589d4c735ddbab9c77a04e85159192e4d42f1dc97c60bd9"},
"recon": {:hex, :recon, "2.5.5", "c108a4c406fa301a529151a3bb53158cadc4064ec0c5f99b03ddb8c0e4281bdf", [:mix, :rebar3], [], "hexpm", "632a6f447df7ccc1a4a10bdcfce71514412b16660fe59deca0fcf0aa3c054404"},
"snappyer": {:hex, :snappyer, "1.2.9", "9cc58470798648ce34c662ca0aa6daae31367667714c9a543384430a3586e5d3", [:rebar3], [], "hexpm", "18d00ca218ae613416e6eecafe1078db86342a66f86277bd45c95f05bf1c8b29"},
"statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"},
"uid": {:hex, :uid, "1.3.4", "42e30e22908e8e2faa6227e9c261f1954cb540be3c5a139e112369ae6cc451fc", [:rebar3], [], "hexpm", "f8388ef93b16a5d5f9977e1fe814ae0acf5529b1e0ee5d7b18d23cb4c0f87eaa"},
"varint": {:hex, :varint, "1.2.0", "61bffd9dcc2d5242d59f75694506b4d4013bb103f6a23e34b94f89cebb0c1ab3", [:mix], [], "hexpm", "d94941ed8b9d1a5fdede9103a5e52035bd0aaf35081d44e67713a36799927e47"},
}
178 changes: 178 additions & 0 deletions test/support/async_producer_benchmark.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
defmodule Klife.TestUtils.AsyncProducerBenchmark do
require Logger

alias Klife.Producer.Controller, as: PController

@number_of_records 5_000_000

def run(clients) do
sample_data = generate_data()

topics = [
List.first(sample_data.records_0).topic,
List.first(sample_data.records_1).topic,
List.first(sample_data.records_2).topic
]

records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2

client_results = Enum.map(clients, &run_benchmark(&1, topics, records))

results = Enum.zip(clients, client_results) |> Map.new()
IO.puts("Client | Result | Compared to klife")
Enum.each(results, fn {client, result} ->
IO.puts(
"#{client}\t| #{result} | x#{results_compared_to_klife(result, results)}"
)
end)
end

defp run_benchmark("erlkaf", topics, records) do
:erlkaf.start()

producer_config = [
bootstrap_servers: "localhost:19092",
max_in_flight: 1,
enable_idempotence: true,
sticky_partitioning_linger_ms: 0,
batch_size: 512_000
]

:ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config)

Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
erlkaf_msg = Enum.random(records)

:erlkaf.produce(
:erlkaf_test_producer,
erlkaf_msg.topic,
erlkaf_msg.key,
erlkaf_msg.value
)
end)

:ok
end)

result = measurement_collector(topics)

:erlkaf.stop()

result
end

defp run_benchmark("klife", topics, records) do
{:ok, client_pid} =
Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
klife_msg = Enum.random(records)
MyClient.produce_async(klife_msg)
end)
end)

result = measurement_collector(topics)

Process.exit(client_pid, :kill)

result
end

defp run_benchmark("brod", topics, records) do
Task.async(fn ->
Enum.map(1..@number_of_records, fn _i ->
brod_msg = Enum.random(records)

:brod.produce(
:kafka_client,
brod_msg.topic,
brod_msg.partition,
brod_msg.key,
brod_msg.value
)
end)
end)

result = measurement_collector(topics)

:brod.stop()

result
end

defp measurement_collector(topics) do
starting_offset = get_total_offsets(topics)

Process.sleep(10000)

get_total_offsets(topics) - starting_offset
end

defp get_total_offsets(topics), do: get_offset_by_topic(topics) |> Map.values() |> Enum.sum()

defp get_offset_by_topic(topics) do
metas = PController.get_all_topics_partitions_metadata(MyClient)

data_by_topic =
metas
|> Enum.group_by(fn m -> m.leader_id end)
|> Enum.flat_map(fn {leader_id, metas} ->
Klife.Testing.get_latest_offsets(leader_id, metas, MyClient)
end)
|> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end)
|> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end)
|> Enum.map(fn {k, v} ->
{k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()}
end)
|> Map.new()
end

defp generate_data() do
topic0 = "async_benchmark_topic_0"
topic1 = "async_benchmark_topic_1"
topic2 = "async_benchmark_topic_2"

max_partition = 30

records_0 =
Enum.map(0..(max_partition - 1), fn p ->
%Klife.Record{
value: :rand.bytes(1_000),
key: :rand.bytes(50),
topic: topic0,
partition: p
}
end)

records_1 =
Enum.map(0..(max_partition - 1), fn p ->
%Klife.Record{
value: :rand.bytes(1_000),
key: :rand.bytes(50),
topic: topic1,
partition: p
}
end)

records_2 =
Enum.map(0..(max_partition - 1), fn p ->
%Klife.Record{
value: :rand.bytes(1_000),
key: :rand.bytes(50),
topic: topic2,
partition: p
}
end)

%{
records_0: records_0,
records_1: records_1,
records_2: records_2,
max_partition: max_partition
}
end

defp results_compared_to_klife(result, results) do
result / Map.get(results, "klife") |> Float.round(2)
end
end

0 comments on commit 069bc76

Please sign in to comment.