-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.go
137 lines (117 loc) · 4.03 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package main
import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"time"
"github.com/jackc/pgconn"
"github.com/jackc/pglogrepl"
"github.com/jackc/pgproto3/v2"
"github.com/sirupsen/logrus"
)
const CONN = "postgres://postgres:password@localhost/postgres?replication=database"
const SLOT_NAME = "go_slot"
const OUTPUT_PLUGIN = "pgoutput"
var ACTOR = struct {
Relation string
Columns []string
}{}
// TODO: add migrations for smooth running
var migration = "CREATE TABLE IF NOT EXISTS gizmos(val int)"
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
/*
notice the drop down from pgx to pgconn,
this is due to protocol differences when consuming logical replication changesets vs simple SQL execution
*/
conn, err := pgconn.Connect(ctx, CONN)
if err != nil {
panic(err)
}
defer conn.Close(ctx)
// 0. migrations
if _, err := conn.Exec(ctx, migration).ReadAll(); err != nil {
logrus.WithError(err).Fatal("failed to run migrations")
}
// 1. ensure publication exists
if _, err := conn.Exec(ctx, "DROP PUBLICATION IF EXISTS gizmo_pub").ReadAll(); err != nil {
logrus.WithError(err).Fatal("failed to drop publication error")
}
if _, err := conn.Exec(ctx, "CREATE PUBLICATION gizmo_pub FOR TABLE gizmos").ReadAll(); err != nil {
logrus.WithError(err).Fatal("failed to create publication")
}
// 2. create temproary replication slot server
if _, err = pglogrepl.CreateReplicationSlot(ctx, conn, SLOT_NAME, OUTPUT_PLUGIN, pglogrepl.CreateReplicationSlotOptions{Temporary: true}); err != nil {
logrus.WithError(err).Fatal("failed to create a replication slot")
}
var msgPointer pglogrepl.LSN
pluginArguments := []string{"proto_version '1'", "publication_names 'gizmo_pub'"}
// 3. establish connection
err = pglogrepl.StartReplication(ctx, conn, SLOT_NAME, msgPointer, pglogrepl.StartReplicationOptions{PluginArgs: pluginArguments})
if err != nil {
logrus.WithError(err).Fatal("failed to establish start replication")
}
var pingTime time.Time
for ctx.Err() != context.Canceled {
if time.Now().After(pingTime) {
if err = pglogrepl.SendStandbyStatusUpdate(ctx, conn, pglogrepl.StandbyStatusUpdate{WALWritePosition: msgPointer}); err != nil {
logrus.WithError(err).Fatal("failed to send standby update")
}
pingTime = time.Now().Add(10 * time.Second)
logrus.Debug("client: please standby")
}
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()
msg, err := conn.ReceiveMessage(ctx)
if pgconn.Timeout(err) {
continue
}
if err != nil {
logrus.WithError(err).Fatal("something went while listening for message")
}
switch msg := msg.(type) {
case *pgproto3.CopyData:
switch msg.Data[0] {
case pglogrepl.PrimaryKeepaliveMessageByteID:
logrus.Debug("server: confirmed standby")
case pglogrepl.XLogDataByteID:
walLog, err := pglogrepl.ParseXLogData(msg.Data[1:])
if err != nil {
logrus.WithError(err).Fatal("failed to parse logical WAL log:", err)
}
var msg pglogrepl.Message
if msg, err = pglogrepl.Parse(walLog.WALData); err != nil {
logrus.WithError(err).Fatalf("failed to parse logical replication message")
}
/*
simply logging here, but could easily push onto a message queue or something
*/
switch m := msg.(type) {
case *pglogrepl.RelationMessage:
ACTOR.Columns = []string{}
for _, col := range m.Columns {
ACTOR.Columns = append(ACTOR.Columns, col.Name)
}
ACTOR.Relation = m.RelationName
case *pglogrepl.InsertMessage:
var sb strings.Builder
sb.WriteString(fmt.Sprintf("INSERT %s(", ACTOR.Relation))
for i := 0; i < len(ACTOR.Columns); i++ {
sb.WriteString(fmt.Sprintf("%s: %s", ACTOR.Columns[i], string(m.Tuple.Columns[i].Data)))
}
sb.WriteString(")\n")
logrus.Info(sb.String())
case *pglogrepl.DeleteMessage:
logrus.Info("DELETE")
case *pglogrepl.TruncateMessage:
logrus.Info("ALL GONE (TRUNCATE)")
}
}
default:
logrus.Warnf("received unexpected message: %T\n", msg)
}
}
}