Skip to content

Commit

Permalink
*: lock the temp index key properly in DML (#41516)
Browse files Browse the repository at this point in the history
close #41515
  • Loading branch information
tangenta authored Feb 17, 2023
1 parent 0d8f8b0 commit bc95a4f
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 59 deletions.
35 changes: 22 additions & 13 deletions ddl/index_merge_tmp.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,6 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
isCommonHandle := w.table.Meta().IsCommonHandle
err := iterateSnapshotKeys(w.GetCtx().jobContext(taskRange.getJobID()), w.sessCtx.GetStore(), taskRange.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
Expand All @@ -259,10 +258,15 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
return false, nil
}

tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
}

tempIdxVal = tempIdxVal.FilterOverwritten()

// Extract the operations on the original index and replay them later.
Expand All @@ -273,19 +277,9 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
continue
}

if elem.Handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}

originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)
tablecodec.TempIndexKey2IndexKey(originIdxKey)

idxRecord := &temporaryIndexRecord{
handle: elem.Handle,
Expand Down Expand Up @@ -320,3 +314,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor
zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.tmpIdxRecords, nextKey.Next(), taskDone, errors.Trace(err)
}

func decodeTempIndexHandleFromIndexKV(indexKey kv.Key, tmpVal tablecodec.TempIndexValue, idxColLen int) (ret tablecodec.TempIndexValue, err error) {
for _, elem := range tmpVal {
if elem.Handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, idxColLen)
if err != nil {
return nil, err
}
}
}
return tmpVal, nil
}
2 changes: 1 addition & 1 deletion executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t
for _, r := range rows {
for _, uk := range r.uniqueKeys {
if val, found := values[string(uk.newKey)]; found {
if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp {
if tablecodec.IsTempIndexKey(uk.newKey) {
// If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue.
// Since this function is an optimization, we can skip prefetching the rows referenced by
// temp indexes.
Expand Down
2 changes: 1 addition & 1 deletion server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values,
return nil, err
}
idxData := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data}
tablecodec.IndexKey2TempIndexKey(idx.Meta().ID, encodedKey)
tablecodec.IndexKey2TempIndexKey(encodedKey)
data, err = t.GetMvccByEncodedKey(encodedKey)
if err != nil {
return nil, err
Expand Down
10 changes: 10 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,16 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
return true
}

if tablecodec.IsTempIndexKey(k) {
tmpVal, err := tablecodec.DecodeTempIndexValue(v)
if err != nil {
logutil.BgLogger().Warn("decode temp index value failed", zap.Error(err))
return false
}
current := tmpVal.Current()
return current.Handle != nil || tablecodec.IndexKVIsUnique(current.Value)
}

return tablecodec.IndexKVIsUnique(v)
}

Expand Down
18 changes: 9 additions & 9 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue
}
var tempIdxVal tablecodec.TempIndexValue
if len(value) > 0 && keyIsTempIdxKey {
tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle)
tempIdxVal, err = tablecodec.DecodeTempIndexValue(value)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
return indexKey, nil, TempIndexKeyTypeNone
case model.BackfillStateRunning:
// Write to the temporary index.
tablecodec.IndexKey2TempIndexKey(indexInfo.ID, indexKey)
tablecodec.IndexKey2TempIndexKey(indexKey)
if indexInfo.State == model.StateDeleteOnly {
return nil, indexKey, TempIndexKeyTypeDelete
}
Expand All @@ -507,7 +507,7 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem
// Double write
tmp := make([]byte, len(indexKey))
copy(tmp, indexKey)
tablecodec.IndexKey2TempIndexKey(indexInfo.ID, tmp)
tablecodec.IndexKey2TempIndexKey(tmp)
return indexKey, tmp, TempIndexKeyTypeMerge
}
}
Expand Down Expand Up @@ -542,8 +542,8 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV
// FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key.
func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool,
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp {
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon)
if tablecodec.IsTempIndexKey(key) {
return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, isCommon)
}
// The index key is not from temp index.
val, err := getKeyInTxn(ctx, txn, key)
Expand All @@ -558,14 +558,14 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool,
}

