Skip to content

Commit

Permalink
chore: async benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 17, 2024
1 parent 3f6671b commit 4bfcc28
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 75 deletions.
4 changes: 4 additions & 0 deletions bechmark_results/16_async.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Client | Result | Compared to klife
brod | 926471 | x0.76
erlkaf | 573942 | x0.47
klife | 1223524 | x1.0
4 changes: 4 additions & 0 deletions bechmark_results/1_async.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Client | Result | Compared to klife
brod | 749305 | x0.71
erlkaf | 1431789 | x1.36
klife | 1053619 | x1.0
4 changes: 4 additions & 0 deletions bechmark_results/2_async.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Client | Result | Compared to klife
brod | 807898 | x0.54
erlkaf | 1271313 | x0.85
klife | 1489859 | x1.0
4 changes: 4 additions & 0 deletions bechmark_results/4_async.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Client | Result | Compared to klife
brod | 887220 | x0.57
erlkaf | 1264701 | x0.81
klife | 1569551 | x1.0
4 changes: 4 additions & 0 deletions bechmark_results/8_async.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Client | Result | Compared to klife
brod | 990580 | x0.64
erlkaf | 1209263 | x0.78
klife | 1545735 | x1.0
34 changes: 29 additions & 5 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ config :klife, MyClient,
producers: [
[
name: :benchmark_producer,
client_id: "my_custom_client_id",
client_id: "my_custom_client_id"
],
[
name: :async_benchmark_producer,
client_id: "my_custom_client_id",
batchers_count: 4
batchers_count: 32
],
[
name: :benchmark_producer_in_flight,
Expand Down Expand Up @@ -74,15 +74,39 @@ config :klife, MyClient,
default_producer: :benchmark_producer
],
[
name: "async_benchmark_topic_0",
name: "async_benchmark_topic_klife_0",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_klife_1",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_klife_2",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_erlkaf_0",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_erlkaf_1",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_erlkaf_2",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_brod_0",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_1",
name: "async_benchmark_topic_brod_1",
default_producer: :async_benchmark_producer
],
[
name: "async_benchmark_topic_2",
name: "async_benchmark_topic_brod_2",
default_producer: :async_benchmark_producer
],
[
Expand Down
2 changes: 1 addition & 1 deletion lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ if Mix.env() in [:dev] do
end

def do_run_bench("producer_async", parallel) do
AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"])
AsyncProducerBenchmark.run(["klife", "erlkaf", "brod"], String.to_integer(parallel))
end

def do_run_bench("producer_sync", parallel) do
Expand Down
148 changes: 79 additions & 69 deletions test/support/async_producer_benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,29 +5,31 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do

@number_of_records 5_000_000

def run(clients) do
sample_data = generate_data()

topics = [
List.first(sample_data.records_0).topic,
List.first(sample_data.records_1).topic,
List.first(sample_data.records_2).topic
]

records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2

client_results = Enum.map(clients, &run_benchmark(&1, topics, records))
def run(clients, parallel) do
client_results =
Enum.map(clients, fn client ->
Process.sleep(5000)
sample_data = generate_data(client)

topics = [
List.first(sample_data.records_0).topic,
List.first(sample_data.records_1).topic,
List.first(sample_data.records_2).topic
]

records = sample_data.records_0 ++ sample_data.records_1 ++ sample_data.records_2
run_benchmark(client, topics, records, parallel)
end)

results = Enum.zip(clients, client_results) |> Map.new()
IO.puts("Client | Result | Compared to klife")

Enum.each(results, fn {client, result} ->
IO.puts(
"#{client}\t| #{result} | x#{results_compared_to_klife(result, results)}"
)
IO.puts("#{client}\t| #{result} | x#{results_compared_to_klife(result, results)}")
end)
end

defp run_benchmark("erlkaf", topics, records) do
defp run_benchmark("erlkaf", topics, records, parallel) do
:erlkaf.start()

Check warning on line 33 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.15, 25)

:erlkaf.start/0 is undefined (module :erlkaf is not available or is yet to be defined)

Check warning on line 33 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.14, 24)

:erlkaf.start/0 is undefined (module :erlkaf is not available or is yet to be defined)

producer_config = [
Expand All @@ -40,70 +42,78 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do

:ok = :erlkaf.create_producer(:erlkaf_test_producer, producer_config)

Check warning on line 43 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.15, 25)

:erlkaf.create_producer/2 is undefined (module :erlkaf is not available or is yet to be defined)

Check warning on line 43 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.14, 24)

:erlkaf.create_producer/2 is undefined (module :erlkaf is not available or is yet to be defined)

Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
erlkaf_msg = Enum.random(records)

