diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6cdc8ef --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,40 @@ +name: CI +on: + push: + branches: + - master + + pull_request: + branches: + - master + +jobs: + test: + name: Run tests + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Setup Elixir and Erlang versions + uses: erlef/setup-beam@v1 + id: setup-elixir + with: + version-type: strict + version-file: .tool-versions + + - name: Restore the cache + uses: actions/cache@v3 + with: + path: | + deps + _build + dialyzer + key: | + ${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash- + + - name: Run CI + run: | + mix ci diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml new file mode 100644 index 0000000..5b0aa53 --- /dev/null +++ b/.github/workflows/release.yml @@ -0,0 +1,42 @@ +name: Release + +on: + release: + types: [published] + +jobs: + release: + name: Release package + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Setup Elixir and Erlang versions + uses: erlef/setup-beam@v1 + id: setup-elixir + with: + version-type: strict + version-file: .tool-versions + + - name: Restore the cache + uses: actions/cache@v3 + with: + path: | + deps + _build + dialyzer + key: | + ${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + restore-keys: | + ${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash- + + - name: Setup project + run: | + mix setup + + - name: Publish package + run: | + mix hex.publish --organization movableink --replace --yes + env: + HEX_API_KEY: ${{ secrets.HEX_API_KEY }} diff --git a/.gitignore b/.gitignore index 51c34f8..730cef4 100644 --- a/.gitignore +++ b/.gitignore @@ -25,4 +25,7 @@ data_buffer-*.tar # Ignore Elixir Language Server files /.elixir_ls -/bench \ No newline at end of file +# Dialyzer generated PLT files +/dialyzer + +/bench diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..15e28fa --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +erlang 26.2.1 +elixir 1.16.0-otp-26 diff --git a/lib/data_buffer.ex b/lib/data_buffer.ex index 226a032..7ea35b9 100644 --- a/lib/data_buffer.ex +++ b/lib/data_buffer.ex @@ -35,6 +35,18 @@ defmodule DataBuffer do end) end + @doc """ + Inserts all of the provided `data` into the provided `buffer` at once. + """ + @spec insert_batch(buffer :: DataBuffer.t(), data :: Enumerable.t(), timeout()) :: :ok + def insert_batch(buffer, data, timeout \\ 5_000) do + Telemetry.span(:insert, %{buffer: buffer}, fn -> + partition = PartitionPool.next(buffer) + result = Partition.insert_batch(partition, data, timeout) + {result, %{buffer: buffer, partition: partition}} + end) + end + @doc """ Performs a flush operation on the provided `buffer`. """ diff --git a/lib/data_buffer/flusher.ex b/lib/data_buffer/flusher.ex index 903c79a..683e938 100644 --- a/lib/data_buffer/flusher.ex +++ b/lib/data_buffer/flusher.ex @@ -27,9 +27,9 @@ defmodule DataBuffer.Flusher do @spec flush(DataBuffer.Partition.table(), atom(), keyword()) :: {:ok, any()} | {:error, any()} def flush(table, buffer, opts \\ []) do - meta = Keyword.get(opts, :meta) + opts = Keyword.take(opts, [:meta, :size]) data = handle_data(table) - buffer.handle_flush(data, meta) + buffer.handle_flush(data, opts) end ################################ @@ -47,6 +47,7 @@ defmodule DataBuffer.Flusher do {buffer, opts} = state ) do Telemetry.span(:flush, %{buffer: buffer, partition: partition, size: size}, fn -> + opts = Keyword.put(opts, :size, size) flush(table, buffer, opts) {:ok, %{buffer: buffer, partition: partition}} end) diff --git a/lib/data_buffer/partition.ex b/lib/data_buffer/partition.ex index 32a201f..e75d55e 100644 --- a/lib/data_buffer/partition.ex +++ b/lib/data_buffer/partition.ex @@ -85,6 +85,11 @@ defmodule DataBuffer.Partition do GenServer.call(partition, {:insert, data}, timeout) end + @spec insert_batch(partition(), Enumerable.t(), timeout()) :: :ok + def insert_batch(partition, data, timeout \\ 5_000) do + GenServer.call(partition, {:insert_batch, data}, timeout) + end + @spec size(partition(), timeout()) :: integer() def size(partition, timeout \\ 5_000) do GenServer.call(partition, :size, timeout) @@ -112,6 +117,11 @@ defmodule DataBuffer.Partition do {:reply, :ok, state} end + def handle_call({:insert_batch, data}, _from, %State{} = state) do + state = do_insert_batch(state, data) + {:reply, :ok, state} + end + def handle_call(:flush, _from, %State{} = state) do state = do_flush(state) {:reply, :ok, state} @@ -236,6 +246,30 @@ defmodule DataBuffer.Partition do %{state | size: size} end + defp do_insert_batch(%State{flusher: flusher, size: size, flush_size: flush_size} = state, data) + when is_pid(flusher) and is_full(size, flush_size) do + state + |> do_await_flush() + |> do_insert_batch(data) + end + + defp do_insert_batch(%State{size: size, flush_size: flush_size} = state, data) + when is_full(size, flush_size) do + state + |> do_flush() + |> do_insert_batch(data) + end + + defp do_insert_batch(%State{size: size, table: table} = state, data) do + {rows, size} = + Enum.map_reduce(data, size, fn data, size -> + {{size + 1, data}, size + 1} + end) + + :ets.insert(table, rows) + %{state | size: size} + end + defp do_scheduled_flush(state, flush_schedule_ref) do if state.flush_schedule_ref == flush_schedule_ref do do_flush(state) @@ -266,7 +300,8 @@ defmodule DataBuffer.Partition do end defp do_sync_flush(state) do - data = Flusher.flush(state.table, state.buffer, state.flush_opts) + opts = Keyword.put(state.flush_opts, :size, state.size) + data = Flusher.flush(state.table, state.buffer, opts) {data, do_prepare_flush(state)} end diff --git a/lib/data_buffer/telemetry.ex b/lib/data_buffer/telemetry.ex index 97f2b53..4a3d4ac 100644 --- a/lib/data_buffer/telemetry.ex +++ b/lib/data_buffer/telemetry.ex @@ -61,7 +61,7 @@ defmodule DataBuffer.Telemetry do """ @doc false - @spec span(atom(), map(), (() -> {any, map})) :: any() + @spec span(atom(), map(), (-> {any, map})) :: any() def span(name, meta, fun) do :telemetry.span([:data_buffer, name], meta, fun) end diff --git a/mix.exs b/mix.exs index c77a364..5b3c423 100644 --- a/mix.exs +++ b/mix.exs @@ -7,9 +7,12 @@ defmodule DataBuffer.MixProject do [ app: :data_buffer, version: @version, - elixir: "~> 1.9", + elixir: "~> 1.16", elixirc_paths: elixirc_paths(Mix.env()), + dialyzer: dialyzer(), start_permanent: Mix.env() == :prod, + aliases: aliases(), + preferred_cli_env: preferred_cli_env(), deps: deps(), description: description(), package: package(), @@ -28,6 +31,13 @@ defmodule DataBuffer.MixProject do defp elixirc_paths(:test), do: ["lib", "test/support"] defp elixirc_paths(_), do: ["lib"] + defp dialyzer do + [ + plt_file: {:no_warn, "dialyzer/dialyzer.plt"}, + plt_add_apps: [:ex_unit, :mix] + ] + end + defp description do """ DataBuffer provides an efficient way to buffer persistable data. @@ -51,13 +61,43 @@ defmodule DataBuffer.MixProject do ] end + # Aliases are shortcuts or tasks specific to the current project. + defp aliases do + [ + setup: [ + "local.hex --if-missing --force", + "local.rebar --if-missing --force", + "deps.get" + ], + ci: [ + "setup", + "compile --warnings-as-errors", + "format --check-formatted", + "credo --strict", + "test", + "dialyzer --format github", + "sobelow --config" + ] + ] + end + + # Specifies the preferred env for mix commands. + defp preferred_cli_env do + [ + ci: :test + ] + end + # Run "mix help deps" to learn about dependencies. defp deps do [ {:keyword_validator, "~> 2.0"}, {:telemetry, "~> 0.4"}, {:benchee, "~> 1.0", only: :dev}, - {:ex_doc, "~> 0.22", only: :dev, runtime: false} + {:ex_doc, "~> 0.22", only: :dev, runtime: false}, + {:credo, "~> 1.7.0", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.4.1", only: [:dev, :test], runtime: false}, + {:sobelow, "~> 0.13.0", only: [:dev, :test], runtime: false} ] end end diff --git a/mix.lock b/mix.lock index d9d5b83..b9bc809 100644 --- a/mix.lock +++ b/mix.lock @@ -1,11 +1,18 @@ %{ "benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"}, + "bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"}, + "credo": {:hex, :credo, "1.7.5", "643213503b1c766ec0496d828c90c424471ea54da77c8a168c725686377b9545", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f799e9b5cd1891577d8c773d245668aa74a2fcd15eb277f51a0131690ebfb3fd"}, "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"}, "earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"}, + "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, "ex_doc": {:hex, :ex_doc, "0.22.2", "03a2a58bdd2ba0d83d004507c4ee113b9c521956938298eba16e55cc4aba4a6c", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "cf60e1b3e2efe317095b6bb79651f83a2c1b3edcb4d319c421d7fcda8b3aff26"}, + "file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, "keyword_validator": {:hex, :keyword_validator, "2.0.1", "92ab90dc93ea9e049530eb0a79c8f074833942dfb93d25e6a946b71b70086b49", [:mix], [], "hexpm", "09715a32d458c6318d39c0a484e958ce20fff64d646188b5801c334179be9fc2"}, "makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"}, "makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"}, "nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"}, + "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, "telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"}, } diff --git a/test/data_buffer_test.exs b/test/data_buffer_test.exs index 17de398..23dd6dd 100644 --- a/test/data_buffer_test.exs +++ b/test/data_buffer_test.exs @@ -31,8 +31,8 @@ defmodule DataBufferTest do DataBuffer.insert(TestBuffer, "foo") DataBuffer.insert(TestBuffer, "foo") DataBuffer.insert(TestBuffer, "foo") - assert_receive {:data, ["foo"]} - assert_receive {:data, ["foo"]} + assert_receive {:data, ["foo"], _} + assert_receive {:data, ["foo"], _} end test "will flush after hitting max_size + max_size_jitter" do @@ -42,8 +42,8 @@ defmodule DataBufferTest do DataBuffer.insert(TestBuffer, "foo") DataBuffer.insert(TestBuffer, "foo") DataBuffer.insert(TestBuffer, "foo") - assert_receive {:data, ["foo"]} - assert_receive {:data, ["foo"]} + assert_receive {:data, ["foo"], _} + assert_receive {:data, ["foo"], _} end test "will insert with equal partition rotation" do @@ -61,6 +61,16 @@ defmodule DataBufferTest do end end + describe "insert_batch/2" do + test "will insert a batch of data into the buffer" do + start_buffer() + + assert [] = DataBuffer.dump(TestBuffer) + DataBuffer.insert(TestBuffer, ["foo", "bar", "baz"]) + assert [{_, ["foo", "bar", "baz"]}] = DataBuffer.dump(TestBuffer) + end + end + describe "flush/2" do test "flushes data from the buffer" do start_buffer() @@ -68,7 +78,7 @@ defmodule DataBufferTest do DataBuffer.insert(TestBuffer, "foo") DataBuffer.flush(TestBuffer) - assert_receive {:data, ["foo"]} + assert_receive {:data, ["foo"], _} end test "flushes duplicate data from the buffer" do @@ -93,12 +103,22 @@ defmodule DataBufferTest do DataBuffer.flush(TestBuffer) - assert_receive {:data, data} + assert_receive {:data, data, _} for x <- 0..500 do assert Enum.at(data, x) == x end end + + test "buffer size is included in flush opts" do + start_buffer() + + data = ["foo", "bar", "baz"] + DataBuffer.insert_batch(TestBuffer, data) + DataBuffer.flush(TestBuffer) + + assert_receive {:data, ^data, 3} + end end describe "sync_flush/2" do @@ -107,7 +127,7 @@ defmodule DataBufferTest do DataBuffer.insert(TestBuffer, "foo") - assert [{:data, ["foo"]}, {:data, []}] = DataBuffer.sync_flush(TestBuffer) + assert [{:data, ["foo"], _}, {:data, [], _}] = DataBuffer.sync_flush(TestBuffer) end end @@ -129,7 +149,7 @@ defmodule DataBufferTest do test "flushes data after flush_interval and flush_jitter" do start_buffer(flush_interval: 50, flush_jitter: 1) DataBuffer.insert(TestBuffer, "foo") - assert_receive {:data, ["foo"]}, 150 + assert_receive {:data, ["foo"], _}, 150 end test "will not perform a scheduled flush from a different schedule reference" do @@ -186,7 +206,7 @@ defmodule DataBufferTest do defp receive_all(partitions \\ partitions()) do for _ <- 1..partitions, reduce: [] do data -> - assert_receive {:data, new_data} + assert_receive {:data, new_data, _} data ++ new_data end end diff --git a/test/support/buffers.ex b/test/support/buffers.ex index a0e601c..6eef516 100644 --- a/test/support/buffers.ex +++ b/test/support/buffers.ex @@ -1,4 +1,6 @@ defmodule TestBuffer do + @moduledoc false + use DataBuffer def start_link(opts) do @@ -6,17 +8,22 @@ defmodule TestBuffer do end @impl DataBuffer - def handle_flush(data_stream, meta) do + def handle_flush(data_stream, opts) do + meta = Keyword.get(opts, :meta) + size = Keyword.get(opts, :size) + if Map.has_key?(meta, :sleep) do :timer.sleep(meta.sleep) end data = Enum.into(data_stream, []) - send(meta.pid, {:data, data}) + send(meta.pid, {:data, data, size}) end end defmodule TestErrorBuffer do + @moduledoc false + use DataBuffer def start_link(opts) do @@ -24,11 +31,14 @@ defmodule TestErrorBuffer do end @impl DataBuffer - def handle_flush(_data_stream, %{kind: :error}) do - raise RuntimeError, "boom" - end - - def handle_flush(_data_stream, %{kind: :exit}) do - exit("boom") + def handle_flush(_data_stream, opts) do + opts + |> Keyword.get(:meta, %{}) + |> Map.get(:kind) + |> case do + :error -> raise RuntimeError, "boom" + :exit -> exit("boom") + _ -> :ok + end end end diff --git a/test/support/helpers.ex b/test/support/helpers.ex index 51ddf07..bac3b86 100644 --- a/test/support/helpers.ex +++ b/test/support/helpers.ex @@ -1,4 +1,8 @@ defmodule DataBuffer.Helpers do + @moduledoc """ + Helper functions for `DataBuffer` unit tests. + """ + import ExUnit.Callbacks, only: [start_supervised: 1] @partitions 2