diff --git a/config/config.exs b/config/config.exs index dc63f7f..7b793b8 100644 --- a/config/config.exs +++ b/config/config.exs @@ -20,6 +20,12 @@ config :klife, client_id: "my_custom_client_id", linger_ms: 1_500 }, + %{ + name: :my_batch_compressed_producer, + client_id: "my_custom_client_id", + linger_ms: 1_500, + compression_type: :snappy + }, %{ name: :benchmark_producer, client_id: "my_custom_client_id" @@ -47,6 +53,10 @@ config :klife, num_partitions: 30, replication_factor: 2 }, + %{ + name: "comression_topic", + producer: :my_batch_compressed_producer + }, %{ name: "dispatcher_benchmark_topic_1", producer: :dispatcher_benchmark_producer_1, diff --git a/lib/klife/producer/dispatcher.ex b/lib/klife/producer/dispatcher.ex index 5d14f02..7b36f77 100644 --- a/lib/klife/producer/dispatcher.ex +++ b/lib/klife/producer/dispatcher.ex @@ -5,10 +5,12 @@ defmodule Klife.Producer.Dispatcher do require Logger alias Klife.Producer alias Klife.Connection.Broker - alias KlifeProtocol.Messages + alias KlifeProtocol.Messages, as: M defstruct [ :producer_config, + :producer_id, + :base_sequences, :broker_id, :current_batch, :current_waiting_pids, @@ -41,6 +43,13 @@ defmodule Klife.Producer.Dispatcher do args_map = Map.new(args) max_in_flight = args_map.producer_config.max_in_flight_requests linger_ms = args_map.producer_config.linger_ms + idempotent? = args_map.producer_config.enable_idempotence + cluster_name = args_map.producer_config.cluster_name + + producer_id = + if idempotent?, + do: get_producer_id(cluster_name, args_map.broker_id), + else: nil next_send_msg_ref = if linger_ms > 0, @@ -55,7 +64,9 @@ defmodule Klife.Producer.Dispatcher do batch_queue: :queue.new(), last_batch_sent_at: System.monotonic_time(:millisecond), in_flight_pool: Enum.map(1..max_in_flight, fn _ -> nil end), - next_send_msg_ref: next_send_msg_ref + next_send_msg_ref: next_send_msg_ref, + producer_id: producer_id, + base_sequences: %{} } state = %__MODULE__{} = Map.merge(base, args_map) @@ -63,6 +74,18 @@ defmodule Klife.Producer.Dispatcher do {:ok, state} end + defp get_producer_id(cluster_name, broker_id) do + content = %{ + transactional_id: nil, + transaction_timeout_ms: 0 + } + + {:ok, %{content: %{error_code: 0, producer_id: producer_id}}} = + Broker.send_message(M.InitProducerId, cluster_name, broker_id, content) + + producer_id + end + def produce_sync( record, topic, @@ -233,10 +256,9 @@ defmodule Klife.Producer.Dispatcher do def add_record_to_current_data( %__MODULE__{ current_estimated_size: curr_size, - producer_config: pconfig, - current_batch: curr_batch, current_waiting_pids: curr_pids, - current_base_time: curr_base_time + current_base_time: curr_base_time, + base_sequences: base_sequences } = state, record, topic, @@ -244,12 +266,13 @@ defmodule Klife.Producer.Dispatcher do pid, estimated_size ) do - new_batch = add_record_to_batch(curr_batch, record, topic, partition, pconfig) + new_batch = add_record_to_current_batch(state, record, topic, partition) %{ state | current_batch: new_batch, current_waiting_pids: add_waiting_pid(curr_pids, new_batch, pid, topic, partition), + base_sequences: update_base_sequence(base_sequences, new_batch, topic, partition), current_base_time: curr_base_time || System.monotonic_time(:millisecond), current_estimated_size: curr_size + estimated_size } @@ -357,23 +380,62 @@ defmodule Klife.Producer.Dispatcher do ## PRIVATE FUNCTIONS - defp add_record_to_batch(current_batch, record, topic, partition, pconfig) do - case Map.get(current_batch, {topic, partition}) do + defp add_record_to_current_batch( + %__MODULE__{current_batch: batch} = state, + record, + topic, + partition + ) do + case Map.get(batch, {topic, partition}) do nil -> new_batch = - pconfig + state |> init_partition_data(topic, partition) - |> add_record_to_batch(record) + |> add_record_to_partition_data(record) - Map.put(current_batch, {topic, partition}, new_batch) + Map.put(batch, {topic, partition}, new_batch) partition_data -> - new_partition_data = add_record_to_batch(partition_data, record) - Map.replace!(current_batch, {topic, partition}, new_partition_data) + new_partition_data = add_record_to_partition_data(partition_data, record) + Map.replace!(batch, {topic, partition}, new_partition_data) end end - defp add_record_to_batch(batch, record) do + defp init_partition_data( + %__MODULE__{ + base_sequences: bs, + producer_id: p_id, + producer_config: %{enable_idempotence: idempotent?} = pconfig + } = _state, + topic, + partition + ) do + %{ + base_offset: 0, + partition_leader_epoch: -1, + magic: 2, + # TODO: Handle different attributes opts + attributes: get_attributes_byte(pconfig, []), + last_offset_delta: -1, + base_timestamp: nil, + max_timestamp: nil, + producer_id: p_id, + # TODO: Handle producer epoch + producer_epoch: -1, + base_sequence: if(idempotent?, do: Map.get(bs, {topic, partition}, 0) + 1, else: -1), + records: [], + records_length: 0 + } + end + + defp get_attributes_byte(%Producer{} = pconfig, _opts) do + [ + compression: pconfig.compression_type + ] + |> KlifeProtocol.RecordBatch.encode_attributes() + end + + defp add_record_to_partition_data(batch, record) do now = DateTime.to_unix(DateTime.utc_now()) new_offset_delta = batch.last_offset_delta + 1 @@ -398,22 +460,13 @@ defmodule Klife.Producer.Dispatcher do } end - defp init_partition_data(%Producer{} = _pconfig, _topic, _partition) do - # TODO: Use proper values here - %{ - base_offset: 0, - partition_leader_epoch: -1, - magic: 2, - attributes: 0, - last_offset_delta: -1, - base_timestamp: nil, - max_timestamp: nil, - producer_id: -1, - producer_epoch: -1, - base_sequence: -1, - records: [], - records_length: 0 - } + defp update_base_sequence(curr_base_sequences, new_batch, topic, partition) do + new_base_sequence = + new_batch + |> Map.fetch!({topic, partition}) + |> Map.fetch!(:base_sequence) + + Map.put(curr_base_sequences, {topic, partition}, new_base_sequence) end defp add_waiting_pid(waiting_pids, _new_batch, nil, _topic, _partition), do: waiting_pids @@ -465,7 +518,7 @@ defmodule Klife.Producer.Dispatcher do # Check if it is safe to send the batch if now + req_timeout - base_time < delivery_timeout - :timer.seconds(5) do {:ok, resp} = - Broker.send_message(Messages.Produce, cluster_name, broker_id, content, headers) + Broker.send_message(M.Produce, cluster_name, broker_id, content, headers) grouped_results = for %{name: topic_name, partition_responses: partition_resps} <- resp.content.responses, @@ -493,9 +546,9 @@ defmodule Klife.Producer.Dispatcher do _list -> # TODO: Handle specific errors by code, not just retry all not success - new_batch_to_send = - batch_to_send - |> Map.drop(Enum.map(success_list, fn {t, p, _, _} -> {t, p} end)) + # TODO: Handle out of order error for max_in_flight > 1 + success_keys = Enum.map(success_list, fn {t, p, _, _} -> {t, p} end) + new_batch_to_send = Map.drop(batch_to_send, success_keys) Process.sleep(retry_ms) diff --git a/lib/klife/producer/producer.ex b/lib/klife/producer/producer.ex index ed85959..28223ac 100644 --- a/lib/klife/producer/producer.ex +++ b/lib/klife/producer/producer.ex @@ -21,12 +21,8 @@ defmodule Klife.Producer do retry_backoff_ms: [type: :non_neg_integer, default: :timer.seconds(1)], max_in_flight_requests: [type: :non_neg_integer, default: 1], dispatchers_count: [type: :pos_integer, default: 1], - # Not implemented - max_retries: [type: :timeout, default: :infinity], - # Not implemented - compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none], - # Not implemented - enable_idempotence: [type: :boolean, default: true] + enable_idempotence: [type: :boolean, default: true], + compression_type: [type: {:in, [:none, :gzip, :snappy]}, default: :none] ] defstruct (Keyword.keys(@producer_options) -- [:name]) ++ [:producer_name] @@ -145,7 +141,7 @@ defmodule Klife.Producer do # Used when a record is produced by a non default producer # in this case the proper dispatcher_id won't be present at # main metadata ets table, therefore we need a way to - # find out it's value. + # find out it's value. put_dispatcher_id(cluster_name, producer_name, t_name, p_idx, d_id) if ProducerController.get_default_producer(cluster_name, t_name, p_idx) == producer_name do diff --git a/lib/klife/utils.ex b/lib/klife/utils.ex index a3fce1c..bac6ede 100644 --- a/lib/klife/utils.ex +++ b/lib/klife/utils.ex @@ -173,4 +173,42 @@ defmodule Klife.Utils do [%{records: records}] = partition_resp.records records end + + def get_partition_resp_records_by_offset(cluster_name, topic, partition, offset) do + content = %{ + replica_id: -1, + max_wait_ms: 1000, + min_bytes: 1, + max_bytes: 100_000, + isolation_level: 0, + topics: [ + %{ + topic: topic, + partitions: [ + %{ + partition: partition, + fetch_offset: offset, + # 1 guarantees that only the first record batch will + # be retrieved + partition_max_bytes: 1 + } + ] + } + ] + } + + broker = Klife.Producer.Controller.get_broker_id(cluster_name, topic, partition) + + {:ok, %{content: content}} = + Klife.Connection.Broker.send_message( + KlifeProtocol.Messages.Fetch, + cluster_name, + broker, + content + ) + + topic_resp = Enum.find(content.responses, &(&1.topic == topic)) + partition_resp = Enum.find(topic_resp.partitions, &(&1.partition_index == partition)) + partition_resp.records + end end diff --git a/test/producer/dispatcher_test.exs b/test/producer/dispatcher_test.exs index 4015ba9..77284ea 100644 --- a/test/producer/dispatcher_test.exs +++ b/test/producer/dispatcher_test.exs @@ -22,7 +22,6 @@ defmodule Klife.Producer.DispatcherTest do enable_idempotence: true, linger_ms: 10_000, max_in_flight_requests: 2, - max_retries: :infinity, producer_name: :my_batch_producer, request_timeout_ms: 15000, retry_backoff_ms: 1000 @@ -35,7 +34,9 @@ defmodule Klife.Producer.DispatcherTest do last_batch_sent_at: System.monotonic_time(:millisecond), in_flight_pool: [nil, nil], next_send_msg_ref: nil, - batch_queue: :queue.new() + batch_queue: :queue.new(), + base_sequences: %{}, + producer_id: 123 } assert {:reply, {:ok, 60000}, new_state} = diff --git a/test/producer/producer_test.exs b/test/producer/producer_test.exs index ac9d96a..16c2488 100644 --- a/test/producer/producer_test.exs +++ b/test/producer/producer_test.exs @@ -12,79 +12,153 @@ defmodule Klife.ProducerTest do end) end + defp wait_batch_cycle(cluster, topic) do + rec = %{ + value: "wait_cycle", + key: "wait_cycle", + headers: [] + } + + {:ok, _} = Producer.produce_sync(rec, topic, 1, cluster) + end + test "produce message sync no batch" do record = %{ - value: :rand.bytes(1_000), - key: :rand.bytes(1_000), - headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}] + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] } + cluster = :my_test_cluster_1 topic = "my_no_batch_topic" - assert {:ok, offset} = Producer.produce_sync(record, topic, 1, :my_test_cluster_1) + assert {:ok, offset} = Producer.produce_sync(record, topic, 1, cluster) - assert_offset(record, :my_test_cluster_1, topic, 1, offset) - record_batch = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset) + assert_offset(record, cluster, topic, 1, offset) + record_batch = Utils.get_record_batch_by_offset(cluster, topic, 1, offset) assert length(record_batch) == 1 end test "produce message sync using not default producer" do record = %{ - value: :rand.bytes(1_000), - key: :rand.bytes(1_000), - headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}] + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] } + cluster = :my_test_cluster_1 topic = "my_no_batch_topic" assert {:ok, offset} = - Producer.produce_sync(record, topic, 1, :my_test_cluster_1, - producer: :benchmark_producer - ) + Producer.produce_sync(record, topic, 1, cluster, producer: :benchmark_producer) - assert_offset(record, :my_test_cluster_1, topic, 1, offset) - record_batch = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset) + assert_offset(record, cluster, topic, 1, offset) + record_batch = Utils.get_record_batch_by_offset(cluster, topic, 1, offset) assert length(record_batch) == 1 end test "produce message sync with batch" do + cluster = :my_test_cluster_1 topic = "my_batch_topic" + wait_batch_cycle(cluster, topic) + + rec_1 = %{ + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] + } + + task_1 = + Task.async(fn -> + Producer.produce_sync(rec_1, topic, 1, cluster) + end) + + rec_2 = %{ + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] + } + + Process.sleep(5) + + task_2 = + Task.async(fn -> + Producer.produce_sync(rec_2, topic, 1, cluster) + end) + + rec_3 = %{ + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] + } + + Process.sleep(5) + + task_3 = + Task.async(fn -> + Producer.produce_sync(rec_3, topic, 1, cluster) + end) + + assert [{:ok, offset_1}, {:ok, offset_2}, {:ok, offset_3}] = + Task.await_many([task_1, task_2, task_3], 2_000) + + assert offset_2 - offset_1 == 1 + assert offset_3 - offset_2 == 1 + + assert_offset(rec_1, cluster, topic, 1, offset_1) + assert_offset(rec_2, cluster, topic, 1, offset_2) + assert_offset(rec_3, cluster, topic, 1, offset_3) + + batch_1 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_1) + batch_2 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_2) + batch_3 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_3) + + assert length(batch_1) == 3 + assert batch_1 == batch_2 and batch_2 == batch_3 + end + + test "produce message sync with batch and compression" do + cluster = :my_test_cluster_1 + topic = "comression_topic" + + wait_batch_cycle(cluster, topic) + rec_1 = %{ - value: :rand.bytes(1_000), - key: :rand.bytes(1_000), - headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}] + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] } task_1 = Task.async(fn -> - Producer.produce_sync(rec_1, topic, 1, :my_test_cluster_1) + Producer.produce_sync(rec_1, topic, 1, cluster) end) rec_2 = %{ - value: :rand.bytes(1_000), - key: :rand.bytes(1_000), - headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}] + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] } Process.sleep(5) task_2 = Task.async(fn -> - Producer.produce_sync(rec_2, topic, 1, :my_test_cluster_1) + Producer.produce_sync(rec_2, topic, 1, cluster) end) rec_3 = %{ - value: :rand.bytes(1_000), - key: :rand.bytes(1_000), - headers: [%{key: :rand.bytes(1_000), value: :rand.bytes(1_000)}] + value: :rand.bytes(10), + key: :rand.bytes(10), + headers: [%{key: :rand.bytes(10), value: :rand.bytes(10)}] } Process.sleep(5) task_3 = Task.async(fn -> - Producer.produce_sync(rec_3, topic, 1, :my_test_cluster_1) + Producer.produce_sync(rec_3, topic, 1, cluster) end) assert [{:ok, offset_1}, {:ok, offset_2}, {:ok, offset_3}] = @@ -93,15 +167,20 @@ defmodule Klife.ProducerTest do assert offset_2 - offset_1 == 1 assert offset_3 - offset_2 == 1 - assert_offset(rec_1, :my_test_cluster_1, topic, 1, offset_1) - assert_offset(rec_2, :my_test_cluster_1, topic, 1, offset_2) - assert_offset(rec_3, :my_test_cluster_1, topic, 1, offset_3) + assert_offset(rec_1, cluster, topic, 1, offset_1) + assert_offset(rec_2, cluster, topic, 1, offset_2) + assert_offset(rec_3, cluster, topic, 1, offset_3) - batch_1 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_1) - batch_2 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_2) - batch_3 = Utils.get_record_batch_by_offset(:my_test_cluster_1, topic, 1, offset_3) + batch_1 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_1) + batch_2 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_2) + batch_3 = Utils.get_record_batch_by_offset(cluster, topic, 1, offset_3) assert length(batch_1) == 3 assert batch_1 == batch_2 and batch_2 == batch_3 + + assert [%{attributes: attr}] = + Utils.get_partition_resp_records_by_offset(cluster, topic, 1, offset_1) + + assert :snappy = KlifeProtocol.RecordBatch.decode_attributes(attr).compression end end