Skip to content

Commit

Permalink
Add support for bulk inserts and include size in flush opts
Browse files Browse the repository at this point in the history
  • Loading branch information
gdwoolbert3 committed Mar 4, 2024
1 parent 851522c commit bad9c29
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 21 deletions.
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 26.2.1
elixir 1.16.0-otp-26
12 changes: 12 additions & 0 deletions lib/data_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule DataBuffer do
end)
end

@doc """
Inserts all of the provided `data` into the provided `buffer` at once.
"""
@spec insert_batch(buffer :: DataBuffer.t(), data :: Enumerable.t(), timeout()) :: :ok
def insert_batch(buffer, data, timeout \\ 5_000) do
Telemetry.span(:insert, %{buffer: buffer}, fn ->
partition = PartitionPool.next(buffer)
result = Partition.insert_batch(partition, data, timeout)
{result, %{buffer: buffer, partition: partition}}
end)
end

@doc """
Performs a flush operation on the provided `buffer`.
"""
Expand Down
5 changes: 3 additions & 2 deletions lib/data_buffer/flusher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ defmodule DataBuffer.Flusher do

@spec flush(DataBuffer.Partition.table(), atom(), keyword()) :: {:ok, any()} | {:error, any()}
def flush(table, buffer, opts \\ []) do
meta = Keyword.get(opts, :meta)
opts = Keyword.take(opts, [:meta, :size])
data = handle_data(table)
buffer.handle_flush(data, meta)
buffer.handle_flush(data, opts)
end

################################
Expand All @@ -47,6 +47,7 @@ defmodule DataBuffer.Flusher do
{buffer, opts} = state
) do
Telemetry.span(:flush, %{buffer: buffer, partition: partition, size: size}, fn ->
opts = Keyword.put(opts, :size, size)
flush(table, buffer, opts)
{:ok, %{buffer: buffer, partition: partition}}
end)
Expand Down
37 changes: 36 additions & 1 deletion lib/data_buffer/partition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ defmodule DataBuffer.Partition do
GenServer.call(partition, {:insert, data}, timeout)
end

@spec insert_batch(partition(), Enumerable.t(), timeout()) :: :ok
def insert_batch(partition, data, timeout \\ 5_000) do
GenServer.call(partition, {:insert_batch, data}, timeout)
end

@spec size(partition(), timeout()) :: integer()
def size(partition, timeout \\ 5_000) do
GenServer.call(partition, :size, timeout)
Expand Down Expand Up @@ -112,6 +117,11 @@ defmodule DataBuffer.Partition do
{:reply, :ok, state}
end

def handle_call({:insert_batch, data}, _from, %State{} = state) do
state = do_insert_batch(state, data)
{:reply, :ok, state}
end

def handle_call(:flush, _from, %State{} = state) do
state = do_flush(state)
{:reply, :ok, state}
Expand Down Expand Up @@ -236,6 +246,30 @@ defmodule DataBuffer.Partition do
%{state | size: size}
end

defp do_insert_batch(%State{flusher: flusher, size: size, flush_size: flush_size} = state, data)
when is_pid(flusher) and is_full(size, flush_size) do
state
|> do_await_flush()
|> do_insert_batch(data)
end

defp do_insert_batch(%State{size: size, flush_size: flush_size} = state, data)
when is_full(size, flush_size) do
state
|> do_flush()
|> do_insert_batch(data)
end

defp do_insert_batch(%State{size: size, table: table} = state, data) do
{rows, size} =
Enum.map_reduce(data, size, fn data, size ->
{{size + 1, data}, size + 1}
end)

:ets.insert(table, rows)
%{state | size: size}
end

defp do_scheduled_flush(state, flush_schedule_ref) do
if state.flush_schedule_ref == flush_schedule_ref do
do_flush(state)
Expand Down Expand Up @@ -266,7 +300,8 @@ defmodule DataBuffer.Partition do
end

