Skip to content

Commit

Permalink
Fixing datetime replication
Browse files Browse the repository at this point in the history
Turns out CBOR by default serializes date time into unix timestamp and
drops the serialization information about it. This PR adds new struct
and enforces tag to be serialized and deserialized along with the
values.
  • Loading branch information
maxpert committed Aug 19, 2023
1 parent 59c3a75 commit 8803efb
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 17 deletions.
10 changes: 10 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package core

import "github.com/fxamacker/cbor/v2"

var CBORTags = cbor.NewTagSet()

type ReplicableEvent[T any] interface {
Wrap() (T, error)
Unwrap() (T, error)
}
2 changes: 1 addition & 1 deletion db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (conn *SqliteStreamDB) publishChangeLog() {

err = conn.consumeChangeLogs(change.TableName, []*changeLogEntry{&logEntry})
if err != nil {
if err == ErrLogNotReadyToPublish || err == context.Canceled {
if errors.Is(err, ErrLogNotReadyToPublish) || errors.Is(err, context.Canceled) {
break
}

Expand Down
80 changes: 74 additions & 6 deletions db/change_log_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ package db

import (
"hash/fnv"
"reflect"
"sort"
"sync"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/maxpert/marmot/core"
"github.com/rs/zerolog/log"
)

var tablePKColumnsCache = make(map[string][]string)
var tablePKColumnsLock = sync.RWMutex{}

type sensitiveTypeWrapper struct {
Time *time.Time `cbor:"1,keyasint,omitempty"`
}

type ChangeLogEvent struct {
Id int64
Type string
Expand All @@ -19,15 +27,50 @@ type ChangeLogEvent struct {
tableInfo []*ColumnInfo `cbor:"-"`
}

func (e *ChangeLogEvent) Marshal() ([]byte, error) {
return cbor.Marshal(e)
func init() {
err := core.CBORTags.Add(
cbor.TagOptions{
DecTag: cbor.DecTagRequired,
EncTag: cbor.EncTagRequired,
},
reflect.TypeOf(sensitiveTypeWrapper{}),
32,
)

log.Panic().Err(err)
}

func (s sensitiveTypeWrapper) GetValue() any {
// Right now only sensitive value is Time
return s.Time
}

func (e ChangeLogEvent) Wrap() (ChangeLogEvent, error) {
return e.prepare(), nil
}

func (e *ChangeLogEvent) Unmarshal(data []byte) error {
return cbor.Unmarshal(data, e)
func (e ChangeLogEvent) Unwrap() (ChangeLogEvent, error) {
ret := ChangeLogEvent{
Id: e.Id,
TableName: e.TableName,
Type: e.Type,
Row: map[string]any{},
tableInfo: e.tableInfo,
}

for k, v := range e.Row {
if st, ok := v.(sensitiveTypeWrapper); ok {
ret.Row[k] = st.GetValue()
continue
}

ret.Row[k] = v
}

return ret, nil
}

func (e *ChangeLogEvent) Hash() (uint64, error) {
func (e ChangeLogEvent) Hash() (uint64, error) {
hasher := fnv.New64()
enc := cbor.NewEncoder(hasher)
err := enc.StartIndefiniteArray()
Expand Down Expand Up @@ -56,7 +99,7 @@ func (e *ChangeLogEvent) Hash() (uint64, error) {
return hasher.Sum64(), nil
}

func (e *ChangeLogEvent) getSortedPKColumns() []string {
func (e ChangeLogEvent) getSortedPKColumns() []string {
tablePKColumnsLock.RLock()

if values, found := tablePKColumnsCache[e.TableName]; found {
Expand All @@ -79,3 +122,28 @@ func (e *ChangeLogEvent) getSortedPKColumns() []string {
tablePKColumnsCache[e.TableName] = pkColumns
return pkColumns
}

func (e ChangeLogEvent) prepare() ChangeLogEvent {
needsTransform := false
preparedRow := map[string]any{}
for k, v := range e.Row {
if t, ok := v.(time.Time); ok {
preparedRow[k] = sensitiveTypeWrapper{Time: &t}
needsTransform = true
} else {
preparedRow[k] = v
}
}

if !needsTransform {
return e
}

return ChangeLogEvent{
Id: e.Id,
Type: e.Type,
TableName: e.TableName,
Row: preparedRow,
tableInfo: e.tableInfo,
}
}
43 changes: 38 additions & 5 deletions logstream/replication_event.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
package logstream

import "github.com/fxamacker/cbor/v2"
import (
"github.com/fxamacker/cbor/v2"
"github.com/maxpert/marmot/core"
)

type ReplicationEvent[T any] struct {
type ReplicationEvent[T core.ReplicableEvent[T]] struct {
FromNodeId uint64
Payload *T
Payload T
}

func (e *ReplicationEvent[T]) Marshal() ([]byte, error) {
return cbor.Marshal(e)
wrappedPayload, err := e.Payload.Wrap()
if err != nil {
return nil, err
}

ev := ReplicationEvent[T]{
FromNodeId: e.FromNodeId,
Payload: wrappedPayload,
}

em, err := cbor.EncOptions{}.EncModeWithTags(core.CBORTags)
if err != nil {
return nil, err
}

return em.Marshal(ev)
}

func (e *ReplicationEvent[T]) Unmarshal(data []byte) error {
return cbor.Unmarshal(data, e)
dm, err := cbor.DecOptions{}.DecModeWithTags(core.CBORTags)
if err != nil {
return nil
}

err = dm.Unmarshal(data, e)
if err != nil {
return err
}

e.Payload, err = e.Payload.Unwrap()
if err != nil {
return err
}

return nil
}
6 changes: 3 additions & 3 deletions logstream/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstream

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -178,8 +179,7 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
savedSeq := r.repState.get(streamName(shardID, r.compressionEnabled))
for sub.IsValid() {
msg, err := sub.NextMsg(5 * time.Second)

if err == nats.ErrTimeout {
if errors.Is(err, nats.ErrTimeout) {
continue
}

Expand All @@ -199,7 +199,7 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
err = r.invokeListener(callback, msg)
if err != nil {
msg.Nak()
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func onChangeEvent(streamDB *db.SqliteStreamDB, ctxSt *utils.StateContext, event
return err
}

return streamDB.Replicate(ev.Payload)
return streamDB.Replicate(&ev.Payload)
}
}

Expand All @@ -206,7 +206,7 @@ func onTableChanged(r *logstream.Replicator, ctxSt *utils.StateContext, events E

ev := &logstream.ReplicationEvent[db.ChangeLogEvent]{
FromNodeId: nodeID,
Payload: event,
Payload: *event,
}

data, err := ev.Marshal()
Expand Down

0 comments on commit 8803efb

Please sign in to comment.