Skip to content

Commit

Permalink
feat(dispatcher): implement instant deliver of completed batches
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 3, 2024
1 parent c03ac29 commit b2a31ec
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 18 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ config :klife,
%{
name: :my_batch_producer,
client_id: "my_custom_client_id",
linger_ms: 1_000
linger_ms: 1_500
},
%{
name: :benchmark_producer,
Expand Down
61 changes: 44 additions & 17 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,32 +93,37 @@ defmodule Klife.Producer.Dispatcher do
pool_idx = Enum.find_index(in_flight_pool, &is_nil/1)

on_time? = now - last_batch_sent_at >= linger_ms
request_in_flight_available? = is_number(pool_idx)
batch_queue_is_empty? = :queue.is_empty(batch_queue)
in_flight_available? = is_number(pool_idx)
has_batch_on_queue? = not :queue.is_empty(batch_queue)

cond do
not on_time? ->
new_state = add_record(state, record, topic, partition, pid, rec_size)

if not :queue.is_empty(new_state.batch_queue) do
maybe_schedule_dispatch(state, 0)
end

{:reply, {:ok, delivery_timeout}, new_state}

not request_in_flight_available? ->
not in_flight_available? ->
new_state =
state
|> add_record(record, topic, partition, pid, rec_size)
|> schedule_dispatch(10)
|> maybe_schedule_dispatch(10)

{:reply, {:ok, delivery_timeout}, new_state}

not batch_queue_is_empty? ->
has_batch_on_queue? ->
new_sate =
state
|> add_record(record, topic, partition, pid, rec_size)
|> dispatch_to_broker(pool_idx)
|> schedule_dispatch(10)
|> maybe_schedule_dispatch(10)

{:reply, {:ok, delivery_timeout}, new_sate}

batch_queue_is_empty? ->
true ->
new_sate =
state
|> add_record(record, topic, partition, pid, rec_size)
Expand All @@ -142,26 +147,36 @@ defmodule Klife.Producer.Dispatcher do
on_time? = now - last_batch_sent_at >= linger_ms
in_flight_available? = is_number(pool_idx)
has_batch_on_queue? = not :queue.is_empty(batch_queue)
should_reschedule? = linger_ms > 0 or has_batch_on_queue?

is_periodic? = linger_ms > 0
new_state = %{state | next_send_msg_ref: nil}

cond do
not in_flight_available? ->
{:noreply, maybe_schedule_dispatch(new_state, 10)}

has_batch_on_queue? ->
new_state =
new_state
|> dispatch_to_broker(pool_idx)
|> maybe_schedule_dispatch(10)

{:noreply, new_state}

not on_time? ->
{:noreply, schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at))}
new_state =
maybe_schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at))

not in_flight_available? ->
{:noreply, schedule_dispatch(new_state, 10)}
{:noreply, new_state}

should_reschedule? ->
is_periodic? ->
new_state =
new_state
|> dispatch_to_broker(pool_idx)
|> schedule_dispatch(if has_batch_on_queue?, do: 0, else: linger_ms)
|> maybe_schedule_dispatch(linger_ms)

{:noreply, new_state}

not should_reschedule? ->
true ->
{:noreply, dispatch_to_broker(new_state, pool_idx)}
end
end
Expand Down Expand Up @@ -320,13 +335,25 @@ defmodule Klife.Producer.Dispatcher do
end
end

def schedule_dispatch(%__MODULE__{next_send_msg_ref: nil} = state, time),
def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: nil} = state, time),
do: %{
state
| next_send_msg_ref: Process.send_after(self(), :send_to_broker, time)
}

def schedule_dispatch(%__MODULE__{} = state, _), do: state
def maybe_schedule_dispatch(%__MODULE__{next_send_msg_ref: ref} = state, time)
when is_reference(ref) do
if Process.read_timer(ref) > time do
Process.cancel_timer(ref)

%{
state
| next_send_msg_ref: Process.send_after(self(), :send_to_broker, time)
}
else
state
end
end

## PRIVATE FUNCTIONS

Expand Down

0 comments on commit b2a31ec

Please sign in to comment.