Skip to content

cpursley/walex

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

WalEx

Simple and reliable Postgres Change Data Capture (CDC) in Elixir.

Walex mascot

WalEx allows you to listen to change events on your Postgres tables then perform callback-like actions with the data via the Event DSL. For example:

  • Stream database changes to an external service
  • Send a user a welcome email after they create a new account
  • Augment an existing Postgres-backed application with business logic
  • Send events to third party services (analytics, CRM, webhooks, etc))
  • Update index / invalidate cache whenever a record is changed

You can learn more about CDC and what you can do with it here: Why capture changes?

Credit

This library borrows liberally from realtime from Supabase, which in turn draws heavily on cainophile.

Installation

If available in Hex, the package can be installed by adding walex to your list of dependencies in mix.exs:

def deps do
  [
    {:walex, "~> 4.4.0"}
  ]
end

PostgreSQL Configuration

Logical Replication

WalEx only supports PostgreSQL. To get started, you first need to configure PostgreSQL for logical replication:

ALTER SYSTEM SET wal_level = 'logical';

Docker Compose:

command: [ "postgres", "-c", "wal_level=logical" ]

Publication

When you change the wal_level variable, you'll need to restart your PostgreSQL server. Once you've restarted, go ahead and create a publication for the tables you want to receive changes for:

All tables:

CREATE PUBLICATION events FOR ALL TABLES;

Or just specific tables:

CREATE PUBLICATION events FOR TABLE user, todo;

Filter based on row conditions (Postgres v15+ only):

CREATE PUBLICATION user_event FOR TABLE user WHERE (active IS TRUE);

Replica Identity

WalEx supports all of the settings for REPLICA IDENTITY. Use FULL if you can use it, as it will make tracking differences easier as the old data will be sent alongside the new data. You'll need to set this for each table.

Specific tables:

ALTER TABLE user REPLICA IDENTITY FULL;
ALTER TABLE todo REPLICA IDENTITY FULL;

Also, be mindful of replication gotchas.

AWS RDS

Amazon (AWS) RDS Postgres allows you to configure logical replication.

When creating a new Postgres database on RDS, you'll need to set a Parameter Group with the following settings:

rds.logical_replication = 1
max_replication_slots = 5
max_slot_wal_keep_size = 2048

Elixir Configuration

Config

# config.exs

config :my_app, WalEx,
  hostname: "localhost",
  username: "postgres",
  password: "postgres",
  port: "5432",
  database: "postgres",
  publication: "events",
  subscriptions: ["user", "todo"],
  # WalEx assumes your module names match this pattern: MyApp.Events.User, MyApp.Events.ToDo, etc
  # but you can also specify custom modules like so:
  # modules: [MyApp.CustomModule.User, MyApp.OtherCustomModule.ToDo],
  name: MyApp

It is also possible to just define the URL configuration for the database

# config.exs

config :my_app, WalEx,
  url: "postgres://username:password@hostname:port/database"
  publication: "events",
  subscriptions: ["user", "todo"],
  name: MyApp

You can also dynamically update the config at runtime:

WalEx.Configs.add_config(MyApp, :subscriptions, ["new_subscriptions_1", "new_subscriptions_2"])
WalEx.Configs.remove_config(MyApp, :subscriptions, "subscriptions")
WalEx.Configs.replace_config(MyApp, :password, "new_password")

Application Supervisor

defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    children = [
      {WalEx.Supervisor, Application.get_env(:my_app, WalEx)}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

Usage

Event

Returned change data is a List of %Event{} structs with changes. UPDATE event example where name field was changed):

[
  %Walex.Event{
    name: :user,
    type: :update,
    source: %WalEx.Event.Source{
      name: "WalEx",
      version: "3.8.0",
      db: "todos",
      schema: "public",
      table: "user",
      columns: %{
        id: "integer",
        name: "varchar",
        created_at: "timestamptz"
      }
    },
    new_record: %{
      id: 1234,
      name: "Chase Pursley",
      created_at: #DateTime<2023-08-18 14:09:05.988369-04:00 -04 Etc/UTC-4>
    },
    # we don't show old_record for update to reduce payload size
    # however, you can see any old values that changed under "changes"
    old_record: nil,
    changes: %{
      name: %{
        new_value: "Chase Pursley",
        old_value: "Chase"
      }
    },
    timestamp: ~U[2023-12-18 15:50:08.329504Z]
  }
]

Event Module

If your app is named MyApp and you have a subscription called :user (which represents a database table), WalEx assumes you have a module called MyApp.Events.User that uses WalEx Event. But you can also define any custom module, just be sure to add it to the modules config.

