This repository has been archived by the owner on May 27, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlistener.go
72 lines (59 loc) · 1.48 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package mutantdb
import (
"bytes"
"context"
"encoding/json"
"strings"
"github.com/jackc/pgx/v4"
)
type Listener[T any] struct {
conn *pgx.Conn
entityType *Type[T]
listening bool
chunks map[string]*bytes.Buffer
}
func NewListener[T any](conn *pgx.Conn, t *Type[T]) *Listener[T] {
return &Listener[T]{
conn: conn,
entityType: t,
chunks: make(map[string]*bytes.Buffer),
}
}
func (l *Listener[T]) WaitForProjection(ctx context.Context) (Projection[T], error) {
p := Projection[T]{}
if !l.listening {
chanName := l.entityType.Name() + "_projections"
if _, err := l.conn.Exec(ctx, "listen $1", chanName); err != nil {
return p, err
}
l.listening = true
}
for {
// if ctx is done, err will be non nil
n, err := l.conn.WaitForNotification(ctx)
if err != nil {
return p, err
}
// characters before first pipe are notification id
// characters between first and second pipe are chunk counter
// characters after second pipe are up to 4000 bytes of json
// payload is complete if counter is EOF
pipe1 := strings.Index(n.Payload, "|")
pipe2 := strings.Index(n.Payload[pipe1+1:], "|") + pipe1 + 1
id := n.Payload[:pipe1]
counter := n.Payload[pipe1+1 : pipe2]
chunk := n.Payload[pipe2+1:]
buf, ok := l.chunks[id]
if !ok {
buf = bytes.NewBuffer([]byte{})
l.chunks[id] = buf
}
if counter != "EOF" {
buf.WriteString(chunk)
continue
}
delete(l.chunks, id)
err = json.Unmarshal(buf.Bytes(), &p)
return p, err
}
}