Skip to content

Commit

Permalink
Add support for bulk inserts and include size in flush opts (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
gdwoolbert3 authored Mar 6, 2024
1 parent 851522c commit 76141a8
Show file tree
Hide file tree
Showing 13 changed files with 240 additions and 24 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: CI
on:
push:
branches:
- master

pull_request:
branches:
- master

jobs:
test:
name: Run tests
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v3

- name: Setup Elixir and Erlang versions
uses: erlef/setup-beam@v1
id: setup-elixir
with:
version-type: strict
version-file: .tool-versions

- name: Restore the cache
uses: actions/cache@v3
with:
path: |
deps
_build
dialyzer
key: |
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
restore-keys: |
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-
- name: Run CI
run: |
mix ci
42 changes: 42 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
name: Release

on:
release:
types: [published]

jobs:
release:
name: Release package
runs-on: ubuntu-latest
steps:
- name: Checkout
uses: actions/checkout@v4

- name: Setup Elixir and Erlang versions
uses: erlef/setup-beam@v1
id: setup-elixir
with:
version-type: strict
version-file: .tool-versions

- name: Restore the cache
uses: actions/cache@v3
with:
path: |
deps
_build
dialyzer
key: |
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }}
restore-keys: |
${{ runner.os }}-${{ steps.setup-elixir.outputs.elixir-version }}-${{ steps.setup-elixir.outputs.otp-version }}-mixlockhash-
- name: Setup project
run: |
mix setup
- name: Publish package
run: |
mix hex.publish --organization movableink --replace --yes
env:
HEX_API_KEY: ${{ secrets.HEX_API_KEY }}
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ data_buffer-*.tar
# Ignore Elixir Language Server files
/.elixir_ls

/bench
# Dialyzer generated PLT files
/dialyzer

/bench
2 changes: 2 additions & 0 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
erlang 26.2.1
elixir 1.16.0-otp-26
12 changes: 12 additions & 0 deletions lib/data_buffer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule DataBuffer do
end)
end

@doc """
Inserts all of the provided `data` into the provided `buffer` at once.
"""
@spec insert_batch(buffer :: DataBuffer.t(), data :: Enumerable.t(), timeout()) :: :ok
def insert_batch(buffer, data, timeout \\ 5_000) do
Telemetry.span(:insert, %{buffer: buffer}, fn ->
partition = PartitionPool.next(buffer)
result = Partition.insert_batch(partition, data, timeout)
{result, %{buffer: buffer, partition: partition}}
end)
end

@doc """
Performs a flush operation on the provided `buffer`.
"""
Expand Down
5 changes: 3 additions & 2 deletions lib/data_buffer/flusher.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ defmodule DataBuffer.Flusher do

@spec flush(DataBuffer.Partition.table(), atom(), keyword()) :: {:ok, any()} | {:error, any()}
def flush(table, buffer, opts \\ []) do
meta = Keyword.get(opts, :meta)
opts = Keyword.take(opts, [:meta, :size])
data = handle_data(table)
buffer.handle_flush(data, meta)
buffer.handle_flush(data, opts)
end

################################
Expand All @@ -47,6 +47,7 @@ defmodule DataBuffer.Flusher do
{buffer, opts} = state
) do
Telemetry.span(:flush, %{buffer: buffer, partition: partition, size: size}, fn ->
opts = Keyword.put(opts, :size, size)
flush(table, buffer, opts)
{:ok, %{buffer: buffer, partition: partition}}
end)
Expand Down
37 changes: 36 additions & 1 deletion lib/data_buffer/partition.ex
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ defmodule DataBuffer.Partition do
GenServer.call(partition, {:insert, data}, timeout)
end

@spec insert_batch(partition(), Enumerable.t(), timeout()) :: :ok
def insert_batch(partition, data, timeout \\ 5_000) do
GenServer.call(partition, {:insert_batch, data}, timeout)
end

@spec size(partition(), timeout()) :: integer()
def size(partition, timeout \\ 5_000) do
GenServer.call(partition, :size, timeout)
Expand Down Expand Up @@ -112,6 +117,11 @@ defmodule DataBuffer.Partition do
{:reply, :ok, state}
end

def handle_call({:insert_batch, data}, _from, %State{} = state) do
state = do_insert_batch(state, data)
{:reply, :ok, state}
end

def handle_call(:flush, _from, %State{} = state) do
state = do_flush(state)
{:reply, :ok, state}
Expand Down Expand Up @@ -236,6 +246,30 @@ defmodule DataBuffer.Partition do
%{state | size: size}
end

defp do_insert_batch(%State{flusher: flusher, size: size, flush_size: flush_size} = state, data)
when is_pid(flusher) and is_full(size, flush_size) do
state
|> do_await_flush()
|> do_insert_batch(data)
end

defp do_insert_batch(%State{size: size, flush_size: flush_size} = state, data)
when is_full(size, flush_size) do
state
|> do_flush()
|> do_insert_batch(data)
end

defp do_insert_batch(%State{size: size, table: table} = state, data) do
{rows, size} =
Enum.map_reduce(data, size, fn data, size ->
{{size + 1, data}, size + 1}
end)

:ets.insert(table, rows)
%{state | size: size}
end

