From 63572ba3db76708ae5a4384c06633a55b4580cd7 Mon Sep 17 00:00:00 2001 From: Gabriel Oliveira Date: Sun, 16 Jun 2024 21:24:49 -0300 Subject: [PATCH] fix: insert default producer when input is empty --- lib/klife/client.ex | 6 ++++-- lib/klife/producer/producer.ex | 1 - lib/klife/test.ex | 10 +++++++--- test/producer/producer_test.exs | 6 +++--- 4 files changed, 14 insertions(+), 9 deletions(-) diff --git a/lib/klife/client.ex b/lib/klife/client.ex index 8c9b76b..38509ee 100644 --- a/lib/klife/client.ex +++ b/lib/klife/client.ex @@ -46,6 +46,7 @@ defmodule Klife.Client do type: {:list, {:keyword_list, Klife.TxnProducerPool.get_opts()}}, type_doc: "List of `Klife.TxnProducerPool` configurations", required: false, + default: [], doc: "List of configurations, each starting a pool of transactional producers for use with transactional api. A default pool is always created." ], @@ -53,6 +54,7 @@ defmodule Klife.Client do type: {:list, {:keyword_list, Klife.Producer.get_opts()}}, type_doc: "List of `Klife.Producer` configurations", required: false, + default: [], doc: "List of configurations, each starting a new producer for use with produce api. A default producer is always created." ], @@ -252,7 +254,7 @@ defmodule Klife.Client do parsed_opts = validated_opts - |> Keyword.update(:producers, [], fn l -> + |> Keyword.update!(:producers, fn l -> default_producer = NimbleOptions.validate!( [name: default_producer_name], @@ -261,7 +263,7 @@ defmodule Klife.Client do Enum.uniq_by(l ++ [default_producer], fn p -> p[:name] end) end) - |> Keyword.update(:txn_pools, [], fn l -> + |> Keyword.update!(:txn_pools, fn l -> default_txn_pool = NimbleOptions.validate!( [name: default_txn_pool_name], diff --git a/lib/klife/producer/producer.ex b/lib/klife/producer/producer.ex index 39bfc29..07e0ee3 100644 --- a/lib/klife/producer/producer.ex +++ b/lib/klife/producer/producer.ex @@ -410,7 +410,6 @@ defmodule Klife.Producer do # main metadata ets table, therefore we need a way to # find out it's value. put_batcher_id(client_name, producer_name, t_name, p_idx, b_id) - if ProducerController.get_default_producer(client_name, t_name, p_idx) == producer_name do ProducerController.update_batcher_id(client_name, t_name, p_idx, b_id) end diff --git a/lib/klife/test.ex b/lib/klife/test.ex index 61dd807..5ea548d 100644 --- a/lib/klife/test.ex +++ b/lib/klife/test.ex @@ -11,10 +11,11 @@ defmodule Klife.Test do def assert_offset( client, - %Record{topic: topic, partition: partition} = expected_record, + %Record{topic: topic} = expected_record, offset, opts \\ [] ) do + partition = Keyword.get(opts, :partition, expected_record.partition) iso_lvl = Keyword.get(opts, :isolation, :committed) txn_status = Keyword.get(opts, :txn_status, :committed) @@ -28,8 +29,11 @@ defmodule Klife.Test do assert status == txn_status Enum.each(Map.from_struct(expected_record), fn {k, v} -> - if k in [:value, :key, :headers] do - assert v == Map.get(stored_record, k) + case k do + :value -> assert v == stored_record.value + :headers -> assert (v || []) == stored_record.headers + :key -> assert v == stored_record.key + _ -> :noop end end) end diff --git a/test/producer/producer_test.exs b/test/producer/producer_test.exs index 490058b..1cdae83 100644 --- a/test/producer/producer_test.exs +++ b/test/producer/producer_test.exs @@ -505,7 +505,7 @@ defmodule Klife.ProducerTest do } = resp_rec} = MyClient.produce(record) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, record, offset, partition: partition) record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, offset) assert length(record_batch) == 1 @@ -527,7 +527,7 @@ defmodule Klife.ProducerTest do } = resp_rec} = MyClient.produce(record) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, record, offset, partition: 3) record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 3, offset) assert length(record_batch) == 1 @@ -549,7 +549,7 @@ defmodule Klife.ProducerTest do } = resp_rec} = MyClient.produce(record, partitioner: Klife.TestCustomPartitioner) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, record, offset, partition: 4) record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 4, offset) assert length(record_batch) == 1