From c45d6cee43eb5cef52df83fdcba89b0f33b2a014 Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Sat, 10 Aug 2024 12:36:55 -0300 Subject: [PATCH] feat: support non configured topics --- example/config/config.exs | 3 +-- guides/examples/client_configuration.md | 34 ++++++++++++++++++------- lib/klife/client.ex | 19 +++++++------- lib/klife/producer/controller.ex | 29 ++++++++------------- lib/klife/utils.ex | 12 +++++++++ test/producer/producer_test.exs | 17 +++++++++++++ 6 files changed, 74 insertions(+), 40 deletions(-) diff --git a/example/config/config.exs b/example/config/config.exs index 6765a25..492c9cd 100644 --- a/example/config/config.exs +++ b/example/config/config.exs @@ -4,5 +4,4 @@ config :example, Example.MySimplestClient, connection: [ bootstrap_servers: ["localhost:19092", "localhost:29092"], ssl: false - ], - topics: [[name: "my_topic_1"]] + ] diff --git a/guides/examples/client_configuration.md b/guides/examples/client_configuration.md index 799d555..8441d6f 100644 --- a/guides/examples/client_configuration.md +++ b/guides/examples/client_configuration.md @@ -9,11 +9,10 @@ Here are some client configuration examples. connection: [ bootstrap_servers: ["localhost:19092", "localhost:29092"], ssl: false - ], - topics: [[name: "my_topic_0"]] + ] ``` -This client will connect to brokers using non ssl connection and produce messages only to topic `my_topic` using the default producer and default partitioner. +This client will connect to brokers using non ssl connection and produce messages using the default producer and default partitioner. ## SSL and custom socket opts @@ -27,13 +26,12 @@ This client will connect to brokers using non ssl connection and produce message cacertfile: Path.relative("test/compose_files/ssl/ca.crt") ], socket_opts: [delay_send: true] - ], - topics: [[name: "my_topic_0"]] + ] ``` This client will connect to brokers using ssl connection, `connect_opts` and `socket_opts` are forwarded to erlang module `:ssl` in order to proper configure the socket. See the documentation for more details. -## Defining multiple producers +## Defining and using multiple producers ```elixir config :my_app, MyApp.Client, @@ -63,10 +61,10 @@ This client will connect to brokers using ssl connection, `connect_opts` and `so ] ``` -This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`. +This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`, messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer. -## Defining custom partitioner +## Defining and using custom partitioner First you need to implement a module following the `Klife.Behaviours.Partitioner` behaviour. @@ -100,7 +98,7 @@ Then, you need to use it on your configuration. ] ``` -On this client, the records produced without a specific partition will have a partition assigned using the `MyApp.MyCustomPartitioner` module. +On this client, the records produced to `my_topic_0` without a specific partition will have a partition assigned using the `MyApp.MyCustomPartitioner` module all other topics keep using the default partitioner. ## Defining multiple txn pools @@ -119,3 +117,21 @@ On this client, the records produced without a specific partition will have a pa ``` This client will have a total of 3 txn pools, the default one plus the other two defined in the configuration. You can see all the configuration options for the producers in `Klife.TxnProducerPool`. + +## Using custom default producer, partitioner and txn pool + +```elixir + config :my_app, MyApp.Client, + connection: [ + bootstrap_servers: ["localhost:19092", "localhost:29092"], + ssl: false + ], + default_producer: :my_custom_producer, + producers: [[name: :my_custom_producer, linger_ms: 1_000]], + default_partitioner: MyCustomPartitioner, + default_txn_pool: :my_txn_pool, + txn_pools: [[name: :my_txn_pool, base_txn_id: "my_custom_base_txn_id"]], + topics: [[name: "my_topic_0"]] +``` + +This cliente will have only one producer (`:my_custom_producer`) and txn pool (`:my_txn_pool`),and the default paritioner strategy will be `MyCustomPartitioner`. All this 3 configurations will be used in produce API calls to topics that does not have any override config defined in the `topics` configuration. \ No newline at end of file diff --git a/lib/klife/client.ex b/lib/klife/client.ex index f5aaab4..b693575 100644 --- a/lib/klife/client.ex +++ b/lib/klife/client.ex @@ -26,7 +26,7 @@ defmodule Klife.Client do required: false, default: @default_producer_name, doc: - "Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option." + "Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically." ], default_partitioner: [ type: :atom, @@ -40,7 +40,7 @@ defmodule Klife.Client do required: false, default: @default_txn_pool, doc: - "Name of the txn pool to be used on transactions when a `:pool_name` is not provided as an option." + "Name of the txn pool to be used on transactions when a `:pool_name` is not provided as an option. If not provided a default txn pool will be started automatically." ], txn_pools: [ type: {:list, {:keyword_list, Klife.TxnProducerPool.get_opts()}}, @@ -48,21 +48,21 @@ defmodule Klife.Client do required: false, default: [], doc: - "List of configurations, each starting a pool of transactional producers for use with transactional api. A default pool is always created." + "List of configurations, each starting a pool of transactional producers for use with transactional api." ], producers: [ type: {:list, {:keyword_list, Klife.Producer.get_opts()}}, type_doc: "List of `Klife.Producer` configurations", required: false, default: [], - doc: - "List of configurations, each starting a new producer for use with produce api. A default producer is always created." + doc: "List of configurations, each starting a new producer for use with produce api." ], topics: [ - type: {:list, {:non_empty_keyword_list, Klife.Topic.get_opts()}}, + type: {:list, {:keyword_list, Klife.Topic.get_opts()}}, type_doc: "List of `Klife.Topic` configurations", - required: true, - doc: "List of topics that will be managed by the client" + required: false, + doc: "List of topics that may have special configurations", + default: [] ] ] @@ -373,10 +373,9 @@ defmodule Klife.Client do [name: default_txn_pool_name], Klife.TxnProducerPool.get_opts() ) - Enum.uniq_by(l ++ [default_txn_pool], fn p -> p[:name] end) end) - |> Keyword.update!(:topics, fn l -> + |> Keyword.update(:topics, [], fn l -> Enum.map(l, fn topic -> topic |> Keyword.put_new(:default_producer, default_producer_name) diff --git a/lib/klife/producer/controller.ex b/lib/klife/producer/controller.ex index 34eff92..ef2fb2e 100644 --- a/lib/klife/producer/controller.ex +++ b/lib/klife/producer/controller.ex @@ -47,13 +47,6 @@ defmodule Klife.Producer.Controller do check_metadata_timer_ref: timer_ref } - Enum.each(topics_list, fn t -> - :persistent_term.put( - {__MODULE__, client_name, t.name}, - t.default_producer - ) - end) - :ets.new(topics_partitions_metadata_table(client_name), [ :set, :public, @@ -167,11 +160,9 @@ defmodule Klife.Producer.Controller do @impl true def handle_info( :check_metadata, - %__MODULE__{client_name: client_name, topics: topics} = state + %__MODULE__{client_name: client_name} = state ) do - content = %{ - topics: Enum.map(topics, fn t -> %{name: t.name} end) - } + content = %{topics: nil} case Broker.send_message(Messages.Metadata, client_name, :controller, content) do {:error, _} -> @@ -260,9 +251,6 @@ defmodule Klife.Producer.Controller do end) end - def get_producer_for_topic(client_name, topic), - do: :persistent_term.get({__MODULE__, client_name, topic}) - def get_partitioner_data(client_name, topic) do client_name |> partitioner_metadata_table() @@ -281,14 +269,15 @@ defmodule Klife.Producer.Controller do results = for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)), - config_topic = Enum.find(topics, &(&1.name == topic.name)), partition <- topic.partitions do case :ets.lookup(table_name, {topic.name, partition.partition_index}) do [] -> + config_topic = Enum.find(topics, %{}, &(&1.name == topic.name)) + :ets.insert(table_name, { {topic.name, partition.partition_index}, partition.leader_id, - config_topic.default_producer, + config_topic[:default_producer] || apply(client_name, :get_default_producer, []), # batcher_id will be defined on producer nil }) @@ -325,8 +314,9 @@ defmodule Klife.Producer.Controller do table_name = partitioner_metadata_table(client_name) - for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)), - config_topic = Enum.find(topics, &(&1.name == topic.name)) do + for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)) do + config_topic = Enum.find(topics, %{}, &(&1.name == topic.name)) + max_partition = topic.partitions |> Enum.map(& &1.partition_index) @@ -334,7 +324,8 @@ defmodule Klife.Producer.Controller do data = %{ max_partition: max_partition, - default_partitioner: config_topic.default_partitioner + default_partitioner: + config_topic[:default_partitioner] || apply(client_name, :get_default_partitioner, []) } case :ets.lookup(table_name, topic.name) do diff --git a/lib/klife/utils.ex b/lib/klife/utils.ex index 6ff25e9..8fb437a 100644 --- a/lib/klife/utils.ex +++ b/lib/klife/utils.ex @@ -88,6 +88,18 @@ defmodule Klife.Utils do } end) + non_configured_topics = [ + %{ + name: "non_configured_topic_1", + num_partitions: 10, + replication_factor: 2, + assignments: [], + configs: [] + } + ] + + topics_input = topics_input ++ non_configured_topics + :ok = %{ content: %{ diff --git a/test/producer/producer_test.exs b/test/producer/producer_test.exs index 1cdae83..633ceb5 100644 --- a/test/producer/producer_test.exs +++ b/test/producer/producer_test.exs @@ -56,6 +56,23 @@ defmodule Klife.ProducerTest do assert length(record_batch) == 1 end + test "produce message sync no batching - unconfigured topic" do + record = %Record{ + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}], + topic: "non_configured_topic_1", + partition: 1 + } + + assert {:ok, %Record{offset: offset} = resp_rec} = MyClient.produce(record) + + assert_resp_record(record, resp_rec) + assert :ok = assert_offset(MyClient, record, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, record.topic, 1, offset) + assert length(record_batch) == 1 + end + test "produce message sync using non default producer" do record = %Record{ value: :rand.bytes(10),