Skip to content

Commit

Permalink
chore: rename dispatcher to batcher
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 29, 2024
1 parent 97a5d41 commit 9a81c89
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 69 deletions.
24 changes: 12 additions & 12 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,19 @@ config :klife,
client_id: "my_custom_client_id"
},
%{
name: :dispatcher_benchmark_producer_1,
name: :batcher_benchmark_producer_1,
client_id: "my_custom_client_id",
dispatchers_count: 1
batchers_count: 1
},
%{
name: :dispatcher_benchmark_producer_2,
name: :batcher_benchmark_producer_2,
client_id: "my_custom_client_id",
dispatchers_count: 2
batchers_count: 2
},
%{
name: :dispatcher_benchmark_producer_3,
name: :batcher_benchmark_producer_3,
client_id: "my_custom_client_id",
dispatchers_count: 3
batchers_count: 3
}
],
topics: [
Expand All @@ -58,20 +58,20 @@ config :klife,
producer: :my_batch_compressed_producer
},
%{
name: "dispatcher_benchmark_topic_1",
producer: :dispatcher_benchmark_producer_1,
name: "batcher_benchmark_topic_1",
producer: :batcher_benchmark_producer_1,
num_partitions: 30,
replication_factor: 2
},
%{
name: "dispatcher_benchmark_topic_2",
producer: :dispatcher_benchmark_producer_2,
name: "batcher_benchmark_topic_2",
producer: :batcher_benchmark_producer_2,
num_partitions: 30,
replication_factor: 2
},
%{
name: "dispatcher_benchmark_topic_3",
producer: :dispatcher_benchmark_producer_3,
name: "batcher_benchmark_topic_3",
producer: :batcher_benchmark_producer_3,
num_partitions: 30,
replication_factor: 2
},
Expand Down
22 changes: 11 additions & 11 deletions lib/klife/producer/dispatcher.ex → lib/klife/producer/batcher.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Klife.Producer.Dispatcher do
defmodule Klife.Producer.Batcher do
use GenServer
import Klife.ProcessRegistry

Expand All @@ -21,22 +21,22 @@ defmodule Klife.Producer.Dispatcher do
:in_flight_pool,
:next_send_msg_ref,
:batch_queue,
:dispatcher_id
:batcher_id
]

def start_link(args) do
pconfig = Keyword.fetch!(args, :producer_config)
cluster_name = pconfig.cluster_name
broker_id = Keyword.fetch!(args, :broker_id)
dispatcher_id = Keyword.fetch!(args, :id)
batcher_id = Keyword.fetch!(args, :id)

GenServer.start_link(__MODULE__, args,
name:
get_process_name(
cluster_name,
broker_id,
pconfig.producer_name,
dispatcher_id
batcher_id
)
)
end
Expand All @@ -48,7 +48,7 @@ defmodule Klife.Producer.Dispatcher do
idempotent? = args_map.producer_config.enable_idempotence
cluster_name = args_map.producer_config.cluster_name
broker_id = args_map.broker_id
dispatcher_id = args_map.id
batcher_id = args_map.id
producer_config = args_map.producer_config

producer_id =
Expand All @@ -74,7 +74,7 @@ defmodule Klife.Producer.Dispatcher do
base_sequences: %{},
producer_epochs: %{},
broker_id: broker_id,
dispatcher_id: dispatcher_id,
batcher_id: batcher_id,
producer_config: producer_config
}

Expand All @@ -100,10 +100,10 @@ defmodule Klife.Producer.Dispatcher do
cluster_name,
broker_id,
producer_name,
dispatcher_id
batcher_id
) do
cluster_name
|> get_process_name(broker_id, producer_name, dispatcher_id)
|> get_process_name(broker_id, producer_name, batcher_id)
|> GenServer.call({:produce_sync, record, topic, partition, estimate_record_size(record)})
end

Expand Down Expand Up @@ -338,7 +338,7 @@ defmodule Klife.Producer.Dispatcher do
# socket writes and move the response handler to other process.
{:ok, task_pid} =
Task.Supervisor.start_child(
via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name}),
via_tuple({Klife.Producer.BatcherTaskSupervisor, cluster_name}),
__MODULE__,
:do_dispatch_to_broker,
[
Expand Down Expand Up @@ -683,9 +683,9 @@ defmodule Klife.Producer.Dispatcher do
cluster_name,
broker_id,
producer_name,
dispatcher_id
batcher_id
) do
via_tuple({__MODULE__, cluster_name, broker_id, producer_name, dispatcher_id})
via_tuple({__MODULE__, cluster_name, broker_id, producer_name, batcher_id})
end

