Skip to content

Commit

Permalink
feat(producer): properly handle idempotent producer
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Apr 8, 2024
1 parent e58d032 commit 23138cd
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 38 deletions.
100 changes: 79 additions & 21 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@ defmodule Klife.Producer.Controller do

@default_producer %{name: :default_producer}

defstruct [:cluster_name, :producers, :topics]
defstruct [
:cluster_name,
:producers,
:topics,
:check_metadata_waiting_pids,
:check_metadata_timer_ref
]

def start_link(args) do
cluster_name = Keyword.fetch!(args, :cluster_name)
Expand All @@ -24,10 +30,14 @@ defmodule Klife.Producer.Controller do
cluster_name = Keyword.fetch!(args, :cluster_name)
topics_list = Keyword.get(args, :topics, [])

timer_ref = Process.send_after(self(), :check_metadata, 0)

state = %__MODULE__{
cluster_name: cluster_name,
producers: [@default_producer] ++ Keyword.get(args, :producers, []),
topics: Enum.filter(topics_list, &Map.get(&1, :enable_produce, true))
topics: Enum.filter(topics_list, &Map.get(&1, :enable_produce, true)),
check_metadata_waiting_pids: [],
check_metadata_timer_ref: timer_ref
}

Enum.each(topics_list, fn t ->
Expand All @@ -46,8 +56,6 @@ defmodule Klife.Producer.Controller do

Utils.wait_connection!(cluster_name)

send(self(), :check_metadata)

{:ok, state}
end

Expand All @@ -74,38 +82,88 @@ defmodule Klife.Producer.Controller do
end

@impl true
def handle_info(:check_metadata, %__MODULE__{} = state) do
def handle_info(
:check_metadata,
%__MODULE__{cluster_name: cluster_name, topics: topics} = state
) do
content = %{
topics: Enum.map(state.topics, fn t -> %{name: t.name} end)
topics: Enum.map(topics, fn t -> %{name: t.name} end)
}

{:ok, %{content: resp}} =
Broker.send_message(Messages.Metadata, state.cluster_name, :controller, content)
Broker.send_message(Messages.Metadata, cluster_name, :controller, content)

table_name = topics_partitions_metadata_table(cluster_name)

results =
for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)),
config_topic = Enum.find(topics, &(&1.name == topic.name)),
partition <- topic.partitions do
case :ets.lookup(table_name, {topic, partition}) do
[] ->
:ets.insert(table_name, {
{topic.name, partition.partition_index},
partition.leader_id,
config_topic[:producer] || @default_producer.name,
nil
})

:new

[{{^topic, ^partition}, current_broker_id, _default_producer, _dispatcher_id}] ->
if current_broker_id != partition.leader_id,
do: :ets.update_element(table_name, {topic, partition}, {2, partition.leader_id}),
else: :noop
end
end

for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)),
config_topic = Enum.find(state.topics, &(&1.name == topic.name)),
partition <- topic.partitions do
state.cluster_name
|> topics_partitions_metadata_table()
|> :ets.insert({
{topic.name, partition.partition_index},
partition.leader_id,
config_topic[:producer] || @default_producer.name,
nil
})
if Enum.any?(results, &(&1 == :new)) do
send(self(), :init_producers)
end

if Enum.any?(resp.topics, &(&1.error_code != 0)) do
Process.send_after(self(), :check_metadata, :timer.seconds(5))
new_ref = Process.send_after(self(), :check_metadata, :timer.seconds(1))
{:noreply, %{state | check_metadata_timer_ref: new_ref}}
else
send(self(), :init_producers)
state.check_metadata_waiting_pids
|> Enum.reverse()
|> Enum.each(&GenServer.reply(&1, :ok))

new_ref = Process.send_after(self(), :check_metadata, :timer.seconds(300))

{:noreply, %{state | check_metadata_timer_ref: new_ref, check_metadata_waiting_pids: []}}
end
end

{:noreply, state}
@impl true
def handle_call(:trigger_check_metadata, from, %__MODULE__{} = state) do
case state do
%__MODULE__{check_metadata_waiting_pids: []} ->
Process.cancel_timer(state.check_metadata_timer_ref)
new_ref = Process.send_after(self(), :check_cluster, 0)

{:noreply,
%__MODULE__{
state
| check_metadata_waiting_pids: [from],
check_metadata_timer_ref: new_ref
}}

%__MODULE__{} ->
{:noreply,
%__MODULE__{
state
| check_metadata_waiting_pids: [from | state.check_metadata_timer_ref]
}}
end
end

# Public Interface

def trigger_metadata_verification(cluster_name) do
GenServer.call(via_tuple({__MODULE__, cluster_name}), :trigger_check_metadata)
end

def get_topics_partitions_metadata(cluster_name, topic, partition) do
[{_key, broker_id, default_producer, dispatcher_id}] =
cluster_name
Expand Down
80 changes: 63 additions & 17 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Klife.Producer.Dispatcher do
defstruct [
:producer_config,
:producer_id,
:producer_epoch,
:base_sequences,
:broker_id,
:current_batch,
Expand All @@ -19,7 +20,8 @@ defmodule Klife.Producer.Dispatcher do
:last_batch_sent_at,
:in_flight_pool,
:next_send_msg_ref,
:batch_queue
:batch_queue,
:dispatcher_id
]

