Skip to content

Commit

Permalink
Merge pull request #77 from maxpert/fix-datetime
Browse files Browse the repository at this point in the history
Fixing datetime replication
  • Loading branch information
maxpert authored Aug 28, 2023
2 parents 59c3a75 + 8803efb commit be02273
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 17 deletions.
10 changes: 10 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package core

import "github.com/fxamacker/cbor/v2"

var CBORTags = cbor.NewTagSet()

type ReplicableEvent[T any] interface {
Wrap() (T, error)
Unwrap() (T, error)
}
2 changes: 1 addition & 1 deletion db/change_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ func (conn *SqliteStreamDB) publishChangeLog() {

err = conn.consumeChangeLogs(change.TableName, []*changeLogEntry{&logEntry})
if err != nil {
if err == ErrLogNotReadyToPublish || err == context.Canceled {
if errors.Is(err, ErrLogNotReadyToPublish) || errors.Is(err, context.Canceled) {
break
}

Expand Down
80 changes: 74 additions & 6 deletions db/change_log_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,23 @@ package db

import (
"hash/fnv"
"reflect"
"sort"
"sync"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/maxpert/marmot/core"
"github.com/rs/zerolog/log"
)

var tablePKColumnsCache = make(map[string][]string)
var tablePKColumnsLock = sync.RWMutex{}

type sensitiveTypeWrapper struct {
Time *time.Time `cbor:"1,keyasint,omitempty"`
}

type ChangeLogEvent struct {
Id int64
Type string
Expand All @@ -19,15 +27,50 @@ type ChangeLogEvent struct {
tableInfo []*ColumnInfo `cbor:"-"`
}

func (e *ChangeLogEvent) Marshal() ([]byte, error) {
return cbor.Marshal(e)
func init() {
err := core.CBORTags.Add(
cbor.TagOptions{
DecTag: cbor.DecTagRequired,
EncTag: cbor.EncTagRequired,
},
reflect.TypeOf(sensitiveTypeWrapper{}),
32,
)

log.Panic().Err(err)
}

func (s sensitiveTypeWrapper) GetValue() any {
// Right now only sensitive value is Time
return s.Time
}

func (e ChangeLogEvent) Wrap() (ChangeLogEvent, error) {
return e.prepare(), nil
}

func (e *ChangeLogEvent) Unmarshal(data []byte) error {
return cbor.Unmarshal(data, e)
func (e ChangeLogEvent) Unwrap() (ChangeLogEvent, error) {
ret := ChangeLogEvent{
Id: e.Id,
TableName: e.TableName,
Type: e.Type,
Row: map[string]any{},
tableInfo: e.tableInfo,
}

for k, v := range e.Row {
if st, ok := v.(sensitiveTypeWrapper); ok {
ret.Row[k] = st.GetValue()
continue
}

ret.Row[k] = v
}

return ret, nil
}

func (e *ChangeLogEvent) Hash() (uint64, error) {
func (e ChangeLogEvent) Hash() (uint64, error) {
hasher := fnv.New64()
enc := cbor.NewEncoder(hasher)
err := enc.StartIndefiniteArray()
Expand Down Expand Up @@ -56,7 +99,7 @@ func (e *ChangeLogEvent) Hash() (uint64, error) {
return hasher.Sum64(), nil
}

func (e *ChangeLogEvent) getSortedPKColumns() []string {
func (e ChangeLogEvent) getSortedPKColumns() []string {
tablePKColumnsLock.RLock()

if values, found := tablePKColumnsCache[e.TableName]; found {
Expand All @@ -79,3 +122,28 @@ func (e *ChangeLogEvent) getSortedPKColumns() []string {
tablePKColumnsCache[e.TableName] = pkColumns
return pkColumns
}

func (e ChangeLogEvent) prepare() ChangeLogEvent {
needsTransform := false
preparedRow := map[string]any{}
for k, v := range e.Row {
if t, ok := v.(time.Time); ok {
preparedRow[k] = sensitiveTypeWrapper{Time: &t}
needsTransform = true
} else {
preparedRow[k] = v
}
}

if !needsTransform {
return e
}

return ChangeLogEvent{
Id: e.Id,
Type: e.Type,
TableName: e.TableName,
Row: preparedRow,
tableInfo: e.tableInfo,
}
}
43 changes: 38 additions & 5 deletions logstream/replication_event.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,49 @@
package logstream

import "github.com/fxamacker/cbor/v2"
import (
"github.com/fxamacker/cbor/v2"
"github.com/maxpert/marmot/core"
)

type ReplicationEvent[T any] struct {
type ReplicationEvent[T core.ReplicableEvent[T]] struct {
FromNodeId uint64
Payload *T
Payload T
}

func (e *ReplicationEvent[T]) Marshal() ([]byte, error) {
return cbor.Marshal(e)
wrappedPayload, err := e.Payload.Wrap()
if err != nil {
return nil, err
}

ev := ReplicationEvent[T]{
FromNodeId: e.FromNodeId,
Payload: wrappedPayload,
}

em, err := cbor.EncOptions{}.EncModeWithTags(core.CBORTags)
if err != nil {
return nil, err
}

return em.Marshal(ev)
}

func (e *ReplicationEvent[T]) Unmarshal(data []byte) error {
return cbor.Unmarshal(data, e)
dm, err := cbor.DecOptions{}.DecModeWithTags(core.CBORTags)
if err != nil {
return nil
}

err = dm.Unmarshal(data, e)
if err != nil {
return err
}

e.Payload, err = e.Payload.Unwrap()
if err != nil {
return err
}

return nil
}
6 changes: 3 additions & 3 deletions logstream/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstream

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -178,8 +179,7 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
savedSeq := r.repState.get(streamName(shardID, r.compressionEnabled))
for sub.IsValid() {
msg, err := sub.NextMsg(5 * time.Second)

if err == nats.ErrTimeout {
if errors.Is(err, nats.ErrTimeout) {
continue
}

Expand All @@ -199,7 +199,7 @@ func (r *Replicator) Listen(shardID uint64, callback func(payload []byte) error)
err = r.invokeListener(callback, msg)
if err != nil {
msg.Nak()
if err == context.Canceled {
if errors.Is(err, context.Canceled) {
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions marmot.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func onChangeEvent(streamDB *db.SqliteStreamDB, ctxSt *utils.StateContext, event
return err
}

return streamDB.Replicate(ev.Payload)
return streamDB.Replicate(&ev.Payload)
}
}

Expand All @@ -206,7 +206,7 @@ func onTableChanged(r *logstream.Replicator, ctxSt *utils.StateContext, events E

ev := &logstream.ReplicationEvent[db.ChangeLogEvent]{
FromNodeId: nodeID,
Payload: event,
Payload: *event,
}

data, err := ev.Marshal()
Expand Down

0 comments on commit be02273

Please sign in to comment.