An experiment with Postgres change-data-capture (CDC) into Pulsar via Pulsar IO's built-in Debezium Postgres source connector.
At a high level, we'll set up a flow that looks like the following:
Why go through all this trouble to set up such a system? Why not simply let your application direcly write to both its database and Pulsar? In short, dual writes are suceptible to 1) race conditions and 2) partial failures. For a good explanation, see Martin Kleppmann's talk "Using logs to build a solid data infrastructure (or: why dual writes are a bad idea)".
Run Postgres and Pulsar
docker-compose up
Start the Postgres source connector in the running Pulsar container
docker exec -it experiment-pulsar-connector_pulsar_1 /pulsar/bin/pulsar-admin source localrun --source-config-file /debezium-postgres-source-config.yaml
This leverages "local run mode". In production deployments, you'd likely want to leverage "cluster mode".
List topics
docker exec -it experiment-pulsar-connector_pulsar_1 /pulsar/bin/pulsar-admin topics list public/default
Consume the CDC topic corresponding to the Postgres table named outbox
, in the
db
database, in the public
schema.
docker exec -it experiment-pulsar-connector_pulsar_1 /pulsar/bin/pulsar-client consume -s "mysubscription" persistent://public/default/db.public.outbox -n 0
Start a psql cli session
docker exec -it experiment-pulsar-connector_db_1 psql experiment experiment
Insert some data into the database
-- Insert some data
INSERT INTO outbox (aggregatetype, aggregateid, type, payload, mybytecol) VALUES('myaggregatetype1', 1, 'mytype', '{"hello":"world 1"}', decode('013d7d16d7ad4fefb61bd95b765c8ceb', 'hex'));
Watch the Pulsar consumer print output to stdout. It'll look something like this (formatted with line breaks for improved readability):
----- got message -----
key:[eyJpZCI6ImJkZGVkZDE2LWJiOTQtNDExZi05MmE1LTI2ZDM1YjExNGZlMiJ9],
properties:[],
content:{
"before": null,
"after": {
"id": "bddedd16-bb94-411f-92a5-26d35b114fe2",
"ts": "2020-12-03T16:09:29.399905Z",
"aggregatetype": "myaggregatetype1",
"aggregateid": "1",
"type": "mytype",
"payload": "{\"hello\": \"world 1\"}",
"mybytecol": "AT19FtetT++2G9lbdlyM6w=="
},
"source": {
"version": "1.0.0.Final",
"connector": "postgresql",
"name": "db",
"ts_ms": 1607011769400,
"snapshot": "false",
"db": "experiment",
"schema": "public",
"table": "outbox",
"txId": 563,
"lsn": 23517120,
"xmin": null
},
"op": "c",
"ts_ms": 1607011769410
}
Some configuration is necessary to set up the replication slot on Postgres.
Configure the replication slot
postgresql.conf
# REPLICATION
wal_level = logical
max_wal_senders = 1
max_replication_slots = 1
(For this example repo, the below is optional. Not necessary locally nor with super user) Configure the PostgreSQL server to allow replication to take place between the server machine and the host on which the Debezium PostgreSQL connector is running.
/var/lib/postgresql/data/pg_hba.conf
local replication experiment trust
host replication experiment 127.0.0.1/32 trust
host replication experiment ::1/128 trust
Pulsar IO (Pulsar's analog to Kafka Connect) does not integrate message transformations like Kafka does via Kafka Connect Single Message Transformations (SMTs).
Pulsar IO's main focus seems to be connecting external systems via business logic agnostic source and sink adapters. Instead, Pulsar's built-in mechanism for message transformations is Pulsar Functions.
For example, there exists a Kafka Connect SMT for event outbox message routing which can be used out-of-the-box with Kafka. Unfortunately, there is not an out-of-the-box equivalent for Pulsar.
To achieve something similar in Pulsar, one might look to one of the following:
- Apply a custom routing transformation on the raw Pulsar IO Postgres source data that was sent to the topic. For example, via Pulsar Functions, a stream processor of your choice (i.e. Flink), or your own service running on your compute platform of choice. This type of approach is explored in experiment-event-outbox-router.
- Leverage Debezium Engine to implement your own Postgres CDC logical replication consumer.