diff --git a/gtm.go b/gtm.go index 6057467..f2eee62 100644 --- a/gtm.go +++ b/gtm.go @@ -77,6 +77,7 @@ const ( type Options struct { After TimestampGenerator + Token ResumeTokenGenenerator Filter OpFilter NamespaceFilter OpFilter OpLogDisabled bool @@ -102,6 +103,11 @@ type Options struct { Log *log.Logger } +type OpResumeToken struct { + StreamID string + ResumeToken interface{} +} + type Op struct { Id interface{} `json:"_id"` Operation string `json:"operation"` @@ -110,7 +116,8 @@ type Op struct { Timestamp primitive.Timestamp `json:"timestamp"` Source QuerySource `json:"source"` Doc interface{} `json:"doc,omitempty"` - UpdateDescription map[string]interface{} `json:"updateDescription,omitempty` + UpdateDescription map[string]interface{} `json:"updateDescription,omitempty"` + ResumeToken OpResumeToken `json:"-"` } type ReplStatus struct { @@ -313,6 +320,8 @@ type OpFilter func(*Op) bool type ShardInsertHandler func(*ShardInfo) (*mongo.Client, error) +type ResumeTokenGenenerator func(*mongo.Client, string, *Options) (interface{}, error) + type TimestampGenerator func(*mongo.Client, *Options) (primitive.Timestamp, error) type DataUnmarshaller func(namespace string, data []byte) (interface{}, error) @@ -1251,10 +1260,14 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options n := &N{} n.parseForChanges(ns) ctx.log.Printf("Watching changes on %s", n.desc()) + var tokenMode bool var pipeline []interface{} - var startAt *primitive.Timestamp - var resumeAfter interface{} - if o.After != nil { + var startAt *primitive.Timestamp = nil + var resumeAfter interface{} = nil + if o.Token != nil { + tokenMode = true + resumeAfter, _ = o.Token(client, ns, o) + } else if o.After != nil { if pos, err := o.After(client, o); err == nil { if pos.T > 0 { startAt = &pos @@ -1320,12 +1333,17 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options resumeAfter = changeDoc.Id startAt = nil oper := changeDoc.mapOperation() + token := OpResumeToken{ + StreamID: ns, + ResumeToken: resumeAfter, + } if changeDoc.isDrop() { op := &Op{ - Operation: oper, - Namespace: changeDoc.mapNs(), - Source: OplogQuerySource, - Timestamp: changeDoc.mapTimestamp(), + Operation: oper, + Namespace: changeDoc.mapNs(), + Source: OplogQuerySource, + Timestamp: changeDoc.mapTimestamp(), + ResumeToken: token, } op.Data = map[string]interface{}{"drop": changeDoc.Namespace.Collection} if op.matchesNsFilter(o) { @@ -1333,10 +1351,11 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options } } else if changeDoc.isDropDatabase() { op := &Op{ - Operation: oper, - Namespace: changeDoc.mapNs(), - Source: OplogQuerySource, - Timestamp: changeDoc.mapTimestamp(), + Operation: oper, + Namespace: changeDoc.mapNs(), + Source: OplogQuerySource, + Timestamp: changeDoc.mapTimestamp(), + ResumeToken: token, } op.Data = map[string]interface{}{"dropDatabase": changeDoc.Namespace.Database} if op.matchesNsFilter(o) { @@ -1349,11 +1368,12 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options time.Sleep(time.Duration(5) * time.Second) } else if oper != "" { op := &Op{ - Id: changeDoc.docId(), - Operation: oper, - Namespace: changeDoc.mapNs(), - Source: OplogQuerySource, - Timestamp: changeDoc.mapTimestamp(), + Id: changeDoc.docId(), + Operation: oper, + Namespace: changeDoc.mapNs(), + Source: OplogQuerySource, + Timestamp: changeDoc.mapTimestamp(), + ResumeToken: token, } if op.matchesNsFilter(o) { if changeDoc.hasUpdate() { @@ -1372,7 +1392,11 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options select { case ts := <-ctx.seekC: resumeAfter = nil - startAt = &ts + if tokenMode { + resumeAfter, _ = o.Token(client, ns, o) + } else { + startAt = &ts + } next = false case <-ctx.pauseC: stream.Close(context.Background()) @@ -1381,7 +1405,11 @@ func ConsumeChangeStream(ctx *OpCtx, client *mongo.Client, ns string, o *Options select { case ts := <-ctx.seekC: resumeAfter = nil - startAt = &ts + if tokenMode { + resumeAfter, _ = o.Token(client, ns, o) + } else { + startAt = &ts + } default: break } @@ -1580,6 +1608,7 @@ func OpFilterForOrdering(ordering OrderingGuarantee, workers []string, worker st func DefaultOptions() *Options { return &Options{ After: LastOpTimestamp, + Token: nil, Filter: nil, NamespaceFilter: nil, OpLogDatabaseName: "local",