From 2bc478f87abb2ed2c5f9ff80f0f97e2ed073b6b4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 23 Feb 2023 15:29:06 +0800 Subject: [PATCH] *: lock the temp index key properly in DML (#41516) (#41546) close pingcap/tidb#41515 --- ddl/index_merge_tmp.go | 35 +++++++++++++++--------- executor/insert.go | 2 +- server/http_handler.go | 2 +- session/txn.go | 10 +++++++ table/tables/index.go | 18 ++++++------- table/tables/mutation_checker.go | 6 ++--- tablecodec/tablecodec.go | 46 ++++++++++++++++---------------- tablecodec/tablecodec_test.go | 36 ++++++++++++++++++------- 8 files changed, 96 insertions(+), 59 deletions(-) diff --git a/ddl/index_merge_tmp.go b/ddl/index_merge_tmp.go index b7fca70b8ed34..0150f0fb42b4c 100644 --- a/ddl/index_merge_tmp.go +++ b/ddl/index_merge_tmp.go @@ -223,7 +223,6 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor oprStartTime := startTime idxPrefix := w.table.IndexPrefix() var lastKey kv.Key - isCommonHandle := w.table.Meta().IsCommonHandle err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(), taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) { oprEndTime := time.Now() @@ -240,10 +239,15 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor return false, nil } - tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle) + tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue) if err != nil { return false, err } + tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.index.Meta().Columns)) + if err != nil { + return false, err + } + tempIdxVal = tempIdxVal.FilterOverwritten() // Extract the operations on the original index and replay them later. @@ -254,19 +258,9 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor continue } - if elem.Handle == nil { - // If the handle is not found in the value of the temp index, it means - // 1) This is not a deletion marker, the handle is in the key or the origin value. - // 2) This is a deletion marker, but the handle is in the key of temp index. - elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, len(w.index.Meta().Columns)) - if err != nil { - return false, err - } - } - originIdxKey := make([]byte, len(indexKey)) copy(originIdxKey, indexKey) - tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey) + tablecodec.TempIndexKey2IndexKey(originIdxKey) idxRecord := &temporaryIndexRecord{ handle: elem.Handle, @@ -301,3 +295,18 @@ func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reor zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime))) return w.tmpIdxRecords, nextKey.Next(), taskDone, errors.Trace(err) } + +func decodeTempIndexHandleFromIndexKV(indexKey kv.Key, tmpVal tablecodec.TempIndexValue, idxColLen int) (ret tablecodec.TempIndexValue, err error) { + for _, elem := range tmpVal { + if elem.Handle == nil { + // If the handle is not found in the value of the temp index, it means + // 1) This is not a deletion marker, the handle is in the key or the origin value. + // 2) This is a deletion marker, but the handle is in the key of temp index. + elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, idxColLen) + if err != nil { + return nil, err + } + } + } + return tmpVal, nil +} diff --git a/executor/insert.go b/executor/insert.go index 83b486d5d1020..325e8df01e367 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -159,7 +159,7 @@ func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []t for _, r := range rows { for _, uk := range r.uniqueKeys { if val, found := values[string(uk.newKey)]; found { - if isTemp, _ := tablecodec.CheckTempIndexKey(uk.newKey); isTemp { + if tablecodec.IsTempIndexKey(uk.newKey) { // If it is a temp index, the value cannot be decoded by DecodeHandleInUniqueIndexValue. // Since this function is an optimization, we can skip prefetching the rows referenced by // temp indexes. diff --git a/server/http_handler.go b/server/http_handler.go index 2855186b9cdfa..39a7928baae94 100644 --- a/server/http_handler.go +++ b/server/http_handler.go @@ -228,7 +228,7 @@ func (t *tikvHandlerTool) getMvccByIdxValue(idx table.Index, values url.Values, return nil, err } idxData := &helper.MvccKV{Key: strings.ToUpper(hex.EncodeToString(encodedKey)), RegionID: regionID, Value: data} - tablecodec.IndexKey2TempIndexKey(idx.Meta().ID, encodedKey) + tablecodec.IndexKey2TempIndexKey(encodedKey) data, err = t.GetMvccByEncodedKey(encodedKey) if err != nil { return nil, err diff --git a/session/txn.go b/session/txn.go index 552f81e88fc77..ead377492c84a 100644 --- a/session/txn.go +++ b/session/txn.go @@ -541,6 +541,16 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { return true } + if tablecodec.IsTempIndexKey(k) { + tmpVal, err := tablecodec.DecodeTempIndexValue(v) + if err != nil { + logutil.BgLogger().Warn("decode temp index value failed", zap.Error(err)) + return false + } + current := tmpVal.Current() + return current.Handle != nil || tablecodec.IndexKVIsUnique(current.Value) + } + return tablecodec.IndexKVIsUnique(v) } diff --git a/table/tables/index.go b/table/tables/index.go index 0f856447464e0..4b10605535c8f 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -229,7 +229,7 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue } var tempIdxVal tablecodec.TempIndexValue if len(value) > 0 && keyIsTempIdxKey { - tempIdxVal, err = tablecodec.DecodeTempIndexValue(value, c.tblInfo.IsCommonHandle) + tempIdxVal, err = tablecodec.DecodeTempIndexValue(value) if err != nil { return nil, err } @@ -392,7 +392,7 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem return indexKey, nil, TempIndexKeyTypeNone case model.BackfillStateRunning: // Write to the temporary index. - tablecodec.IndexKey2TempIndexKey(indexInfo.ID, indexKey) + tablecodec.IndexKey2TempIndexKey(indexKey) if indexInfo.State == model.StateDeleteOnly { return nil, indexKey, TempIndexKeyTypeDelete } @@ -401,7 +401,7 @@ func GenTempIdxKeyByState(indexInfo *model.IndexInfo, indexKey kv.Key) (key, tem // Double write tmp := make([]byte, len(indexKey)) copy(tmp, indexKey) - tablecodec.IndexKey2TempIndexKey(indexInfo.ID, tmp) + tablecodec.IndexKey2TempIndexKey(tmp) return indexKey, tmp, TempIndexKeyTypeMerge } } @@ -432,8 +432,8 @@ func (c *index) Exist(sc *stmtctx.StatementContext, txn kv.Transaction, indexedV // FetchDuplicatedHandle is used to find the duplicated row's handle for a given unique index key. func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { - if isTemp, originIdxID := tablecodec.CheckTempIndexKey(key); isTemp { - return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, originIdxID, isCommon) + if tablecodec.IsTempIndexKey(key) { + return fetchDuplicatedHandleForTempIndexKey(ctx, key, distinct, txn, tableID, isCommon) } // The index key is not from temp index. val, err := getKeyInTxn(ctx, txn, key) @@ -448,14 +448,14 @@ func FetchDuplicatedHandle(ctx context.Context, key kv.Key, distinct bool, } func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, distinct bool, - txn kv.Transaction, tableID, idxID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { + txn kv.Transaction, tableID int64, isCommon bool) (foundKey bool, dupHandle kv.Handle, err error) { tempRawVal, err := getKeyInTxn(ctx, txn, tempKey) if err != nil { return false, nil, err } if tempRawVal == nil { originKey := tempKey.Clone() - tablecodec.TempIndexKey2IndexKey(idxID, originKey) + tablecodec.TempIndexKey2IndexKey(originKey) originVal, err := getKeyInTxn(ctx, txn, originKey) if err != nil || originVal == nil { return false, nil, err @@ -469,14 +469,14 @@ func fetchDuplicatedHandleForTempIndexKey(ctx context.Context, tempKey kv.Key, d } return false, nil, nil } - tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal, isCommon) + tempVal, err := tablecodec.DecodeTempIndexValue(tempRawVal) if err != nil { return false, nil, err } curElem := tempVal.Current() if curElem.Delete { originKey := tempKey.Clone() - tablecodec.TempIndexKey2IndexKey(idxID, originKey) + tablecodec.TempIndexKey2IndexKey(originKey) originVal, err := getKeyInTxn(ctx, txn, originKey) if err != nil || originVal == nil { return false, nil, err diff --git a/table/tables/mutation_checker.go b/table/tables/mutation_checker.go index 2229bfbb9d138..81225dcda1663 100644 --- a/table/tables/mutation_checker.go +++ b/table/tables/mutation_checker.go @@ -159,7 +159,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in continue } var tempIdxVal tablecodec.TempIndexValue - tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value, tblInfo.IsCommonHandle) + tempIdxVal, err = tablecodec.DecodeTempIndexValue(m.value) if err != nil { return err } @@ -171,7 +171,7 @@ func checkHandleConsistency(rowInsertion mutation, indexMutations []mutation, in continue } orgKey = append(orgKey, m.key...) - tablecodec.TempIndexKey2IndexKey(idxID, orgKey) + tablecodec.TempIndexKey2IndexKey(orgKey) indexHandle, err = tablecodec.DecodeIndexHandle(orgKey, value, len(indexInfo.Columns)) } else { indexHandle, err = tablecodec.DecodeIndexHandle(m.key, m.value, len(indexInfo.Columns)) @@ -227,7 +227,7 @@ func checkIndexKeys( // We never commit the untouched key values to the storage. Skip this check. continue } - tmpVal, err := tablecodec.DecodeTempIndexValue(m.value, t.Meta().IsCommonHandle) + tmpVal, err := tablecodec.DecodeTempIndexValue(m.value) if err != nil { return err } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index a28db76980cfa..022caefaa9ec4 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -1038,6 +1038,9 @@ func IsUntouchedIndexKValue(k, v []byte) bool { return false } vLen := len(v) + if IsTempIndexKey(k) { + return vLen > 0 && v[vLen-1] == kv.UnCommitIndexKVFlag + } if vLen <= MaxOldEncodeValueLen { return (vLen == 1 || vLen == 9) && v[vLen-1] == kv.UnCommitIndexKVFlag } @@ -1132,29 +1135,27 @@ const TempIndexPrefix = 0x7fff000000000000 const IndexIDMask = 0xffffffffffff // IndexKey2TempIndexKey generates a temporary index key. -func IndexKey2TempIndexKey(indexID int64, key []byte) { - eid := codec.EncodeIntToCmpUint(TempIndexPrefix | indexID) +func IndexKey2TempIndexKey(key []byte) { + idxIDBytes := key[prefixLen : prefixLen+idLen] + idxID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(idxIDBytes)) + eid := codec.EncodeIntToCmpUint(TempIndexPrefix | idxID) binary.BigEndian.PutUint64(key[prefixLen:], eid) } // TempIndexKey2IndexKey generates an index key from temporary index key. -func TempIndexKey2IndexKey(originIdxID int64, tempIdxKey []byte) { - eid := codec.EncodeIntToCmpUint(originIdxID) +func TempIndexKey2IndexKey(tempIdxKey []byte) { + tmpIdxIDBytes := tempIdxKey[prefixLen : prefixLen+idLen] + tempIdxID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(tmpIdxIDBytes)) + eid := codec.EncodeIntToCmpUint(tempIdxID & IndexIDMask) binary.BigEndian.PutUint64(tempIdxKey[prefixLen:], eid) } -// CheckTempIndexKey checks whether the input key is for a temp index. -func CheckTempIndexKey(indexKey []byte) (isTemp bool, originIdxID int64) { - var ( - indexIDKey []byte - indexID int64 - tempIndexID int64 - ) - // Get encoded indexID from key, Add uint64 8 byte length. - indexIDKey = indexKey[prefixLen : prefixLen+8] - indexID = codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) - tempIndexID = int64(TempIndexPrefix) | indexID - return tempIndexID == indexID, indexID & IndexIDMask +// IsTempIndexKey checks whether the input key is for a temp index. +func IsTempIndexKey(indexKey []byte) (isTemp bool) { + indexIDKey := indexKey[prefixLen : prefixLen+8] + indexID := codec.DecodeCmpUintToInt(binary.BigEndian.Uint64(indexIDKey)) + tempIndexID := int64(TempIndexPrefix) | indexID + return tempIndexID == indexID } // TempIndexValueFlag is the flag of temporary index value. @@ -1287,14 +1288,14 @@ func (v *TempIndexValueElem) Encode(buf []byte) []byte { } // DecodeTempIndexValue decodes the temp index value. -func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, error) { +func DecodeTempIndexValue(value []byte) (TempIndexValue, error) { var ( values []*TempIndexValueElem err error ) for len(value) > 0 { v := &TempIndexValueElem{} - value, err = v.DecodeOne(value, isCommonHandle) + value, err = v.DecodeOne(value) if err != nil { return nil, err } @@ -1304,7 +1305,7 @@ func DecodeTempIndexValue(value []byte, isCommonHandle bool) (TempIndexValue, er } // DecodeOne decodes one temp index value element. -func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain []byte, err error) { +func (v *TempIndexValueElem) DecodeOne(b []byte) (remain []byte, err error) { flag := TempIndexValueFlag(b[0]) b = b[1:] switch flag { @@ -1316,7 +1317,6 @@ func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain [] v.KeyVer = b[0] b = b[1:] v.Distinct = true - v.Handle, err = DecodeHandleInUniqueIndexValue(v.Value, isCommonHandle) return b, err case TempIndexValueFlagNonDistinctNormal: v.Value = b[:len(b)-1] @@ -1325,10 +1325,10 @@ func (v *TempIndexValueElem) DecodeOne(b []byte, isCommonHandle bool) (remain [] case TempIndexValueFlagDeleted: hLen := (uint16(b[0]) << 8) + uint16(b[1]) b = b[2:] - if isCommonHandle { - v.Handle, _ = kv.NewCommonHandle(b[:hLen]) + if hLen == idLen { + v.Handle = decodeIntHandleInIndexValue(b[:idLen]) } else { - v.Handle = decodeIntHandleInIndexValue(b[:hLen]) + v.Handle, _ = kv.NewCommonHandle(b[:hLen]) } b = b[hLen:] v.KeyVer = b[0] diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index adc4ccc78c13b..0e75b38741f72 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -15,6 +15,7 @@ package tablecodec import ( + "encoding/binary" "fmt" "math" "testing" @@ -588,6 +589,11 @@ func TestUntouchedIndexKValue(t *testing.T) { untouchedIndexKey := []byte("t00000001_i000000001") untouchedIndexValue := []byte{0, 0, 0, 0, 0, 0, 0, 1, 49} require.True(t, IsUntouchedIndexKValue(untouchedIndexKey, untouchedIndexValue)) + IndexKey2TempIndexKey(untouchedIndexKey) + require.True(t, IsUntouchedIndexKValue(untouchedIndexKey, untouchedIndexValue)) + elem := TempIndexValueElem{Handle: kv.IntHandle(1), Delete: true, Distinct: true} + tmpIdxVal := elem.Encode(nil) + require.False(t, IsUntouchedIndexKValue(untouchedIndexKey, tmpIdxVal)) } func TestTempIndexKey(t *testing.T) { @@ -597,14 +603,14 @@ func TestTempIndexKey(t *testing.T) { tableID := int64(4) indexID := int64(5) indexKey := EncodeIndexSeekKey(tableID, indexID, encodedValue) - IndexKey2TempIndexKey(indexID, indexKey) + IndexKey2TempIndexKey(indexKey) tid, iid, _, err := DecodeKeyHead(indexKey) require.NoError(t, err) require.Equal(t, tid, tableID) require.NotEqual(t, indexID, iid) require.Equal(t, indexID, iid&IndexIDMask) - TempIndexKey2IndexKey(indexID, indexKey) + TempIndexKey2IndexKey(indexKey) tid, iid, _, err = DecodeKeyHead(indexKey) require.NoError(t, err) require.Equal(t, tid, tableID) @@ -624,7 +630,7 @@ func TestTempIndexValueCodec(t *testing.T) { } val := tempIdxVal.Encode(nil) var newTempIdxVal TempIndexValueElem - remain, err := newTempIdxVal.DecodeOne(val, false) + remain, err := newTempIdxVal.DecodeOne(val) require.NoError(t, err) require.Equal(t, 0, len(remain)) require.EqualValues(t, tempIdxVal, newTempIdxVal) @@ -637,11 +643,12 @@ func TestTempIndexValueCodec(t *testing.T) { } newTempIdxVal = TempIndexValueElem{} val = tempIdxVal.Encode(nil) - remain, err = newTempIdxVal.DecodeOne(val, false) + remain, err = newTempIdxVal.DecodeOne(val) require.NoError(t, err) require.Equal(t, 0, len(remain)) - require.Equal(t, newTempIdxVal.Handle.IntValue(), int64(100)) - newTempIdxVal.Handle = nil + handle, err := DecodeHandleInUniqueIndexValue(newTempIdxVal.Value, false) + require.NoError(t, err) + require.Equal(t, handle.IntValue(), int64(100)) require.EqualValues(t, tempIdxVal, newTempIdxVal) tempIdxVal = TempIndexValueElem{ @@ -650,7 +657,7 @@ func TestTempIndexValueCodec(t *testing.T) { } newTempIdxVal = TempIndexValueElem{} val = tempIdxVal.Encode(nil) - remain, err = newTempIdxVal.DecodeOne(val, false) + remain, err = newTempIdxVal.DecodeOne(val) require.NoError(t, err) require.Equal(t, 0, len(remain)) require.EqualValues(t, tempIdxVal, newTempIdxVal) @@ -663,7 +670,7 @@ func TestTempIndexValueCodec(t *testing.T) { } newTempIdxVal = TempIndexValueElem{} val = tempIdxVal.Encode(nil) - remain, err = newTempIdxVal.DecodeOne(val, false) + remain, err = newTempIdxVal.DecodeOne(val) require.NoError(t, err) require.Equal(t, 0, len(remain)) require.EqualValues(t, tempIdxVal, newTempIdxVal) @@ -691,10 +698,21 @@ func TestTempIndexValueCodec(t *testing.T) { val = tempIdxVal2.Encode(val) val = tempIdxVal3.Encode(val) var result TempIndexValue - result, err = DecodeTempIndexValue(val, false) + result, err = DecodeTempIndexValue(val) require.NoError(t, err) require.Equal(t, 3, len(result)) + for i := 0; i < 3; i++ { + if result[i].Handle == nil { + uv := binary.BigEndian.Uint64(result[i].Value) + result[i].Handle = kv.IntHandle(int64(uv)) + } + } require.Equal(t, result[0].Handle.IntValue(), int64(100)) require.Equal(t, result[1].Handle.IntValue(), int64(100)) require.Equal(t, result[2].Handle.IntValue(), int64(101)) + + elem := TempIndexValueElem{Handle: kv.IntHandle(100), KeyVer: 'b', Delete: true, Distinct: true} + val = elem.Encode(nil) + isUnique := IndexKVIsUnique(val) + require.False(t, isUnique) }