func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool,
txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) {
tempRawVal, err := getKeyInTxn(ctx, txn, tempKey)
if err != nil {
return false, nil, err
}
if tempRawVal == nil {
originKey := tempKey.Clone()
tablecodec.TempIndexKey2IndexKey(idxID, originKey)
tablecodec.TempIndexKey2IndexKey(originKey)
originVal, err := getKeyInTxn(ctx, txn, originKey)
if err != nil || originVal == nil {
return false, nil, err
Expand All @@ -579,14 +579,14 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d
}
return false, nil, nil
}
tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon)
tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal)
if err != nil {
return false, nil, err
}
curElem := tempVal.Current()
if curElem.Delete {
originKey := tempKey.Clone()
tablecodec.TempIndexKey2IndexKey(idxID, originKey)
tablecodec.TempIndexKey2IndexKey(originKey)
originVal, err := getKeyInTxn(ctx, txn, originKey)
if err != nil || originVal == nil {
return false, nil, err
Expand Down
6 changes: 3 additions & 3 deletions table/tables/mutation_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in
continue
}
var tempIdxVal tablecodec.TempIndexValue
tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle)
tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value)
if err != nil {
return err
}
Expand All @@ -171,7 +171,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in
continue
}
orgKey = append(orgKey, m.key...)
tablecodec.TempIndexKey2IndexKey(idxID, orgKey)
tablecodec.TempIndexKey2IndexKey(orgKey)
indexHandle, err = tablecodec.DecodeIndexHandle(orgKey, value, len(indexInfo.Columns))
} else {
indexHandle, err = tablecodec.DecodeIndexHandle(m.key, m.value, len(indexInfo.Columns))
Expand Down Expand Up @@ -227,7 +227,7 @@ func checkIndexKeys(
// We never commit the untouched key values to the storage. Skip this check.
continue
}
tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle)
tmpVal, err := tablecodec.DecodeTempIndexValue(m.value)
if err != nil {
return err
}
Expand Down
46 changes: 23 additions & 23 deletions tablecodec/tablecodec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,6 +1058,9 @@ func IsUntouchedIndexKValue(k, v []byte) bool {
return false
}
vLen := len(v)
if IsTempIndexKey(k) {
return vLen > 0 && v[vLen-1] == kv.UnCommitIndexKVFlag
}
if vLen <= MaxOldEncodeValueLen {
return (vLen == 1 || vLen == 9) && v[vLen-1] == kv.UnCommitIndexKVFlag
}
Expand Down Expand Up @@ -1152,29 +1155,27 @@ const TempIndexPrefix = 0x7fff000000000000
const IndexIDMask = 0xffffffffffff

// IndexKey2TempIndexKey generates a temporary index key.
func IndexKey2TempIndexKey(indexID int64, key []byte) {
eid := codec.EncodeIntToCmpUint(TempIndexPrefix | indexID)
func IndexKey2TempIndexKey(key []byte) {
idxIDBytes := key[prefixLen : prefixLen+idLen]
idxID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(idxIDBytes))
eid := codec.EncodeIntToCmpUint(TempIndexPrefix | idxID)
binary.BigEndian.PutUint64(key[prefixLen:], eid)
}

// TempIndexKey2IndexKey generates an index key from temporary index key.
func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) {
eid := codec.EncodeIntToCmpUint(originIdxID)
func TempIndexKey2IndexKey(tempIdxKey []byte) {
tmpIdxIDBytes := tempIdxKey[prefixLen : prefixLen+idLen]
tempIdxID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(tmpIdxIDBytes))
eid := codec.EncodeIntToCmpUint(tempIdxID & IndexIDMask)
binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid)
}

// CheckTempIndexKey checks whether the input key is for a temp index.
func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) {
var (
indexIDKey []byte
indexID int64
tempIndexID int64
)
// Get encoded indexID from key, Add uint64 8 byte length.
indexIDKey = indexKey[prefixLen : prefixLen+8]
indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey))
tempIndexID = int64(TempIndexPrefix) | indexID
return tempIndexID == indexID, indexID & IndexIDMask
// IsTempIndexKey checks whether the input key is for a temp index.
func IsTempIndexKey(indexKey []byte) (isTemp bool) {
indexIDKey := indexKey[prefixLen : prefixLen+8]
indexID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey))
tempIndexID := int64(TempIndexPrefix) | indexID
return tempIndexID == indexID
}

// TempIndexValueFlag is the flag of temporary index value.
Expand Down Expand Up @@ -1307,14 +1308,14 @@ func (v *TempIndexValueElem) Encode(buf []byte) []byte {
}

// DecodeTempIndexValue decodes the temp index value.
func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) {
func DecodeTempIndexValue(value []byte) (TempIndexValue, error) {
var (
values []*TempIndexValueElem
err error
)
for len(value) > 0 {
v := &TempIndexValueElem{}
value, err = v.DecodeOne(value, isCommonHandle)
value, err = v.DecodeOne(value)
if err != nil {
return nil, err
}
Expand All @@ -1324,7 +1325,7 @@ func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, er
}

// DecodeOne decodes one temp index value element.
func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) {
func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) {
flag := TempIndexValueFlag(b[0])
b = b[1:]
switch flag {
Expand All @@ -1336,7 +1337,6 @@ func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []
v.KeyVer = b[0]
b = b[1:]
v.Distinct = true
v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle)
return b, err
case TempIndexValueFlagNonDistinctNormal:
v.Value = b[:len(b)-1]
Expand All @@ -1345,10 +1345,10 @@ func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []
case TempIndexValueFlagDeleted:
hLen := (uint16(b[0]) << 8) + uint16(b[1])
b = b[2:]
if isCommonHandle {
v.Handle, _ = kv.NewCommonHandle(b[:hLen])
if hLen == idLen {
v.Handle = decodeIntHandleInIndexValue(b[:idLen])
} else {
v.Handle = decodeIntHandleInIndexValue(b[:hLen])
v.Handle, _ = kv.NewCommonHandle(b[:hLen])
}
b = b[hLen:]
v.KeyVer = b[0]
Expand Down
Loading

0 comments on commit bc95a4f

Please sign in to comment.