:erlkaf.produce(
:erlkaf_test_producer,
erlkaf_msg.topic,
erlkaf_msg.key,
erlkaf_msg.value
)
tasks =
Enum.map(1..parallel, fn _ ->
Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
erlkaf_msg = Enum.random(records)

:erlkaf.produce(

Check warning on line 51 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.15, 25)

:erlkaf.produce/4 is undefined (module :erlkaf is not available or is yet to be defined)

Check warning on line 51 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.14, 24)

:erlkaf.produce/4 is undefined (module :erlkaf is not available or is yet to be defined)
:erlkaf_test_producer,
erlkaf_msg.topic,
erlkaf_msg.key,
erlkaf_msg.value
)
end)

:ok
end)
end)

:ok
end)

result = measurement_collector(topics)

:erlkaf.stop()
Enum.each(tasks, fn {:ok, pid} -> Process.exit(pid, :kill) end)

result
end

defp run_benchmark("klife", topics, records) do
{:ok, client_pid} =
Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
klife_msg = Enum.random(records)
MyClient.produce_async(klife_msg)
defp run_benchmark("klife", topics, records, parallel) do
tasks =
Enum.map(1..parallel, fn _ ->
Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
klife_msg = Enum.random(records)
MyClient.produce_async(klife_msg)
end)
end)
end)

result = measurement_collector(topics)

Process.exit(client_pid, :kill)
Enum.each(tasks, fn {:ok, pid} -> Process.exit(pid, :kill) end)

result
end

defp run_benchmark("brod", topics, records) do
Task.async(fn ->
Enum.map(1..@number_of_records, fn _i ->
brod_msg = Enum.random(records)

:brod.produce(
:kafka_client,
brod_msg.topic,
brod_msg.partition,
brod_msg.key,
brod_msg.value
)
defp run_benchmark("brod", topics, records, parallel) do
tasks =
Enum.map(1..parallel, fn _ ->
Task.start(fn ->
Enum.map(1..@number_of_records, fn _i ->
brod_msg = Enum.random(records)

:brod.produce(

Check warning on line 95 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.15, 25)

:brod.produce/5 is undefined (module :brod is not available or is yet to be defined)

Check warning on line 95 in test/support/async_producer_benchmark.ex

View workflow job for this annotation

GitHub Actions / test (1.14, 24)

:brod.produce/5 is undefined (module :brod is not available or is yet to be defined)
:kafka_client,
brod_msg.topic,
brod_msg.partition,
brod_msg.key,
brod_msg.value
)
end)
end)
end)
end)

result = measurement_collector(topics)

:brod.stop()
Enum.each(tasks, fn {:ok, pid} -> Process.exit(pid, :kill) end)

result
end

defp measurement_collector(topics) do
starting_offset = get_total_offsets(topics)

Process.sleep(10000)
Process.sleep(10_000)

get_total_offsets(topics) - starting_offset
end
Expand All @@ -113,24 +123,24 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do
defp get_offset_by_topic(topics) do
metas = PController.get_all_topics_partitions_metadata(MyClient)

data_by_topic =
metas
|> Enum.group_by(fn m -> m.leader_id end)
|> Enum.flat_map(fn {leader_id, metas} ->
Klife.Testing.get_latest_offsets(leader_id, metas, MyClient)
end)
|> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end)
|> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end)
|> Enum.map(fn {k, v} ->
{k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()}
end)
|> Map.new()
metas
|> Enum.group_by(fn m -> m.leader_id end)
|> Enum.flat_map(fn {leader_id, metas} ->
Klife.Testing.get_latest_offsets(leader_id, metas, MyClient)
end)
|> Enum.filter(fn {topic, _pdata} -> Enum.member?(topics, topic) end)
|> Enum.group_by(fn {topic, _pdata} -> topic end, fn {_topic, pdata} -> pdata end)
|> Enum.map(fn {k, v} ->
{k, List.flatten(v) |> Enum.map(fn {_p, offset} -> offset end) |> Enum.sum()}
end)
|> Map.new()
end

defp generate_data() do
topic0 = "async_benchmark_topic_0"
topic1 = "async_benchmark_topic_1"
topic2 = "async_benchmark_topic_2"
defp generate_data(client) do
[topic0, topic1, topic2] =
Enum.map(0..2, fn i ->
"async_benchmark_topic_#{client}_#{i}"
end)

max_partition = 30

Expand Down Expand Up @@ -173,6 +183,6 @@ defmodule Klife.TestUtils.AsyncProducerBenchmark do
end

defp results_compared_to_klife(result, results) do
result / Map.get(results, "klife") |> Float.round(2)
(result / Map.get(results, "klife")) |> Float.round(2)
end
end

0 comments on commit 4bfcc28

Please sign in to comment.