defp do_scheduled_flush(state, flush_schedule_ref) do
if state.flush_schedule_ref == flush_schedule_ref do
do_flush(state)
Expand Down Expand Up @@ -266,7 +300,8 @@ defmodule DataBuffer.Partition do
end

defp do_sync_flush(state) do
data = Flusher.flush(state.table, state.buffer, state.flush_opts)
opts = Keyword.put(state.flush_opts, :size, state.size)
data = Flusher.flush(state.table, state.buffer, opts)
{data, do_prepare_flush(state)}
end

Expand Down
2 changes: 1 addition & 1 deletion lib/data_buffer/telemetry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ defmodule DataBuffer.Telemetry do
"""

@doc false
@spec span(atom(), map(), (() -> {any, map})) :: any()
@spec span(atom(), map(), (-> {any, map})) :: any()
def span(name, meta, fun) do
:telemetry.span([:data_buffer, name], meta, fun)
end
Expand Down
44 changes: 42 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ defmodule DataBuffer.MixProject do
[
app: :data_buffer,
version: @version,
elixir: "~> 1.9",
elixir: "~> 1.16",
elixirc_paths: elixirc_paths(Mix.env()),
dialyzer: dialyzer(),
start_permanent: Mix.env() == :prod,
aliases: aliases(),
preferred_cli_env: preferred_cli_env(),
deps: deps(),
description: description(),
package: package(),
Expand All @@ -28,6 +31,13 @@ defmodule DataBuffer.MixProject do
defp elixirc_paths(:test), do: ["lib", "test/support"]
defp elixirc_paths(_), do: ["lib"]

defp dialyzer do
[
plt_file: {:no_warn, "dialyzer/dialyzer.plt"},
plt_add_apps: [:ex_unit, :mix]
]
end

defp description do
"""
DataBuffer provides an efficient way to buffer persistable data.
Expand All @@ -51,13 +61,43 @@ defmodule DataBuffer.MixProject do
]
end

# Aliases are shortcuts or tasks specific to the current project.
defp aliases do
[
setup: [
"local.hex --if-missing --force",
"local.rebar --if-missing --force",
"deps.get"
],
ci: [
"setup",
"compile --warnings-as-errors",
"format --check-formatted",
"credo --strict",
"test",
"dialyzer --format github",
"sobelow --config"
]
]
end

# Specifies the preferred env for mix commands.
defp preferred_cli_env do
[
ci: :test
]
end

# Run "mix help deps" to learn about dependencies.
defp deps do
[
{:keyword_validator, "~> 2.0"},
{:telemetry, "~> 0.4"},
{:benchee, "~> 1.0", only: :dev},
{:ex_doc, "~> 0.22", only: :dev, runtime: false}
{:ex_doc, "~> 0.22", only: :dev, runtime: false},
{:credo, "~> 1.7.0", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4.1", only: [:dev, :test], runtime: false},
{:sobelow, "~> 0.13.0", only: [:dev, :test], runtime: false}
]
end
end
7 changes: 7 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
%{
"benchee": {:hex, :benchee, "1.0.1", "66b211f9bfd84bd97e6d1beaddf8fc2312aaabe192f776e8931cb0c16f53a521", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}], "hexpm", "3ad58ae787e9c7c94dd7ceda3b587ec2c64604563e049b2a0e8baafae832addb"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"credo": {:hex, :credo, "1.7.5", "643213503b1c766ec0496d828c90c424471ea54da77c8a168c725686377b9545", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "f799e9b5cd1891577d8c773d245668aa74a2fcd15eb277f51a0131690ebfb3fd"},
"deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"},
"dialyxir": {:hex, :dialyxir, "1.4.3", "edd0124f358f0b9e95bfe53a9fcf806d615d8f838e2202a9f430d59566b6b53b", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "bf2cfb75cd5c5006bec30141b131663299c661a864ec7fbbc72dfa557487a986"},
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.22.2", "03a2a58bdd2ba0d83d004507c4ee113b9c521956938298eba16e55cc4aba4a6c", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "cf60e1b3e2efe317095b6bb79651f83a2c1b3edcb4d319c421d7fcda8b3aff26"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"keyword_validator": {:hex, :keyword_validator, "2.0.1", "92ab90dc93ea9e049530eb0a79c8f074833942dfb93d25e6a946b71b70086b49", [:mix], [], "hexpm", "09715a32d458c6318d39c0a484e958ce20fff64d646188b5801c334179be9fc2"},
"makeup": {:hex, :makeup, "1.0.3", "e339e2f766d12e7260e6672dd4047405963c5ec99661abdc432e6ec67d29ef95", [:mix], [{:nimble_parsec, "~> 0.5", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "2e9b4996d11832947731f7608fed7ad2f9443011b3b479ae288011265cdd3dad"},
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
"nimble_parsec": {:hex, :nimble_parsec, "0.6.0", "32111b3bf39137144abd7ba1cce0914533b2d16ef35e8abc5ec8be6122944263", [:mix], [], "hexpm", "27eac315a94909d4dc68bc07a4a83e06c8379237c5ea528a9acff4ca1c873c52"},
"sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"},
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
}
Loading

0 comments on commit 76141a8

Please sign in to comment.