Skip to content

Commit

Permalink
fix(Dgraph): update reverse index when updating single UID predicates. (
Browse files Browse the repository at this point in the history
#5868) (#6006)

When updating a single uid predicate with a reverse index, the existing entry in the
reverse index should be deleted first.

Fixes DGRAPH-1738

(cherry picked from commit 5b70fe8)
  • Loading branch information
martinmr authored Aug 28, 2020
1 parent 802a2f2 commit ec19c6d
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 8 deletions.
48 changes: 40 additions & 8 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,37 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
if err != nil {
return err
}
if plist == nil {
return errors.Errorf("nil posting list for reverse key %s", hex.Dump(key))
}

// For single uid predicates, updating the reverse index requires that the existing
// entries for this key in the index are removed.
pred, ok := schema.State().Get(ctx, t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
t.Op == pb.DirectedEdge_SET && t.ValueId != 0
if isSingleUidUpdate {
dataKey := x.DataKey(t.Attr, t.Entity)
dataList, err := getFn(dataKey)
if err != nil {
return errors.Wrapf(err, "cannot find single uid list to update with key %s",
hex.Dump(dataKey))
}
err = dataList.Iterate(txn.StartTs, 0, func(p *pb.Posting) error {
delEdge := &pb.DirectedEdge{
Entity: t.Entity,
ValueId: p.Uid,
Attr: t.Attr,
Op: pb.DirectedEdge_DEL,
}
return txn.addReverseAndCountMutation(ctx, delEdge)
})
if err != nil {
return errors.Wrapf(err, "cannot remove existing reverse index entries for key %s",
hex.Dump(dataKey))
}
}

x.AssertTrue(plist != nil)
// We must create a copy here.
edge := &pb.DirectedEdge{
Entity: t.ValueId,
Expand All @@ -234,6 +263,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
return err
}
}

return nil
}

Expand Down Expand Up @@ -438,6 +468,15 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,

doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr)
hasCountIndex := schema.State().HasCount(ctx, edge.Attr)

// Add reverse mutation irrespective of hasMutated, server crash can happen after
// mutation is synced and before reverse edge is synced
if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) {
if err := txn.addReverseAndCountMutation(ctx, edge); err != nil {
return err
}
}

val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge)
if err != nil {
return err
Expand Down Expand Up @@ -475,13 +514,6 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
}
}
}
// Add reverse mutation irrespective of hasMutated, server crash can happen after
// mutation is synced and before reverse edge is synced
if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) {
if err := txn.addReverseAndCountMutation(ctx, edge); err != nil {
return err
}
}
return nil
}

Expand Down
88 changes: 88 additions & 0 deletions systest/mutations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestSystem(t *testing.T) {
t.Run("count index delete on non list predicate", wrap(CountIndexNonlistPredicateDelete))
t.Run("Reverse count index delete", wrap(ReverseCountIndexDelete))
t.Run("overwrite uid predicates", wrap(OverwriteUidPredicates))
t.Run("overwrite uid predicates reverse index", wrap(OverwriteUidPredicatesReverse))
t.Run("delete and query same txn", wrap(DeleteAndQuerySameTxn))
}

Expand Down Expand Up @@ -2187,6 +2188,93 @@ func OverwriteUidPredicates(t *testing.T, c *dgo.Dgraph) {
string(resp.GetJson()))
}

func OverwriteUidPredicatesReverse(t *testing.T, c *dgo.Dgraph) {
ctx := context.Background()
op := &api.Operation{DropAll: true}
require.NoError(t, c.Alter(ctx, op))

op = &api.Operation{
Schema: `
best_friend: uid @reverse .
name: string @index(exact) .`,
}
err := c.Alter(ctx, op)
require.NoError(t, err)

txn := c.NewTxn()
_, err = txn.Mutate(context.Background(), &api.Mutation{
CommitNow: true,
SetNquads: []byte(`
_:alice <name> "Alice" .
_:bob <name> "Bob" .
_:alice <best_friend> _:bob .`),
})
require.NoError(t, err)

q := `{
me(func: eq(name, Alice)) {
name
best_friend {
name
}
}
}`
resp, err := c.NewReadOnlyTxn().Query(ctx, q)
require.NoError(t, err)
testutil.CompareJSON(t, `{"me":[{"name":"Alice","best_friend": {"name": "Bob"}}]}`,
string(resp.GetJson()))

reverseQuery := `{
reverse(func: has(~best_friend)) {
name
~best_friend {
name
}
}}`
resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery)
require.NoError(t, err)
testutil.CompareJSON(t, `{"reverse":[{"name":"Bob","~best_friend": [{"name": "Alice"}]}]}`,
string(resp.GetJson()))

upsertQuery := `query { alice as var(func: eq(name, Alice)) }`
upsertMutation := &api.Mutation{
SetNquads: []byte(`
_:carol <name> "Carol" .
uid(alice) <best_friend> _:carol .`),
}
req := &api.Request{
Query: upsertQuery,
Mutations: []*api.Mutation{upsertMutation},
CommitNow: true,
}
_, err = c.NewTxn().Do(ctx, req)
require.NoError(t, err)

resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery)
require.NoError(t, err)
testutil.CompareJSON(t, `{"reverse":[{"name":"Carol","~best_friend": [{"name": "Alice"}]}]}`,
string(resp.GetJson()))

// Delete the triples and verify the reverse edge is gone.
upsertMutation = &api.Mutation{
DelNquads: []byte(`
uid(alice) <best_friend> * .`),
}
req = &api.Request{
Query: upsertQuery,
Mutations: []*api.Mutation{upsertMutation},
CommitNow: true,
}
_, err = c.NewTxn().Do(ctx, req)
require.NoError(t, err)

resp, err = c.NewReadOnlyTxn().Query(ctx, reverseQuery)
require.NoError(t, err)
testutil.CompareJSON(t, `{"reverse":[]}`,
string(resp.GetJson()))

}

func DeleteAndQuerySameTxn(t *testing.T, c *dgo.Dgraph) {
// Set the schema.
ctx := context.Background()
Expand Down

0 comments on commit ec19c6d

Please sign in to comment.