diff --git a/executor/executor.go b/executor/executor.go index 669a67ac0f2ab..6a5ad18e1488b 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -955,7 +955,6 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { LockKeysDuration: &seVars.StmtCtx.LockKeysDuration, LockKeysCount: &seVars.StmtCtx.LockKeysCount, LockExpired: &seVars.TxnCtx.LockExpire, - CheckKeyExists: seVars.StmtCtx.CheckKeyExists, } } @@ -1653,7 +1652,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) { } sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool) - sc.CheckKeyExists = make(map[string]struct{}) errCount, warnCount := vars.StmtCtx.NumErrorWarnings() vars.SysErrorCount = errCount vars.SysWarningCount = warnCount diff --git a/kv/kv.go b/kv/kv.go index c066e20ac1e2d..3b2780652c6fb 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -160,16 +160,28 @@ type RetrieverMutator interface { Mutator } +// MemBufferIterator is an Iterator with KeyFlags related functions. +type MemBufferIterator interface { + Iterator + HasValue() bool + Flags() KeyFlags +} + // MemBuffer is an in-memory kv collection, can be used to buffer write operations. type MemBuffer interface { RetrieverMutator // GetFlags returns the latest flags associated with key. GetFlags(Key) (KeyFlags, error) + // IterWithFlags returns a MemBufferIterator. + IterWithFlags(k Key, upperBound Key) MemBufferIterator + // IterReverseWithFlags returns a reversed MemBufferIterator. + IterReverseWithFlags(k Key) MemBufferIterator // SetWithFlags put key-value into the last active staging buffer with the given KeyFlags. SetWithFlags(Key, []byte, ...FlagsOp) error // UpdateFlags update the flags associated with key. UpdateFlags(Key, ...FlagsOp) + // Reset reset the MemBuffer to initial states. Reset() // DiscardValues releases the memory used by all values. @@ -186,7 +198,7 @@ type MemBuffer interface { // If the changes are not published by `Release`, they will be discarded. Cleanup(StagingHandle) // InspectStage used to inspect the value updates in the given stage. - InspectStage(handle StagingHandle, f func(Key, KeyFlags, []byte)) + InspectStage(StagingHandle, func(Key, KeyFlags, []byte)) // Size returns sum of keys and values length. Size() int @@ -256,7 +268,6 @@ type LockCtx struct { Values map[string]ReturnedValue ValuesLock sync.Mutex LockExpired *uint32 - CheckKeyExists map[string]struct{} } // ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result. diff --git a/kv/memdb.go b/kv/memdb.go index f4c480bfd2bab..b9fa2c3968d39 100644 --- a/kv/memdb.go +++ b/kv/memdb.go @@ -23,10 +23,12 @@ import ( const ( flagPresumeKNE KeyFlags = 1 << iota - flagPessimisticLock + flagKeyLocked + flagKeyLockedValExist + flagNeedCheckExists flagNoNeedCommit - persistentFlags = flagPessimisticLock + persistentFlags = flagKeyLocked | flagKeyLockedValExist // bit 1 => red, bit 0 => black nodeColorBit uint8 = 0x80 nodeFlagsMask = ^nodeColorBit @@ -35,14 +37,24 @@ const ( // KeyFlags are metadata associated with key type KeyFlags uint8 -// HasPresumeKeyNotExists retruns whether the associated key use lazy check. +// HasPresumeKeyNotExists returns whether the associated key use lazy check. func (f KeyFlags) HasPresumeKeyNotExists() bool { return f&flagPresumeKNE != 0 } -// HasPessimisticLock retruns whether the associated key has acquired pessimistic lock. -func (f KeyFlags) HasPessimisticLock() bool { - return f&flagPessimisticLock != 0 +// HasLocked returns whether the associated key has acquired pessimistic lock. +func (f KeyFlags) HasLocked() bool { + return f&flagKeyLocked != 0 +} + +// HasLockedValueExists returns whether the value exists when key locked. +func (f KeyFlags) HasLockedValueExists() bool { + return f&flagKeyLockedValExist != 0 +} + +// HasNeedCheckExists returns whether the key need to check existence when it has been locked. +func (f KeyFlags) HasNeedCheckExists() bool { + return f&flagNeedCheckExists != 0 } // HasNoNeedCommit returns whether the key should be used in 2pc commit phase. @@ -51,17 +63,24 @@ func (f KeyFlags) HasNoNeedCommit() bool { } // FlagsOp describes KeyFlags modify operation. -type FlagsOp uint8 +type FlagsOp uint16 const ( // SetPresumeKeyNotExists marks the existence of the associated key is checked lazily. + // Implies KeyFlags.HasNeedCheckExists() == true. SetPresumeKeyNotExists FlagsOp = 1 << iota // DelPresumeKeyNotExists reverts SetPresumeKeyNotExists. DelPresumeKeyNotExists - // SetPessimisticLock marks the associated key has acquired pessimistic lock. - SetPessimisticLock - // DelPessimisticLock reverts SetPessimisticLock. - DelPessimisticLock + // SetKeyLocked marks the associated key has acquired lock. + SetKeyLocked + // DelKeyLocked reverts SetKeyLocked. + DelKeyLocked + // SetKeyLockedValueExists marks the value exists when key has been locked in Transaction.LockKeys. + SetKeyLockedValueExists + // SetKeyLockedValueNotExists marks the value doesn't exists when key has been locked in Transaction.LockKeys. + SetKeyLockedValueNotExists + // DelNeedCheckExists marks the key no need to be checked in Transaction.LockKeys. + DelNeedCheckExists // SetNoNeedCommit marks the key shouldn't be used in 2pc commit phase. SetNoNeedCommit ) @@ -70,13 +89,19 @@ func applyFlagsOps(origin KeyFlags, ops ...FlagsOp) KeyFlags { for _, op := range ops { switch op { case SetPresumeKeyNotExists: - origin |= flagPresumeKNE + origin |= flagPresumeKNE | flagNeedCheckExists case DelPresumeKeyNotExists: - origin &= ^flagPresumeKNE - case SetPessimisticLock: - origin |= flagPessimisticLock - case DelPessimisticLock: - origin &= ^flagPessimisticLock + origin &= ^(flagPresumeKNE | flagNeedCheckExists) + case SetKeyLocked: + origin |= flagKeyLocked + case DelKeyLocked: + origin &= ^flagKeyLocked + case SetKeyLockedValueExists: + origin |= flagKeyLockedValExist + case DelNeedCheckExists: + origin &= ^flagNeedCheckExists + case SetKeyLockedValueNotExists: + origin &= ^flagKeyLockedValExist case SetNoNeedCommit: origin |= flagNoNeedCommit } @@ -92,7 +117,7 @@ var tombstone = []byte{} // The value map is rollbackable, that means you can use the `Staging`, `Release` and `Cleanup` API to safely modify KVs. // // The flags map is not rollbackable. There are two types of flag, persistent and non-persistent. -// When discading a newly added KV in `Cleanup`, the non-persistent flags will be cleared. +// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared. // If there are persistent flags associated with key, we will keep this key in node without value. type memdb struct { root memdbArenaAddr @@ -126,7 +151,7 @@ func (db *memdb) Staging() StagingHandle { func (db *memdb) Release(h StagingHandle) { if int(h) != len(db.stages) { - // This should never happens in production environmen. + // This should never happens in production environment. // Use panic to make debug easier. panic("cannot release staging buffer") } @@ -144,7 +169,7 @@ func (db *memdb) Cleanup(h StagingHandle) { return } if int(h) < len(db.stages) { - // This should never happens in production environmen. + // This should never happens in production environment. // Use panic to make debug easier. panic("cannot cleanup staging buffer") } @@ -186,7 +211,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) { panic("vlog is resetted") } - x := db.tranverse(key, false) + x := db.traverse(key, false) if x.isNull() { return nil, ErrNotExist } @@ -198,7 +223,7 @@ func (db *memdb) Get(_ context.Context, key Key) ([]byte, error) { } func (db *memdb) GetFlags(key Key) (KeyFlags, error) { - x := db.tranverse(key, false) + x := db.traverse(key, false) if x.isNull() { return 0, ErrNotExist } @@ -254,14 +279,13 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error { if len(db.stages) == 0 { db.dirty = true } - x := db.tranverse(key, true) - if x.vptr.isNull() && value != nil { - db.size += len(key) - db.count++ - } + x := db.traverse(key, true) if len(ops) != 0 { flags := applyFlagsOps(x.getKeyFlags(), ops...) + if flags&persistentFlags != 0 { + db.dirty = true + } x.setKeyFlags(flags) } @@ -299,9 +323,9 @@ func (db *memdb) setValue(x memdbNodeAddr, value []byte) { db.size = db.size - len(oldVal) + len(value) } -// tranverse search for and if not found and insert is true, will add a new node in. +// traverse search for and if not found and insert is true, will add a new node in. // Returns a pointer to the new node, or the node found. -func (db *memdb) tranverse(key Key, insert bool) memdbNodeAddr { +func (db *memdb) traverse(key Key, insert bool) memdbNodeAddr { x := db.getRoot() y := memdbNodeAddr{nil, nullAddr} found := false @@ -437,7 +461,8 @@ func (db *memdb) leftRotate(x memdbNodeAddr) { // If B is not null, set it's parent to be X if !y.left.isNull() { - y.getLeft(db).up = x.addr + left := y.getLeft(db) + left.up = x.addr } // Set Y's parent to be what X's parent was @@ -470,7 +495,8 @@ func (db *memdb) rightRotate(y memdbNodeAddr) { // If B is not null, set it's parent to be Y if !x.right.isNull() { - x.getRight(db).up = y.addr + right := x.getRight(db) + right.up = y.addr } // Set X's parent to be what Y's parent was @@ -498,6 +524,9 @@ func (db *memdb) rightRotate(y memdbNodeAddr) { func (db *memdb) deleteNode(z memdbNodeAddr) { var x, y memdbNodeAddr + db.count-- + db.size -= int(z.klen) + if z.left.isNull() || z.right.isNull() { y = z } else { @@ -551,10 +580,12 @@ func (db *memdb) replaceNode(old memdbNodeAddr, new memdbNodeAddr) { } new.up = old.up - old.getLeft(db).up = new.addr + left := old.getLeft(db) + left.up = new.addr new.left = old.left - old.getRight(db).up = new.addr + right := old.getRight(db) + right.up = new.addr new.right = old.right if old.isBlack() { @@ -693,6 +724,8 @@ func (db *memdb) getRoot() memdbNodeAddr { } func (db *memdb) allocNode(key Key) memdbNodeAddr { + db.size += len(key) + db.count++ x, xn := db.allocator.allocNode(key) return memdbNodeAddr{xn, x} } diff --git a/kv/memdb_arena.go b/kv/memdb_arena.go index 596a1b26116a2..2c2e8af4a262a 100644 --- a/kv/memdb_arena.go +++ b/kv/memdb_arena.go @@ -288,8 +288,6 @@ func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) { db.size -= int(hdr.valueLen) // oldValue.isNull() == true means this is a newly added value. if hdr.oldValue.isNull() { - db.count-- - db.size -= int(node.klen) // If there are no flags associated with this key, we need to delete this node. keptFlags := node.getKeyFlags() & persistentFlags if keptFlags == 0 { diff --git a/kv/memdb_iterator.go b/kv/memdb_iterator.go index c0246d097838a..2a1f6d6b3541e 100644 --- a/kv/memdb_iterator.go +++ b/kv/memdb_iterator.go @@ -44,6 +44,28 @@ func (db *memdb) IterReverse(k Key) (Iterator, error) { return i, nil } +func (db *memdb) IterWithFlags(k Key, upperBound Key) MemBufferIterator { + i := &memdbIterator{ + db: db, + start: k, + end: upperBound, + includeFlags: true, + } + i.init() + return i +} + +func (db *memdb) IterReverseWithFlags(k Key) MemBufferIterator { + i := &memdbIterator{ + db: db, + end: k, + reverse: true, + includeFlags: true, + } + i.init() + return i +} + func (i *memdbIterator) init() { if i.reverse { if len(i.end) == 0 { @@ -72,6 +94,14 @@ func (i *memdbIterator) Valid() bool { return !i.curr.isNull() } +func (i *memdbIterator) Flags() KeyFlags { + return i.curr.getKeyFlags() +} + +func (i *memdbIterator) HasValue() bool { + return !i.isFlagsOnly() +} + func (i *memdbIterator) Key() Key { return i.curr.getKey() } diff --git a/kv/memdb_test.go b/kv/memdb_test.go index a8c0884e97c2b..d0ad12c426095 100644 --- a/kv/memdb_test.go +++ b/kv/memdb_test.go @@ -45,12 +45,11 @@ type testMemDBSuite struct{} // DeleteKey is used in test to verify the `deleteNode` used in `vlog.revertToCheckpoint`. func (db *memdb) DeleteKey(key []byte) { - x := db.tranverse(key, false) + x := db.traverse(key, false) if x.isNull() { return } - db.count-- - db.size -= (len(db.vlog.getValue(x.vptr)) + int(x.klen)) + db.size -= len(db.vlog.getValue(x.vptr)) db.deleteNode(x) } @@ -452,7 +451,7 @@ func (s *testMemDBSuite) TestDirty(c *C) { // persistent flags will make memdb dirty. db = newMemDB() h = db.Staging() - db.SetWithFlags([]byte{1}, []byte{1}, SetPessimisticLock) + db.SetWithFlags([]byte{1}, []byte{1}, SetKeyLocked) db.Cleanup(h) c.Assert(db.Dirty(), IsTrue) @@ -472,7 +471,7 @@ func (s *testMemDBSuite) TestFlags(c *C) { var buf [4]byte binary.BigEndian.PutUint32(buf[:], i) if i%2 == 0 { - db.SetWithFlags(buf[:], buf[:], SetPresumeKeyNotExists, SetPessimisticLock) + db.SetWithFlags(buf[:], buf[:], SetPresumeKeyNotExists, SetKeyLocked) } else { db.SetWithFlags(buf[:], buf[:], SetPresumeKeyNotExists) } @@ -487,15 +486,15 @@ func (s *testMemDBSuite) TestFlags(c *C) { flags, err := db.GetFlags(buf[:]) if i%2 == 0 { c.Assert(err, IsNil) - c.Assert(flags.HasPessimisticLock(), IsTrue) + c.Assert(flags.HasLocked(), IsTrue) c.Assert(flags.HasPresumeKeyNotExists(), IsFalse) } else { c.Assert(err, NotNil) } } - c.Assert(db.Len(), Equals, 0) - c.Assert(db.Size(), Equals, 0) + c.Assert(db.Len(), Equals, 5000) + c.Assert(db.Size(), Equals, 20000) it1, _ := db.Iter(nil, nil) it := it1.(*memdbIterator) @@ -512,7 +511,7 @@ func (s *testMemDBSuite) TestFlags(c *C) { for i := uint32(0); i < cnt; i++ { var buf [4]byte binary.BigEndian.PutUint32(buf[:], i) - db.UpdateFlags(buf[:], DelPessimisticLock) + db.UpdateFlags(buf[:], DelKeyLocked) } for i := uint32(0); i < cnt; i++ { var buf [4]byte @@ -523,7 +522,7 @@ func (s *testMemDBSuite) TestFlags(c *C) { // UpdateFlags will create missing node. flags, err := db.GetFlags(buf[:]) c.Assert(err, IsNil) - c.Assert(flags.HasPessimisticLock(), IsFalse) + c.Assert(flags.HasLocked(), IsFalse) } } diff --git a/session/txn.go b/session/txn.go index 5346a7056d35c..7e13baa19d487 100644 --- a/session/txn.go +++ b/session/txn.go @@ -297,7 +297,6 @@ func (st *TxnState) KeysNeedToLock() ([]kv.Key, error) { if !keyNeedToLock(k, v) { return } - // If the key is already locked, it will be deduplicated in LockKeys method later. keys = append(keys, k) }) return keys, nil diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 1162f592c0848..6498180875cb8 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -152,9 +152,8 @@ type StatementContext struct { LockKeysDuration int64 LockKeysCount int32 TblInfo2UnionScan map[*model.TableInfo]bool - TaskID uint64 // unique ID for an execution of a statement - TaskMapBakTS uint64 // counter for - CheckKeyExists map[string]struct{} // mark the keys needs to check for existence for pessimistic locks. + TaskID uint64 // unique ID for an execution of a statement + TaskMapBakTS uint64 // counter for } // StmtHints are SessionVars related sql hints. @@ -486,7 +485,6 @@ func (sc *StatementContext) ResetForRetry() { sc.TableIDs = sc.TableIDs[:0] sc.IndexNames = sc.IndexNames[:0] sc.TaskID = AllocateTaskID() - sc.CheckKeyExists = make(map[string]struct{}) } // MergeExecDetails merges a single region execution details into self, used to print diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index c6222603faec4..85753f0c9b4c5 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -17,7 +17,6 @@ import ( "bytes" "context" "math" - "sort" "strings" "sync" "sync/atomic" @@ -223,92 +222,68 @@ func (c *twoPhaseCommitter) initKeysAndMutations() error { txn := c.txn memBuf := txn.GetMemBuffer() - sizeHint := len(txn.lockKeys) + txn.us.GetMemBuffer().Len() + sizeHint := txn.us.GetMemBuffer().Len() mutations := NewCommiterMutations(sizeHint) c.isPessimistic = txn.IsPessimistic() - // Merge ordered lockKeys and pairs in the memBuffer into the mutations array - sort.Slice(txn.lockKeys, func(i, j int) bool { - return bytes.Compare(txn.lockKeys[i], txn.lockKeys[j]) < 0 - }) - lockIdx := 0 - err := kv.WalkMemBuffer(txn.us.GetMemBuffer(), func(k kv.Key, v []byte) error { - var ( - op pb.Op - value []byte - isPessimisticLock bool - ) - if len(v) > 0 { - if tablecodec.IsUntouchedIndexKValue(k, v) { - return nil - } - op = pb.Op_Put - if txn.us.HasPresumeKeyNotExists(k) { - op = pb.Op_Insert + var err error + for it := memBuf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { + _ = err + key := it.Key() + flags := it.Flags() + var value []byte + var op pb.Op + + if !it.HasValue() { + if !flags.HasLocked() { + continue } - value = v - putCnt++ + op = pb.Op_Lock + lockCnt++ } else { - if !txn.IsPessimistic() && txn.us.HasPresumeKeyNotExists(k) { - // delete-your-writes keys in optimistic txn need check not exists in prewrite-phase - // due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase. - op = pb.Op_CheckNotExists - checkCnt++ - memBuf.UpdateFlags(k, kv.SetNoNeedCommit) + value = it.Value() + if len(value) > 0 { + if tablecodec.IsUntouchedIndexKValue(key, value) { + continue + } + op = pb.Op_Put + if flags.HasPresumeKeyNotExists() { + op = pb.Op_Insert + } + putCnt++ } else { - // normal delete keys in optimistic txn can be delete without not exists checking - // delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them - op = pb.Op_Del - delCnt++ + if !txn.IsPessimistic() && flags.HasPresumeKeyNotExists() { + // delete-your-writes keys in optimistic txn need check not exists in prewrite-phase + // due to `Op_CheckNotExists` doesn't prewrite lock, so mark those keys should not be used in commit-phase. + op = pb.Op_CheckNotExists + checkCnt++ + memBuf.UpdateFlags(key, kv.SetNoNeedCommit) + } else { + // normal delete keys in optimistic txn can be delete without not exists checking + // delete-your-writes keys in pessimistic txn can ensure must be no exists so can directly delete them + op = pb.Op_Del + delCnt++ + } } } - for lockIdx < len(txn.lockKeys) { - lockKey := txn.lockKeys[lockIdx] - ord := bytes.Compare(lockKey, k) - if ord == 0 { - isPessimisticLock = c.isPessimistic - lockIdx++ - break - } else if ord > 0 { - break - } else { - mutations.Push(pb.Op_Lock, lockKey, nil, c.isPessimistic) - lockCnt++ - size += len(lockKey) - lockIdx++ - } + + var isPessimistic bool + if flags.HasLocked() { + isPessimistic = c.isPessimistic } - mutations.Push(op, k, value, isPessimisticLock) - entrySize := len(k) + len(v) - if uint64(entrySize) > kv.TxnEntrySizeLimit { - return kv.ErrEntryTooLarge.GenWithStackByArgs(kv.TxnEntrySizeLimit, entrySize) + mutations.Push(op, key, value, isPessimistic) + size += len(key) + len(value) + + if len(c.primaryKey) == 0 && op != pb.Op_CheckNotExists { + c.primaryKey = key } - size += entrySize - return nil - }) - if err != nil { - return errors.Trace(err) - } - // add the remaining locks to mutations and keys - for _, lockKey := range txn.lockKeys[lockIdx:] { - mutations.Push(pb.Op_Lock, lockKey, nil, c.isPessimistic) - lockCnt++ - size += len(lockKey) } + if mutations.len() == 0 { return nil } c.txnSize = size - if len(c.primaryKey) == 0 { - for i, op := range mutations.ops { - if op != pb.Op_CheckNotExists { - c.primaryKey = mutations.keys[i] - break - } - } - } - if size > int(kv.TxnTotalSizeLimit) { return kv.ErrTxnTooLarge.GenWithStackByArgs(size) } diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 4858964ccf1a5..09a75a34b211c 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -639,7 +639,7 @@ func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { lockCtx = &kv.LockCtx{ForUpdateTS: 100, WaitStartTime: time.Now()} err = txn.LockKeys(context.Background(), lockCtx, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - c.Assert(txn.lockKeys, HasLen, 2) + c.Assert(txn.collectLockedKeys(), HasLen, 2) } func (s *testCommitterSuite) TestPessimisticTTL(c *C) { diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 45ecd38b2b060..1d2cc7a21d157 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -61,12 +61,11 @@ type tikvTxn struct { startTS uint64 startTime time.Time // Monotonic timestamp for recording txn time consuming. commitTS uint64 - lockKeys [][]byte - lockedMap map[string]bool mu sync.Mutex // For thread-safe LockKeys function. setCnt int64 vars *kv.Variables committer *twoPhaseCommitter + lockedCnt int // For data consistency check. // assertions[:confirmed] is the assertion of current transaction. @@ -100,7 +99,6 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin return &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), - lockedMap: make(map[string]bool), store: store, startTS: startTS, startTime: time.Now(), @@ -322,10 +320,25 @@ func (txn *tikvTxn) Rollback() error { } func (txn *tikvTxn) rollbackPessimisticLocks() error { - if len(txn.lockKeys) == 0 { + if txn.lockedCnt == 0 { return nil } - return txn.committer.pessimisticRollbackMutations(NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars), CommitterMutations{keys: txn.lockKeys}) + bo := NewBackofferWithVars(context.Background(), cleanupMaxBackoff, txn.vars) + keys := txn.collectLockedKeys() + return txn.committer.pessimisticRollbackMutations(bo, CommitterMutations{keys: keys}) +} + +func (txn *tikvTxn) collectLockedKeys() [][]byte { + keys := make([][]byte, 0, txn.lockedCnt) + buf := txn.GetMemBuffer() + var err error + for it := buf.IterWithFlags(nil, nil); it.Valid(); err = it.Next() { + _ = err + if it.Flags().HasLocked() { + keys = append(keys, it.Key()) + } + } + return keys } // lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock @@ -349,10 +362,15 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput *lockCtx.LockKeysCount += int32(len(keys)) } }() + memBuf := txn.us.GetMemBuffer() for _, key := range keysInput { // The value of lockedMap is only used by pessimistic transactions. - valueExist, locked := txn.lockedMap[string(key)] - _, checkKeyExists := lockCtx.CheckKeyExists[string(key)] + var valueExist, locked, checkKeyExists bool + if flags, err := memBuf.GetFlags(key); err == nil { + locked = flags.HasLocked() + valueExist = flags.HasLockedValueExists() + checkKeyExists = flags.HasNeedCheckExists() + } if !locked { keys = append(keys, key) } else if txn.IsPessimistic() { @@ -394,7 +412,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput txn.committer.forUpdateTS = lockCtx.ForUpdateTS // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. - txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 + txn.committer.isFirstLock = txn.lockedCnt == 0 && len(keys) == 1 err = txn.committer.pessimisticLockMutations(bo, lockCtx, CommitterMutations{keys: keys}) if lockCtx.Killed != nil { // If the kill signal is received during waiting for pessimisticLock, @@ -404,7 +422,9 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput } if err != nil { for _, key := range keys { - txn.us.UnmarkPresumeKeyNotExists(key) + if txn.us.HasPresumeKeyNotExists(key) { + txn.us.UnmarkPresumeKeyNotExists(key) + } } keyMayBeLocked := terror.ErrorNotEqual(kv.ErrWriteConflict, err) && terror.ErrorNotEqual(kv.ErrKeyExists, err) // If there is only 1 key and lock fails, no need to do pessimistic rollback. @@ -431,19 +451,19 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput txn.committer.ttlManager.run(txn.committer, lockCtx) } } - txn.lockKeys = append(txn.lockKeys, keys...) for _, key := range keys { + valExists := kv.SetKeyLockedValueExists // PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist. // For other lock modes, the locked key values always exist. if lockCtx.ReturnValues { val, _ := lockCtx.Values[string(key)] - valExists := len(val.Value) > 0 - txn.lockedMap[string(key)] = valExists - } else { - txn.lockedMap[string(key)] = true + if len(val.Value) == 0 { + valExists = kv.SetKeyLockedValueNotExists + } } + memBuf.UpdateFlags(key, kv.SetKeyLocked, kv.DelNeedCheckExists, valExists) } - txn.dirty = true + txn.lockedCnt += len(keys) return nil } @@ -495,7 +515,7 @@ func hashInKeys(deadlockKeyHash uint64, keys [][]byte) bool { } func (txn *tikvTxn) IsReadOnly() bool { - return !txn.dirty && !txn.us.GetMemBuffer().Dirty() + return !txn.us.GetMemBuffer().Dirty() } func (txn *tikvTxn) StartTS() uint64 { diff --git a/table/tables/index.go b/table/tables/index.go index 6ba83f65c4a1c..01c919f1754a7 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -284,7 +284,6 @@ func (c *index) Create(sctx sessionctx.Context, us kv.UnionStore, indexedValues if err != nil || len(value) == 0 { if sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil { err = us.GetMemBuffer().SetWithFlags(key, idxVal, kv.SetPresumeKeyNotExists) - sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{} } else { err = us.GetMemBuffer().Set(key, idxVal) } diff --git a/table/tables/tables.go b/table/tables/tables.go index 1b6632fb209b6..c705b5f83cc50 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -699,7 +699,6 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . if setPresume { err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists) - sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{} } else { err = memBuffer.Set(key, value) }