Skip to content

Commit

Permalink
fix(Dgraph): update reverse index when updating single UID predicates. (
Browse files Browse the repository at this point in the history
#6748)

Fixes the test OverwriteUidPredicatesReverse added in the previous PR #6746.

When updating a single uid predicate with a reverse index, the existing entry in the
reverse index should be deleted first.

Fixes DGRAPH-1738

(cherry picked from commit 5b70fe8)

* Remove context arguments.

* Optimize computing reverse reindexing (#4755)

We used to compute reverse count indexes twice while reindexing.
First, while computing reverse edges, and then later while explicitly
computing reverse count index. Now, we use a different function
while reindexing reverse edges and do not compute the reverse
count indexes there any more.

* fix(Dgraph): update reverse index when updating single UID predicates. (#5868)

When updating a single uid predicate with a reverse index, the existing entry in the
reverse index should be deleted first.

Fixes DGRAPH-1738

Co-authored-by: Martin Martinez Rivera <martinmr@dgraph.io>
Co-authored-by: Aman Mangal <aman@dgraph.io>
  • Loading branch information
3 people authored Oct 27, 2020
1 parent 2ccb101 commit 30baf66
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 19 deletions.
68 changes: 68 additions & 0 deletions dgraph/cmd/alpha/reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,71 @@ func TestReindexLang(t *testing.T) {
}
}`, res)
}

func TestReindexReverseCount(t *testing.T) {
require.NoError(t, dropAll())
require.NoError(t, alterSchema(`value: [uid] .`))

m1 := `{
set {
<1> <value> <4> .
<1> <value> <5> .
<1> <value> <6> .
<1> <value> <7> .
<1> <value> <8> .
<2> <value> <4> .
<2> <value> <5> .
<2> <value> <6> .
<3> <value> <5> .
<3> <value> <6> .
}
}`
_, err := mutationWithTs(m1, "application/rdf", false, true, 0)
require.NoError(t, err)

// reindex
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))

q1 := `{
q(func: eq(count(~value), "3")) {
uid
}
}`
res, _, err := queryWithTs(q1, "application/graphql+-", "", 0)
require.NoError(t, err)
require.JSONEq(t, `{
"data": {
"q": [
{
"uid": "0x5"
},
{
"uid": "0x6"
}
]
}
}`, res)

// adding another triplet
m2 := `{ set { <9> <value> <4> . }}`
_, err = mutationWithTs(m2, "application/rdf", false, true, 0)
require.NoError(t, err)

res, _, err = queryWithTs(q1, "application/graphql+-", "", 0)
require.NoError(t, err)
require.JSONEq(t, `{
"data": {
"q": [
{
"uid": "0x4"
},
{
"uid": "0x5"
},
{
"uid": "0x6"
}
]
}
}`, res)
}
95 changes: 76 additions & 19 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,30 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
}

func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) error {
key := x.ReverseKey(t.Attr, t.ValueId)
plist, err := txn.GetFromDelta(key)
if err != nil {
return err
}
x.AssertTrue(plist != nil)

// We must create a copy here.
edge := &pb.DirectedEdge{
Entity: t.ValueId,
ValueId: t.Entity,
Attr: t.Attr,
Op: t.Op,
Facets: t.Facets,
}
if err := plist.addMutation(ctx, txn, edge); err != nil {
return err
}

ostats.Record(ctx, x.NumEdges.M(1))
return nil
}

func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEdge) error {
key := x.ReverseKey(t.Attr, t.ValueId)
hasCountIndex := schema.State().HasCount(t.Attr)

Expand All @@ -190,8 +214,37 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro
if err != nil {
return err
}
if plist == nil {
return errors.Errorf("nil posting list for reverse key %s", hex.Dump(key))
}

// For single uid predicates, updating the reverse index requires that the existing
// entries for this key in the index are removed.
pred, ok := schema.State().Get(t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
t.Op == pb.DirectedEdge_SET && t.ValueId != 0
if isSingleUidUpdate {
dataKey := x.DataKey(t.Attr, t.Entity)
dataList, err := getFn(dataKey)
if err != nil {
return errors.Wrapf(err, "cannot find single uid list to update with key %s",
hex.Dump(dataKey))
}
err = dataList.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
delEdge := &pb.DirectedEdge{
Entity: t.Entity,
ValueId: p.Uid,
Attr: t.Attr,
Op: pb.DirectedEdge_DEL,
}
return txn.addReverseAndCountMutation(ctx, delEdge)
})
if err != nil {
return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s",
hex.Dump(dataKey))
}
}

x.AssertTrue(plist != nil)
// We must create a copy here.
edge := &pb.DirectedEdge{
Entity: t.ValueId,
Expand All @@ -212,6 +265,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro
return err
}
}

return nil
}

Expand All @@ -233,7 +287,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge,
case isReversed:
// Delete reverse edge for each posting.
delEdge.ValueId = p.Uid
return txn.addReverseMutation(ctx, delEdge)
return txn.addReverseAndCountMutation(ctx, delEdge)
case isIndexed:
// Delete index edge of each posting.
val := types.Val{
Expand Down Expand Up @@ -284,7 +338,6 @@ func (txn *Txn) addCountMutation(ctx context.Context, t *pb.DirectedEdge, count
}
ostats.Record(ctx, x.NumEdges.M(1))
return nil

}

func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
Expand All @@ -293,9 +346,11 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
Attr: params.attr,
Op: pb.DirectedEdge_DEL,
}
if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore),
params.reverse); err != nil {
return err
if params.countBefore > 0 {
if err := txn.addCountMutation(ctx, &edge, uint32(params.countBefore),
params.reverse); err != nil {
return err
}
}

if params.countAfter > 0 {
Expand Down Expand Up @@ -393,6 +448,15 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,

doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr)
hasCountIndex := schema.State().HasCount(edge.Attr)

// Add reverse mutation irrespective of hasMutated, server crash can happen after
// mutation is synced and before reverse edge is synced
if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) {
if err := txn.addReverseAndCountMutation(ctx, edge); err != nil {
return err
}
}

val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge)
if err != nil {
return err
Expand Down Expand Up @@ -430,13 +494,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
}
}
}
// Add reverse mutation irrespective of hasMutated, server crash can happen after
// mutation is synced and before reverse edge is synced
if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) {
if err := txn.addReverseMutation(ctx, edge); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -531,13 +588,11 @@ func (r *rebuilder) Run(ctx context.Context) error {
dbOpts := badger.DefaultOptions(tmpIndexDir).
WithSyncWrites(false).
WithNumVersionsToKeep(math.MaxInt64).
WithLogger(&x.ToGlog{}).
WithCompression(options.None).
WithLogRotatesToFlush(10).
WithBlockCacheSize(50) // TODO(Aman): Disable cache altogether

// TODO(Ibrahim): Remove this once badger is updated.
dbOpts.ZSTDCompressionLevel = 1

tmpDB, err := badger.OpenManaged(dbOpts)
if err != nil {
return errors.Wrap(err, "error opening temp badger for reindexing")
Expand All @@ -555,9 +610,6 @@ func (r *rebuilder) Run(ctx context.Context) error {
// Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that
// WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter
// could be replaced with WriteBatch in the code
// Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that
// WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter
// could be replaced with WriteBatch in the code.
tmpWriter := NewTxnWriter(tmpDB)
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
Expand All @@ -580,6 +632,9 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, errors.Wrapf(err, "error reading posting list from disk")
}

// We are using different transactions in each call to KeyToList function. This could
// be a problem for computing reverse count indexes if deltas for same key are added
// in different transactions. Such a case doesn't occur for now.
txn := NewTxn(r.startTs)
if err := r.fn(pk.Uid, l, txn); err != nil {
return nil, err
Expand Down Expand Up @@ -986,6 +1041,8 @@ func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error {
edge.Label = pp.Label

for {
// we only need to build reverse index here.
// We will update the reverse count index separately.
err := txn.addReverseMutation(ctx, &edge)
switch err {
case ErrRetry:
Expand Down

0 comments on commit 30baf66

Please sign in to comment.