From 37321c2d1bdfeb3eb3d8c1662f6715c08e8c03df Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Fri, 22 Nov 2024 23:05:40 -0300 Subject: [PATCH] chore: reduce find coordinator version and add logs --- lib/klife/connection/message_versions.ex | 2 +- lib/klife/producer/producer.ex | 22 +++++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/klife/connection/message_versions.ex b/lib/klife/connection/message_versions.ex index 208e625..58853b0 100644 --- a/lib/klife/connection/message_versions.ex +++ b/lib/klife/connection/message_versions.ex @@ -47,7 +47,7 @@ defmodule Klife.Connection.MessageVersions do {M.Fetch, %{min: 4, max: 4, required_for: []}}, {M.ListOffsets, %{min: 2, max: 2, required_for: []}}, {M.AddPartitionsToTxn, %{min: 4, max: 4, required_for: [:txn_producer]}}, - {M.FindCoordinator, %{min: 4, max: 4, required_for: [:txn_producer]}}, + {M.FindCoordinator, %{min: 1, max: 1, required_for: [:txn_producer]}}, {M.EndTxn, %{min: 3, max: 3, required_for: [:txn_producer]}}, {M.SaslHandshake, %{min: 1, max: 1, required_for: [:sasl]}}, {M.SaslAuthenticate, %{min: 1, max: 1, required_for: [:sasl]}} diff --git a/lib/klife/producer/producer.ex b/lib/klife/producer/producer.ex index 6c0aa22..c4e3709 100644 --- a/lib/klife/producer/producer.ex +++ b/lib/klife/producer/producer.ex @@ -4,6 +4,8 @@ defmodule Klife.Producer do import Klife.ProcessRegistry, only: [via_tuple: 1, registry_lookup: 1] + require Logger + alias Klife.Record alias Klife.Producer.Batcher @@ -364,7 +366,14 @@ defmodule Klife.Producer do {:ok, %{content: %{error_code: 0, producer_id: producer_id, producer_epoch: p_epoch}}} -> {producer_id, p_epoch} - _data -> + {:ok, %{content: %{error_code: ec}}} -> + Logger.error( + "Error code #{ec} returned from broker for client #{state.client_name} on #{inspect(M.InitProducerId)} call" + ) + + :retry + + _ -> :retry end end @@ -384,14 +393,21 @@ defmodule Klife.Producer do defp maybe_find_coordinator(%__MODULE__{txn_id: txn_id} = state) do content = %{ key_type: 1, - coordinator_keys: [txn_id] + key: txn_id } fun = fn -> case Broker.send_message(M.FindCoordinator, state.client_name, :any, content) do - {:ok, %{content: %{coordinators: [%{error_code: 0, node_id: broker_id}]}}} -> + {:ok, %{content: %{error_code: 0, node_id: broker_id}}} -> broker_id + {:ok, %{content: %{error_code: ec}}} -> + Logger.error( + "Error code #{ec} returned from broker for client #{state.client_name} on #{inspect(M.FindCoordinator)} call" + ) + + :retry + _data -> :retry end