Skip to content

Commit

Permalink
chore: reduce find coordinator version and add logs
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 23, 2024
1 parent 5e70abe commit 37321c2
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
2 changes: 1 addition & 1 deletion lib/klife/connection/message_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ defmodule Klife.Connection.MessageVersions do
{M.Fetch, %{min: 4, max: 4, required_for: []}},
{M.ListOffsets, %{min: 2, max: 2, required_for: []}},
{M.AddPartitionsToTxn, %{min: 4, max: 4, required_for: [:txn_producer]}},
{M.FindCoordinator, %{min: 4, max: 4, required_for: [:txn_producer]}},
{M.FindCoordinator, %{min: 1, max: 1, required_for: [:txn_producer]}},
{M.EndTxn, %{min: 3, max: 3, required_for: [:txn_producer]}},
{M.SaslHandshake, %{min: 1, max: 1, required_for: [:sasl]}},
{M.SaslAuthenticate, %{min: 1, max: 1, required_for: [:sasl]}}
Expand Down
22 changes: 19 additions & 3 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ defmodule Klife.Producer do

import Klife.ProcessRegistry, only: [via_tuple: 1, registry_lookup: 1]

require Logger

alias Klife.Record

alias Klife.Producer.Batcher
Expand Down Expand Up @@ -364,7 +366,14 @@ defmodule Klife.Producer do
{:ok, %{content: %{error_code: 0, producer_id: producer_id, producer_epoch: p_epoch}}} ->
{producer_id, p_epoch}

_data ->
{:ok, %{content: %{error_code: ec}}} ->
Logger.error(
"Error code #{ec} returned from broker for client #{state.client_name} on #{inspect(M.InitProducerId)} call"
)

:retry

_ ->
:retry
end
end
Expand All @@ -384,14 +393,21 @@ defmodule Klife.Producer do
defp maybe_find_coordinator(%__MODULE__{txn_id: txn_id} = state) do
content = %{
key_type: 1,
coordinator_keys: [txn_id]
key: txn_id
}

fun = fn ->
case Broker.send_message(M.FindCoordinator, state.client_name, :any, content) do
{:ok, %{content: %{coordinators: [%{error_code: 0, node_id: broker_id}]}}} ->
{:ok, %{content: %{error_code: 0, node_id: broker_id}}} ->
broker_id

{:ok, %{content: %{error_code: ec}}} ->
Logger.error(
"Error code #{ec} returned from broker for client #{state.client_name} on #{inspect(M.FindCoordinator)} call"
)

:retry

_data ->
:retry
end
Expand Down

0 comments on commit 37321c2

Please sign in to comment.