defp do_sync_flush(state) do
data = Flusher.flush(state.table, state.buffer, state.flush_opts)
opts = Keyword.put(state.flush_opts, :size, state.size)
data = Flusher.flush(state.table, state.buffer, opts)
{data, do_prepare_flush(state)}
end

Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ defmodule DataBuffer.MixProject do
[
app: :data_buffer,
version: @version,
elixir: "~> 1.9",
elixir: "~> 1.16",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
deps: deps(),
Expand Down
38 changes: 29 additions & 9 deletions test/data_buffer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ defmodule DataBufferTest do
DataBuffer.insert(TestBuffer, "foo")
DataBuffer.insert(TestBuffer, "foo")
DataBuffer.insert(TestBuffer, "foo")
assert_receive {:data, ["foo"]}
assert_receive {:data, ["foo"]}
assert_receive {:data, ["foo"], _}
assert_receive {:data, ["foo"], _}
end

test "will flush after hitting max_size + max_size_jitter" do
Expand All @@ -42,8 +42,8 @@ defmodule DataBufferTest do
DataBuffer.insert(TestBuffer, "foo")
DataBuffer.insert(TestBuffer, "foo")
DataBuffer.insert(TestBuffer, "foo")
assert_receive {:data, ["foo"]}
assert_receive {:data, ["foo"]}
assert_receive {:data, ["foo"], _}
assert_receive {:data, ["foo"], _}
end

test "will insert with equal partition rotation" do
Expand All @@ -61,14 +61,24 @@ defmodule DataBufferTest do
end
end

describe "insert_batch/2" do
test "will insert a batch of data into the buffer" do
start_buffer()

assert [] = DataBuffer.dump(TestBuffer)
DataBuffer.insert(TestBuffer, ["foo", "bar", "baz"])
assert [{_, ["foo", "bar", "baz"]}] = DataBuffer.dump(TestBuffer)
end
end

describe "flush/2" do
test "flushes data from the buffer" do
start_buffer()

DataBuffer.insert(TestBuffer, "foo")
DataBuffer.flush(TestBuffer)

assert_receive {:data, ["foo"]}
assert_receive {:data, ["foo"], _}
end

test "flushes duplicate data from the buffer" do
Expand All @@ -93,12 +103,22 @@ defmodule DataBufferTest do

DataBuffer.flush(TestBuffer)

assert_receive {:data, data}
assert_receive {:data, data, _}

for x <- 0..500 do
assert Enum.at(data, x) == x
end
end

test "buffer size is included in flush opts" do
start_buffer()

data = ["foo", "bar", "baz"]
DataBuffer.insert_batch(TestBuffer, data)
DataBuffer.flush(TestBuffer)

assert_receive {:data, ^data, 3}
end
end

describe "sync_flush/2" do
Expand All @@ -107,7 +127,7 @@ defmodule DataBufferTest do

DataBuffer.insert(TestBuffer, "foo")

assert [{:data, ["foo"]}, {:data, []}] = DataBuffer.sync_flush(TestBuffer)
assert [{:data, ["foo"], _}, {:data, [], _}] = DataBuffer.sync_flush(TestBuffer)
end
end

Expand All @@ -129,7 +149,7 @@ defmodule DataBufferTest do
test "flushes data after flush_interval and flush_jitter" do
start_buffer(flush_interval: 50, flush_jitter: 1)
DataBuffer.insert(TestBuffer, "foo")
assert_receive {:data, ["foo"]}, 150
assert_receive {:data, ["foo"], _}, 150
end

test "will not perform a scheduled flush from a different schedule reference" do
Expand Down Expand Up @@ -186,7 +206,7 @@ defmodule DataBufferTest do
defp receive_all(partitions \\ partitions()) do
for _ <- 1..partitions, reduce: [] do
data ->
assert_receive {:data, new_data}
assert_receive {:data, new_data, _}
data ++ new_data
end
end
Expand Down
22 changes: 14 additions & 8 deletions test/support/buffers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ defmodule TestBuffer do
end

@impl DataBuffer
def handle_flush(data_stream, meta) do
def handle_flush(data_stream, opts) do
meta = Keyword.get(opts, :meta)
size = Keyword.get(opts, :size)

if Map.has_key?(meta, :sleep) do
:timer.sleep(meta.sleep)
end

data = Enum.into(data_stream, [])
send(meta.pid, {:data, data})
send(meta.pid, {:data, data, size})
end
end

Expand All @@ -24,11 +27,14 @@ defmodule TestErrorBuffer do
end

@impl DataBuffer
def handle_flush(_data_stream, %{kind: :error}) do
raise RuntimeError, "boom"
end

def handle_flush(_data_stream, %{kind: :exit}) do
exit("boom")
def handle_flush(_data_stream, opts) do
opts
|> Keyword.get(:meta, %{})
|> Map.get(:kind)
|> case do
:error -> raise RuntimeError, "boom"
:exit -> exit("boom")
_ -> :ok
end
end
end

0 comments on commit bad9c29

Please sign in to comment.