Skip to content

Commit

Permalink
chore: minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Mar 28, 2024
1 parent 54ba383 commit 0c9b715
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 16 deletions.
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ config :klife,
ssl: false
# ssl_opts: [
# verify: :verify_peer,
# cacertfile: Path.relative("test/compose_files/truststore/ca.crt")
# cacertfile: Path.relative("test/compose_files/ssl/ca.crt")
# ]
]
],
Expand Down
23 changes: 12 additions & 11 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ defmodule Klife.Connection.Controller do
{:noreply, %__MODULE__{state | known_brokers: new_brokers_list}}

{:error, _reason} ->
Process.send(self(), :init_bootstrap_conn, [])
Process.send_after(self(), :init_bootstrap_conn, 1_000)
{:noreply, state}
end
end
Expand All @@ -101,23 +101,20 @@ defmodule Klife.Connection.Controller do
)
end)

:persistent_term.put(
{:known_brokers_ids, state.cluster_name},
Enum.map(state.known_brokers, &elem(&1, 0))
)
new_brokers =
(state.known_brokers ++ brokers_list)
|> Enum.map(&elem(&1, 0))
|> Enum.uniq()

:persistent_term.put({:known_brokers_ids, state.cluster_name}, new_brokers)

if from != nil, do: GenServer.reply(from, :ok)

{:noreply, state}
end

def handle_info({:remove_brokers, brokers_list}, %__MODULE__{} = state) do
:persistent_term.put(
{:known_brokers_ids, state.cluster_name},
Enum.map(state.known_brokers -- brokers_list, &elem(&1, 0))
)

Enum.map(brokers_list, fn {broker_id, _url} ->
Enum.each(brokers_list, fn {broker_id, _url} ->
case registry_lookup({Broker, broker_id, state.cluster_name}) do
[] ->
:ok
Expand All @@ -130,6 +127,10 @@ defmodule Klife.Connection.Controller do
end
end)

new_brokers = Enum.map(state.known_brokers -- brokers_list, &elem(&1, 0))

:persistent_term.put({:known_brokers_ids, state.cluster_name}, new_brokers)

{:noreply, state}
end

Expand Down
4 changes: 3 additions & 1 deletion lib/klife/connection/message_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Klife.Connection.MessageVersions do

defp do_setup_versions([], _, _), do: :ok

# TODO: Handle non required messages
defp do_setup_versions([{mod, client_data} | rest], cluster_map, cluster_name) do
api_key = apply(mod, :api_key, [])

Expand Down Expand Up @@ -42,7 +43,8 @@ defmodule Klife.Connection.MessageVersions do
{M.ApiVersions, %{min: 0, max: 0, should_raise?: true}},
{M.CreateTopics, %{min: 0, max: 0, should_raise?: false}},
{M.Metadata, %{min: 1, max: 1, should_raise?: true}},
{M.Produce, %{min: 0, max: 0, should_raise?: false}}
{M.Produce, %{min: 0, max: 0, should_raise?: false}},
{M.InitProducerId, %{min: 0, max: 0, should_raise?: false}}
]
end

Expand Down
22 changes: 19 additions & 3 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ defmodule Klife.Producer.Dispatcher do
new_state =
state
|> add_record(record, topic, partition, pid, rec_size)
|> schedule_dispatch(20)
|> schedule_dispatch(10)

{:reply, :ok, new_state}

Expand All @@ -113,7 +113,7 @@ defmodule Klife.Producer.Dispatcher do
state
|> add_record(record, topic, partition, pid, rec_size)
|> dispatch_to_broker(pool_idx)
|> schedule_dispatch(20)
|> schedule_dispatch(10)

{:reply, :ok, new_sate}
end
Expand Down Expand Up @@ -142,7 +142,7 @@ defmodule Klife.Producer.Dispatcher do
{:noreply, schedule_dispatch(new_state, linger_ms - (now - last_batch_sent_at))}

not in_flight_available? ->
{:noreply, schedule_dispatch(new_state, 20)}
{:noreply, schedule_dispatch(new_state, 10)}

should_reschedule? ->
new_state =
Expand All @@ -165,6 +165,22 @@ defmodule Klife.Producer.Dispatcher do
{:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}}
end

def handle_info({:broker_delivery_error, pool_idx, :timeout}, %__MODULE__{} = state) do
%__MODULE__{
in_flight_pool: in_flight_pool
} = state

Logger.error("""
Timeout error while produce to broker.
cluster: #{state.producer_config.cluster_name}
broker_id: #{state.broker_id}
producer_name: #{state.producer_config.name}
""")

{:noreply, %{state | in_flight_pool: List.replace_at(in_flight_pool, pool_idx, nil)}}
end

## State Operations

def reset_current_data(%__MODULE__{} = state) do
Expand Down
1 change: 1 addition & 0 deletions lib/klife/producer/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ defmodule Klife.Producer.Supervisor do
@impl true
def init(opts) do
cluster_name = Keyword.fetch!(opts, :cluster_name)

children = [
{Task.Supervisor, name: via_tuple({Klife.Producer.DispatcherTaskSupervisor, cluster_name})},
{Klife.Producer.ProducerSupervisor, opts},
Expand Down

0 comments on commit 0c9b715

Please sign in to comment.