Skip to content

Commit

Permalink
chore(producer): add cluster changes test
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed May 18, 2024
1 parent 986f0bd commit 2ce28de
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 1 deletion.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
- Implement Producer
- Async api
- Batch produce
- Test cluster changes handling
- Improve test coverage
- Transaction api
- Accept more versions of the protocol
Expand Down
1 change: 1 addition & 0 deletions lib/klife/producer/dispatcher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ defmodule Klife.Producer.Dispatcher do
end
end

# TODO: Handle more specific codes
@delivery_success_codes [0, 46]
@delivery_discard_codes [18, 47]
def handle_info(
Expand Down
37 changes: 37 additions & 0 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ defmodule Klife.ProducerTest do
use ExUnit.Case

alias Klife.Producer
alias Klife.Producer.Controller, as: ProdController
alias Klife.Utils
alias Klife.TestUtils

defp assert_offset(expected_record, cluster, topic, partition, offset) do
stored_record = Utils.get_record_by_offset(cluster, topic, partition, offset)
Expand Down Expand Up @@ -183,4 +185,39 @@ defmodule Klife.ProducerTest do

assert :snappy = KlifeProtocol.RecordBatch.decode_attributes(attr).compression
end

test "is able to recover from cluster changes" do

Check failure on line 189 in test/producer/producer_test.exs

View workflow job for this annotation

GitHub Actions / test

test is able to recover from cluster changes (Klife.ProducerTest)
cluster = :my_test_cluster_1
topic = "my_no_batch_topic"

record = %{
value: :rand.bytes(10),
key: :rand.bytes(10),
headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}]
}

assert {:ok, offset} = Producer.produce_sync(record, topic, 1, cluster)

assert_offset(record, cluster, topic, 1, offset)

%{broker_id: old_broker_id} = ProdController.get_topics_partitions_metadata(cluster, topic, 1)

{:ok, service_name} = TestUtils.stop_broker(cluster, old_broker_id)

%{broker_id: new_broker_id} = ProdController.get_topics_partitions_metadata(cluster, topic, 1)

assert new_broker_id != old_broker_id

record = %{
value: :rand.bytes(10),
key: :rand.bytes(10),
headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}]
}

assert {:ok, offset} = Producer.produce_sync(record, topic, 1, cluster)

assert_offset(record, cluster, topic, 1, offset)

{:ok, _} = TestUtils.start_broker(service_name, cluster)
end
end

0 comments on commit 2ce28de

Please sign in to comment.