Skip to content

Commit

Permalink
feat: dynamically disable features based on api versions negotiation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 23, 2024
1 parent 18a41f5 commit fc557be
Show file tree
Hide file tree
Showing 9 changed files with 157 additions and 58 deletions.
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ config :klife, MyClient,
# delay_send: true
# ]
],
# disabled_features: [:producer],
txn_pools: [
[name: :my_test_pool_1, pool_size: 1]
],
Expand Down
33 changes: 30 additions & 3 deletions lib/klife.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ defmodule Klife do
alias Klife.Producer
alias Klife.TxnProducerPool
alias Klife.Producer.Controller, as: PController
alias Klife.Connection.Controller, as: ConnController

@produce_opts [
producer: [
Expand Down Expand Up @@ -59,19 +60,39 @@ defmodule Klife do
def produce_batch([%Record{} | _] = records, client, opts \\ []) do
records = prepare_records(records, client, opts)

if TxnProducerPool.in_txn?(client),
do: TxnProducerPool.produce(records, client, opts),
else: Producer.produce(records, client, opts)
if TxnProducerPool.in_txn?(client) do
TxnProducerPool.produce(records, client, opts)
else
if ConnController.disabled_feature?(client, :producer) do
raise """
You have tried to call the Produce API, but the producer feature is disabled. Check logs for details.
"""
end

Producer.produce(records, client, opts)
end
end

@doc false
def produce_async(%Record{} = record, client, opts \\ []) do
if ConnController.disabled_feature?(client, :producer) do
raise """
You have tried to call the Produce API, but the producer feature is disabled. Check logs for details.
"""
end

prepared_rec = prepare_records(record, client, opts)
Producer.produce_async([prepared_rec], client, opts)
end

@doc false
def produce_batch_async([%Record{} | _] = records, client, opts \\ []) do
if ConnController.disabled_feature?(client, :producer) do
raise """
You have tried to call the Produce API, but the producer feature is disabled. Check logs for details.
"""
end

case opts[:callback] do
nil ->
records = prepare_records(records, client, opts)
Expand All @@ -98,6 +119,12 @@ defmodule Klife do

@doc false
def transaction(fun, client, opts \\ []) do
if ConnController.disabled_feature?(client, :txn_producer) do
raise """
You have tried to call the Transaction API, but the txn_producer feature is disabled. Check logs for details.
"""
end

TxnProducerPool.run_txn(client, get_txn_pool(client, opts), fun)
end

Expand Down
11 changes: 11 additions & 0 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ defmodule Klife.Client do
required: false,
doc: "List of topics that may have special configurations",
default: []
],
disabled_features: [
type: {:list, {:in, [:producer, :txn_producer]}},
type_doc: "List atoms representing a features to disable.",
required: false,
doc: "`:producer` disable producer feature. `:txn_producer` disables transactions.",
default: []
]
]

Expand Down Expand Up @@ -533,6 +540,10 @@ defmodule Klife.Client do
:persistent_term.put(default_producer_key, parsed_opts[:default_producer])
:persistent_term.put(default_partitioner_key, parsed_opts[:default_partitioner])

Enum.each(parsed_opts[:disabled_features], fn feature ->
Klife.Connection.Controller.disable_feature(feature, module)
end)

Supervisor.init(children, strategy: :one_for_one)
end
end
1 change: 0 additions & 1 deletion lib/klife/connection/broker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ defmodule Klife.Connection.Broker do
alias Klife.Connection.Controller
alias Klife.Connection.MessageVersions


@reconnect_delays_seconds [1, 1, 1, 1, 5, 5, 10]