defp estimate_record_size(record) do
Expand Down
16 changes: 8 additions & 8 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ defmodule Klife.Producer.Controller do

:new

[{key, current_broker_id, _default_producer, _dispatcher_id}] ->
[{key, current_broker_id, _default_producer, _batcher_id}] ->
if current_broker_id != partition.leader_id do
:ets.update_element(
table_name,
Expand Down Expand Up @@ -197,15 +197,15 @@ defmodule Klife.Producer.Controller do
end

def get_topics_partitions_metadata(cluster_name, topic, partition) do
[{_key, broker_id, default_producer, dispatcher_id}] =
[{_key, broker_id, default_producer, batcher_id}] =
cluster_name
|> topics_partitions_metadata_table()
|> :ets.lookup({topic, partition})

%{
broker_id: broker_id,
producer_name: default_producer,
dispatcher_id: dispatcher_id
batcher_id: batcher_id
}
end

Expand All @@ -221,28 +221,28 @@ defmodule Klife.Producer.Controller do
|> :ets.lookup_element({topic, partition}, 3)
end

def get_dispatcher_id(cluster_name, topic, partition) do
def get_batcher_id(cluster_name, topic, partition) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.lookup_element({topic, partition}, 4)
end

def update_dispatcher_id(cluster_name, topic, partition, new_dispatcher_id) do
def update_batcher_id(cluster_name, topic, partition, new_batcher_id) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.update_element({topic, partition}, {4, new_dispatcher_id})
|> :ets.update_element({topic, partition}, {4, new_batcher_id})
end

def get_all_topics_partitions_metadata(cluster_name) do
cluster_name
|> topics_partitions_metadata_table()
|> :ets.tab2list()
|> Enum.map(fn {{topic_name, partition_idx}, leader_id, _default_producer, dispatcher_id} ->
|> Enum.map(fn {{topic_name, partition_idx}, leader_id, _default_producer, batcher_id} ->
%{
topic_name: topic_name,
partition_idx: partition_idx,
leader_id: leader_id,
dispatcher_id: dispatcher_id
batcher_id: batcher_id
}
end)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/klife/producer/dispatcher_supervisor.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Klife.Producer.DispatcherSupervisor do
defmodule Klife.Producer.BatcherSupervisor do
use DynamicSupervisor

import Klife.ProcessRegistry
Expand Down
56 changes: 28 additions & 28 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ defmodule Klife.Producer do

import Klife.ProcessRegistry

alias Klife.Producer.Dispatcher
alias Klife.Producer.DispatcherSupervisor
alias Klife.Producer.Batcher
alias Klife.Producer.BatcherSupervisor
alias Klife.Producer.Controller, as: ProducerController
alias Klife.Connection.Controller, as: ConnController

Expand All @@ -20,7 +20,7 @@ defmodule Klife.Producer do
request_timeout_ms: [type: :non_neg_integer, default: :timer.seconds(15)],
retry_backoff_ms: [type: :non_neg_integer, default: :timer.seconds(1)],
max_in_flight_requests: [type: :non_neg_integer, default: 1],
dispatchers_count: [type: :pos_integer, default: 1],
batchers_count: [type: :pos_integer, default: 1],
enable_idempotence: [type: :boolean, default: true],
compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none]
]
Expand Down Expand Up @@ -50,7 +50,7 @@ defmodule Klife.Producer do
filtered_args = Map.take(args_map, Map.keys(base))
state = Map.merge(base, filtered_args)

:ok = init_dispatchers(state)
:ok = init_batchers(state)

{:ok, state}
end
Expand All @@ -59,27 +59,27 @@ defmodule Klife.Producer do
%{
broker_id: broker_id,
producer_name: default_producer,
dispatcher_id: default_dispatcher_id
batcher_id: default_batcher_id
} = ProducerController.get_topics_partitions_metadata(cluster_name, topic, partition)

{producer_name, dispatcher_id} =
{producer_name, batcher_id} =
case Keyword.get(opts, :producer) do
nil ->
{default_producer, default_dispatcher_id}
{default_producer, default_batcher_id}

other_producer ->
{other_producer, get_dispatcher_id(cluster_name, other_producer, topic, partition)}
{other_producer, get_batcher_id(cluster_name, other_producer, topic, partition)}
end

