Skip to content

Commit

Permalink
add option for bounded direct reads
Browse files Browse the repository at this point in the history
  • Loading branch information
rwynn committed Oct 17, 2019
1 parent d992aab commit d7134e6
Showing 1 changed file with 53 additions and 16 deletions.
69 changes: 53 additions & 16 deletions gtm.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@ package gtm
import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/serialx/hashring"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
"log"
"net"
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/pkg/errors"
"github.com/serialx/hashring"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/x/bsonx"
)

var opCodes = [...]string{"c", "i", "u", "d"}
Expand Down Expand Up @@ -94,6 +95,7 @@ type Options struct {
DirectReadSplitMax int32
DirectReadConcur int
DirectReadNoTimeout bool
DirectReadBounded bool
Unmarshal DataUnmarshaller
Pipe PipelineBuilder
PipeAllowDisk bool
Expand Down Expand Up @@ -215,6 +217,7 @@ type CollectionSegment struct {
splitKey string
splits []map[string]interface{}
subSegments []*CollectionSegment
maxIncl bool
}

func (cs *CollectionSegment) shrinkTo(next interface{}) {
Expand All @@ -227,7 +230,11 @@ func (cs *CollectionSegment) toSelector() bson.M {
doc["$gte"] = cs.min
}
if cs.max != nil {
doc["$lt"] = cs.max
if cs.maxIncl {
doc["$lte"] = cs.max
} else {
doc["$lt"] = cs.max
}
}
if len(doc) > 0 {
sel[cs.splitKey] = doc
Expand Down Expand Up @@ -261,21 +268,35 @@ func (cs *CollectionSegment) divide() {
cs.subSegments = append(cs.subSegments, ns)
}

func (cs *CollectionSegment) init(c *mongo.Collection) (err error) {
func (cs *CollectionSegment) setMin(ctx context.Context, c *mongo.Collection) (err error) {
opts := &options.FindOneOptions{}
opts.SetSort(bson.M{cs.splitKey: 1})
opts.SetProjection(bson.M{cs.splitKey: 1})
doc := make(map[string]interface{})
if err = c.FindOne(context.Background(), nil, opts).Decode(&doc); err != nil {
return
if err = c.FindOne(ctx, nil, opts).Decode(&doc); err == nil {
cs.min = doc[cs.splitKey]
}
cs.min = doc[cs.splitKey]
opts = &options.FindOneOptions{}
return
}

func (cs *CollectionSegment) setMax(ctx context.Context, c *mongo.Collection) (err error) {
opts := &options.FindOneOptions{}
opts.SetSort(bson.M{cs.splitKey: -1})
doc = make(map[string]interface{})
if err = c.FindOne(context.Background(), nil, opts).Decode(&doc); err != nil {
opts.SetProjection(bson.M{cs.splitKey: 1})
doc := make(map[string]interface{})
if err = c.FindOne(ctx, nil, opts).Decode(&doc); err == nil {
cs.max = doc[cs.splitKey]
}
return
}

func (cs *CollectionSegment) init(c *mongo.Collection) (err error) {
if err = cs.setMin(context.Background(), c); err != nil {
return
}
if err = cs.setMax(context.Background(), c); err != nil {
return
}
cs.max = doc[cs.splitKey]
return
}

Expand Down Expand Up @@ -1065,6 +1086,21 @@ func DirectReadSegment(ctx *OpCtx, client *mongo.Client, ns string, o *Options,
}
}
c := client.Database(n.database).Collection(n.collection)
if o.DirectReadBounded {
if seg.min == nil {
if err = seg.setMin(task.ctx, c); err != nil {
ctx.ErrC <- errors.Wrap(err, "Error finding min of bounded direct read")
return
}
}
if seg.max == nil {
seg.maxIncl = true
if err = seg.setMax(task.ctx, c); err != nil {
ctx.ErrC <- errors.Wrap(err, "Error finding max of bounded direct read")
return
}
}
}
sel := seg.toSelector()
var cursor *mongo.Cursor
if o.Pipe != nil {
Expand Down Expand Up @@ -1520,6 +1556,7 @@ func DefaultOptions() *Options {
DirectReadSplitMax: 9,
DirectReadConcur: 0,
DirectReadNoTimeout: false,
DirectReadBounded: false,
Unmarshal: nil,
Log: log.New(os.Stdout, "INFO ", log.Flags()),
}
Expand Down

0 comments on commit d7134e6

Please sign in to comment.