Skip to content

Commit

Permalink
chore: reduce flaky tests
Browse files Browse the repository at this point in the history
  • Loading branch information
oliveigah committed Nov 22, 2024
1 parent 9159024 commit d6d02c7
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 48 deletions.
2 changes: 1 addition & 1 deletion test/connection/system_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ defmodule Klife.Connection.SystemTest do
Enum.each(brokers_list_3, &check_broker_connection(client_name_3, &1))
end

@tag cluster_change: true, capture_log: true
@tag cluster_change: true, capture_log: true, timeout: 90_000
test "cluster changes events" do
config = [
connection: [
Expand Down
65 changes: 23 additions & 42 deletions test/producer/producer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ defmodule Klife.ProducerTest do
partition: 1
}

assert {:ok, %Record{} = rec} =
MyClient.produce(record, producer: :benchmark_producer)
assert {:ok, %Record{} = rec} = MyClient.produce(record, producer: :benchmark_producer)

assert :ok = TestUtils.assert_offset(MyClient, record, rec.offset)

Expand Down Expand Up @@ -139,8 +138,7 @@ defmodule Klife.ProducerTest do
{:ok, %Record{} = resp_rec1},
{:ok, %Record{} = resp_rec2},
{:ok, %Record{} = resp_rec3}
] =
Task.await_many([task_1, task_2, task_3], 2_000)
] = Task.await_many([task_1, task_2, task_3], 2_000)

assert resp_rec2.offset - resp_rec1.offset == 1
assert resp_rec3.offset - resp_rec2.offset == 1
Expand Down Expand Up @@ -210,8 +208,7 @@ defmodule Klife.ProducerTest do
{:ok, %Record{} = resp_rec1},
{:ok, %Record{} = resp_rec2},
{:ok, %Record{} = resp_rec3}
] =
Task.await_many([task_1, task_2, task_3], 2_000)
] = Task.await_many([task_1, task_2, task_3], 2_000)

assert resp_rec2.offset - resp_rec1.offset == 1
assert resp_rec3.offset - resp_rec2.offset == 1
Expand All @@ -233,7 +230,7 @@ defmodule Klife.ProducerTest do
assert :snappy = KlifeProtocol.RecordBatch.decode_attributes(attr).compression
end

@tag cluster_change: true, capture_log: true
@tag cluster_change: true, capture_log: true, timeout: 90_000
test "is able to recover from client changes" do
topic = "test_no_batch_topic"

Expand Down Expand Up @@ -467,8 +464,7 @@ defmodule Klife.ProducerTest do
[{:ok, %Record{offset: offset1_1}}, {:ok, %Record{offset: offset1_2}}],
[{:ok, %Record{offset: offset2_1}}, {:ok, %Record{offset: offset2_2}}],
[{:ok, %Record{offset: offset3_1}}, {:ok, %Record{offset: offset3_2}}]
] =
Task.await_many([task_1, task_2, task_3], 2_000)
] = Task.await_many([task_1, task_2, task_3], 2_000)

assert offset2_1 - offset1_1 == 1
assert offset3_1 - offset2_1 == 1
Expand Down Expand Up @@ -655,8 +651,7 @@ defmodule Klife.ProducerTest do
assert {:ok, new_rec} = resp
assert :ok = TestUtils.assert_offset(MyClient, rec, new_rec.offset)

record_batch =
TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, new_rec.offset)
record_batch = TestUtils.get_record_batch_by_offset(MyClient, rec.topic, 1, new_rec.offset)

assert length(record_batch) == 1
end
Expand Down Expand Up @@ -702,8 +697,7 @@ defmodule Klife.ProducerTest do
t1_data.batcher_id}
)

[{producer_pid, _}] =
registry_lookup({Klife.Producer, client_name, default_producer_name})
[{producer_pid, _}] = registry_lookup({Klife.Producer, client_name, default_producer_name})

assert [
{:ok, %Record{}},
Expand Down Expand Up @@ -876,12 +870,12 @@ defmodule Klife.ProducerTest do
{:error, resp1 ++ resp2}
end)

TestUtils.assert_offset(MyClient, rec1, offset1, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec2, offset2, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec3, offset3, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec4, offset4, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec5, offset5, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec6, offset6, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec1, offset1, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec2, offset2, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec3, offset3, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec4, offset4, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec5, offset5, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec6, offset6, txn_status: :aborted, isolation: :committed)
end

