Skip to content

Commit

Permalink
feat: support non configured topics
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Aug 10, 2024
1 parent 19ec7d4 commit c45d6ce
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 40 deletions.
3 changes: 1 addition & 2 deletions example/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,4 @@ config :example, Example.MySimplestClient,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
topics: [[name: "my_topic_1"]]
]
34 changes: 25 additions & 9 deletions guides/examples/client_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ Here are some client configuration examples.
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
topics: [[name: "my_topic_0"]]
]
```

This client will connect to brokers using non ssl connection and produce messages only to topic `my_topic` using the default producer and default partitioner.
This client will connect to brokers using non ssl connection and produce messages using the default producer and default partitioner.

## SSL and custom socket opts

Expand All @@ -27,13 +26,12 @@ This client will connect to brokers using non ssl connection and produce message
cacertfile: Path.relative("test/compose_files/ssl/ca.crt")
],
socket_opts: [delay_send: true]
],
topics: [[name: "my_topic_0"]]
]
```

This client will connect to brokers using ssl connection, `connect_opts` and `socket_opts` are forwarded to erlang module `:ssl` in order to proper configure the socket. See the documentation for more details.

## Defining multiple producers
## Defining and using multiple producers

```elixir
config :my_app, MyApp.Client,
Expand Down Expand Up @@ -63,10 +61,10 @@ This client will connect to brokers using ssl connection, `connect_opts` and `so
]
```

This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`.
This client will have a total of 3 producers, the default one plus the other 2 defined in the configuration. You can see all the configuration options for the producers in `Klife.Producer`, messages produced to `my_topic_0` and `my_topic_1` will use `my_linger_ms_producer` and `my_custom_client_id_producer` respectively if no producer is set on opts. All other topics keep using the default producer.


## Defining custom partitioner
## Defining and using custom partitioner

First you need to implement a module following the `Klife.Behaviours.Partitioner` behaviour.

Expand Down Expand Up @@ -100,7 +98,7 @@ Then, you need to use it on your configuration.
]
```

On this client, the records produced without a specific partition will have a partition assigned using the `MyApp.MyCustomPartitioner` module.
On this client, the records produced to `my_topic_0` without a specific partition will have a partition assigned using the `MyApp.MyCustomPartitioner` module all other topics keep using the default partitioner.

## Defining multiple txn pools

Expand All @@ -119,3 +117,21 @@ On this client, the records produced without a specific partition will have a pa
```

This client will have a total of 3 txn pools, the default one plus the other two defined in the configuration. You can see all the configuration options for the producers in `Klife.TxnProducerPool`.

## Using custom default producer, partitioner and txn pool

```elixir
config :my_app, MyApp.Client,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
ssl: false
],
default_producer: :my_custom_producer,
producers: [[name: :my_custom_producer, linger_ms: 1_000]],
default_partitioner: MyCustomPartitioner,
default_txn_pool: :my_txn_pool,
txn_pools: [[name: :my_txn_pool, base_txn_id: "my_custom_base_txn_id"]],
topics: [[name: "my_topic_0"]]
```

This cliente will have only one producer (`:my_custom_producer`) and txn pool (`:my_txn_pool`),and the default paritioner strategy will be `MyCustomPartitioner`. All this 3 configurations will be used in produce API calls to topics that does not have any override config defined in the `topics` configuration.
19 changes: 9 additions & 10 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Klife.Client do
required: false,
default: @default_producer_name,
doc:
"Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option."
"Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option. If not provided a default producer will be started automatically."
],
default_partitioner: [
type: :atom,
Expand All @@ -40,29 +40,29 @@ defmodule Klife.Client do
required: false,
default: @default_txn_pool,
doc:
"Name of the txn pool to be used on transactions when a `:pool_name` is not provided as an option."
"Name of the txn pool to be used on transactions when a `:pool_name` is not provided as an option. If not provided a default txn pool will be started automatically."
],
txn_pools: [
type: {:list, {:keyword_list, Klife.TxnProducerPool.get_opts()}},
type_doc: "List of `Klife.TxnProducerPool` configurations",
required: false,
default: [],
doc:
"List of configurations, each starting a pool of transactional producers for use with transactional api. A default pool is always created."
"List of configurations, each starting a pool of transactional producers for use with transactional api."
],
producers: [
type: {:list, {:keyword_list, Klife.Producer.get_opts()}},
type_doc: "List of `Klife.Producer` configurations",
required: false,
default: [],
doc:
"List of configurations, each starting a new producer for use with produce api. A default producer is always created."
doc: "List of configurations, each starting a new producer for use with produce api."
],
topics: [
type: {:list, {:non_empty_keyword_list, Klife.Topic.get_opts()}},
type: {:list, {:keyword_list, Klife.Topic.get_opts()}},
type_doc: "List of `Klife.Topic` configurations",
required: true,
doc: "List of topics that will be managed by the client"
required: false,
doc: "List of topics that may have special configurations",
default: []
]
]

