Skip to content

Commit

Permalink
fix(restore): fix incr restore and normal restore for vector predicat…
Browse files Browse the repository at this point in the history
…es (#9078)
  • Loading branch information
shivaji-dgraph committed May 8, 2024
1 parent 8563a54 commit 5451b77
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 0 deletions.
237 changes: 237 additions & 0 deletions systest/vector/backup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
//go:build !oss && integration

/*
* Copyright 2023 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"context"
"fmt"
"slices"
"strings"
"testing"
"time"

"github.com/dgraph-io/dgo/v230/protos/api"
"github.com/dgraph-io/dgraph/dgraphtest"
"github.com/dgraph-io/dgraph/x"
"github.com/stretchr/testify/require"
)

func TestVectorIncrBackupRestore(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 500
pred := "project_discription_v"
allVectors := make([][][]float32, 0, 5)
allRdfs := make([]string, 0, 5)
for i := 1; i <= 5; i++ {
var rdfs string
var vectors [][]float32
rdfs, vectors = dgraphtest.GenerateRandomVectors(numVectors*(i-1), numVectors*i, 1, pred)
allVectors = append(allVectors, vectors)
allRdfs = append(allRdfs, rdfs)
mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err := gc.Mutate(mu)
require.NoError(t, err)

t.Logf("taking backup #%v\n", i)
require.NoError(t, hc.Backup(c, i == 1, dgraphtest.DefaultBackupDir))
}

for i := 1; i <= 5; i++ {
t.Logf("restoring backup #%v\n", i)

incrFrom := i - 1
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", incrFrom, i))
require.NoError(t, dgraphtest.WaitForRestore(c))
query := `{
vector(func: has(project_discription_v)) {
count(uid)
}
}`
result, err := gc.Query(query)
require.NoError(t, err)

require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%v}]}`, numVectors*i), string(result.GetJson()))
var allSpredVec [][]float32
for i, vecArr := range allVectors {
if i <= i {
allSpredVec = append(allSpredVec, vecArr...)
}
}
for p, vector := range allVectors[i-1] {
triple := strings.Split(allRdfs[i-1], "\n")[p]
uid := strings.Split(triple, " ")[0]
queriedVector, err := gc.QuerySingleVectorsUsingUid(uid, pred)
require.NoError(t, err)

require.Equal(t, allVectors[i-1][p], queriedVector[0])

similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, numVectors)
require.NoError(t, err)
require.GreaterOrEqual(t, len(similarVectors), 10)
for _, similarVector := range similarVectors {
require.Contains(t, allSpredVec, similarVector)
}
}
}
}

func TestVectorBackupRestore(t *testing.T) {
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

require.NoError(t, gc.SetupSchema(testSchema))

numVectors := 1000
pred := "project_discription_v"
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 10, pred)

mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

t.Log("taking backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

t.Log("restoring backup \n")
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0))
require.NoError(t, dgraphtest.WaitForRestore(c))

testVectorQuery(t, gc, vectors, rdfs, pred, numVectors)
}

func TestVectorBackupRestoreDropIndex(t *testing.T) {
// setup cluster
conf := dgraphtest.NewClusterConfig().WithNumAlphas(1).WithNumZeros(1).WithReplicas(1).WithACL(time.Hour)
c, err := dgraphtest.NewLocalCluster(conf)
require.NoError(t, err)
defer func() { c.Cleanup(t.Failed()) }()
require.NoError(t, c.Start())

gc, cleanup, err := c.Client()
require.NoError(t, err)
defer cleanup()
require.NoError(t, gc.LoginIntoNamespace(context.Background(),
dgraphtest.DefaultUser, dgraphtest.DefaultPassword, x.GalaxyNamespace))

hc, err := c.HTTPClient()
require.NoError(t, err)
require.NoError(t, hc.LoginIntoNamespace(dgraphtest.DefaultUser,
dgraphtest.DefaultPassword, x.GalaxyNamespace))

// add vector predicate + index
require.NoError(t, gc.SetupSchema(testSchema))
// add data to the vector predicate
numVectors := 3
pred := "project_discription_v"
rdfs, vectors := dgraphtest.GenerateRandomVectors(0, numVectors, 1, pred)
mu := &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

t.Log("taking full backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// drop index
require.NoError(t, gc.SetupSchema(testSchemaWithoutIndex))

// add more data to the vector predicate
rdfs, vectors2 := dgraphtest.GenerateRandomVectors(3, numVectors+3, 1, pred)
mu = &api.Mutation{SetNquads: []byte(rdfs), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

// delete some entries
mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[1]), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)

vectors2 = slices.Delete(vectors2, 1, 2)

mu = &api.Mutation{DelNquads: []byte(strings.Split(rdfs, "\n")[0]), CommitNow: true}
_, err = gc.Mutate(mu)
require.NoError(t, err)
vectors2 = slices.Delete(vectors2, 0, 1)

t.Log("taking first incr backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// add index
require.NoError(t, gc.SetupSchema(testSchema))

t.Log("taking second incr backup \n")
require.NoError(t, hc.Backup(c, false, dgraphtest.DefaultBackupDir))

// restore backup
t.Log("restoring backup \n")
require.NoError(t, hc.Restore(c, dgraphtest.DefaultBackupDir, "", 0, 0))
require.NoError(t, dgraphtest.WaitForRestore(c))

query := ` {
vectors(func: has(project_discription_v)) {
count(uid)
}
}`
resp, err := gc.Query(query)
require.NoError(t, err)
require.JSONEq(t, `{"vectors":[{"count":4}]}`, string(resp.GetJson()))

require.NoError(t, err)
allVec := append(vectors, vectors2...)

for _, vector := range allVec {

similarVectors, err := gc.QueryMultipleVectorsUsingSimilarTo(vector, pred, 4)
require.NoError(t, err)
for _, similarVector := range similarVectors {
require.Contains(t, allVec, similarVector)
}
}
}
23 changes: 23 additions & 0 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok/hnsw"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)
Expand Down Expand Up @@ -194,6 +195,28 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
for pred := range group.Tablets {
predMap[gid] = append(predMap[gid], pred)
}

}

// see if any of the predicates are vector predicates and add the supporting
// vector predicates to the backup request.
vecPredMap := make(map[uint32][]string)
for gid, preds := range predMap {
schema, err := GetSchemaOverNetwork(ctx, &pb.SchemaRequest{Predicates: preds})
if err != nil {
return err
}

for _, pred := range schema {
if pred.Type == "float32vector" && len(pred.IndexSpecs) != 0 {
vecPredMap[gid] = append(predMap[gid], pred.Predicate+hnsw.VecEntry, pred.Predicate+hnsw.VecKeyword,
pred.Predicate+hnsw.VecDead)
}
}
}

for gid, preds := range vecPredMap {
predMap[gid] = append(predMap[gid], preds...)
}

glog.Infof(
Expand Down
10 changes: 10 additions & 0 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -44,6 +45,7 @@ import (
"github.com/dgraph-io/dgraph/ee/enc"
"github.com/dgraph-io/dgraph/posting"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/tok/hnsw"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/ristretto/z"
)
Expand Down Expand Up @@ -470,6 +472,7 @@ func (m *mapper) processReqCh(ctx context.Context) error {
}
return nil
}

// We changed the format of predicate in 2103 and 2105. SchemaUpdate and TypeUpdate have
// predicate stored within them, so they also need to be updated accordingly.
switch in.version {
Expand All @@ -488,6 +491,13 @@ func (m *mapper) processReqCh(ctx context.Context) error {
default:
// for manifest versions >= 2015, do nothing.
}

// If the predicate is a vector indexing predicate, skip further processing.
// currently we don't store vector supporting predicates in the schema.
if strings.HasSuffix(parsedKey.Attr, hnsw.VecEntry) || strings.HasSuffix(parsedKey.Attr, hnsw.VecKeyword) ||
strings.HasSuffix(parsedKey.Attr, hnsw.VecDead) {
return nil
}
// Reset the StreamId to prevent ordering issues while writing to stream writer.
kv.StreamId = 0
// Schema and type keys are not stored in an intermediate format so their
Expand Down

0 comments on commit 5451b77

Please sign in to comment.