Skip to content

Commit

Permalink
*: support recover multi-valued index (#41181)
Browse files Browse the repository at this point in the history
close #41180
  • Loading branch information
xiongjiwei authored Feb 9, 2023
1 parent 5cb8418 commit 9f5cc51
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 13 deletions.
42 changes: 29 additions & 13 deletions executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (e *RecoverIndexExec) fetchRecoverRows(ctx context.Context, srcResult dists
}
e.idxValsBufs[result.scanRowCount] = idxVals
rsData := tables.TryGetHandleRestoredDataWrapper(e.table.Meta(), plannercore.GetCommonHandleDatum(e.handleCols, row), nil, e.index.Meta())
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: false})
e.recoverRows = append(e.recoverRows, recoverRows{handle: handle, idxVals: idxVals, rsData: rsData, skip: true})
result.scanRowCount++
result.currentHandle = handle
}
Expand Down Expand Up @@ -438,16 +438,31 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
}
e.batchKeys = e.batchKeys[:0]
sc := e.ctx.GetSessionVars().StmtCtx
distinctFlags := make([]bool, len(rows))
distinctFlags := make([]bool, 0, len(rows))
rowIdx := make([]int, 0, len(rows))
cnt := 0
for i, row := range rows {
idxKey, distinct, err := e.index.GenIndexKey(sc, row.idxVals, row.handle, e.idxKeyBufs[i])
if err != nil {
return err
}
e.idxKeyBufs[i] = idxKey
iter := e.index.GenIndexKVIter(sc, row.idxVals, row.handle, nil)
for iter.Valid() {
var buf []byte
if cnt < len(e.idxKeyBufs) {
buf = e.idxKeyBufs[cnt]
}
key, _, distinct, err := iter.Next(buf)
if err != nil {
return err
}
if cnt < len(e.idxKeyBufs) {
e.idxKeyBufs[cnt] = key
} else {
e.idxKeyBufs = append(e.idxKeyBufs, key)
}

e.batchKeys = append(e.batchKeys, idxKey)
distinctFlags[i] = distinct
cnt++
e.batchKeys = append(e.batchKeys, key)
distinctFlags = append(distinctFlags, distinct)
rowIdx = append(rowIdx, i)
}
}

values, err := txn.BatchGet(context.Background(), e.batchKeys)
Expand All @@ -460,21 +475,22 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows)
// 3. non-unique-key is duplicate, skip it.
isCommonHandle := e.table.Meta().IsCommonHandle
for i, key := range e.batchKeys {
if val, found := values[string(key)]; found {
val, found := values[string(key)]
if found {
if distinctFlags[i] {
handle, err1 := tablecodec.DecodeHandleInUniqueIndexValue(val, isCommonHandle)
if err1 != nil {
return err1
}

if handle.Compare(rows[i].handle) != 0 {
if handle.Compare(rows[rowIdx[i]].handle) != 0 {
logutil.BgLogger().Warn("recover index: the constraint of unique index is broken, handle in index is not equal to handle in table",
zap.String("index", e.index.Meta().Name.O), zap.ByteString("indexKey", key),
zap.Stringer("handleInTable", rows[i].handle), zap.Stringer("handleInIndex", handle))
zap.Stringer("handleInTable", rows[rowIdx[i]].handle), zap.Stringer("handleInIndex", handle))
}
}
rows[i].skip = true
}
rows[rowIdx[i]].skip = found && rows[rowIdx[i]].skip
}
return nil
}
Expand Down
43 changes: 43 additions & 0 deletions executor/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,49 @@ func TestAdminRecoverIndex(t *testing.T) {
tk.MustExec("admin recover index admin_test i1;")
}

func TestAdminRecoverMVIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(pk int primary key, a json, index idx((cast(a as signed array))))")
tk.MustExec("insert into t values (0, '[0,1,2]')")
tk.MustExec("insert into t values (1, '[1,2,3]')")
tk.MustExec("insert into t values (2, '[2,3,4]')")
tk.MustExec("insert into t values (3, '[3,4,5]')")
tk.MustExec("insert into t values (4, '[4,5,6]')")
tk.MustExec("admin check table t")

ctx := mock.NewContext()
ctx.Store = store
is := domain.InfoSchema()
dbName := model.NewCIStr("test")
tblName := model.NewCIStr("t")
tbl, err := is.TableByName(dbName, tblName)
require.NoError(t, err)
tblInfo := tbl.Meta()
idxInfo := tblInfo.Indices[0]
tk.Session().GetSessionVars().IndexLookupSize = 3
tk.Session().GetSessionVars().MaxChunkSize = 3

cpIdx := idxInfo.Clone()
cpIdx.MVIndex = false
indexOpr := tables.NewIndex(tblInfo.ID, tblInfo, cpIdx)

txn, err := store.Begin()
require.NoError(t, err)
err = indexOpr.Delete(ctx.GetSessionVars().StmtCtx, txn, types.MakeDatums(2), kv.IntHandle(1))
require.NoError(t, err)
err = txn.Commit(context.Background())
require.NoError(t, err)
err = tk.ExecToErr("admin check table t")
require.Error(t, err)
r := tk.MustQuery("admin recover index t idx")
r.Check(testkit.Rows("1 5"))
tk.MustExec("admin check table t")
}

func TestAdminCleanupMVIndex(t *testing.T) {
store, domain := testkit.CreateMockStoreAndDomain(t)

Expand Down

0 comments on commit 9f5cc51

Please sign in to comment.