defstruct [
Expand Down
59 changes: 59 additions & 0 deletions lib/klife/connection/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule Klife.Connection.Controller do

import Klife.ProcessRegistry, only: [via_tuple: 1, registry_lookup: 1]

require Logger

alias Klife.PubSub

alias KlifeProtocol.Messages
Expand Down Expand Up @@ -300,6 +302,63 @@ defmodule Klife.Connection.Controller do
end
end

def disable_feature(:connection, client_name) do
raise "Could not agree on a required message for the connection system on client #{inspect(client_name)} . Check logs for details."
end

def disable_feature(:producer = feature, client_name) do
Logger.warning("""
Producer feature will be disabled.
This may happen because of:
- API version negotiation failures
- Client configuration
""")

:persistent_term.put({__MODULE__, client_name, feature, :disabled?}, true)
end

def disable_feature(:txn_producer = feature, client_name) do
Logger.warning("""
Transactions feature will be disabled.
This may happen because of:
- API version negotiation failures
- Client configuration
""")

:persistent_term.put({__MODULE__, client_name, feature, :disabled?}, true)
end

def disable_feature(:producer_idempotence = feature, client_name) do
Logger.warning("""
Producer idempotence feature will be disabled .
This may happen because of:
- API version negotiation failures
""")

:persistent_term.put({__MODULE__, client_name, feature, :disabled?}, true)
end

def disable_feature(:sasl = feature, client_name) do
Logger.warning("""
SASL feature will be disabled.
This may happen because of:
- API version negotiation failures
""")

:persistent_term.put({__MODULE__, client_name, feature, :disabled?}, true)
end

def disabled_feature?(client_name, feature),
do: :persistent_term.get({__MODULE__, client_name, feature, :disabled?}, false)

## PRIVATE FUNCTIONS

defp handle_brokers(to_start, to_remove, %__MODULE__{} = state) do
Expand Down
68 changes: 22 additions & 46 deletions lib/klife/connection/message_versions.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Klife.Connection.MessageVersions do
@moduledoc false

alias KlifeProtocol.Messages, as: M
alias Klife.Connection.Controller

require Logger

Expand All @@ -17,64 +18,39 @@ defmodule Klife.Connection.MessageVersions do
defp do_setup_versions([{mod, client_data} | rest], server_map, client_name) do
api_key = mod.api_key()

server_data = Map.get(server_map, api_key, :not_found)

found_on_broker? = server_data != :not_found
raise? = Map.get(client_data, :raise?, true)

with true <- found_on_broker?,
with server_data = %{} <- Map.get(server_map, api_key, :not_found),
common_version <- min(server_data.max, client_data.max),
false <- common_version < server_data.min or common_version < client_data.min do
:ok = set_api_version(client_name, mod, common_version)
do_setup_versions(rest, server_map, client_name)
else
_err ->
if raise? do
raise(
"Could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
else
warning_for_message(mod, api_key, client_name)
do_setup_versions(rest, server_map, client_name)
end
end
end
Logger.warning(
"Some features may be disabled because could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)

defp warning_for_message(
mod,
api_key,
client_name
)
when mod in [M.AddPartitionsToTxn] do
Logger.warning(
"Transactions will not work because could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
end
Enum.each(client_data.required_for, fn feature ->
Controller.disable_feature(feature, client_name)
end)

defp warning_for_message(
mod,
api_key,
client_name
) do
Logger.warning(
"Some features may not work because could not agree on API version for #{inspect(mod)} api_key #{api_key} for client #{client_name}."
)
do_setup_versions(rest, server_map, client_name)
end
end

defp client_versions do
[
{M.ApiVersions, %{min: 0, max: 0}},
{M.CreateTopics, %{min: 0, max: 0}},
{M.Metadata, %{min: 1, max: 1}},
{M.Produce, %{min: 3, max: 9}},
{M.InitProducerId, %{min: 0, max: 0}},
{M.Fetch, %{min: 4, max: 4}},
{M.ListOffsets, %{min: 2, max: 2}},
{M.AddPartitionsToTxn, %{min: 4, max: 4, raise?: false}},
{M.FindCoordinator, %{min: 4, max: 4}},
{M.EndTxn, %{min: 3, max: 3}},
{M.SaslHandshake, %{min: 1, max: 1}},
{M.SaslAuthenticate, %{min: 1, max: 1}}
{M.ApiVersions, %{min: 0, max: 0, required_for: [:connection]}},
{M.CreateTopics, %{min: 0, max: 0, required_for: []}},
{M.Metadata, %{min: 1, max: 1, required_for: [:connection]}},
{M.Produce, %{min: 3, max: 9, required_for: [:producer, :txn_producer]}},
{M.InitProducerId, %{min: 0, max: 0, required_for: [:producer_idempotence]}},
{M.Fetch, %{min: 4, max: 4, required_for: []}},
{M.ListOffsets, %{min: 2, max: 2, required_for: []}},
{M.AddPartitionsToTxn, %{min: 4, max: 4, required_for: [:txn_producer]}},
{M.FindCoordinator, %{min: 4, max: 4, required_for: [:txn_producer]}},
{M.EndTxn, %{min: 3, max: 3, required_for: [:txn_producer]}},
{M.SaslHandshake, %{min: 1, max: 1, required_for: [:sasl]}},
{M.SaslAuthenticate, %{min: 1, max: 1, required_for: [:sasl]}}
]
end

Expand Down
11 changes: 9 additions & 2 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,15 @@ defmodule Klife.Producer.Controller do
end

if any_new? do
:ok = handle_producers(state)
:ok = handle_txn_producers(state)
:ok =
if ConnController.disabled_feature?(client_name, :producer),
do: :ok,
else: handle_producers(state)

:ok =
if ConnController.disabled_feature?(client_name, :txn_producer),
do: :ok,
else: handle_txn_producers(state)
end

:ok
Expand Down
7 changes: 7 additions & 0 deletions lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ defmodule Klife.Producer do
do: %{state | coordinator_id: maybe_find_coordinator(state)}

defp set_producer_id(%__MODULE__{enable_idempotence: true} = state) do
if ConnController.disabled_feature?(state.client_name, :producer_idempotence) do
raise """
Producer idempotence feature is disabled but producer #{state.name} has idempotence enabled.
Please check for API versions problems or disable idempotence on this producer.
"""
end

broker = maybe_find_coordinator(state)

content = %{
Expand Down
24 changes: 18 additions & 6 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1068,13 +1068,17 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec1, offset1, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec1, offset1, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec1, offset1,
isolation: :uncommitted
)

assert :not_found =
TestUtils.assert_offset(MyClient, rec2, offset2, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec2, offset2, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec2, offset2,
isolation: :uncommitted
)

{:ok, resp}
end,
Expand All @@ -1099,19 +1103,25 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec3, offset3, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec3, offset3, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec3, offset3,
isolation: :uncommitted
)

assert :not_found =
TestUtils.assert_offset(MyClient, rec4, offset4, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec4, offset4, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec4, offset4,
isolation: :uncommitted
)

assert :not_found =
TestUtils.assert_offset(MyClient, rec5, offset5, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec5, offset5, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec5, offset5,
isolation: :uncommitted
)

Process.put(:raised_offsets, {offset3, offset4, offset5})
raise "crazy error"
Expand All @@ -1135,7 +1145,9 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec6, offset6, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec6, offset6, isolation: :uncommitted)
TestUtils.assert_offset(MyClient, rec6, offset6,
isolation: :uncommitted
)

{:ok, resp}
end,
Expand Down

0 comments on commit fc557be

Please sign in to comment.