diff --git a/.iex.exs b/.iex.exs index e69de29..044a65c 100644 --- a/.iex.exs +++ b/.iex.exs @@ -0,0 +1,4 @@ +:ok = Klife.Utils.create_topics() + +opts = [strategy: :one_for_one, name: Test.Supervisor] +{:ok, _} = Supervisor.start_link([MyClient], opts) diff --git a/README.md b/README.md index b65f03e..8633bcb 100644 --- a/README.md +++ b/README.md @@ -8,8 +8,6 @@ - SASL - Producer System - - Rename client to client - - Add default producer and partition as client option - Implement test helper functions (assert_produced) - Improve input errors handling - Accept more versions of the protocol diff --git a/config/config.exs b/config/config.exs index 806a119..782a4e3 100644 --- a/config/config.exs +++ b/config/config.exs @@ -1,6 +1,6 @@ import Config -config :klife, MyTestClient, +config :klife, MyClient, connection: [ bootstrap_servers: ["localhost:19092", "localhost:29092"], # bootstrap_servers: ["localhost:19093", "localhost:29093"], diff --git a/lib/klife/client.ex b/lib/klife/client.ex index b1020c2..8c9b76b 100644 --- a/lib/klife/client.ex +++ b/lib/klife/client.ex @@ -6,6 +6,7 @@ defmodule Klife.Client do @type list_of_records :: list(record) @default_producer_name :klife_default_producer + @default_partitioner Klife.Producer.DefaultPartitioner @default_txn_pool :klife_default_txn_pool @doc false @@ -20,6 +21,20 @@ defmodule Klife.Client do required: true, keys: Klife.Connection.Controller.get_opts() ], + default_producer: [ + type: :atom, + required: false, + default: @default_producer_name, + doc: + "Name of the producer to be used on produce API calls when a specific producer is not provided via configuration or option." + ], + default_partitioner: [ + type: :atom, + required: false, + default: @default_partitioner, + doc: + "Partitioner module to be used on produce API calls when a specific partitioner is not provided via configuration or option." + ], default_txn_pool: [ type: :atom, required: false, @@ -62,7 +77,7 @@ defmodule Klife.Client do When used it expects an `:otp_app` option that is the OTP application that has the client configuration. ```elixir - defmodule MyApp.MyTestClient do + defmodule MyApp.MyClient do use Klife.Client, otp_app: :my_app end ``` @@ -73,8 +88,8 @@ defmodule Klife.Client do > > - Define it as a proxy to a subset of the functions on `Klife` module, > using it's module's name as the `client_name` parameter. - > One example of this is the `MyTestClient.produce/2` that forwards - > both arguments to `Klife.produce/3` and inject `MyTestClient` as the + > One example of this is the `MyClient.produce/2` that forwards + > both arguments to `Klife.produce/3` and inject `MyClient` as the > second argument. > > - Define it as a supervisor by calling `use Supervisor` and implementing @@ -123,7 +138,7 @@ defmodule Klife.Client do def start(_type, _args) do children = [ # some other modules..., - MyApp.MyTestClient + MyApp.MyClient ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] @@ -160,7 +175,7 @@ defmodule Klife.Client do ```elixir rec = %Klife.Record{value: "some_val", topic: "my_topic"} - {:ok, %Klife.Record{offset: offset, partition: partition}} = MyTestClient.produce(rec) + {:ok, %Klife.Record{offset: offset, partition: partition}} = MyClient.produce(rec) ``` """ @@ -180,7 +195,7 @@ defmodule Klife.Client do ## Examples iex> rec = %Klife.Record{value: "my_val", topic: "my_topic"} - iex> {:ok, %Klife.Record{} = enriched_rec} = MyTestClient.produce(rec) + iex> {:ok, %Klife.Record{} = enriched_rec} = MyClient.produce(rec) iex> true = is_number(enriched_rec.offset) iex> true = is_number(enriched_rec.partition) @@ -217,38 +232,63 @@ defmodule Klife.Client do end defp default_txn_pool_key(), do: {__MODULE__, :default_txn_pool} + defp default_producer_key(), do: {__MODULE__, :default_producer} + defp default_partitioner_key(), do: {__MODULE__, :default_partitioner} + def get_default_txn_pool(), do: :persistent_term.get(default_txn_pool_key()) + def get_default_producer(), do: :persistent_term.get(default_producer_key()) + def get_default_partitioner(), do: :persistent_term.get(default_partitioner_key()) @doc false @impl Supervisor def init(_args) do config = Application.get_env(@otp_app, __MODULE__) - default_producer = [name: Klife.Client.default_producer_name()] - default_txn_pool = [name: Klife.Client.default_txn_pool_name()] + validated_opts = NimbleOptions.validate!(config, @input_opts) - enriched_config = - config - |> Keyword.update(:producers, [], fn l -> [default_producer | l] end) - |> Keyword.update(:txn_pools, [], fn l -> [default_txn_pool | l] end) + default_producer_name = validated_opts[:default_producer] + default_txn_pool_name = validated_opts[:default_txn_pool] + default_partitioner = validated_opts[:default_partitioner] - validated_opts = - enriched_config - |> NimbleOptions.validate!(@input_opts) + parsed_opts = + validated_opts + |> Keyword.update(:producers, [], fn l -> + default_producer = + NimbleOptions.validate!( + [name: default_producer_name], + Klife.Producer.get_opts() + ) + + Enum.uniq_by(l ++ [default_producer], fn p -> p[:name] end) + end) + |> Keyword.update(:txn_pools, [], fn l -> + default_txn_pool = + NimbleOptions.validate!( + [name: default_txn_pool_name], + Klife.TxnProducerPool.get_opts() + ) + + Enum.uniq_by(l ++ [default_txn_pool], fn p -> p[:name] end) + end) + |> Keyword.update!(:topics, fn l -> + Enum.map(l, fn topic -> + topic + |> Keyword.put_new(:default_producer, default_producer_name) + |> Keyword.put_new(:default_partitioner, default_partitioner) + end) + end) |> Keyword.update!(:producers, fn l -> Enum.map(l, &Map.new/1) end) |> Keyword.update!(:txn_pools, fn l -> Enum.map(l, &Map.new/1) end) |> Keyword.update!(:topics, fn l -> Enum.map(l, &Map.new/1) end) - client_name_map = %{} - conn_opts = - validated_opts + parsed_opts |> Keyword.fetch!(:connection) |> Map.new() |> Map.put(:client_name, __MODULE__) producer_opts = - validated_opts + parsed_opts |> Keyword.take([:producers, :txn_pools, :topics]) |> Keyword.merge(client_name: __MODULE__) |> Map.new() @@ -258,7 +298,9 @@ defmodule Klife.Client do {Klife.Producer.Supervisor, producer_opts} ] - :persistent_term.put(default_txn_pool_key(), validated_opts[:default_txn_pool]) + :persistent_term.put(default_txn_pool_key(), parsed_opts[:default_txn_pool]) + :persistent_term.put(default_producer_key(), parsed_opts[:default_producer]) + :persistent_term.put(default_partitioner_key(), parsed_opts[:default_partitioner]) Supervisor.init(children, strategy: :one_for_one) end diff --git a/lib/klife/producer/txn_producer_pool.ex b/lib/klife/producer/txn_producer_pool.ex index 4b755cc..562cadc 100644 --- a/lib/klife/producer/txn_producer_pool.ex +++ b/lib/klife/producer/txn_producer_pool.ex @@ -33,7 +33,7 @@ defmodule Klife.TxnProducerPool do txn_timeout_ms: [ type: :non_neg_integer, default: :timer.seconds(90), - docs: + doc: "The maximum amount of time, in milliseconds, that a transactional producer is allowed to remain open without either committing or aborting a transaction before it is considered expired" ] ] diff --git a/lib/klife/topic.ex b/lib/klife/topic.ex index 538acb3..a4d864e 100644 --- a/lib/klife/topic.ex +++ b/lib/klife/topic.ex @@ -8,13 +8,11 @@ defmodule Klife.Topic do default_producer: [ type: :atom, required: false, - default: :klife_default_producer, doc: "Define the default producer to be used on produce API calls." ], default_partitioner: [ type: :atom, required: false, - default: Klife.Producer.DefaultPartitioner, doc: "Define the default partitioner module to be used on produce API calls. Must implement `Klife.Behaviours.Partitioner`" ] diff --git a/lib/klife/utils.ex b/lib/klife/utils.ex index 6c44fdf..6ff25e9 100644 --- a/lib/klife/utils.ex +++ b/lib/klife/utils.ex @@ -46,7 +46,7 @@ defmodule Klife.Utils do end defp create_topics_call() do - client_opts = Application.fetch_env!(:klife, MyTestClient) + client_opts = Application.fetch_env!(:klife, MyClient) conn_defaults = Klife.Connection.Controller.get_opts() diff --git a/lib/mix/tasks/benchmark.ex b/lib/mix/tasks/benchmark.ex index 0f1381b..2d8c237 100644 --- a/lib/mix/tasks/benchmark.ex +++ b/lib/mix/tasks/benchmark.ex @@ -7,7 +7,7 @@ if Mix.env() in [:dev] do :ok = Klife.Utils.create_topics() opts = [strategy: :one_for_one, name: Benchmark.Supervisor] - {:ok, _} = Supervisor.start_link([MyTestClient], opts) + {:ok, _} = Supervisor.start_link([MyClient], opts) Process.sleep(1_000) apply(Mix.Tasks.Benchmark, :do_run_bench, args) @@ -118,9 +118,9 @@ if Mix.env() in [:dev] do rec1 = Enum.random(records_1) rec2 = Enum.random(records_2) - t0 = Task.async(fn -> MyTestClient.produce(rec0) end) - t1 = Task.async(fn -> MyTestClient.produce(rec1) end) - t2 = Task.async(fn -> MyTestClient.produce(rec2) end) + t0 = Task.async(fn -> MyClient.produce(rec0) end) + t1 = Task.async(fn -> MyClient.produce(rec1) end) + t2 = Task.async(fn -> MyClient.produce(rec2) end) [{:ok, _}, {:ok, _}, {:ok, _}] = Task.await_many([t0, t1, t2]) end, @@ -216,14 +216,14 @@ if Mix.env() in [:dev] do rec1 = Enum.random(records_1) rec2 = Enum.random(records_2) - [{:ok, _}, {:ok, _}, {:ok, _}] = MyTestClient.produce_batch([rec0, rec1, rec2]) + [{:ok, _}, {:ok, _}, {:ok, _}] = MyClient.produce_batch([rec0, rec1, rec2]) end, "produce_batch_txn" => fn -> rec0 = Enum.random(records_0) rec1 = Enum.random(records_1) rec2 = Enum.random(records_2) - {:ok, [_rec1, _rec2, _rec3]} = MyTestClient.produce_batch_txn([rec0, rec1, rec2]) + {:ok, [_rec1, _rec2, _rec3]} = MyClient.produce_batch_txn([rec0, rec1, rec2]) end }, time: 15, @@ -250,13 +250,13 @@ if Mix.env() in [:dev] do Benchee.run( %{ "klife" => fn -> - {:ok, _rec} = MyTestClient.produce(Enum.random(records_0)) + {:ok, _rec} = MyClient.produce(Enum.random(records_0)) end, "klife multi inflight" => fn -> - {:ok, _rec} = MyTestClient.produce(Enum.random(in_flight_records)) + {:ok, _rec} = MyClient.produce(Enum.random(in_flight_records)) end, "klife multi inflight linger" => fn -> - {:ok, _rec} = MyTestClient.produce(Enum.random(in_flight_linger_records)) + {:ok, _rec} = MyClient.produce(Enum.random(in_flight_linger_records)) end }, time: 15, @@ -295,7 +295,7 @@ if Mix.env() in [:dev] do Enum.map(tasks_recs_to_send, fn recs -> Task.async(fn -> Enum.map(recs, fn rec -> - {:ok, _rec} = MyTestClient.produce(rec) + {:ok, _rec} = MyClient.produce(rec) end) end) end) diff --git a/test/producer/producer_test.exs b/test/producer/producer_test.exs index 014dc48..490058b 100644 --- a/test/producer/producer_test.exs +++ b/test/producer/producer_test.exs @@ -29,13 +29,13 @@ defmodule Klife.ProducerTest do partition: partition } - {:ok, _} = MyTestClient.produce(rec, client: client) + {:ok, _} = MyClient.produce(rec, client: client) end defp now_unix(), do: DateTime.utc_now() |> DateTime.to_unix() setup_all do - :ok = TestUtils.wait_producer(MyTestClient) + :ok = TestUtils.wait_producer(MyClient) %{} end @@ -48,11 +48,11 @@ defmodule Klife.ProducerTest do partition: 1 } - assert {:ok, %Record{offset: offset} = resp_rec} = MyTestClient.produce(record) + assert {:ok, %Record{offset: offset} = resp_rec} = MyClient.produce(record) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyTestClient, record, offset) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, record.topic, 1, offset) + assert :ok = assert_offset(MyClient, record, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, record.topic, 1, offset) assert length(record_batch) == 1 end @@ -66,12 +66,12 @@ defmodule Klife.ProducerTest do } assert {:ok, %Record{} = rec} = - MyTestClient.produce(record, producer: :benchmark_producer) + MyClient.produce(record, producer: :benchmark_producer) - assert :ok = assert_offset(MyTestClient, record, rec.offset) + assert :ok = assert_offset(MyClient, record, rec.offset) record_batch = - TestUtils.get_record_batch_by_offset(MyTestClient, record.topic, record.partition, rec.offset) + TestUtils.get_record_batch_by_offset(MyClient, record.topic, record.partition, rec.offset) assert length(record_batch) == 1 end @@ -79,7 +79,7 @@ defmodule Klife.ProducerTest do test "produce message sync with batching" do topic = "test_batch_topic" - wait_batch_cycle(MyTestClient, topic, 1) + wait_batch_cycle(MyClient, topic, 1) rec_1 = %Record{ value: :rand.bytes(10), @@ -91,7 +91,7 @@ defmodule Klife.ProducerTest do task_1 = Task.async(fn -> - MyTestClient.produce(rec_1) + MyClient.produce(rec_1) end) rec_2 = %Record{ @@ -106,7 +106,7 @@ defmodule Klife.ProducerTest do task_2 = Task.async(fn -> - MyTestClient.produce(rec_2) + MyClient.produce(rec_2) end) rec_3 = %Record{ @@ -121,7 +121,7 @@ defmodule Klife.ProducerTest do task_3 = Task.async(fn -> - MyTestClient.produce(rec_3) + MyClient.produce(rec_3) end) assert [ @@ -134,13 +134,13 @@ defmodule Klife.ProducerTest do assert resp_rec2.offset - resp_rec1.offset == 1 assert resp_rec3.offset - resp_rec2.offset == 1 - assert :ok = assert_offset(MyTestClient, rec_1, resp_rec1.offset) - assert :ok = assert_offset(MyTestClient, rec_2, resp_rec2.offset) - assert :ok = assert_offset(MyTestClient, rec_3, resp_rec3.offset) + assert :ok = assert_offset(MyClient, rec_1, resp_rec1.offset) + assert :ok = assert_offset(MyClient, rec_2, resp_rec2.offset) + assert :ok = assert_offset(MyClient, rec_3, resp_rec3.offset) - batch_1 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, resp_rec1.offset) - batch_2 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, resp_rec2.offset) - batch_3 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, resp_rec3.offset) + batch_1 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, resp_rec1.offset) + batch_2 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, resp_rec2.offset) + batch_3 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, resp_rec3.offset) assert length(batch_1) == 3 assert batch_1 == batch_2 and batch_2 == batch_3 @@ -150,7 +150,7 @@ defmodule Klife.ProducerTest do topic = "test_compression_topic" partition = 1 - wait_batch_cycle(MyTestClient, topic, partition) + wait_batch_cycle(MyClient, topic, partition) rec_1 = %Record{ value: :rand.bytes(10), @@ -162,7 +162,7 @@ defmodule Klife.ProducerTest do task_1 = Task.async(fn -> - MyTestClient.produce(rec_1) + MyClient.produce(rec_1) end) rec_2 = %Record{ @@ -177,7 +177,7 @@ defmodule Klife.ProducerTest do task_2 = Task.async(fn -> - MyTestClient.produce(rec_2) + MyClient.produce(rec_2) end) rec_3 = %Record{ @@ -192,7 +192,7 @@ defmodule Klife.ProducerTest do task_3 = Task.async(fn -> - MyTestClient.produce(rec_3) + MyClient.produce(rec_3) end) assert [ @@ -205,19 +205,19 @@ defmodule Klife.ProducerTest do assert resp_rec2.offset - resp_rec1.offset == 1 assert resp_rec3.offset - resp_rec2.offset == 1 - assert :ok = assert_offset(MyTestClient, rec_1, resp_rec1.offset) - assert :ok = assert_offset(MyTestClient, rec_2, resp_rec2.offset) - assert :ok = assert_offset(MyTestClient, rec_3, resp_rec3.offset) + assert :ok = assert_offset(MyClient, rec_1, resp_rec1.offset) + assert :ok = assert_offset(MyClient, rec_2, resp_rec2.offset) + assert :ok = assert_offset(MyClient, rec_3, resp_rec3.offset) - batch_1 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, partition, resp_rec1.offset) - batch_2 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, partition, resp_rec2.offset) - batch_3 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, partition, resp_rec3.offset) + batch_1 = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, resp_rec1.offset) + batch_2 = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, resp_rec2.offset) + batch_3 = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, resp_rec3.offset) assert length(batch_1) == 3 assert batch_1 == batch_2 and batch_2 == batch_3 assert [%{attributes: attr}] = - TestUtils.get_partition_resp_records_by_offset(MyTestClient, topic, 1, resp_rec1.offset) + TestUtils.get_partition_resp_records_by_offset(MyClient, topic, 1, resp_rec1.offset) assert :snappy = KlifeProtocol.RecordBatch.decode_attributes(attr).compression end @@ -226,7 +226,7 @@ defmodule Klife.ProducerTest do test "is able to recover from client changes" do topic = "test_no_batch_topic" - :ok = TestUtils.wait_client(MyTestClient, 3) + :ok = TestUtils.wait_client(MyClient, 3) record = %Record{ value: :rand.bytes(10), @@ -236,19 +236,19 @@ defmodule Klife.ProducerTest do partition: 1 } - assert {:ok, %Record{} = resp_rec} = MyTestClient.produce(record) + assert {:ok, %Record{} = resp_rec} = MyClient.produce(record) - assert :ok = assert_offset(MyTestClient, record, resp_rec.offset) + assert :ok = assert_offset(MyClient, record, resp_rec.offset) %{broker_id: old_broker_id} = - ProdController.get_topics_partitions_metadata(MyTestClient, topic, 1) + ProdController.get_topics_partitions_metadata(MyClient, topic, 1) - {:ok, service_name} = TestUtils.stop_broker(MyTestClient, old_broker_id) + {:ok, service_name} = TestUtils.stop_broker(MyClient, old_broker_id) Process.sleep(50) %{broker_id: new_broker_id} = - ProdController.get_topics_partitions_metadata(MyTestClient, topic, 1) + ProdController.get_topics_partitions_metadata(MyClient, topic, 1) assert new_broker_id != old_broker_id @@ -260,11 +260,11 @@ defmodule Klife.ProducerTest do partition: 1 } - assert {:ok, %Record{} = resp_rec} = MyTestClient.produce(record) + assert {:ok, %Record{} = resp_rec} = MyClient.produce(record) - assert :ok = assert_offset(MyTestClient, record, resp_rec.offset) + assert :ok = assert_offset(MyClient, record, resp_rec.offset) - {:ok, _} = TestUtils.start_broker(service_name, MyTestClient) + {:ok, _} = TestUtils.start_broker(service_name, MyClient) end test "produce batch message sync no batching" do @@ -296,13 +296,13 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset1}}, {:ok, %Record{offset: offset2}}, {:ok, %Record{offset: offset3}} - ] = MyTestClient.produce_batch([rec1, rec2, rec3]) + ] = MyClient.produce_batch([rec1, rec2, rec3]) - assert :ok = assert_offset(MyTestClient, rec1, offset1) - assert :ok = assert_offset(MyTestClient, rec2, offset2) - assert :ok = assert_offset(MyTestClient, rec3, offset3) + assert :ok = assert_offset(MyClient, rec1, offset1) + assert :ok = assert_offset(MyClient, rec2, offset2) + assert :ok = assert_offset(MyClient, rec3, offset3) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, rec1.topic, 1, offset1) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, rec1.topic, 1, offset1) assert length(record_batch) == 3 end @@ -353,31 +353,31 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset3}}, {:ok, %Record{offset: offset4}}, {:ok, %Record{offset: offset5}} - ] = MyTestClient.produce_batch([rec1, rec2, rec3, rec4, rec5]) + ] = MyClient.produce_batch([rec1, rec2, rec3, rec4, rec5]) - assert :ok = assert_offset(MyTestClient, rec1, offset1) - assert :ok = assert_offset(MyTestClient, rec2, offset2) - assert :ok = assert_offset(MyTestClient, rec3, offset3) - assert :ok = assert_offset(MyTestClient, rec4, offset4) - assert :ok = assert_offset(MyTestClient, rec5, offset5) + assert :ok = assert_offset(MyClient, rec1, offset1) + assert :ok = assert_offset(MyClient, rec2, offset2) + assert :ok = assert_offset(MyClient, rec3, offset3) + assert :ok = assert_offset(MyClient, rec4, offset4) + assert :ok = assert_offset(MyClient, rec5, offset5) record_batch = - TestUtils.get_record_batch_by_offset(MyTestClient, rec1.topic, rec1.partition, offset1) + TestUtils.get_record_batch_by_offset(MyClient, rec1.topic, rec1.partition, offset1) assert length(record_batch) == 1 record_batch = - TestUtils.get_record_batch_by_offset(MyTestClient, rec2.topic, rec2.partition, offset2) + TestUtils.get_record_batch_by_offset(MyClient, rec2.topic, rec2.partition, offset2) assert length(record_batch) == 1 record_batch = - TestUtils.get_record_batch_by_offset(MyTestClient, rec3.topic, rec3.partition, offset3) + TestUtils.get_record_batch_by_offset(MyClient, rec3.topic, rec3.partition, offset3) assert length(record_batch) == 1 record_batch = - TestUtils.get_record_batch_by_offset(MyTestClient, rec4.topic, rec4.partition, offset4) + TestUtils.get_record_batch_by_offset(MyClient, rec4.topic, rec4.partition, offset4) assert length(record_batch) == 2 end @@ -385,7 +385,7 @@ defmodule Klife.ProducerTest do test "produce batch message sync with batching" do topic = "test_batch_topic" - wait_batch_cycle(MyTestClient, topic, 1) + wait_batch_cycle(MyClient, topic, 1) rec1_1 = %Record{ value: :rand.bytes(10), @@ -405,7 +405,7 @@ defmodule Klife.ProducerTest do task_1 = Task.async(fn -> - MyTestClient.produce_batch([rec1_1, rec1_2]) + MyClient.produce_batch([rec1_1, rec1_2]) end) rec2_1 = %Record{ @@ -428,7 +428,7 @@ defmodule Klife.ProducerTest do task_2 = Task.async(fn -> - MyTestClient.produce_batch([rec2_1, rec2_2]) + MyClient.produce_batch([rec2_1, rec2_2]) end) rec3_1 = %Record{ @@ -451,7 +451,7 @@ defmodule Klife.ProducerTest do task_3 = Task.async(fn -> - MyTestClient.produce_batch([rec3_1, rec3_2]) + MyClient.produce_batch([rec3_1, rec3_2]) end) assert [ @@ -467,23 +467,23 @@ defmodule Klife.ProducerTest do assert offset2_2 - offset1_2 == 1 assert offset3_2 - offset2_2 == 1 - assert :ok = assert_offset(MyTestClient, rec1_1, offset1_1) - assert :ok = assert_offset(MyTestClient, rec1_2, offset1_2) - assert :ok = assert_offset(MyTestClient, rec2_1, offset2_1) - assert :ok = assert_offset(MyTestClient, rec2_2, offset2_2) - assert :ok = assert_offset(MyTestClient, rec3_1, offset3_1) - assert :ok = assert_offset(MyTestClient, rec3_2, offset3_2) + assert :ok = assert_offset(MyClient, rec1_1, offset1_1) + assert :ok = assert_offset(MyClient, rec1_2, offset1_2) + assert :ok = assert_offset(MyClient, rec2_1, offset2_1) + assert :ok = assert_offset(MyClient, rec2_2, offset2_2) + assert :ok = assert_offset(MyClient, rec3_1, offset3_1) + assert :ok = assert_offset(MyClient, rec3_2, offset3_2) - batch_1 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, offset1_1) - batch_2 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, offset2_1) - batch_3 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 1, offset3_1) + batch_1 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, offset1_1) + batch_2 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, offset2_1) + batch_3 = TestUtils.get_record_batch_by_offset(MyClient, topic, 1, offset3_1) assert length(batch_1) == 3 assert batch_1 == batch_2 and batch_2 == batch_3 - batch_1 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 2, offset1_2) - batch_2 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 2, offset2_2) - batch_3 = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 2, offset3_2) + batch_1 = TestUtils.get_record_batch_by_offset(MyClient, topic, 2, offset1_2) + batch_2 = TestUtils.get_record_batch_by_offset(MyClient, topic, 2, offset2_2) + batch_3 = TestUtils.get_record_batch_by_offset(MyClient, topic, 2, offset3_2) assert length(batch_1) == 3 assert batch_1 == batch_2 and batch_2 == batch_3 @@ -502,12 +502,12 @@ defmodule Klife.ProducerTest do offset: offset, partition: partition, topic: topic - } = resp_rec} = MyTestClient.produce(record) + } = resp_rec} = MyClient.produce(record) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyTestClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, resp_rec, offset) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, topic, partition, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, partition, offset) assert length(record_batch) == 1 end @@ -524,12 +524,12 @@ defmodule Klife.ProducerTest do offset: offset, partition: 3, topic: topic - } = resp_rec} = MyTestClient.produce(record) + } = resp_rec} = MyClient.produce(record) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyTestClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, resp_rec, offset) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 3, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 3, offset) assert length(record_batch) == 1 end @@ -546,12 +546,12 @@ defmodule Klife.ProducerTest do offset: offset, partition: 4, topic: topic - } = resp_rec} = MyTestClient.produce(record, partitioner: Klife.TestCustomPartitioner) + } = resp_rec} = MyClient.produce(record, partitioner: Klife.TestCustomPartitioner) assert_resp_record(record, resp_rec) - assert :ok = assert_offset(MyTestClient, resp_rec, offset) + assert :ok = assert_offset(MyClient, resp_rec, offset) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, topic, 4, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, topic, 4, offset) assert length(record_batch) == 1 end @@ -565,19 +565,19 @@ defmodule Klife.ProducerTest do } base_ts = now_unix() - assert :ok = MyTestClient.produce(rec, async: true) + assert :ok = MyClient.produce(rec, async: true) Process.sleep(10) - offset = TestUtils.get_latest_offset(MyTestClient, rec.topic, rec.partition, base_ts) + offset = TestUtils.get_latest_offset(MyClient, rec.topic, rec.partition, base_ts) - assert :ok = assert_offset(MyTestClient, rec, offset) - record_batch = TestUtils.get_record_batch_by_offset(MyTestClient, rec.topic, 1, offset) + assert :ok = assert_offset(MyClient, rec, offset) + record_batch = TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, offset) assert length(record_batch) == 1 end test "producer epoch bump" do - client_name = MyTestClient + client_name = MyClient %{ "test_no_batch_topic" => [t1_data | _], @@ -623,7 +623,7 @@ defmodule Klife.ProducerTest do assert [ {:ok, %Record{}}, {:ok, %Record{}} - ] = MyTestClient.produce_batch([rec1, rec2]) + ] = MyClient.produce_batch([rec1, rec2]) tp_key = {t1_data.topic_name, t1_data.partition_idx} @@ -673,8 +673,8 @@ defmodule Klife.ProducerTest do |> Map.get(:epochs) |> Map.get(tp_key) - assert [{:ok, %Record{}}] = MyTestClient.produce_batch([rec1]) - assert [{:ok, %Record{}}] = MyTestClient.produce_batch([rec2]) + assert [{:ok, %Record{}}] = MyClient.produce_batch([rec1]) + assert [{:ok, %Record{}}] = MyClient.produce_batch([rec2]) end test "txn produce message - aborts" do @@ -735,8 +735,8 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset5}}, {:ok, %Record{offset: offset6}} ]} = - MyTestClient.transaction(fn -> - resp1 = MyTestClient.produce_batch([rec1, rec2, rec3]) + MyClient.transaction(fn -> + resp1 = MyClient.produce_batch([rec1, rec2, rec3]) assert [ {:ok, %Record{offset: offset1}}, @@ -744,16 +744,16 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset3}} ] = resp1 - assert :not_found = assert_offset(MyTestClient, rec1, offset1, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec1, offset1, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec1, offset1, isolation: :committed) + assert :ok = assert_offset(MyClient, rec1, offset1, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec2, offset2, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec2, offset2, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec2, offset2, isolation: :committed) + assert :ok = assert_offset(MyClient, rec2, offset2, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec3, offset3, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec3, offset3, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec3, offset3, isolation: :committed) + assert :ok = assert_offset(MyClient, rec3, offset3, isolation: :uncommitted) - resp2 = MyTestClient.produce_batch([rec4, rec5, rec6]) + resp2 = MyClient.produce_batch([rec4, rec5, rec6]) assert [ {:ok, %Record{offset: offset4}}, @@ -761,24 +761,24 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset6}} ] = resp2 - assert :not_found = assert_offset(MyTestClient, rec4, offset4, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec4, offset4, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec4, offset4, isolation: :committed) + assert :ok = assert_offset(MyClient, rec4, offset4, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec5, offset5, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec5, offset5, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec5, offset5, isolation: :committed) + assert :ok = assert_offset(MyClient, rec5, offset5, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec6, offset6, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec6, offset6, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec6, offset6, isolation: :committed) + assert :ok = assert_offset(MyClient, rec6, offset6, isolation: :uncommitted) {:error, resp1 ++ resp2} end) - assert_offset(MyTestClient, rec1, offset1, txn_status: :aborted) - assert_offset(MyTestClient, rec2, offset2, txn_status: :aborted) - assert_offset(MyTestClient, rec3, offset3, txn_status: :aborted) - assert_offset(MyTestClient, rec4, offset4, txn_status: :aborted) - assert_offset(MyTestClient, rec5, offset5, txn_status: :aborted) - assert_offset(MyTestClient, rec6, offset6, txn_status: :aborted) + assert_offset(MyClient, rec1, offset1, txn_status: :aborted) + assert_offset(MyClient, rec2, offset2, txn_status: :aborted) + assert_offset(MyClient, rec3, offset3, txn_status: :aborted) + assert_offset(MyClient, rec4, offset4, txn_status: :aborted) + assert_offset(MyClient, rec5, offset5, txn_status: :aborted) + assert_offset(MyClient, rec6, offset6, txn_status: :aborted) end test "txn produce message - commits" do @@ -839,8 +839,8 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset5}}, {:ok, %Record{offset: offset6}} ]} = - MyTestClient.transaction(fn -> - resp1 = MyTestClient.produce_batch([rec1, rec2, rec3]) + MyClient.transaction(fn -> + resp1 = MyClient.produce_batch([rec1, rec2, rec3]) assert [ {:ok, %Record{offset: offset1}}, @@ -848,16 +848,16 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset3}} ] = resp1 - assert :not_found = assert_offset(MyTestClient, rec1, offset1, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec1, offset1, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec1, offset1, isolation: :committed) + assert :ok = assert_offset(MyClient, rec1, offset1, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec2, offset2, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec2, offset2, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec2, offset2, isolation: :committed) + assert :ok = assert_offset(MyClient, rec2, offset2, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec3, offset3, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec3, offset3, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec3, offset3, isolation: :committed) + assert :ok = assert_offset(MyClient, rec3, offset3, isolation: :uncommitted) - resp2 = MyTestClient.produce_batch([rec4, rec5, rec6]) + resp2 = MyClient.produce_batch([rec4, rec5, rec6]) assert [ {:ok, %Record{offset: offset4}}, @@ -865,24 +865,24 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset6}} ] = resp2 - assert :not_found = assert_offset(MyTestClient, rec4, offset4, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec4, offset4, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec4, offset4, isolation: :committed) + assert :ok = assert_offset(MyClient, rec4, offset4, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec5, offset5, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec5, offset5, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec5, offset5, isolation: :committed) + assert :ok = assert_offset(MyClient, rec5, offset5, isolation: :uncommitted) - assert :not_found = assert_offset(MyTestClient, rec6, offset6, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec6, offset6, isolation: :uncommitted) + assert :not_found = assert_offset(MyClient, rec6, offset6, isolation: :committed) + assert :ok = assert_offset(MyClient, rec6, offset6, isolation: :uncommitted) {:ok, resp1 ++ resp2} end) - assert_offset(MyTestClient, rec1, offset1, txn_status: :committed) - assert_offset(MyTestClient, rec2, offset2, txn_status: :committed) - assert_offset(MyTestClient, rec3, offset3, txn_status: :committed) - assert_offset(MyTestClient, rec4, offset4, txn_status: :committed) - assert_offset(MyTestClient, rec5, offset5, txn_status: :committed) - assert_offset(MyTestClient, rec6, offset6, txn_status: :committed) + assert_offset(MyClient, rec1, offset1, txn_status: :committed) + assert_offset(MyClient, rec2, offset2, txn_status: :committed) + assert_offset(MyClient, rec3, offset3, txn_status: :committed) + assert_offset(MyClient, rec4, offset4, txn_status: :committed) + assert_offset(MyClient, rec5, offset5, txn_status: :committed) + assert_offset(MyClient, rec6, offset6, txn_status: :committed) end @tag capture_log: true @@ -940,9 +940,9 @@ defmodule Klife.ProducerTest do {:ok, %Record{offset: offset1}}, {:ok, %Record{offset: offset2}} ]} = - MyTestClient.transaction( + MyClient.transaction( fn -> - resp = MyTestClient.produce_batch([rec1, rec2]) + resp = MyClient.produce_batch([rec1, rec2]) assert [ {:ok, %Record{offset: offset1}}, @@ -950,27 +950,27 @@ defmodule Klife.ProducerTest do ] = resp assert :not_found = - assert_offset(MyTestClient, rec1, offset1, isolation: :committed) + assert_offset(MyClient, rec1, offset1, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec1, offset1, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec1, offset1, isolation: :uncommitted) assert :not_found = - assert_offset(MyTestClient, rec2, offset2, isolation: :committed) + assert_offset(MyClient, rec2, offset2, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec2, offset2, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec2, offset2, isolation: :uncommitted) {:ok, resp} end, pool_name: :my_test_pool_1 ) - assert_offset(MyTestClient, rec1, offset1, txn_status: :committed) - assert_offset(MyTestClient, rec2, offset2, txn_status: :committed) + assert_offset(MyClient, rec1, offset1, txn_status: :committed) + assert_offset(MyClient, rec2, offset2, txn_status: :committed) assert {:error, %RuntimeError{message: "crazy error"}} = - MyTestClient.transaction( + MyClient.transaction( fn -> - resp = MyTestClient.produce_batch([rec3, rec4, rec5]) + resp = MyClient.produce_batch([rec3, rec4, rec5]) assert [ {:ok, %Record{offset: offset3}}, @@ -979,19 +979,19 @@ defmodule Klife.ProducerTest do ] = resp assert :not_found = - assert_offset(MyTestClient, rec3, offset3, isolation: :committed) + assert_offset(MyClient, rec3, offset3, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec3, offset3, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec3, offset3, isolation: :uncommitted) assert :not_found = - assert_offset(MyTestClient, rec4, offset4, isolation: :committed) + assert_offset(MyClient, rec4, offset4, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec4, offset4, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec4, offset4, isolation: :uncommitted) assert :not_found = - assert_offset(MyTestClient, rec5, offset5, isolation: :committed) + assert_offset(MyClient, rec5, offset5, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec5, offset5, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec5, offset5, isolation: :uncommitted) Process.put(:raised_offsets, {offset3, offset4, offset5}) raise "crazy error" @@ -1000,28 +1000,28 @@ defmodule Klife.ProducerTest do ) {offset3, offset4, offset5} = Process.get(:raised_offsets) - assert_offset(MyTestClient, rec3, offset3, txn_status: :aborted) - assert_offset(MyTestClient, rec4, offset4, txn_status: :aborted) - assert_offset(MyTestClient, rec5, offset5, txn_status: :aborted) + assert_offset(MyClient, rec3, offset3, txn_status: :aborted) + assert_offset(MyClient, rec4, offset4, txn_status: :aborted) + assert_offset(MyClient, rec5, offset5, txn_status: :aborted) assert {:ok, {:ok, %Record{offset: offset6}}} = - MyTestClient.transaction( + MyClient.transaction( fn -> - resp = MyTestClient.produce(rec6) + resp = MyClient.produce(rec6) assert {:ok, %Record{offset: offset6}} = resp assert :not_found = - assert_offset(MyTestClient, rec6, offset6, isolation: :committed) + assert_offset(MyClient, rec6, offset6, isolation: :committed) - assert :ok = assert_offset(MyTestClient, rec6, offset6, isolation: :uncommitted) + assert :ok = assert_offset(MyClient, rec6, offset6, isolation: :uncommitted) {:ok, resp} end, pool_name: :my_test_pool_1 ) - assert_offset(MyTestClient, rec6, offset6, txn_status: :committed) + assert_offset(MyClient, rec6, offset6, txn_status: :committed) end # TODO: How to assert transactional behaviour here? @@ -1080,22 +1080,22 @@ defmodule Klife.ProducerTest do %Record{offset: offset2}, %Record{offset: offset3} ]} = - MyTestClient.produce_batch_txn([rec1, rec2, rec3]) + MyClient.produce_batch_txn([rec1, rec2, rec3]) - assert_offset(MyTestClient, rec1, offset1, txn_status: :committed) - assert_offset(MyTestClient, rec2, offset2, txn_status: :committed) - assert_offset(MyTestClient, rec3, offset3, txn_status: :committed) + assert_offset(MyClient, rec1, offset1, txn_status: :committed) + assert_offset(MyClient, rec2, offset2, txn_status: :committed) + assert_offset(MyClient, rec3, offset3, txn_status: :committed) assert {:ok, [ %Record{offset: offset4}, %Record{offset: offset5}, %Record{offset: offset6} - ]} = MyTestClient.produce_batch_txn([rec4, rec5, rec6]) + ]} = MyClient.produce_batch_txn([rec4, rec5, rec6]) - assert_offset(MyTestClient, rec4, offset4, txn_status: :committed) - assert_offset(MyTestClient, rec5, offset5, txn_status: :committed) - assert_offset(MyTestClient, rec6, offset6, txn_status: :committed) + assert_offset(MyClient, rec4, offset4, txn_status: :committed) + assert_offset(MyClient, rec5, offset5, txn_status: :committed) + assert_offset(MyClient, rec6, offset6, txn_status: :committed) rec7 = %Record{ value: :rand.bytes(10), @@ -1121,6 +1121,6 @@ defmodule Klife.ProducerTest do %Record{value: ^rec7_val, error_code: 55}, %Record{value: ^rec8_val, error_code: 3} ]} = - MyTestClient.produce_batch_txn([rec7, rec8]) + MyClient.produce_batch_txn([rec7, rec8]) end end diff --git a/test/support/my_test_client.ex b/test/support/my_client.ex similarity index 69% rename from test/support/my_test_client.ex rename to test/support/my_client.ex index 1bd3add..828d82d 100644 --- a/test/support/my_test_client.ex +++ b/test/support/my_client.ex @@ -1,4 +1,4 @@ -defmodule MyTestClient do +defmodule MyClient do @moduledoc false use Klife.Client, otp_app: :klife diff --git a/test/test_helper.exs b/test/test_helper.exs index 8fce7f8..4b03b91 100644 --- a/test/test_helper.exs +++ b/test/test_helper.exs @@ -3,6 +3,6 @@ :ok = Klife.Utils.create_topics() opts = [strategy: :one_for_one, name: Test.Supervisor] -{:ok, _} = Supervisor.start_link([MyTestClient], opts) +{:ok, _} = Supervisor.start_link([MyClient], opts) ExUnit.start()