Skip to content

Commit

Permalink
feat(batcher): do not send to broker on handle_call
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed May 20, 2024
1 parent f8db17f commit 04a503e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 50 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ config :klife,
%{
name: :benchmark_producer_in_flight,
client_id: "my_custom_client_id",
max_in_flight_requests: 5
max_in_flight_requests: 10
},
%{
name: :my_batch_compressed_producer,
Expand Down
60 changes: 13 additions & 47 deletions lib/klife/producer/batcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -133,52 +133,17 @@ defmodule Klife.Producer.Batcher do
%{
producer_config: %{linger_ms: linger_ms, delivery_timeout_ms: delivery_timeout},
last_batch_sent_at: last_batch_sent_at,
in_flight_pool: in_flight_pool,
batch_queue: batch_queue
next_send_msg_ref: next_ref
} = state

now = System.monotonic_time(:millisecond)
pool_idx = Enum.find_index(in_flight_pool, &is_nil/1)

on_time? = now - last_batch_sent_at >= linger_ms
in_flight_available? = is_number(pool_idx)
has_batch_on_queue? = not :queue.is_empty(batch_queue)

cond do
not on_time? ->
new_state = add_record(state, record, topic, partition, pid, rec_size)

if not :queue.is_empty(new_state.batch_queue) do
maybe_schedule_dispatch(state, 0)
end

{:reply, {:ok, delivery_timeout}, new_state}

not in_flight_available? ->
new_state =
state
|> add_record(record, topic, partition, pid, rec_size)
|> maybe_schedule_dispatch(10)

{:reply, {:ok, delivery_timeout}, new_state}

has_batch_on_queue? ->
new_sate =
state
|> add_record(record, topic, partition, pid, rec_size)
|> dispatch_to_broker(pool_idx)
|> maybe_schedule_dispatch(10)

{:reply, {:ok, delivery_timeout}, new_sate}
new_state = add_record(state, record, topic, partition, pid, rec_size)

true ->
new_sate =
state
|> add_record(record, topic, partition, pid, rec_size)
|> dispatch_to_broker(pool_idx)

{:reply, {:ok, delivery_timeout}, new_sate}
end
if on_time? and next_ref == nil,
do: {:reply, {:ok, delivery_timeout}, maybe_schedule_send(new_state, 0)},
else: {:reply, {:ok, delivery_timeout}, new_state}
end

def handle_info(:send_to_broker, %__MODULE__{} = state) do
Expand All @@ -200,27 +165,27 @@ defmodule Klife.Producer.Batcher do

cond do
not in_flight_available? ->
{:noreply, maybe_schedule_dispatch(new_state, 10)}
{:noreply, maybe_schedule_send(new_state, 1)}

has_batch_on_queue? ->
new_state =
new_state
|> dispatch_to_broker(pool_idx)
|> maybe_schedule_dispatch(10)
|> maybe_schedule_send(1)

{:noreply, new_state}

not on_time? ->
new_state =
maybe_schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at))
maybe_schedule_send(new_state, linger_ms - (now - last_batch_sent_at))

{:noreply, new_state}

is_periodic? ->
new_state =
new_state
|> dispatch_to_broker(pool_idx)
|> maybe_schedule_dispatch(linger_ms)
|> maybe_schedule_send(linger_ms)

{:noreply, new_state}

Expand Down Expand Up @@ -315,7 +280,8 @@ defmodule Klife.Producer.Batcher do
do:
state
|> move_current_data_to_batch_queue()
|> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size),
|> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size)
|> maybe_schedule_send(5),
else:
state
|> add_record_to_current_data(record, topic, partition, pid, rec_estimated_size)
Expand Down Expand Up @@ -376,13 +342,13 @@ defmodule Klife.Producer.Batcher do
end
end

def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: nil} = state, time),
def maybe_schedule_send(%__MODULE__{next_send_msg_ref: nil} = state, time),
do: %{
state
| next_send_msg_ref: Process.send_after(self(), :send_to_broker, time)
}

def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: ref} = state, time)
def maybe_schedule_send(%__MODULE__{next_send_msg_ref: ref} = state, time)
when is_reference(ref) do
if Process.read_timer(ref) > time do
Process.cancel_timer(ref)
Expand Down
4 changes: 2 additions & 2 deletions test/support/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ defmodule Klife.TestUtils do
Process.sleep(10)
result
end)
|> Task.await(15_000)
|> Task.await(30_000)
end

def start_broker(service_name, cluster_name) do
Expand Down Expand Up @@ -102,6 +102,6 @@ defmodule Klife.TestUtils do
Process.sleep(10)
result
end)
|> Task.await(25_000)
|> Task.await(30_000)
end
end

0 comments on commit 04a503e

Please sign in to comment.