From 04a503eab1262e8b530f1095179876320d49e00d Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Sun, 19 May 2024 21:40:16 -0300 Subject: [PATCH] feat(batcher): do not send to broker on handle_call --- config/config.exs | 2 +- lib/klife/producer/batcher.ex | 60 ++++++++--------------------------- test/support/utils.ex | 4 +-- 3 files changed, 16 insertions(+), 50 deletions(-) diff --git a/config/config.exs b/config/config.exs index 5226066..19685ca 100644 --- a/config/config.exs +++ b/config/config.exs @@ -23,7 +23,7 @@ config :klife, %{ name: :benchmark_producer_in_flight, client_id: "my_custom_client_id", - max_in_flight_requests: 5 + max_in_flight_requests: 10 }, %{ name: :my_batch_compressed_producer, diff --git a/lib/klife/producer/batcher.ex b/lib/klife/producer/batcher.ex index db221da..2d720f5 100644 --- a/lib/klife/producer/batcher.ex +++ b/lib/klife/producer/batcher.ex @@ -133,52 +133,17 @@ defmodule Klife.Producer.Batcher do %{ producer_config: %{linger_ms: linger_ms, delivery_timeout_ms: delivery_timeout}, last_batch_sent_at: last_batch_sent_at, - in_flight_pool: in_flight_pool, - batch_queue: batch_queue + next_send_msg_ref: next_ref } = state now = System.monotonic_time(:millisecond) - pool_idx = Enum.find_index(in_flight_pool, &is_nil/1) on_time? = now - last_batch_sent_at >= linger_ms - in_flight_available? = is_number(pool_idx) - has_batch_on_queue? = not :queue.is_empty(batch_queue) - - cond do - not on_time? -> - new_state = add_record(state, record, topic, partition, pid, rec_size) - - if not :queue.is_empty(new_state.batch_queue) do - maybe_schedule_dispatch(state, 0) - end - - {:reply, {:ok, delivery_timeout}, new_state} - - not in_flight_available? -> - new_state = - state - |> add_record(record, topic, partition, pid, rec_size) - |> maybe_schedule_dispatch(10) - - {:reply, {:ok, delivery_timeout}, new_state} - - has_batch_on_queue? -> - new_sate = - state - |> add_record(record, topic, partition, pid, rec_size) - |> dispatch_to_broker(pool_idx) - |> maybe_schedule_dispatch(10) - - {:reply, {:ok, delivery_timeout}, new_sate} + new_state = add_record(state, record, topic, partition, pid, rec_size) - true -> - new_sate = - state - |> add_record(record, topic, partition, pid, rec_size) - |> dispatch_to_broker(pool_idx) - - {:reply, {:ok, delivery_timeout}, new_sate} - end + if on_time? and next_ref == nil, + do: {:reply, {:ok, delivery_timeout}, maybe_schedule_send(new_state, 0)}, + else: {:reply, {:ok, delivery_timeout}, new_state} end def handle_info(:send_to_broker, %__MODULE__{} = state) do @@ -200,19 +165,19 @@ defmodule Klife.Producer.Batcher do cond do not in_flight_available? -> - {:noreply, maybe_schedule_dispatch(new_state, 10)} + {:noreply, maybe_schedule_send(new_state, 1)} has_batch_on_queue? -> new_state = new_state |> dispatch_to_broker(pool_idx) - |> maybe_schedule_dispatch(10) + |> maybe_schedule_send(1) {:noreply, new_state} not on_time? -> new_state = - maybe_schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at)) + maybe_schedule_send(new_state, linger_ms - (now - last_batch_sent_at)) {:noreply, new_state} @@ -220,7 +185,7 @@ defmodule Klife.Producer.Batcher do new_state = new_state |> dispatch_to_broker(pool_idx) - |> maybe_schedule_dispatch(linger_ms) + |> maybe_schedule_send(linger_ms) {:noreply, new_state} @@ -315,7 +280,8 @@ defmodule Klife.Producer.Batcher do do: state |> move_current_data_to_batch_queue() - |> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size), + |> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size) + |> maybe_schedule_send(5), else: state |> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size) @@ -376,13 +342,13 @@ defmodule Klife.Producer.Batcher do end end - def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: nil} = state, time), + def maybe_schedule_send(%__MODULE__{next_send_msg_ref: nil} = state, time), do: %{ state | next_send_msg_ref: Process.send_after(self(), :send_to_broker, time) } - def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: ref} = state, time) + def maybe_schedule_send(%__MODULE__{next_send_msg_ref: ref} = state, time) when is_reference(ref) do if Process.read_timer(ref) > time do Process.cancel_timer(ref) diff --git a/test/support/utils.ex b/test/support/utils.ex index 2bda29d..d79802a 100644 --- a/test/support/utils.ex +++ b/test/support/utils.ex @@ -50,7 +50,7 @@ defmodule Klife.TestUtils do Process.sleep(10) result end) - |> Task.await(15_000) + |> Task.await(30_000) end def start_broker(service_name, cluster_name) do @@ -102,6 +102,6 @@ defmodule Klife.TestUtils do Process.sleep(10) result end) - |> Task.await(25_000) + |> Task.await(30_000) end end