test "txn produce message - commits" do
Expand Down Expand Up @@ -1074,17 +1068,13 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec1, offset1, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec1, offset1,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec1, offset1, isolation: :uncommitted)

assert :not_found =
TestUtils.assert_offset(MyClient, rec2, offset2, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec2, offset2,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec2, offset2, isolation: :uncommitted)

{:ok, resp}
end,
Expand All @@ -1109,25 +1099,19 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec3, offset3, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec3, offset3,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec3, offset3, isolation: :uncommitted)

assert :not_found =
TestUtils.assert_offset(MyClient, rec4, offset4, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec4, offset4,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec4, offset4, isolation: :uncommitted)

assert :not_found =
TestUtils.assert_offset(MyClient, rec5, offset5, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec5, offset5,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec5, offset5, isolation: :uncommitted)

Process.put(:raised_offsets, {offset3, offset4, offset5})
raise "crazy error"
Expand All @@ -1136,9 +1120,9 @@ defmodule Klife.ProducerTest do
)

{offset3, offset4, offset5} = Process.get(:raised_offsets)
TestUtils.assert_offset(MyClient, rec3, offset3, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec4, offset4, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec5, offset5, txn_status: :aborted)
TestUtils.assert_offset(MyClient, rec3, offset3, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec4, offset4, txn_status: :aborted, isolation: :committed)
TestUtils.assert_offset(MyClient, rec5, offset5, txn_status: :aborted, isolation: :committed)

assert {:ok, {:ok, %Record{offset: offset6}}} =
MyClient.transaction(
Expand All @@ -1151,9 +1135,7 @@ defmodule Klife.ProducerTest do
TestUtils.assert_offset(MyClient, rec6, offset6, isolation: :committed)

assert :ok =
TestUtils.assert_offset(MyClient, rec6, offset6,
isolation: :uncommitted
)
TestUtils.assert_offset(MyClient, rec6, offset6, isolation: :uncommitted)

{:ok, resp}
end,
Expand Down Expand Up @@ -1218,8 +1200,7 @@ defmodule Klife.ProducerTest do
%Record{offset: offset1},
%Record{offset: offset2},
%Record{offset: offset3}
]} =
MyClient.produce_batch_txn([rec1, rec2, rec3])
]} = MyClient.produce_batch_txn([rec1, rec2, rec3])

TestUtils.assert_offset(MyClient, rec1, offset1, txn_status: :committed)
TestUtils.assert_offset(MyClient, rec2, offset2, txn_status: :committed)
Expand Down
12 changes: 7 additions & 5 deletions test/support/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Klife.TestUtils do
# change. One way to avoid this, would be having pubsub
# events related to the producer system but it does not
# exists yet.
|> tap(fn _ -> Process.sleep(:timer.seconds(10)) end)
|> tap(fn _ -> Process.sleep(:timer.seconds(30)) end)
end

defp do_stop_broker(client_name, broker_id) do
Expand Down Expand Up @@ -83,7 +83,7 @@ defmodule Klife.TestUtils do
# change. One way to avoid this, would be having pubsub
# events related to the producer system but it does not
# exists yet.
|> tap(fn _ -> Process.sleep(:timer.seconds(10)) end)
|> tap(fn _ -> Process.sleep(:timer.seconds(30)) end)
end

defp do_start_broker(service_name, client_name) do
Expand Down Expand Up @@ -258,7 +258,7 @@ defmodule Klife.TestUtils do
opts \\ []
) do
partition = Keyword.get(opts, :partition, expected_record.partition)
iso_lvl = Keyword.get(opts, :isolation, :committed)
iso_lvl = Keyword.get(opts, :isolation, :uncommitted)
txn_status = Keyword.get(opts, :txn_status, :committed)

client
Expand All @@ -268,8 +268,6 @@ defmodule Klife.TestUtils do
:not_found

{stored_record, status} ->
assert status == txn_status

Enum.each(Map.from_struct(expected_record), fn {k, v} ->
case k do
:value -> assert v == stored_record.value
Expand All @@ -278,6 +276,10 @@ defmodule Klife.TestUtils do
_ -> :noop
end
end)

assert status == txn_status

:ok
end
end

Expand Down

0 comments on commit d6d02c7

Please sign in to comment.