Skip to content

Commit

Permalink
fix: insert default producer when input is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Jun 17, 2024
1 parent 74f76c3 commit 63572ba
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 9 deletions.
6 changes: 4 additions & 2 deletions lib/klife/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ defmodule Klife.Client do
type: {:list, {:keyword_list, Klife.TxnProducerPool.get_opts()}},
type_doc: "List of `Klife.TxnProducerPool` configurations",
required: false,
default: [],
doc:
"List of configurations, each starting a pool of transactional producers for use with transactional api. A default pool is always created."
],
producers: [
type: {:list, {:keyword_list, Klife.Producer.get_opts()}},
type_doc: "List of `Klife.Producer` configurations",
required: false,
default: [],
doc:
"List of configurations, each starting a new producer for use with produce api. A default producer is always created."
],
Expand Down Expand Up @@ -252,7 +254,7 @@ defmodule Klife.Client do

parsed_opts =
validated_opts
|> Keyword.update(:producers, [], fn l ->
|> Keyword.update!(:producers, fn l ->
default_producer =
NimbleOptions.validate!(
[name: default_producer_name],
Expand All @@ -261,7 +263,7 @@ defmodule Klife.Client do

Enum.uniq_by(l ++ [default_producer], fn p -> p[:name] end)
end)
|> Keyword.update(:txn_pools, [], fn l ->
|> Keyword.update!(:txn_pools, fn l ->
default_txn_pool =
NimbleOptions.validate!(
[name: default_txn_pool_name],
Expand Down
1 change: 0 additions & 1 deletion lib/klife/producer/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,6 @@ defmodule Klife.Producer do
# main metadata ets table, therefore we need a way to
# find out it's value.
put_batcher_id(client_name, producer_name, t_name, p_idx, b_id)

if ProducerController.get_default_producer(client_name, t_name, p_idx) == producer_name do
ProducerController.update_batcher_id(client_name, t_name, p_idx, b_id)
end
Expand Down
10 changes: 7 additions & 3 deletions lib/klife/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ defmodule Klife.Test do

def assert_offset(
client,
%Record{topic: topic, partition: partition} = expected_record,
%Record{topic: topic} = expected_record,
offset,
opts \\ []
) do
partition = Keyword.get(opts, :partition, expected_record.partition)
iso_lvl = Keyword.get(opts, :isolation, :committed)
txn_status = Keyword.get(opts, :txn_status, :committed)

Expand All @@ -28,8 +29,11 @@ defmodule Klife.Test do
assert status == txn_status

Enum.each(Map.from_struct(expected_record), fn {k, v} ->
if k in [:value, :key, :headers] do
assert v == Map.get(stored_record, k)
case k do
:value -> assert v == stored_record.value
:headers -> assert (v || []) == stored_record.headers
:key -> assert v == stored_record.key
_ -> :noop
end
end)
end
Expand Down
6 changes: 3 additions & 3 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ defmodule Klife.ProducerTest do
} = resp_rec} = MyClient.produce(record)

assert_resp_record(record, resp_rec)
assert :ok = assert_offset(MyClient, resp_rec, offset)
assert :ok = assert_offset(MyClient, record, offset, partition: partition)

record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, offset)
assert length(record_batch) == 1
Expand All @@ -527,7 +527,7 @@ defmodule Klife.ProducerTest do
} = resp_rec} = MyClient.produce(record)

assert_resp_record(record, resp_rec)
assert :ok = assert_offset(MyClient, resp_rec, offset)
assert :ok = assert_offset(MyClient, record, offset, partition: 3)

record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 3, offset)
assert length(record_batch) == 1
Expand All @@ -549,7 +549,7 @@ defmodule Klife.ProducerTest do
} = resp_rec} = MyClient.produce(record, partitioner: Klife.TestCustomPartitioner)

assert_resp_record(record, resp_rec)
assert :ok = assert_offset(MyClient, resp_rec, offset)
assert :ok = assert_offset(MyClient, record, offset, partition: 4)

record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 4, offset)
assert length(record_batch) == 1
Expand Down

0 comments on commit 63572ba

Please sign in to comment.