{:ok, delivery_timeout_ms} =
Dispatcher.produce_sync(
Batcher.produce_sync(
record,
topic,
partition,
cluster_name,
broker_id,
producer_name,
dispatcher_id
batcher_id
)

# TODO: Should we handle cluster change errors here by retrying after a cluster check?
Expand All @@ -95,22 +95,22 @@ defmodule Klife.Producer do
end
end

defp init_dispatchers(%__MODULE__{} = state) do
defp init_batchers(%__MODULE__{} = state) do
known_brokers = ConnController.get_known_brokers(state.cluster_name)
dispatchers_per_broker = state.dispatchers_count
:ok = do_init_dispatchers(state, known_brokers, dispatchers_per_broker)
:ok = update_topic_partition_metadata(state, dispatchers_per_broker)
batchers_per_broker = state.batchers_count
:ok = do_init_batchers(state, known_brokers, batchers_per_broker)
:ok = update_topic_partition_metadata(state, batchers_per_broker)

:ok
end

defp do_init_dispatchers(state, known_brokers, dispatchers_per_broker) do
defp do_init_batchers(state, known_brokers, batchers_per_broker) do
for broker_id <- known_brokers,
dispatcher_id <- 0..(dispatchers_per_broker - 1) do
batcher_id <- 0..(batchers_per_broker - 1) do
result =
DynamicSupervisor.start_child(
via_tuple({DispatcherSupervisor, state.cluster_name}),
{Dispatcher, [{:broker_id, broker_id}, {:id, dispatcher_id}, {:producer_config, state}]}
via_tuple({BatcherSupervisor, state.cluster_name}),
{Batcher, [{:broker_id, broker_id}, {:id, batcher_id}, {:producer_config, state}]}
)

case result do
Expand All @@ -122,7 +122,7 @@ defmodule Klife.Producer do
:ok
end

defp update_topic_partition_metadata(%__MODULE__{} = state, dispatchers_per_broker) do
defp update_topic_partition_metadata(%__MODULE__{} = state, batchers_per_broker) do
%__MODULE__{
cluster_name: cluster_name,
producer_name: producer_name
Expand All @@ -136,33 +136,33 @@ defmodule Klife.Producer do
|> Enum.with_index()
|> Enum.map(fn {val, idx} ->
dipsatcher_id =
if dispatchers_per_broker > 1, do: rem(idx, dispatchers_per_broker), else: 0
if batchers_per_broker > 1, do: rem(idx, batchers_per_broker), else: 0

Map.put(val, :dispatcher_id, dipsatcher_id)
Map.put(val, :batcher_id, dipsatcher_id)
end)
end)
|> List.flatten()
|> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, dispatcher_id: d_id} ->
|> Enum.each(fn %{topic_name: t_name, partition_idx: p_idx, batcher_id: d_id} ->
# Used when a record is produced by a non default producer
# in this case the proper dispatcher_id won't be present at
# in this case the proper batcher_id won't be present at
# main metadata ets table, therefore we need a way to
# find out it's value.
put_dispatcher_id(cluster_name, producer_name, t_name, p_idx, d_id)
put_batcher_id(cluster_name, producer_name, t_name, p_idx, d_id)

if ProducerController.get_default_producer(cluster_name, t_name, p_idx) == producer_name do
ProducerController.update_dispatcher_id(cluster_name, t_name, p_idx, d_id)
ProducerController.update_batcher_id(cluster_name, t_name, p_idx, d_id)
end
end)
end

defp put_dispatcher_id(cluster_name, producer_name, topic, partition, dispatcher_id) do
defp put_batcher_id(cluster_name, producer_name, topic, partition, batcher_id) do
:persistent_term.put(
{__MODULE__, cluster_name, producer_name, topic, partition},
dispatcher_id
batcher_id
)
end

defp get_dispatcher_id(cluster_name, producer_name, topic, partition) do
defp get_batcher_id(cluster_name, producer_name, topic, partition) do
:persistent_term.get({__MODULE__, cluster_name, producer_name, topic, partition})
end
end
4 changes: 2 additions & 2 deletions lib/klife/producer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ defmodule Klife.Producer.Supervisor do
cluster_name = Keyword.fetch!(opts, :cluster_name)

children = [
{Task.Supervisor, name: via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name})},
{Task.Supervisor, name: via_tuple({Klife.Producer.BatcherTaskSupervisor, cluster_name})},
{Klife.Producer.ProducerSupervisor, opts},
{Klife.Producer.DispatcherSupervisor, opts},
{Klife.Producer.BatcherSupervisor, opts},
{Klife.Producer.Controller, opts}
]

Expand Down
Loading

0 comments on commit 9a81c89

Please sign in to comment.