def start_link(args) do
Expand All @@ -45,6 +47,9 @@ defmodule Klife.Producer.Dispatcher do
linger_ms = args_map.producer_config.linger_ms
idempotent? = args_map.producer_config.enable_idempotence
cluster_name = args_map.producer_config.cluster_name
broker_id = args_map.broker_id
dispatcher_id = args_map.id
producer_config = args_map.producer_config

producer_id =
if idempotent?,
Expand All @@ -56,7 +61,7 @@ defmodule Klife.Producer.Dispatcher do
do: Process.send_after(self(), :send_to_broker, linger_ms),
else: nil

base = %__MODULE__{
state = %__MODULE__{
current_batch: %{},
current_waiting_pids: %{},
current_base_time: nil,
Expand All @@ -66,14 +71,32 @@ defmodule Klife.Producer.Dispatcher do
in_flight_pool: Enum.map(1..max_in_flight, fn _ -> nil end),
next_send_msg_ref: next_send_msg_ref,
producer_id: producer_id,
base_sequences: %{}
base_sequences: %{},
producer_epoch:
get_producer_epoch(cluster_name, broker_id, producer_config.producer_name, dispatcher_id),
broker_id: broker_id,
dispatcher_id: dispatcher_id,
producer_config: producer_config
}

state = %__MODULE__{} = Map.merge(base, args_map)

{:ok, state}
end

# TODO: Use an unique producer epoch for each topic partition
defp get_producer_epoch(cluster_name, broker_id, producer_name, dispatcher_id) do
key = {__MODULE__, cluster_name, broker_id, producer_name, dispatcher_id, :epoch}

case :persistent_term.get(key, nil) do
nil ->
counter = :atomics.new(1, [])
:persistent_term.put(key, counter)
:atomics.add_get(counter, 1, 1)

counter ->
:atomics.add_get(counter, 1, 1)
end
end

defp get_producer_id(cluster_name, broker_id) do
content = %{
transactional_id: nil,
Expand Down Expand Up @@ -222,7 +245,7 @@ defmodule Klife.Producer.Dispatcher do
cluster: #{state.producer_config.cluster_name}
broker_id: #{state.broker_id}
producer_name: #{state.producer_config.name}
producer_name: #{state.producer_config.producer_name}
""")

{:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}}
Expand Down Expand Up @@ -405,7 +428,8 @@ defmodule Klife.Producer.Dispatcher do
%__MODULE__{
base_sequences: bs,
producer_id: p_id,
producer_config: %{enable_idempotence: idempotent?} = pconfig
producer_config: %{enable_idempotence: idempotent?} = pconfig,
producer_epoch: p_epoch
} = _state,
topic,
partition
Expand All @@ -420,9 +444,8 @@ defmodule Klife.Producer.Dispatcher do
base_timestamp: nil,
max_timestamp: nil,
producer_id: p_id,
# TODO: Handle producer epoch
producer_epoch: -1,
base_sequence: if(idempotent?, do: Map.get(bs, {topic, partition}, 0) + 1, else: -1),
producer_epoch: p_epoch,
base_sequence: if(idempotent?, do: Map.get(bs, {topic, partition}, 0), else: -1),
records: [],
records_length: 0
}
Expand Down Expand Up @@ -461,12 +484,20 @@ defmodule Klife.Producer.Dispatcher do
end

defp update_base_sequence(curr_base_sequences, new_batch, topic, partition) do
new_base_sequence =
new_batch
|> Map.fetch!({topic, partition})
|> Map.fetch!(:base_sequence)
case Map.get(curr_base_sequences, {topic, partition}) do
nil ->
new_base_sequence =
new_batch
|> Map.fetch!({topic, partition})
|> Map.fetch!(:base_sequence)

Map.put(curr_base_sequences, {topic, partition}, new_base_sequence)
if new_base_sequence != -1,
do: Map.put(curr_base_sequences, {topic, partition}, new_base_sequence + 1),
else: curr_base_sequences

curr_base_seq ->
Map.replace!(curr_base_sequences, {topic, partition}, curr_base_seq + 1)
end
end

defp add_waiting_pid(waiting_pids, _new_batch, nil, _topic, _partition), do: waiting_pids
Expand Down Expand Up @@ -501,7 +532,8 @@ defmodule Klife.Producer.Dispatcher do
cluster_name: cluster_name,
retry_backoff_ms: retry_ms,
client_id: client_id,
acks: acks
acks: acks,
producer_name: producer_name
} = pconfig

now = System.monotonic_time(:millisecond)
Expand Down Expand Up @@ -544,7 +576,21 @@ defmodule Klife.Producer.Dispatcher do
[] ->
send(callback_pid, {:broker_delivery_success, pool_idx})

_list ->
error_list ->
Enum.each(error_list, fn {topic, partition, error_code, _base_offset} ->
Logger.warning("""
Error while producing message. Retrying...
topic: #{topic}
partition: #{partition}
error_code: #{error_code}
cluster: #{cluster_name}
broker_id: #{broker_id}
producer_name: #{producer_name}
""")
end)

# TODO: Handle specific errors by code, not just retry all not success
# TODO: Handle out of order error for max_in_flight > 1
success_keys = Enum.map(success_list, fn {t, p, _, _} -> {t, p} end)
Expand Down

0 comments on commit 23138cd

Please sign in to comment.