Expand Down Expand Up @@ -373,10 +373,9 @@ defmodule Klife.Client do
[name: default_txn_pool_name],
Klife.TxnProducerPool.get_opts()
)

Enum.uniq_by(l ++ [default_txn_pool], fn p -> p[:name] end)
end)
|> Keyword.update!(:topics, fn l ->
|> Keyword.update(:topics, [], fn l ->
Enum.map(l, fn topic ->
topic
|> Keyword.put_new(:default_producer, default_producer_name)
Expand Down
29 changes: 10 additions & 19 deletions lib/klife/producer/controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,6 @@ defmodule Klife.Producer.Controller do
check_metadata_timer_ref: timer_ref
}

Enum.each(topics_list, fn t ->
:persistent_term.put(
{__MODULE__, client_name, t.name},
t.default_producer
)
end)

:ets.new(topics_partitions_metadata_table(client_name), [
:set,
:public,
Expand Down Expand Up @@ -167,11 +160,9 @@ defmodule Klife.Producer.Controller do
@impl true
def handle_info(
:check_metadata,
%__MODULE__{client_name: client_name, topics: topics} = state
%__MODULE__{client_name: client_name} = state
) do
content = %{
topics: Enum.map(topics, fn t -> %{name: t.name} end)
}
content = %{topics: nil}

case Broker.send_message(Messages.Metadata, client_name, :controller, content) do
{:error, _} ->
Expand Down Expand Up @@ -260,9 +251,6 @@ defmodule Klife.Producer.Controller do
end)
end

def get_producer_for_topic(client_name, topic),
do: :persistent_term.get({__MODULE__, client_name, topic})

def get_partitioner_data(client_name, topic) do
client_name
|> partitioner_metadata_table()
Expand All @@ -281,14 +269,15 @@ defmodule Klife.Producer.Controller do

results =
for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)),
config_topic = Enum.find(topics, &(&1.name == topic.name)),
partition <- topic.partitions do
case :ets.lookup(table_name, {topic.name, partition.partition_index}) do
[] ->
config_topic = Enum.find(topics, %{}, &(&1.name == topic.name))

:ets.insert(table_name, {
{topic.name, partition.partition_index},
partition.leader_id,
config_topic.default_producer,
config_topic[:default_producer] || apply(client_name, :get_default_producer, []),
# batcher_id will be defined on producer
nil
})
Expand Down Expand Up @@ -325,16 +314,18 @@ defmodule Klife.Producer.Controller do

table_name = partitioner_metadata_table(client_name)

for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)),
config_topic = Enum.find(topics, &(&1.name == topic.name)) do
for topic <- Enum.filter(resp.topics, &(&1.error_code == 0)) do
config_topic = Enum.find(topics, %{}, &(&1.name == topic.name))

max_partition =
topic.partitions
|> Enum.map(& &1.partition_index)
|> Enum.max()

data = %{
max_partition: max_partition,
default_partitioner: config_topic.default_partitioner
default_partitioner:
config_topic[:default_partitioner] || apply(client_name, :get_default_partitioner, [])
}

case :ets.lookup(table_name, topic.name) do
Expand Down
12 changes: 12 additions & 0 deletions lib/klife/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,18 @@ defmodule Klife.Utils do
}
end)

non_configured_topics = [
%{
name: "non_configured_topic_1",
num_partitions: 10,
replication_factor: 2,
assignments: [],
configs: []
}
]

topics_input = topics_input ++ non_configured_topics

:ok =
%{
content: %{
Expand Down
17 changes: 17 additions & 0 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ defmodule Klife.ProducerTest do
assert length(record_batch) == 1
end

test "produce message sync no batching - unconfigured topic" do
record = %Record{
value: :rand.bytes(10),
key: :rand.bytes(10),
headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}],
topic: "non_configured_topic_1",
partition: 1
}

assert {:ok, %Record{offset: offset} = resp_rec} = MyClient.produce(record)

assert_resp_record(record, resp_rec)
assert :ok = assert_offset(MyClient, record, offset)
record_batch = TestUtils.get_record_batch_by_offset(MyClient, record.topic, 1, offset)
assert length(record_batch) == 1
end

test "produce message sync using non default producer" do
record = %Record{
value: :rand.bytes(10),
Expand Down

0 comments on commit c45d6ce

Please sign in to comment.