Skip to content

Commit

Permalink
Add the ability to resume streams from tokens. Resuming from timestamps
Browse files Browse the repository at this point in the history
is not possible before MongoDB 4.0.  Resuming from tokens should work
for MongoDB 3.6+.
  • Loading branch information
rwynn committed Nov 19, 2019
1 parent 32a9d23 commit 081995b
Showing 1 changed file with 48 additions and 19 deletions.
67 changes: 48 additions & 19 deletions gtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ const (

type Options struct {
After TimestampGenerator
Token ResumeTokenGenenerator
Filter OpFilter
NamespaceFilter OpFilter
OpLogDisabled bool
Expand All @@ -102,6 +103,11 @@ type Options struct {
Log *log.Logger
}

type OpResumeToken struct {
StreamID string
ResumeToken interface{}
}

type Op struct {
Id interface{} `json:"_id"`
Operation string `json:"operation"`
Expand All @@ -110,7 +116,8 @@ type Op struct {
Timestamp primitive.Timestamp `json:"timestamp"`
Source QuerySource `json:"source"`
Doc interface{} `json:"doc,omitempty"`
UpdateDescription map[string]interface{} `json:"updateDescription,omitempty`
UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"`
ResumeToken OpResumeToken `json:"-"`
}

type ReplStatus struct {
Expand Down Expand Up @@ -313,6 +320,8 @@ type OpFilter func(*Op) bool

type ShardInsertHandler func(*ShardInfo) (*mongo.Client, error)

type ResumeTokenGenenerator func(*mongo.Client, string, *Options) (interface{}, error)

type TimestampGenerator func(*mongo.Client, *Options) (primitive.Timestamp, error)

type DataUnmarshaller func(namespace string, data []byte) (interface{}, error)
Expand Down Expand Up @@ -1251,10 +1260,14 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options
n := &N{}
n.parseForChanges(ns)
ctx.log.Printf("Watching changes on %s", n.desc())
var tokenMode bool
var pipeline []interface{}
var startAt *primitive.Timestamp
var resumeAfter interface{}
if o.After != nil {
var startAt *primitive.Timestamp = nil
var resumeAfter interface{} = nil
if o.Token != nil {
tokenMode = true
resumeAfter, _ = o.Token(client, ns, o)
} else if o.After != nil {
if pos, err := o.After(client, o); err == nil {
if pos.T > 0 {
startAt = &pos
Expand Down Expand Up @@ -1320,23 +1333,29 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options
resumeAfter = changeDoc.Id
startAt = nil
oper := changeDoc.mapOperation()
token := OpResumeToken{
StreamID: ns,
ResumeToken: resumeAfter,
}
if changeDoc.isDrop() {
op := &Op{
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
ResumeToken: token,
}
op.Data = map[string]interface{}{"drop": changeDoc.Namespace.Collection}
if op.matchesNsFilter(o) {
ctx.OpC <- op
}
} else if changeDoc.isDropDatabase() {
op := &Op{
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
ResumeToken: token,
}
op.Data = map[string]interface{}{"dropDatabase": changeDoc.Namespace.Database}
if op.matchesNsFilter(o) {
Expand All @@ -1349,11 +1368,12 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options
time.Sleep(time.Duration(5) * time.Second)
} else if oper != "" {
op := &Op{
Id: changeDoc.docId(),
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
Id: changeDoc.docId(),
Operation: oper,
Namespace: changeDoc.mapNs(),
Source: OplogQuerySource,
Timestamp: changeDoc.mapTimestamp(),
ResumeToken: token,
}
if op.matchesNsFilter(o) {
if changeDoc.hasUpdate() {
Expand All @@ -1372,7 +1392,11 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options
select {
case ts := <-ctx.seekC:
resumeAfter = nil
startAt = &ts
if tokenMode {
resumeAfter, _ = o.Token(client, ns, o)
} else {
startAt = &ts
}
next = false
case <-ctx.pauseC:
stream.Close(context.Background())
Expand All @@ -1381,7 +1405,11 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options
select {
case ts := <-ctx.seekC:
resumeAfter = nil
startAt = &ts
if tokenMode {
resumeAfter, _ = o.Token(client, ns, o)
} else {
startAt = &ts
}
default:
break
}
Expand Down Expand Up @@ -1580,6 +1608,7 @@ func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker st
func DefaultOptions() *Options {
return &Options{
After: LastOpTimestamp,
Token: nil,
Filter: nil,
NamespaceFilter: nil,
OpLogDatabaseName: "local",
Expand Down

0 comments on commit 081995b

Please sign in to comment.