Skip to content

Commit

Permalink
feat: default producer and partitioner option
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 16, 2024
1 parent 738c025 commit 74f76c3
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 200 deletions.
4 changes: 4 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
:ok = Klife.Utils.create_topics()

opts = [strategy: :one_for_one, name: Test.Supervisor]
{:ok, _} = Supervisor.start_link([MyClient], opts)
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
- SASL

- Producer System
- Rename client to client
- Add default producer and partition as client option
- Implement test helper functions (assert_produced)
- Improve input errors handling
- Accept more versions of the protocol
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Config

config :klife, MyTestClient,
config :klife, MyClient,
connection: [
bootstrap_servers: ["localhost:19092", "localhost:29092"],
# bootstrap_servers: ["localhost:19093", "localhost:29093"],
Expand Down
82 changes: 62 additions & 20 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Klife.Client do
@type list_of_records :: list(record)

@default_producer_name :klife_default_producer
@default_partitioner Klife.Producer.DefaultPartitioner
@default_txn_pool :klife_default_txn_pool

@doc false
Expand All @@ -20,6 +21,20 @@ defmodule Klife.Client do
required: true,
keys: Klife.Connection.Controller.get_opts()
],
default_producer: [
type: :atom,
required: false,
default: @default_producer_name,
doc:
"Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option."
],
default_partitioner: [
type: :atom,
required: false,
default: @default_partitioner,
doc:
"Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option."
],
default_txn_pool: [
type: :atom,
required: false,
Expand Down Expand Up @@ -62,7 +77,7 @@ defmodule Klife.Client do
When used it expects an `:otp_app` option that is the OTP application that has the client configuration.
```elixir
defmodule MyApp.MyTestClient do
defmodule MyApp.MyClient do
use Klife.Client, otp_app: :my_app
end
```
Expand All @@ -73,8 +88,8 @@ defmodule Klife.Client do
>
> - Define it as a proxy to a subset of the functions on `Klife` module,
> using it's module's name as the `client_name` parameter.
> One example of this is the `MyTestClient.produce/2` that forwards
> both arguments to `Klife.produce/3` and inject `MyTestClient` as the
> One example of this is the `MyClient.produce/2` that forwards
> both arguments to `Klife.produce/3` and inject `MyClient` as the
> second argument.
>
> - Define it as a supervisor by calling `use Supervisor` and implementing
Expand Down Expand Up @@ -123,7 +138,7 @@ defmodule Klife.Client do
def start(_type, _args) do
children = [
# some other modules...,
MyApp.MyTestClient
MyApp.MyClient
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Expand Down Expand Up @@ -160,7 +175,7 @@ defmodule Klife.Client do
```elixir
rec = %Klife.Record{value: "some_val", topic: "my_topic"}
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyTestClient.produce(rec)
{:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec)
```
"""
Expand All @@ -180,7 +195,7 @@ defmodule Klife.Client do
## Examples
iex> rec = %Klife.Record{value: "my_val", topic: "my_topic"}
iex> {:ok, %Klife.Record{} = enriched_rec} = MyTestClient.produce(rec)
iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec)
iex> true = is_number(enriched_rec.offset)
iex> true = is_number(enriched_rec.partition)
Expand Down Expand Up @@ -217,38 +232,63 @@ defmodule Klife.Client do
end

defp default_txn_pool_key(), do: {__MODULE__, :default_txn_pool}
defp default_producer_key(), do: {__MODULE__, :default_producer}
defp default_partitioner_key(), do: {__MODULE__, :default_partitioner}

def get_default_txn_pool(), do: :persistent_term.get(default_txn_pool_key())
def get_default_producer(), do: :persistent_term.get(default_producer_key())
def get_default_partitioner(), do: :persistent_term.get(default_partitioner_key())

@doc false
@impl Supervisor
def init(_args) do
config = Application.get_env(@otp_app, __MODULE__)

default_producer = [name: Klife.Client.default_producer_name()]
default_txn_pool = [name: Klife.Client.default_txn_pool_name()]
validated_opts = NimbleOptions.validate!(config, @input_opts)

enriched_config =
config
|> Keyword.update(:producers, [], fn l -> [default_producer | l] end)
|> Keyword.update(:txn_pools, [], fn l -> [default_txn_pool | l] end)
default_producer_name = validated_opts[:default_producer]
default_txn_pool_name = validated_opts[:default_txn_pool]
default_partitioner = validated_opts[:default_partitioner]

validated_opts =
enriched_config
|> NimbleOptions.validate!(@input_opts)
parsed_opts =
validated_opts
|> Keyword.update(:producers, [], fn l ->
default_producer =
NimbleOptions.validate!(
[name: default_producer_name],
Klife.Producer.get_opts()
)

Enum.uniq_by(l ++ [default_producer], fn p -> p[:name] end)
end)
|> Keyword.update(:txn_pools, [], fn l ->
default_txn_pool =
NimbleOptions.validate!(
[name: default_txn_pool_name],
Klife.TxnProducerPool.get_opts()
)

Enum.uniq_by(l ++ [default_txn_pool], fn p -> p[:name] end)
end)
|> Keyword.update!(:topics, fn l ->
Enum.map(l, fn topic ->
topic
|> Keyword.put_new(:default_producer, default_producer_name)
|> Keyword.put_new(:default_partitioner, default_partitioner)
end)
end)
|> Keyword.update!(:producers, fn l -> Enum.map(l, &Map.new/1) end)
|> Keyword.update!(:txn_pools, fn l -> Enum.map(l, &Map.new/1) end)
|> Keyword.update!(:topics, fn l -> Enum.map(l, &Map.new/1) end)

client_name_map = %{}

conn_opts =
validated_opts
parsed_opts
|> Keyword.fetch!(:connection)
|> Map.new()
|> Map.put(:client_name, __MODULE__)

producer_opts =
validated_opts
parsed_opts
|> Keyword.take([:producers, :txn_pools, :topics])
|> Keyword.merge(client_name: __MODULE__)
|> Map.new()
Expand All @@ -258,7 +298,9 @@ defmodule Klife.Client do
{Klife.Producer.Supervisor, producer_opts}
]

:persistent_term.put(default_txn_pool_key(), validated_opts[:default_txn_pool])
:persistent_term.put(default_txn_pool_key(), parsed_opts[:default_txn_pool])
:persistent_term.put(default_producer_key(), parsed_opts[:default_producer])
:persistent_term.put(default_partitioner_key(), parsed_opts[:default_partitioner])

Supervisor.init(children, strategy: :one_for_one)
end
Expand Down
2 changes: 1 addition & 1 deletion lib/klife/producer/txn_producer_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule Klife.TxnProducerPool do
txn_timeout_ms: [
type: :non_neg_integer,
default: :timer.seconds(90),
docs:
doc:
"The maximum amount of time, in milliseconds, that a transactional producer is allowed to remain open without either committing or aborting a transaction before it is considered expired"
]
]
Expand Down
2 changes: 0 additions & 2 deletions lib/klife/topic.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ defmodule Klife.Topic do
default_producer: [
type: :atom,
required: false,
default: :klife_default_producer,
doc: "Define the default producer to be used on produce API calls."
],
default_partitioner: [
type: :atom,
required: false,
default: Klife.Producer.DefaultPartitioner,
doc:
"Define the default partitioner module to be used on produce API calls. Must implement `Klife.Behaviours.Partitioner`"
]
Expand Down
2 changes: 1 addition & 1 deletion lib/klife/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ defmodule Klife.Utils do
end

defp create_topics_call() do
client_opts = Application.fetch_env!(:klife, MyTestClient)
client_opts = Application.fetch_env!(:klife, MyClient)

conn_defaults =
Klife.Connection.Controller.get_opts()
Expand Down
20 changes: 10 additions & 10 deletions lib/mix/tasks/benchmark.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ if Mix.env() in [:dev] do

:ok = Klife.Utils.create_topics()
opts = [strategy: :one_for_one, name: Benchmark.Supervisor]
{:ok, _} = Supervisor.start_link([MyTestClient], opts)
{:ok, _} = Supervisor.start_link([MyClient], opts)

Process.sleep(1_000)
apply(Mix.Tasks.Benchmark, :do_run_bench, args)
Expand Down Expand Up @@ -118,9 +118,9 @@ if Mix.env() in [:dev] do
rec1 = Enum.random(records_1)
rec2 = Enum.random(records_2)

t0 = Task.async(fn -> MyTestClient.produce(rec0) end)
t1 = Task.async(fn -> MyTestClient.produce(rec1) end)
t2 = Task.async(fn -> MyTestClient.produce(rec2) end)
t0 = Task.async(fn -> MyClient.produce(rec0) end)
t1 = Task.async(fn -> MyClient.produce(rec1) end)
t2 = Task.async(fn -> MyClient.produce(rec2) end)

[{:ok, _}, {:ok, _}, {:ok, _}] = Task.await_many([t0, t1, t2])
end,
Expand Down Expand Up @@ -216,14 +216,14 @@ if Mix.env() in [:dev] do
rec1 = Enum.random(records_1)
rec2 = Enum.random(records_2)

[{:ok, _}, {:ok, _}, {:ok, _}] = MyTestClient.produce_batch([rec0, rec1, rec2])
[{:ok, _}, {:ok, _}, {:ok, _}] = MyClient.produce_batch([rec0, rec1, rec2])
end,
"produce_batch_txn" => fn ->
rec0 = Enum.random(records_0)
rec1 = Enum.random(records_1)
rec2 = Enum.random(records_2)

{:ok, [_rec1, _rec2, _rec3]} = MyTestClient.produce_batch_txn([rec0, rec1, rec2])
{:ok, [_rec1, _rec2, _rec3]} = MyClient.produce_batch_txn([rec0, rec1, rec2])
end
},
time: 15,
Expand All @@ -250,13 +250,13 @@ if Mix.env() in [:dev] do
Benchee.run(
%{
"klife" => fn ->
{:ok, _rec} = MyTestClient.produce(Enum.random(records_0))
{:ok, _rec} = MyClient.produce(Enum.random(records_0))
end,
"klife multi inflight" => fn ->
{:ok, _rec} = MyTestClient.produce(Enum.random(in_flight_records))
{:ok, _rec} = MyClient.produce(Enum.random(in_flight_records))
end,
"klife multi inflight linger" => fn ->
{:ok, _rec} = MyTestClient.produce(Enum.random(in_flight_linger_records))
{:ok, _rec} = MyClient.produce(Enum.random(in_flight_linger_records))
end
},
time: 15,
Expand Down Expand Up @@ -295,7 +295,7 @@ if Mix.env() in [:dev] do
Enum.map(tasks_recs_to_send, fn recs ->
Task.async(fn ->
Enum.map(recs, fn rec ->
{:ok, _rec} = MyTestClient.produce(rec)
{:ok, _rec} = MyClient.produce(rec)
end)
end)
end)
Expand Down
Loading

0 comments on commit 74f76c3

Please sign in to comment.