Skip to content

Commit

Permalink
Merge pull request #52 from commanded/bug/projector-concurrency
Browse files Browse the repository at this point in the history
Use an UPSERT SQL query to insert or update the projection version
  • Loading branch information
slashdotdash authored Jan 18, 2024
2 parents d77c087 + f20f1c8 commit bf0463c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 45 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,29 @@
# Changelog

## Next release

### Enhancements

- Use an UPSERT SQL query to insert or update the projection version ([#52](https://github.com/commanded/commanded-ecto-projections/pull/52)).

## 1.3.0

### Enhancements

- Fix Elixir 1.14 compilation warnings ([#45](https://github.com/commanded/commanded-ecto-projections/pull/45)).

---

## 1.2.1

### Enhancements

- Allow exceptions to be rescued by Commanded's event handler ([#37](https://github.com/commanded/commanded-ecto-projections/pull/37)).

## 1.2.0

### Enhancements

- Support runtime projector names ([#32](https://github.com/commanded/commanded-ecto-projections/pull/32)).
- Support `schema_prefix/2` function ([#33](https://github.com/commanded/commanded-ecto-projections/pull/33)).

Expand Down
2 changes: 1 addition & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ config :commanded_ecto_projections, Commanded.Projections.Repo,

config :ex_unit, capture_log: true

# Print only warnings and errors during test
# Print only warning and above log messages during tests
config :logger, :console, level: :warning, format: "[$level] $message\n"
63 changes: 28 additions & 35 deletions lib/projections/ecto.ex
Original file line number Diff line number Diff line change
Expand Up @@ -51,45 +51,47 @@ defmodule Commanded.Projections.Ecto do
use Ecto.Schema
use Commanded.Event.Handler, @handler_opts

import Ecto.Changeset
import Ecto.Query
import unquote(__MODULE__)

def update_projection(event, metadata, multi_fn) do
projection_name = Map.fetch!(metadata, :handler_name)
event_number = Map.fetch!(metadata, :event_number)

changeset =
%ProjectionVersion{projection_name: projection_name}
|> ProjectionVersion.changeset(%{last_seen_event_number: event_number})
projection_version = %ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: event_number
}

prefix = schema_prefix(event, metadata)

# Query to update an existing projection version with the last seen event number with
# a check to ensure that the event has not already been projected.
update_projection_version =
from(pv in ProjectionVersion,
where:
pv.projection_name == ^projection_name and pv.last_seen_event_number < ^event_number,
update: [set: [last_seen_event_number: ^event_number]]
)

multi =
Ecto.Multi.new()
|> Ecto.Multi.run(:verify_projection_version, fn repo, _changes ->
version =
case repo.get(ProjectionVersion, projection_name, prefix: prefix) do
nil ->
repo.insert!(
%ProjectionVersion{
projection_name: projection_name,
last_seen_event_number: 0
},
prefix: prefix
)

version ->
version
end

if version.last_seen_event_number < event_number do
{:ok, %{version: version}}
else
{:error, :already_seen_event}
|> Ecto.Multi.run(:track_projection_version, fn repo, _changes ->
try do
repo.insert(projection_version,
prefix: prefix,
on_conflict: update_projection_version,
conflict_target: [:projection_name]
)
rescue
exception in Ecto.StaleEntryError ->
# Attempted to insert a projection version for an already seen event
{:error, :already_seen_event}

exception ->
reraise exception, __STACKTRACE__
end
end)
|> Ecto.Multi.update(:projection_version, changeset, prefix: prefix)

with %Ecto.Multi{} = multi <- apply(multi_fn, [multi]),
{:ok, changes} <- transaction(multi) do
Expand All @@ -99,7 +101,7 @@ defmodule Commanded.Projections.Ecto do
:ok
end
else
{:error, :verify_projection_version, :already_seen_event, _changes} -> :ok
{:error, :track_projection_version, :already_seen_event, _changes} -> :ok
{:error, _stage, error, _changes} -> {:error, error}
{:error, _error} = reply -> reply
end
Expand Down Expand Up @@ -202,24 +204,15 @@ defmodule Commanded.Projections.Ecto do
quote do
defmodule ProjectionVersion do
@moduledoc false

use Ecto.Schema

import Ecto.Changeset

@primary_key {:projection_name, :string, []}

schema "projection_versions" do
field(:last_seen_event_number, :integer)

timestamps(type: :naive_datetime_usec)
end

@required_fields ~w(last_seen_event_number)a

def changeset(model, params \\ :invalid) do
cast(model, params, @required_fields)
end
end
end
end
Expand Down
6 changes: 3 additions & 3 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ defmodule Commanded.Projections.Ecto.Mixfile do
{:postgrex, ">= 0.0.0", only: :test},

# Optional dependencies
{:jason, "~> 1.3", optional: true},
{:jason, "~> 1.4", optional: true},

# Test & build tooling
{:dialyxir, "~> 1.2", only: [:dev, :test], runtime: false},
{:dialyxir, "~> 1.4", only: [:dev, :test], runtime: false},
{:ex_doc, ">= 0.0.0", only: :dev, runtime: false},
{:mix_test_watch, "~> 1.1", only: :dev, runtime: false},
{:mox, "~> 1.0", only: :test}
{:mox, "~> 1.1", only: :test}
]
end

Expand Down
58 changes: 58 additions & 0 deletions test/projections/ecto_projection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,64 @@ defmodule Commanded.Projections.EctoProjectionTest do
assert_seen_event("Projector", 3)
end

test "should prevent first event being projected more than once" do
tasks =
Enum.map(1..5, fn _index ->
Task.async(Projector, :handle, [
%AnEvent{name: "Event1"},
%{handler_name: "Projector", event_number: 1}
])
end)

results = Task.await_many(tasks)

assert Enum.uniq(results) == [:ok]

assert_projections(Projection, ["Event1"])
assert_seen_event("Projector", 1)
end

test "should prevent an event being projected more than once" do
Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1})
Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2})

tasks =
Enum.map(1..5, fn _index ->
Task.async(Projector, :handle, [
%AnEvent{name: "Event3"},
%{handler_name: "Projector", event_number: 3}
])
end)

results = Task.await_many(tasks)

assert Enum.uniq(results) == [:ok]

assert_projections(Projection, ["Event1", "Event2", "Event3"])
assert_seen_event("Projector", 3)
end

test "should prevent an event being projected more than once after an ignored event" do
Projector.handle(%AnEvent{name: "Event1"}, %{handler_name: "Projector", event_number: 1})
Projector.handle(%AnEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 2})
Projector.handle(%IgnoredEvent{name: "Event2"}, %{handler_name: "Projector", event_number: 3})

tasks =
Enum.map(1..5, fn _index ->
Task.async(Projector, :handle, [
%AnEvent{name: "Event4"},
%{handler_name: "Projector", event_number: 4}
])
end)

results = Task.await_many(tasks)

assert Enum.uniq(results) == [:ok]

assert_projections(Projection, ["Event1", "Event2", "Event4"])
assert_seen_event("Projector", 4)
end

test "should return an error on failure" do
assert {:error, :failure} ==
Projector.handle(%ErrorEvent{}, %{handler_name: "Projector", event_number: 1})
Expand Down
29 changes: 23 additions & 6 deletions test/projections/runtime_config_projector_test.exs
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
defmodule Commanded.Projections.RuntimeConfigProjectorTest do
use ExUnit.Case

import Commanded.Projections.ProjectionAssertions

alias Commanded.EventStore.Adapters.Mock, as: MockEventStore
alias Commanded.EventStore.RecordedEvent
alias Commanded.Projections.Events.AnEvent
alias Commanded.Projections.Projection
alias Commanded.Projections.Repo
alias Commanded.Projections.RuntimeConfigProjector
alias Commanded.Projections.{Projection, ProjectionAssertions, Repo, RuntimeConfigProjector}
alias Commanded.UUID

import Mox
import ProjectionAssertions

setup [:set_mox_global, :stub_event_store, :verify_on_exit!]

setup do
start_supervised!(TestApplication)
start_supervised!({TestApplication, event_store: [adapter: MockEventStore]})
Ecto.Adapters.SQL.Sandbox.checkout(Repo)
end

Expand Down Expand Up @@ -54,4 +56,19 @@ defmodule Commanded.Projections.RuntimeConfigProjectorTest do
defp send_events(projector, events) do
send(projector, {:events, events})
end

defp stub_event_store(_context) do
stub(MockEventStore, :ack_event, fn _adapter_meta, _pid, _event -> :ok end)

stub(MockEventStore, :child_spec, fn _application, _config ->
{:ok, [], %{}}
end)

stub(MockEventStore, :subscribe_to, fn
_event_store, :all, _handler_name, _handler, _subscribe_from, _opts ->
{:ok, self()}
end)

:ok
end
end

0 comments on commit bf0463c

Please sign in to comment.