diff --git a/.formatter.exs b/.formatter.exs new file mode 100644 index 0000000..d2cda26 --- /dev/null +++ b/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..4d3e7d3 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,61 @@ +name: CI + +on: + push: + branches: + - main + + pull_request: + branches: + - main + +jobs: + test: + name: Run tests + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Determine the Elixir version + run: echo "ELIXIR_VERSION=$(grep -h elixir .tool-versions | awk '{ print $2 }' | awk -F - '{print $1}')" >> $GITHUB_ENV + + - name: Determine the OTP version + run: echo "OTP_VERSION=$(grep -h erlang .tool-versions | awk '{ print $2 }')" >> $GITHUB_ENV + + - name: Setup Elixir and Erlang versions + uses: erlef/setup-elixir@v1 + with: + otp-version: ${{ env.OTP_VERSION }} + elixir-version: ${{ env.ELIXIR_VERSION }} + + - name: Restore the deps cache + uses: actions/cache@v3 + id: deps-cache + with: + path: deps + key: ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-deps-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + restore-keys: | + ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-deps- + + - name: Restore the _build cache + uses: actions/cache@v3 + id: build-cache + with: + path: _build + key: ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-build-mixlockhash-${{ hashFiles(format('{0}{1}', github.workspace, '/mix.lock')) }} + restore-keys: | + ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-build- + + - name: Restore the dialyzer cache + uses: actions/cache@v3 + id: dialyzer-cache + with: + path: dialyzer + key: ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-dialyzer + restore-keys: | + ${{ runner.os }}-${{ env.ELIXIR_VERSION }}-${{ env.OTP_VERSION }}-${{ env.MIX_ENV }}-dialyzer + + - name: Run CI + run: | + mix ci diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..efc0d6f --- /dev/null +++ b/.gitignore @@ -0,0 +1,38 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +sauron-*.tar + +# Temporary files, for example, from tests. +/tmp/ + +# Ignore the env file used for dev and test +/.envrc + +# Ignore Elixir Language Server files +.elixir_ls + +# Dialyzer generated PLT files +/dialyzer + +# Local .terraform directories +**/.terraform/* diff --git a/.tool-versions b/.tool-versions new file mode 100644 index 0000000..12be720 --- /dev/null +++ b/.tool-versions @@ -0,0 +1,2 @@ +erlang 25.1.2 +elixir 1.14.2-otp-25 diff --git a/README.md b/README.md index 67e7420..91cd8a3 100644 --- a/README.md +++ b/README.md @@ -1 +1,13 @@ -#ExBuffer +# Buffer + +## Installation + +This package can be installed by adding `:buffer` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:buffer, git: "https://github.com/coherentpath/buffer.git", tag: "0.1.0"} + ] +end +``` diff --git a/lib/buffer.ex b/lib/buffer.ex new file mode 100644 index 0000000..a08c5c0 --- /dev/null +++ b/lib/buffer.ex @@ -0,0 +1,289 @@ +defmodule Buffer do + @moduledoc """ + A simple data buffer that can be added directly to a supervision tree. + """ + + alias Buffer.{Server, Stream} + + @supervisor_fields [:name, :partitioner, :partitions] + + ################################ + # Public API + ################################ + + @doc false + @spec child_spec(keyword()) :: map() + def child_spec(opts) do + %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}} + end + + @doc """ + Starts a `Buffer` process linked to the current process. + + ## Options + + * `:flush_callback` - The function invoked to flush a buffer. Must have an arity of 2 + where the first arg is a list of items and the second arg is a keyword list of flush + opts. This option is required. + + * `:name` - The name of the buffer. Must be an atom or a `:via` tuple. This option is + required. + + * `:buffer_timeout` - The maximum time (in ms) allowed between flushes of the buffer. + Defaults to `:infinity` + + * `:flush_meta` - Any term to be included in the flush opts under the `:meta` key. + + * `:jitter_rate` - The rate at which limits are jittered between partitions. Limits are not + jittered by default. + + * `:max_length` - The maximum number of items allowed in the buffer before being flushed. + By default, this limit is `:infinity`. + + * `:max_size` - The maximum size (in bytes) of the buffer before being flushed. By default, + this limit is `:infinity`. + + * `:partitioner` - The method by which items are inserted into different partitions. The + options are `:rotating` and `:random` and the former is the default. + + * `:partitions` - The number of buffer partitions. + + * `:size_callback` - The function invoked to determine item size. Must have an arity of 1 + where the only arg is an inserted item and must return a non-negative integer representing + the size of the item. Default size callback is `byte_size` (predicated by `term_to_binary` + if applicable). + """ + @spec start_link(keyword()) :: Supervisor.on_start() + def start_link(opts) do + opts = Keyword.put_new(opts, :name, __MODULE__) + + with {:ok, partitions} <- validate_partitions(opts), + {:ok, partitioner} <- validate_partitioner(opts), + {:ok, _} = result <- do_start_link(opts) do + partitioner = build_partitioner(partitions, partitioner) + name = Keyword.get(opts, :name) + put_buffer(name, partitioner, partitions) + result + end + end + + @doc """ + Lazily chunks an enumerable based on `Buffer` flush conditions. + + ## Options + + * `:max_length` - The maximum number of items in a chunk. By default, this limit is `:infinity`. + + * `:max_size` - The maximum size (in bytes) of the items in a chunk. By default, this limit is + `:infinity`. + + * `:size_callback` - The function invoked to determine the size of an item. Default size callback + is `byte_size` (predicated by `term_to_binary` if applicable). + """ + @spec chunk(Enumerable.t(), keyword()) :: {:ok, Enumerable.t()} | {:error, atom()} + defdelegate chunk(enum, opts \\ []), to: Stream + + @doc """ + Lazily chunks an enumerable based on `Buffer` flush conditions and raises an `ArgumentError` + with invalid options. + + For information on options, see `chunk/2`. + """ + @spec chunk!(Enumerable.t(), keyword()) :: Enumerable.t() + defdelegate chunk!(enum, opts \\ []), to: Stream + + @doc """ + Dumps the contents of the given `Buffer` to a list, bypassing a flush + callback and resetting the buffer. + + ## Options + + * `:partition` - The specific partition to dump. Defaults to `:all`. + """ + @spec dump(GenServer.server(), keyword()) :: {:ok, list()} | {:error, atom()} + def dump(buffer, opts \\ []) do + with {:ok, {_, parts}} <- fetch_buffer(buffer), + {:ok, part} <- validate_partition(opts, parts) do + case part do + :all -> {:ok, Enum.reduce(1..parts, [], &(&2 ++ do_dump_part(buffer, &1 - 1)))} + part -> {:ok, do_dump_part(buffer, part)} + end + end + end + + @doc """ + Flushes the given `Buffer`, regardless of whether or not the flush conditions + have been met. + + ## Options + + * `:async` - Whether or not the flush will be async. Defaults to `true`. + + * `:partition` - The specific partition to flush. Defaults to `:all`. + """ + @spec flush(GenServer.server(), keyword()) :: :ok | {:error, atom()} + def flush(buffer, opts \\ []) do + with {:ok, {_, parts}} <- fetch_buffer(buffer), + {:ok, part} <- validate_partition(opts, parts) do + case part do + :all -> Enum.each(1..parts, &do_flush_part(buffer, &1 - 1, opts)) + part -> do_flush_part(buffer, part, opts) + end + end + end + + @doc """ + Returns information about the given `Buffer`. + + ## Options + + * `:partition` - The specific partition to return info for. Defaults to `:all`. + """ + @spec info(GenServer.server(), keyword()) :: {:ok, list()} | {:error, atom()} + def info(buffer, opts \\ []) do + with {:ok, {_, parts}} <- fetch_buffer(buffer), + {:ok, part} <- validate_partition(opts, parts) do + case part do + :all -> {:ok, Enum.map(1..parts, &do_info_part(buffer, &1 - 1))} + part -> {:ok, [do_info_part(buffer, part)]} + end + end + end + + @doc """ + Inserts the given item into the given `Buffer`. + """ + @spec insert(GenServer.server(), term()) :: :ok | {:error, atom()} + def insert(buffer, item) do + with {:ok, {partitioner, _}} <- fetch_buffer(buffer) do + do_insert(buffer, partitioner, item) + end + end + + @doc """ + Inserts a batch of items into the given `Buffer`. + + ## Options + + * `:safe_flush` - Whether or not to flush immediately after exceeding a buffer limit. + Defaults to `true`. If set to `false`, all items in the batch will be inserted + regardless of flush conditions being met. Afterwards, if a limit has been exceeded, + the buffer will be flushed async. + """ + @spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: :ok | {:error, atom()} + def insert_batch(buffer, items, opts \\ []) do + with {:ok, {partitioner, _}} <- fetch_buffer(buffer) do + do_insert_batch(buffer, partitioner, items, opts) + end + end + + ################################ + # Private API + ################################ + + defguardp is_valid_part(part, parts) when part == :all or (part >= 0 and part < parts) + + defp validate_partitions(opts) do + case Keyword.get(opts, :partitions, 1) do + parts when is_integer(parts) and parts > 0 -> {:ok, parts} + _ -> {:error, :invalid_partitions} + end + end + + defp validate_partitioner(opts) do + case Keyword.get(opts, :partitioner, :rotating) do + partitioner when partitioner in [:random, :rotating] -> {:ok, partitioner} + _ -> {:error, :invalid_partitioner} + end + end + + defp validate_partition(opts, partitions) do + case Keyword.get(opts, :partition, :all) do + part when is_valid_part(part, partitions) -> {:ok, part} + _ -> {:error, :invalid_partition} + end + end + + defp do_start_link(opts) do + {sup_opts, buffer_opts} = Keyword.split(opts, @supervisor_fields) + with_args = fn [opts], part -> [Keyword.put(opts, :partition, part)] end + child_spec = {Server, buffer_opts} + + sup_opts + |> Keyword.merge(with_arguments: with_args, child_spec: child_spec) + |> PartitionSupervisor.start_link() + end + + defp build_partitioner(1, _), do: fn -> 0 end + + defp build_partitioner(partitions, :random) do + fn -> :rand.uniform(partitions) - 1 end + end + + defp build_partitioner(partitions, :rotating) do + atomics_ref = :atomics.new(1, []) + + fn -> + case :atomics.add_get(atomics_ref, 1, 1) do + part when part > partitions -> + :atomics.put(atomics_ref, 1, 0) + 0 + + part -> + part - 1 + end + end + end + + defp put_buffer(buffer, partitioner, partitions) do + buffer + |> build_key() + |> :persistent_term.put({partitioner, partitions}) + end + + defp fetch_buffer(buffer) do + buffer + |> build_key() + |> :persistent_term.get(nil) + |> case do + nil -> {:error, :not_found} + buffer -> {:ok, buffer} + end + end + + defp build_key(buffer), do: {__MODULE__, buffer} + + defp do_dump_part(buffer, partition) do + buffer + |> buffer_partition_name(partition) + |> Server.dump() + end + + defp do_flush_part(buffer, partition, opts) do + buffer + |> buffer_partition_name(partition) + |> Server.flush(opts) + end + + defp do_info_part(buffer, partition) do + buffer + |> buffer_partition_name(partition) + |> Server.info() + end + + defp do_insert(buffer, partitioner, item) do + buffer + |> buffer_partition_name(partitioner.()) + |> Server.insert(item) + end + + defp do_insert_batch(buffer, partitioner, items, opts) do + buffer + |> buffer_partition_name(partitioner.()) + |> Server.insert_batch(items, opts) + end + + defp buffer_partition_name(buffer, partition) do + {:via, PartitionSupervisor, {buffer, partition}} + end +end diff --git a/lib/buffer/server.ex b/lib/buffer/server.ex new file mode 100644 index 0000000..bf2aff8 --- /dev/null +++ b/lib/buffer/server.ex @@ -0,0 +1,215 @@ +defmodule Buffer.Server do + @moduledoc false + + use GenServer + + alias Buffer.State + + @server_fields [ + :buffer_timeout, + :flush_callback, + :flush_meta, + :jitter_rate, + :max_length, + :max_size, + :partition, + :size_callback + ] + + ################################ + # Public API + ################################ + + @doc false + @spec start_link(keyword()) :: GenServer.on_start() + def start_link(opts) do + {opts, server_opts} = Keyword.split(opts, @server_fields) + GenServer.start_link(__MODULE__, opts, server_opts) + end + + @doc false + @spec dump(GenServer.server()) :: list() + def dump(buffer), do: GenServer.call(buffer, :dump) + + @doc false + @spec flush(GenServer.server(), keyword()) :: :ok + def flush(buffer, opts \\ []) do + if Keyword.get(opts, :async, true) do + GenServer.call(buffer, :async_flush) + else + GenServer.call(buffer, :sync_flush) + end + end + + @doc false + @spec info(GenServer.server()) :: map() + def info(buffer), do: GenServer.call(buffer, :info) + + @doc false + @spec insert(GenServer.server(), term()) :: :ok + def insert(buffer, item), do: GenServer.call(buffer, {:insert, item}) + + @doc false + @spec insert_batch(GenServer.server(), Enumerable.t(), keyword()) :: :ok + def insert_batch(buffer, items, opts \\ []) do + if Keyword.get(opts, :safe_flush, true) do + GenServer.call(buffer, {:safe_insert_batch, items}) + else + GenServer.call(buffer, {:unsafe_insert_batch, items}) + end + end + + ################################ + # GenServer Callbacks + ################################ + + @doc false + @impl GenServer + @spec init(keyword()) :: {:ok, State.t(), {:continue, :refresh}} | {:stop, atom()} + def init(opts) do + Process.flag(:trap_exit, true) + + case init_buffer(opts) do + {:ok, buffer} -> {:ok, buffer, {:continue, :refresh}} + {:error, reason} -> {:stop, reason} + end + end + + @doc false + @impl GenServer + @spec handle_call(term(), GenServer.from(), State.t()) :: + {:reply, term(), State.t()} + | {:reply, term(), State.t(), {:continue, :flush | :refresh}} + def handle_call(:async_flush, _from, buffer) do + {:reply, :ok, buffer, {:continue, :flush}} + end + + def handle_call(:dump, _from, buffer) do + {:reply, State.items(buffer), buffer, {:continue, :refresh}} + end + + def handle_call(:info, _from, buffer), do: {:reply, build_info(buffer), buffer} + + def handle_call({:insert, item}, _from, buffer) do + case State.insert(buffer, item) do + {:flush, buffer} -> {:reply, :ok, buffer, {:continue, :flush}} + {:cont, buffer} -> {:reply, :ok, buffer} + end + end + + def handle_call({:safe_insert_batch, items}, _from, buffer) do + {:reply, :ok, do_safe_insert_batch(buffer, items)} + end + + def handle_call({:unsafe_insert_batch, items}, _from, buffer) do + case do_unsafe_insert_batch(buffer, items) do + {:flush, buffer} -> {:reply, :ok, buffer, {:continue, :flush}} + {:cont, buffer} -> {:reply, :ok, buffer} + end + end + + def handle_call(:sync_flush, _from, buffer) do + do_flush(buffer) + {:reply, :ok, buffer, {:continue, :refresh}} + end + + @doc false + @impl GenServer + @spec handle_continue(term(), State.t()) :: + {:noreply, State.t()} | {:noreply, State.t(), {:continue, :refresh}} + def handle_continue(:flush, buffer) do + do_flush(buffer) + {:noreply, buffer, {:continue, :refresh}} + end + + def handle_continue(:refresh, buffer), do: {:noreply, refresh(buffer)} + + @doc false + @impl GenServer + @spec handle_info(term(), State.t()) :: + {:noreply, State.t()} | {:noreply, State.t(), {:continue, :flush}} + def handle_info({:timeout, timer, :flush}, buffer) when timer == buffer.timer do + {:noreply, buffer, {:continue, :flush}} + end + + def handle_info(_, buffer), do: {:noreply, buffer} + + @doc false + @impl GenServer + @spec terminate(term(), State.t()) :: term() + def terminate(_, buffer), do: do_flush(buffer) + + ################################ + # Private API + ################################ + + defp init_buffer(opts) do + case Keyword.get(opts, :flush_callback) do + nil -> {:error, :invalid_callback} + _ -> State.new(opts) + end + end + + defp build_info(buffer) do + %{ + length: buffer.length, + max_length: buffer.max_length, + max_size: buffer.max_size, + next_flush: get_next_flush(buffer), + partition: buffer.partition, + size: buffer.size, + timeout: buffer.timeout + } + end + + defp do_safe_insert_batch(buffer, items) do + Enum.reduce(items, buffer, fn item, acc -> + case State.insert(acc, item) do + {:flush, acc} -> + do_flush(acc) + refresh(acc) + + {:cont, acc} -> + acc + end + end) + end + + defp do_unsafe_insert_batch(buffer, items) do + Enum.reduce(items, {:cont, buffer}, fn item, acc -> + {_, buffer} = acc + State.insert(buffer, item) + end) + end + + defp refresh(%State{timeout: :infinity} = buffer), do: State.refresh(buffer) + + defp refresh(buffer) do + cancel_upcoming_flush(buffer) + timer = schedule_next_flush(buffer) + State.refresh(buffer, timer) + end + + defp cancel_upcoming_flush(%State{timer: nil}), do: :ok + defp cancel_upcoming_flush(buffer), do: Process.cancel_timer(buffer.timer) + + defp schedule_next_flush(buffer) do + # We use `:erlang.start_timer/3` to include the timer ref in the message. This is necessary + # for handling race conditions resulting from multiple simultaneous flush conditions. + :erlang.start_timer(buffer.timeout, self(), :flush) + end + + defp get_next_flush(%State{timer: nil}), do: nil + + defp get_next_flush(buffer) do + with false <- Process.read_timer(buffer.timer), do: nil + end + + defp do_flush(buffer) do + opts = [length: buffer.length, meta: buffer.flush_meta, size: buffer.size] + + buffer + |> State.items() + |> buffer.flush_callback.(opts) + end +end diff --git a/lib/buffer/state.ex b/lib/buffer/state.ex new file mode 100644 index 0000000..943f420 --- /dev/null +++ b/lib/buffer/state.ex @@ -0,0 +1,132 @@ +defmodule Buffer.State do + @moduledoc false + + defstruct [ + :flush_callback, + :flush_meta, + :max_length, + :max_size, + :partition, + :size_callback, + :timeout, + buffer: [], + length: 0, + size: 0, + timer: nil + ] + + @type t :: %__MODULE__{} + + ################################ + # Public API + ################################ + + @doc false + @spec insert(t(), term()) :: {:flush, t()} | {:cont, t()} + def insert(buffer, item) do + buffer = %{ + buffer + | buffer: [item | buffer.buffer], + length: buffer.length + 1, + size: buffer.size + buffer.size_callback.(item) + } + + if flush?(buffer), do: {:flush, buffer}, else: {:cont, buffer} + end + + @doc false + @spec items(t()) :: list() + def items(buffer), do: Enum.reverse(buffer.buffer) + + @doc false + @spec new(keyword()) :: {:ok, t()} | {:error, atom()} + def new(opts) do + with {:ok, jitter} <- get_jitter(opts), + {:ok, flush_callback} <- get_flush_callback(opts), + {:ok, size_callback} <- get_size_callback(opts), + {:ok, max_length} <- get_max_length(opts, jitter), + {:ok, max_size} <- get_max_size(opts, jitter), + {:ok, timeout} <- get_timeout(opts, jitter) do + buffer = %__MODULE__{ + flush_callback: flush_callback, + flush_meta: Keyword.get(opts, :flush_meta), + max_length: max_length, + max_size: max_size, + partition: Keyword.get(opts, :partition, 0), + size_callback: size_callback, + timeout: timeout + } + + {:ok, buffer} + end + end + + @doc false + @spec refresh(t(), reference() | nil) :: t() + def refresh(buffer, timer \\ nil) do + %{buffer | buffer: [], length: 0, size: 0, timer: timer} + end + + ################################ + # Private API + ################################ + + defp get_jitter(opts) do + case Keyword.get(opts, :jitter_rate, 0.0) do + jitter when jitter < 0 or jitter > 1 -> {:error, :invalid_jitter} + jitter -> {:ok, jitter} + end + end + + defp get_flush_callback(opts) do + case Keyword.get(opts, :flush_callback) do + nil -> {:ok, nil} + callback -> validate_callback(callback, 2) + end + end + + defp get_size_callback(opts) do + opts + |> Keyword.get(:size_callback, &item_size/1) + |> validate_callback(1) + end + + defp get_max_length(opts, jitter) do + validate_limit(Keyword.get(opts, :max_length, :infinity), jitter) + end + + defp get_max_size(opts, jitter) do + validate_limit(Keyword.get(opts, :max_size, :infinity), jitter) + end + + defp get_timeout(opts, jitter) do + validate_limit(Keyword.get(opts, :buffer_timeout, :infinity), jitter) + end + + defp validate_callback(fun, arity) when is_function(fun, arity), do: {:ok, fun} + defp validate_callback(_, _), do: {:error, :invalid_callback} + + defp validate_limit(:infinity, _), do: {:ok, :infinity} + defp validate_limit(_, jitter) when jitter < 0 or jitter > 1, do: {:error, :invalid_jitter} + + defp validate_limit(limit, jitter) when is_integer(limit) and limit >= 0 do + {:ok, round(limit * (1 - jitter * :rand.uniform()))} + end + + defp validate_limit(_, _), do: {:error, :invalid_limit} + + defp item_size(item) when is_bitstring(item), do: byte_size(item) + + defp item_size(item) do + item + |> :erlang.term_to_binary() + |> byte_size() + end + + defp flush?(buffer) do + exceeds?(buffer.length, buffer.max_length) or exceeds?(buffer.size, buffer.max_size) + end + + defp exceeds?(_, :infinity), do: false + defp exceeds?(num, max), do: num >= max +end diff --git a/lib/buffer/stream.ex b/lib/buffer/stream.ex new file mode 100644 index 0000000..69c8dbe --- /dev/null +++ b/lib/buffer/stream.ex @@ -0,0 +1,45 @@ +defmodule Buffer.Stream do + @moduledoc false + + alias Buffer.State + + @fields [:max_length, :max_size, :size_callback] + + ################################ + # Public API + ################################ + + @doc false + @spec chunk(Enumerable.t(), keyword()) :: {:ok, Enumerable.t()} | {:error, atom()} + def chunk(enum, opts \\ []) do + opts = Keyword.take(opts, @fields) + + with {:ok, buffer} <- State.new(opts) do + {:ok, Stream.chunk_while(enum, buffer, &chunk_fun(&2, &1), &after_fun/1)} + end + end + + @doc false + @spec chunk!(Enumerable.t(), keyword()) :: Enumerable.t() + def chunk!(enum, opts \\ []) do + case chunk(enum, opts) do + {:ok, stream} -> stream + {:error, reason} -> raise(ArgumentError, to_message(reason)) + end + end + + ################################ + # Private API + ################################ + + defp chunk_fun(buffer, item) do + with {:flush, buffer} <- State.insert(buffer, item) do + {:cont, State.items(buffer), State.refresh(buffer)} + end + end + + defp after_fun(%State{buffer: []} = buffer), do: {:cont, buffer} + defp after_fun(buffer), do: {:cont, State.items(buffer), State.refresh(buffer)} + + defp to_message(reason), do: String.replace(to_string(reason), "_", " ") +end diff --git a/mix.exs b/mix.exs new file mode 100644 index 0000000..4432ffa --- /dev/null +++ b/mix.exs @@ -0,0 +1,75 @@ +defmodule Buffer.MixProject do + use Mix.Project + + @version "0.1.0" + + def project do + [ + app: :buffer, + version: @version, + elixir: "~> 1.14", + elixirc_paths: elixirc_paths(Mix.env()), + dialyzer: dialyzer(), + start_permanent: Mix.env() == :prod, + name: "Buffer", + aliases: aliases(), + preferred_cli_env: preferred_cli_env(), + deps: deps() + ] + end + + # Run "mix help compile.app" to learn about applications. + def application do + [ + extra_applications: [:logger] + ] + end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + + defp dialyzer do + [ + plt_file: {:no_warn, "dialyzer/dialyzer.plt"}, + plt_add_apps: [:ex_unit, :mix] + ] + end + + # Aliases are shortcuts or tasks specific to the current project. + defp aliases do + [ + setup: [ + "local.hex --if-missing --force", + "local.rebar --if-missing --force", + "deps.get" + ], + ci: [ + "setup", + "compile --warnings-as-errors", + "format --check-formatted", + "credo --strict", + "test", + "dialyzer --format github", + "sobelow --config" + ] + ] + end + + # Specifies the preferred env for mix commands. + defp preferred_cli_env do + [ + ci: :test + ] + end + + # Run "mix help deps" to learn about dependencies. + defp deps do + [ + {:benchee, "~> 1.1.0", only: :dev, runtime: false}, + {:ex_doc, "~> 0.30.8", only: :dev, runtime: false}, + {:credo, "~> 1.7.0", only: [:dev, :test], runtime: false}, + {:dialyxir, "~> 1.4.1", only: [:dev, :test], runtime: false}, + {:sobelow, "~> 0.13.0", only: [:dev, :test], runtime: false} + ] + end +end diff --git a/mix.lock b/mix.lock new file mode 100644 index 0000000..14362c9 --- /dev/null +++ b/mix.lock @@ -0,0 +1,18 @@ +%{ + "benchee": {:hex, :benchee, "1.1.0", "f3a43817209a92a1fade36ef36b86e1052627fd8934a8b937ac9ab3a76c43062", [:mix], [{:deep_merge, "~> 1.0", [hex: :deep_merge, repo: "hexpm", optional: false]}, {:statistex, "~> 1.0", [hex: :statistex, repo: "hexpm", optional: false]}], "hexpm", "7da57d545003165a012b587077f6ba90b89210fd88074ce3c60ce239eb5e6d93"}, + "bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"}, + "credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"}, + "deep_merge": {:hex, :deep_merge, "1.0.0", "b4aa1a0d1acac393bdf38b2291af38cb1d4a52806cf7a4906f718e1feb5ee961", [:mix], [], "hexpm", "ce708e5f094b9cd4e8f2be4f00d2f4250c4095be93f8cd6d018c753894885430"}, + "dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.38", "b42252eddf63bda05554ba8be93a1262dc0920c721f1aaf989f5de0f73a2e367", [:mix], [], "hexpm", "2cd0907795aaef0c7e8442e376633c5b3bd6edc8dbbdc539b22f095501c1cdb6"}, + "erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"}, + "ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"}, + "file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"}, + "jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"}, + "makeup": {:hex, :makeup, "1.1.1", "fa0bc768698053b2b3869fa8a62616501ff9d11a562f3ce39580d60860c3a55e", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "5dc62fbdd0de44de194898b6710692490be74baa02d9d108bc29f007783b0b48"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.4.0", "51f9b613ea62cfa97b25ccc2c1b4216e81df970acd8e16e8d1bdc58fef21370d", [:mix], [], "hexpm", "9c565862810fb383e9838c1dd2d7d2c437b3d13b267414ba6af33e50d2d1cf28"}, + "sobelow": {:hex, :sobelow, "0.13.0", "218afe9075904793f5c64b8837cc356e493d88fddde126a463839351870b8d1e", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "cd6e9026b85fc35d7529da14f95e85a078d9dd1907a9097b3ba6ac7ebbe34a0d"}, + "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, +} diff --git a/test/buffer_test.exs b/test/buffer_test.exs new file mode 100644 index 0000000..28ccd03 --- /dev/null +++ b/test/buffer_test.exs @@ -0,0 +1,414 @@ +defmodule BufferTest do + use ExUnit.Case, async: true + + import Buffer.Helpers + + describe "start_link/1" do + test "will start an unpartitioned Buffer" do + assert start_ex_buffer() == {:ok, Buffer} + end + + test "will correctly name an unpartitioned Buffer" do + opts = [name: :ex_buffer] + + assert start_ex_buffer(opts) == {:ok, :ex_buffer} + end + + test "will start a partitioned Buffer" do + opts = [partitions: 2] + + assert start_ex_buffer(opts) == {:ok, Buffer} + end + + test "will correctly name a partitioned Buffer" do + opts = [name: :ex_buffer, partitions: 2] + + assert start_ex_buffer(opts) == {:ok, :ex_buffer} + end + + test "will jitter the limits of an Buffer" do + opts = [jitter_rate: 0.05, max_size: 10_000, partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert {:ok, [%{max_size: limit_1}, %{max_size: limit_2}]} = Buffer.info(buffer) + assert limit_1 != limit_2 + end + + test "will not start with an invalid flush callback" do + opts = [flush_callback: nil] + + assert start_ex_buffer(opts) == {:error, :invalid_callback} + end + + test "will not start with an invalid size callback" do + opts = [size_callback: fn _, _ -> :ok end] + + assert start_ex_buffer(opts) == {:error, :invalid_callback} + end + + test "will not start with an invalid limit" do + opts = [buffer_timeout: -5] + + assert start_ex_buffer(opts) == {:error, :invalid_limit} + end + + test "will not start with an invalid partition count" do + opts = [partitions: -2] + + assert start_ex_buffer(opts) == {:error, :invalid_partitions} + end + + test "will not start with an invalid partitioner" do + opts = [partitioner: :fake_partitioner] + + assert start_ex_buffer(opts) == {:error, :invalid_partitioner} + end + + test "will not start with an invalid jitter rate" do + opts = [jitter_rate: 3.14] + + assert start_ex_buffer(opts) == {:error, :invalid_jitter} + end + + test "will flush an Buffer on termination" do + assert {:ok, buffer} = start_ex_buffer() + assert seed_buffer(buffer) == :ok + assert PartitionSupervisor.stop(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + end + + describe "chunk/2" do + test "will correctly chunk an enumerable" do + opts = [max_length: 3, max_size: 10] + enum = ["foo", "bar", "baz", "foobar", "barbaz", "foobarbaz"] + + assert {:ok, enum} = Buffer.chunk(enum, opts) + assert Enum.into(enum, []) == [["foo", "bar", "baz"], ["foobar", "barbaz"], ["foobarbaz"]] + end + + test "will correctly chunk an enumerable with a size callback" do + opts = [max_size: 8, size_callback: &(byte_size(&1) + 1)] + enum = ["foo", "bar", "baz"] + + assert {:ok, enum} = Buffer.chunk(enum, opts) + assert Enum.into(enum, []) == [["foo", "bar"], ["baz"]] + end + + test "will return an error with an invalid callback" do + opts = [size_callback: fn -> :ok end] + enum = ["foo", "bar", "baz"] + + assert Buffer.chunk(enum, opts) == {:error, :invalid_callback} + end + + test "will return an error with an invalid limit" do + opts = [max_length: -5] + + assert Buffer.chunk(["foo", "bar", "baz"], opts) == {:error, :invalid_limit} + end + end + + describe "chunk!/2" do + test "will correctly chunk an enumerable" do + opts = [max_length: 3, max_size: 10] + enum = ["foo", "bar", "baz", "foobar", "barbaz", "foobarbaz"] + enum = Buffer.chunk!(enum, opts) + + assert Enum.into(enum, []) == [["foo", "bar", "baz"], ["foobar", "barbaz"], ["foobarbaz"]] + end + + test "will correctly chunk an enumerable with a size callback" do + opts = [max_size: 8, size_callback: &(byte_size(&1) + 1)] + enum = ["foo", "bar", "baz"] + enum = Buffer.chunk!(enum, opts) + + assert Enum.into(enum, []) == [["foo", "bar"], ["baz"]] + end + + test "will raise an error with an invalid callback" do + opts = [size_callback: fn -> :ok end] + enum = ["foo", "bar", "baz"] + fun = fn -> Buffer.chunk!(enum, opts) end + + assert_raise ArgumentError, "invalid callback", fun + end + + test "will raise an error with an invalid limit" do + opts = [max_length: -5] + enum = ["foo", "bar", "baz"] + fun = fn -> Buffer.chunk!(enum, opts) end + + assert_raise ArgumentError, "invalid limit", fun + end + end + + describe "dump/2" do + test "will dump an unpartitioned Buffer" do + assert {:ok, buffer} = start_ex_buffer() + assert seed_buffer(buffer) == :ok + assert Buffer.dump(buffer) == {:ok, ["foo", "bar", "baz"]} + assert {:ok, [%{length: 0}]} = Buffer.info(buffer) + end + + test "will dump a partitioned Buffer" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert Buffer.dump(buffer) == {:ok, ["foo", "baz", "bar"]} + assert {:ok, [%{length: 0}, %{length: 0}]} = Buffer.info(buffer) + end + + test "will dump a specific Buffer partition" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert Buffer.dump(buffer, partition: 0) == {:ok, ["foo", "baz"]} + assert {:ok, [%{length: 0}]} = Buffer.info(buffer, partition: 0) + end + + test "will return an error with an invalid buffer" do + assert Buffer.dump(:fake_buffer) == {:error, :not_found} + end + + test "will return an error with an invalid partition" do + assert {:ok, buffer} = start_ex_buffer() + assert Buffer.dump(buffer, partition: -1) == {:error, :invalid_partition} + end + end + + describe "flush/2" do + test "will flush an unpartitioned Buffer" do + assert {:ok, buffer} = start_ex_buffer() + assert seed_buffer(buffer) == :ok + assert Buffer.flush(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush a partitioned Buffer" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert Buffer.flush(buffer) == :ok + assert_receive {^buffer, ["foo", "baz"], _} + assert_receive {^buffer, ["bar"], _} + end + + test "will flush a specific Buffer partition" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert Buffer.flush(buffer, partition: 0) == :ok + assert_receive {^buffer, ["foo", "baz"], _} + refute_receive _ + end + + test "will synchronously flush an Buffer" do + assert {:ok, buffer} = start_ex_buffer() + assert seed_buffer(buffer) == :ok + assert Buffer.flush(buffer, async: false) == :ok + assert_received {^buffer, ["foo", "bar", "baz"], _} + end + + test "will include flush meta" do + opts = [flush_meta: "meta"] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert Buffer.flush(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], flush_opts} + assert Keyword.get(flush_opts, :meta) == "meta" + end + + test "will return an error with an invalid buffer" do + assert Buffer.flush(:fake_buffer) == {:error, :not_found} + end + + test "will return an error with an invalid partition" do + assert {:ok, buffer} = start_ex_buffer() + assert Buffer.flush(buffer, partition: -1) == {:error, :invalid_partition} + end + end + + describe "info/2" do + test "will return info for an unpartitioned Buffer" do + assert {:ok, buffer} = start_ex_buffer() + assert seed_buffer(buffer) == :ok + assert {:ok, [%{length: 3}]} = Buffer.info(buffer) + end + + test "will return info for a partitioned Buffer" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert {:ok, [%{length: 2}, %{length: 1}]} = Buffer.info(buffer) + end + + test "will return info for a specific Buffer partition" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert {:ok, [%{length: 2}]} = Buffer.info(buffer, partition: 0) + end + + test "will return info for an Buffer with a size callback" do + opts = [size_callback: &(byte_size(&1) + 1)] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert {:ok, [%{size: 12}]} = Buffer.info(buffer) + end + + test "will include next flush when applicable" do + opts = [buffer_timeout: 1_000] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert {:ok, [%{next_flush: next_flush}]} = Buffer.info(buffer) + refute is_nil(next_flush) + end + + test "will return an error with an invalid buffer" do + assert Buffer.info(:fake_buffer) == {:error, :not_found} + end + + test "will return an error with an invalid partition" do + assert {:ok, buffer} = start_ex_buffer() + assert Buffer.info(buffer, partition: -1) == {:error, :invalid_partition} + end + end + + describe "insert/2" do + test "will insert items into an unpartitioned Buffer" do + assert {:ok, buffer} = start_ex_buffer() + assert Buffer.insert(buffer, "foo") == :ok + assert Buffer.dump(buffer) == {:ok, ["foo"]} + end + + test "will insert items into a partitioned Buffer" do + opts = [partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert Buffer.insert(buffer, "foo") == :ok + assert Buffer.insert(buffer, "bar") == :ok + assert Buffer.dump(buffer, partition: 0) == {:ok, ["foo"]} + assert Buffer.dump(buffer, partition: 1) == {:ok, ["bar"]} + end + + test "will insert items into a partitioned Buffer with a random partitioner" do + opts = [partitioner: :random, partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + end + + test "will flush an Buffer based on a length condition" do + opts = [max_length: 3] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush an Buffer based on a size condition" do + opts = [max_size: 9] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush an Buffer with a size callback based on a size condition" do + opts = [max_size: 12, size_callback: &(byte_size(&1) + 1)] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush an Buffer based on a time condition" do + opts = [buffer_timeout: 50] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + + :timer.sleep(50) + + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush an Buffer once the first condition is met" do + opts = [max_length: 5, max_size: 9] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + + test "will flush a Buffer partitions independently" do + opts = [max_length: 2, partitions: 2] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "baz"], _} + assert {:ok, [%{length: 1}]} = Buffer.info(buffer, partition: 1) + end + + test "will include flush meta when flushed" do + opts = [flush_meta: "meta", max_length: 3] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert seed_buffer(buffer) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], flush_opts} + assert Keyword.get(flush_opts, :meta) == "meta" + end + + test "will return an error with an invalid buffer" do + assert Buffer.insert(:fake_buffer, "foo") == {:error, :not_found} + end + end + + describe "insert_batch/3" do + test "will insert a batch of items into an unpartitioned Buffer" do + items = ["foo", "bar", "baz"] + + assert {:ok, buffer} = start_ex_buffer() + assert Buffer.insert_batch(buffer, items) == :ok + assert Buffer.dump(buffer) == {:ok, ["foo", "bar", "baz"]} + end + + test "will insert a batch of items into a partitioned Buffer" do + opts = [partitions: 2] + items = ["foo", "bar", "baz"] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert Buffer.insert_batch(buffer, items) == :ok + assert Buffer.dump(buffer, partition: 0) == {:ok, ["foo", "bar", "baz"]} + end + + test "will flush an Buffer while inserting a batch of items" do + opts = [max_length: 2] + items = ["foo", "bar", "baz"] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert Buffer.insert_batch(buffer, items) == :ok + assert_receive {^buffer, ["foo", "bar"], _} + assert Buffer.dump(buffer) == {:ok, ["baz"]} + end + + test "will flush an Buffer unsafely" do + opts = [max_length: 2] + items = ["foo", "bar", "baz"] + + assert {:ok, buffer} = start_ex_buffer(opts) + assert Buffer.insert_batch(buffer, items, safe_flush: false) == :ok + assert_receive {^buffer, ["foo", "bar", "baz"], _} + end + end +end diff --git a/test/support/helpers.ex b/test/support/helpers.ex new file mode 100644 index 0000000..47f0c2a --- /dev/null +++ b/test/support/helpers.ex @@ -0,0 +1,45 @@ +defmodule Buffer.Helpers do + @moduledoc false + + import ExUnit.Callbacks, only: [start_supervised: 1] + + ################################ + # Public API + ################################ + + @doc false + @spec seed_buffer(GenServer.server()) :: :ok + def seed_buffer(buffer) do + Buffer.insert(buffer, "foo") + Buffer.insert(buffer, "bar") + Buffer.insert(buffer, "baz") + end + + @doc false + @spec start_ex_buffer(keyword()) :: {:ok, GenServer.name()} | {:error, atom()} + def start_ex_buffer(opts \\ []) do + name = Keyword.get(opts, :name, Buffer) + opts = Keyword.put_new(opts, :flush_callback, flush_callback(name)) + + case start_supervised({Buffer, opts}) do + {:ok, pid} -> {:ok, process_name(pid)} + {:error, {{_, {_, _, reason}}, _}} -> {:error, reason} + {:error, {reason, _}} -> {:error, reason} + end + end + + ################################ + # Private API + ################################ + + defp flush_callback(name) do + destination = self() + fn data, opts -> send(destination, {name, data, opts}) end + end + + defp process_name(pid) do + pid + |> Process.info() + |> Keyword.get(:registered_name) + end +end diff --git a/test/test_helper.exs b/test/test_helper.exs new file mode 100644 index 0000000..3b6ecf7 --- /dev/null +++ b/test/test_helper.exs @@ -0,0 +1,2 @@ +ExUnit.configure(exclude: [:skip]) +ExUnit.start()