Note that the result of events is a list. This is because WalEx returns a List of transactions for a particular table when there's a change event. Often times this will just contain one result, but it could be many (for example, if you use database triggers to update a column after an insert).

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  # any subscribed event
  on_event(:all, fn events ->
    IO.inspect(events: events)
  end)

  # any user event
  on_event(:user, fn users ->
    IO.inspect(on_event: users)
    # do something with users data
  end)

  # any user insert event
  on_insert(:user, fn users ->
    IO.inspect(on_insert: users)
  end)

  on_update(:user, fn users ->
    IO.inspect(on_update: users)
  end)

  on_delete(:user, fn users ->
    IO.inspect(on_delete: users)
  end)
Filters

A common scenario is where you want to "unsubscribe" from specific records (for example, temporarily for a migration or data fix). One way to accomplish this is to have a column with a value like event_subscribe: false. Then you can ignore specific events by specifying their key and value to unwatched_records.

Another scenario is you might not care when just certain fields change. For example, maybe a database trigger sets updated_at after a record is updated. Or a count changes, or several do that you don't need to react to. In this case, you can ignore the event change by adding them to unwatched_fields.

Additional filter helpers available in the WalEx.TransactionFilter module.

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  @filters %{
    unwatched_records: %{event_subscribe: false},
    unwatched_fields: ~w(event_id updated_at todos_count)a
  }

  on_insert(:user, @filters, fn users ->
    IO.inspect(on_insert: users)
    # resulting users data is filtered
  end)
end
Functions

You can also provide a list of functions (as atoms) to be applied to each Event (after optional filters are applied). Each function is run as an async Task on each event. The functions must be defined in the current module and take a single event argument. Use with caution!

defmodule MyApp.Events.User do
  use WalEx.Event, name: MyApp

  @filters %{unwatched_records: %{event_subscribe: false}}
  @functions ~w(send_welcome_email add_to_crm clear_cache)a

  on_insert(:user, @filters, @functions, fn users ->
    IO.inspect(on_insert: users)
    # resulting users data is first filtered then functions are applied
  end)

  def send_welcome_email(user) do
    # logic for sending welcome email to new user
  end

  def add_to_crm(user) do
  # logic for adding user to crm system
  end

  def clear_cache(user) do
  # logic for clearing user cache
  end
end

Advanced usages

Durable slot

By default WalEx will create a temporary replication slot in Postgres.

This means that if the connection between WalEx and Postgres gets interrupted (crash / disconnection / etc.), the replication slot will get dropped by Postgres. This makes using WalEx safer as there is not risk of filling up the disk of the Postgres writer instance in case of downtime.

The downside being that this event-loss is more than likely. If this is a no-go, WalEx also supports durable replication.

# config.exs

config :my_app, WalEx,
  # ...
  durable_slot: true,
  slot_name: my_app_replication_slot

Only a single process can be connected to a durable slot at once, in case the slot is already used WalEx.Supervisor will fail to start with a RuntimeError.

Be warned that there are many additional potential gotchas (a detailed guide is planned).

Event middleware / Back-pressure

WalEx receives events from Postgres in WalEx.Replication.Server and then cast those to WalEx.Replication.Publisher. It's then WalEx.Replication.Publisher that is responsible to join these events together and process them.

In the event where you'd expect Postgres to overwhelm WalEx and potentially cause OOMs, WalEx provides a config option that should help you implement back-pressure.

As it's a quite advanced use, with many strong requirements, it's recommended instead to increase the amount of RAM of your instance.

Never the less, if it's not an option or would like to control the consumption rate of events, WalEx provide the following configuration option:

config :my_app, WalEx,
  # ...
  message_middleware: fn message, app_name -> ... end

message_middleware allows you to define the way WalEx.Replication.Server and WalEx.Replication.Publisher communicate.

If for instance you'd like to store these events to disk before processing them you would need to:

  • provide a message_middleware callback. It should serialize messages and store them to disk
  • add a supervised strictly-ordered disk consumer. On each event it would call one of:
    • WalEx.Replication.Publisher.process_message_async(message, app_name)
    • WalEx.Replication.Publisher.process_message_sync(message, app_name)

Any back-pressure implementation needs to guarantee:

  • exact message ordering
  • exactly-once-delivery
  • that each running walex has an isolated back-pressure system (for instance one queue per instance)

Test

You'll need a local Postgres instance running

MIX_ENV=test mix walex.setup
MIX_ENV=test mix test