Skip to content

Commit

Permalink
chore: accept more produce versions and do not require addPartitionsTxn
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 21, 2024
1 parent 21fb4a6 commit 9159024
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions lib/klife/connection/message_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ defmodule Klife.Connection.MessageVersions do

alias KlifeProtocol.Messages, as: M

require Logger

def get(client_name, mod), do: :persistent_term.get({:api_version, mod, client_name})

def setup_versions(server_data, client_name),
do: do_setup_versions(client_versions(), server_data, client_name)
def setup_versions(server_data, client_name) do
do_setup_versions(client_versions(), server_data, client_name)
end

defp do_setup_versions([], _, _), do: :ok

Expand All @@ -16,36 +19,58 @@ defmodule Klife.Connection.MessageVersions do

server_data = Map.get(server_map, api_key, :not_found)

not_found_on_broker? = server_data == :not_found

if not_found_on_broker?,
do: raise("Could not find required message #{inspect(mod)} for client #{client_name}")

common_version = min(server_data.max, client_data.max)
found_on_broker? = server_data != :not_found
raise? = Map.get(client_data, :raise?, true)

invalid_common_version? =
common_version < server_data.min or common_version < client_data.min
with true <- found_on_broker?,
common_version <- min(server_data.max, client_data.max),
false <- common_version < server_data.min or common_version < client_data.min do
:ok = set_api_version(client_name, mod, common_version)
do_setup_versions(rest, server_map, client_name)
else
_err ->
if raise? do
raise(
"Could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
else
warning_for_message(mod, api_key, client_name)
do_setup_versions(rest, server_map, client_name)
end
end
end

cond do
not invalid_common_version? ->
:ok = set_api_version(client_name, mod, common_version)
do_setup_versions(rest, server_map, client_name)
defp warning_for_message(
mod,
api_key,
client_name
)
when mod in [M.AddPartitionsToTxn] do
Logger.warning(
"Transactions will not work because could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
end

invalid_common_version? ->
raise "Could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}. "
end
defp warning_for_message(
mod,
api_key,
client_name
) do
Logger.warning(
"Some features may not work because could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
end

defp client_versions do
[
{M.ApiVersions, %{min: 0, max: 0}},
{M.CreateTopics, %{min: 0, max: 0}},
{M.Metadata, %{min: 1, max: 1}},
{M.Produce, %{min: 9, max: 9}},
{M.Produce, %{min: 3, max: 9}},
{M.InitProducerId, %{min: 0, max: 0}},
{M.Fetch, %{min: 4, max: 4}},
{M.ListOffsets, %{min: 2, max: 2}},
{M.AddPartitionsToTxn, %{min: 4, max: 4}},
{M.AddPartitionsToTxn, %{min: 4, max: 4, raise?: false}},
{M.FindCoordinator, %{min: 4, max: 4}},
{M.EndTxn, %{min: 3, max: 3}},
{M.SaslHandshake, %{min: 1, max: 1}},
Expand Down

0 comments on commit 9159024

Please sign in to comment.