Skip to content

Commit

Permalink
fix insert check after lock (#19236)
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored Aug 19, 2020
1 parent 6446961 commit d88f03e
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 10 deletions.
2 changes: 2 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
PessimisticLockWaited: &seVars.StmtCtx.PessimisticLockWaited,
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
CheckKeyExists: seVars.StmtCtx.CheckKeyExists,
}
}

Expand Down Expand Up @@ -1554,6 +1555,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
sc.CheckKeyExists = make(map[string]struct{})
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
err = vars.SetSystemVar("warning_count", warnCount)
if err != nil {
Expand Down
11 changes: 9 additions & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ type PointGetExecutor struct {
done bool
lock bool
lockWaitTime int64
snapValExist bool
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -146,7 +147,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), e.lockWaitTime), key)
lockCtx := newLockCtx(e.ctx.GetSessionVars(), e.lockWaitTime)
lockCtx.PointGetLock = &e.snapValExist
return doLockKeys(ctx, e.ctx, lockCtx, key)
}
return nil
}
Expand Down Expand Up @@ -191,7 +194,11 @@ func (e *PointGetExecutor) get(key kv.Key) (val []byte, err error) {
}
// fallthrough to snapshot get.
}
return e.snapshot.Get(key)
snapVal, snapErr := e.snapshot.Get(key)
if snapErr == nil && len(snapVal) > 0 {
e.snapValExist = true
}
return snapVal, snapErr
}

func (e *PointGetExecutor) decodeRowValToChunk(rowVal []byte, chk *chunk.Chunk) error {
Expand Down
4 changes: 4 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (
Pessimistic
// SnapshotTS is defined to set snapshot ts.
SnapshotTS
// CheckExist map for key existence check.
CheckExists
)

// Priority value for transaction priority.
Expand Down Expand Up @@ -182,6 +184,8 @@ type LockCtx struct {
PessimisticLockWaited *int32
LockKeysDuration *time.Duration
LockKeysCount *int32
CheckKeyExists map[string]struct{}
PointGetLock *bool
}

// Client is used to send request to KV layer.
Expand Down
4 changes: 4 additions & 0 deletions kv/union_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ func (us *unionStore) Get(k Key) ([]byte, error) {
e, ok := us.opts.Get(PresumeKeyNotExistsError)
if ok && e != nil {
us.markLazyConditionPair(k, nil, e.(error))
if val, ok := us.opts.Get(CheckExists); ok {
checkExistMap := val.(map[string]struct{})
checkExistMap[string(k)] = struct{}{}
}
} else {
us.markLazyConditionPair(k, nil, ErrKeyExists)
}
Expand Down
91 changes: 91 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,94 @@ func (s *testPessimisticSuite) TestPessimisticReadCommitted(c *C) {

tk1.MustExec("commit;")
}

func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk2.MustExec("use test_db")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));")
tk2.MustExec("insert into t1 values(1, 2, 3);")
tk2.MustExec("insert into t1 values(10, 20, 30);")

// Test insert after lock.
tk.MustExec("begin pessimistic")
err := tk.ExecToErr("update t1 set c2 = 20 where c1 = 1;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 where c2 = 2 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

// Test insert after insert.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("insert into t1 values(5, 6, 7)")
err = tk.ExecToErr("insert into t1 values(6, 6, 7);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30"))

// Test insert after delete.
tk.MustExec("begin pessimistic")
tk.MustExec("delete from t1 where c2 > 2")
tk.MustExec("insert into t1 values(10, 20, 500);")
err = tk.ExecToErr("insert into t1 values(20, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test range.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test select for update after dml.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 6, 7)")
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c1 = 6 for update")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(7, 6, 7)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(5, 8, 6)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c2 = 8 for update")
tk.MustExec("select * from t1 for update")
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500"))

// Test optimistic for update.
tk.MustExec("begin optimistic")
tk.MustQuery("select * from t1 where c1 = 1 for update").Check(testkit.Rows("1 2 3"))
tk.MustExec("insert into t1 values(10, 10, 10)")
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}
2 changes: 2 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type StatementContext struct {
planNormalized string
planDigest string
LockKeysCount int32
CheckKeyExists map[string]struct{} // mark the keys needs to check for existence for pessimistic locks.
}

// GetNowTsCached getter for nowTs, if not set get now time and cache it
Expand Down Expand Up @@ -436,6 +437,7 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.Unlock()
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.CheckKeyExists = make(map[string]struct{})
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
35 changes: 27 additions & 8 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type tikvTxn struct {
commitTS uint64
valid bool
lockKeys [][]byte
lockedMap map[string]struct{}
lockedMap map[string]bool
mu sync.Mutex // For thread-safe LockKeys function.
dirty bool
setCnt int64
Expand All @@ -88,7 +88,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) {
return &tikvTxn{
snapshot: snapshot,
us: kv.NewUnionStore(snapshot),
lockedMap: map[string]struct{}{},
lockedMap: make(map[string]bool),
store: store,
startTS: startTS,
startTime: time.Now(),
Expand Down Expand Up @@ -225,6 +225,8 @@ func (txn *tikvTxn) SetOption(opt kv.Option, val interface{}) {
txn.snapshot.keyOnly = val.(bool)
case kv.SnapshotTS:
txn.snapshot.setSnapshotTS(val.(uint64))
case kv.CheckExists:
txn.us.SetOption(kv.CheckExists, val.(map[string]struct{}))
}
}

Expand Down Expand Up @@ -354,6 +356,8 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
// Exclude keys that are already locked.
var err error
keys := make([][]byte, 0, len(keysInput))
txn.mu.Lock()
defer txn.mu.Unlock()
defer func() {
if err == nil {
if lockCtx.PessimisticLockWaited != nil {
Expand All @@ -368,13 +372,26 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
*lockCtx.LockKeysCount += int32(len(keys))
}
}()
txn.mu.Lock()
for _, key := range keysInput {
if _, ok := txn.lockedMap[string(key)]; !ok {
// The value of lockedMap is only used by pessimistic transactions.
valueExist, locked := txn.lockedMap[string(key)]
_, checkKeyExists := lockCtx.CheckKeyExists[string(key)]
if !locked {
keys = append(keys, key)
} else if txn.IsPessimistic() {
if checkKeyExists && valueExist {
existErrInfo := txn.us.LookupConditionPair(key)
if existErrInfo == nil {
logutil.Logger(ctx).Error("key exist error not found",
zap.Uint64("connID", txn.committer.connID),
zap.Uint64("startTS", txn.startTS),
zap.ByteString("key", key))
return errors.Errorf("conn %d, existErr for key:%s should not be nil", txn.committer.connID, key)
}
return existErrInfo.Err()
}
}
}
txn.mu.Unlock()
if len(keys) == 0 {
return nil
}
Expand Down Expand Up @@ -437,13 +454,15 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
txn.committer.ttlManager.run(txn.committer, lockCtx.Killed)
}
}
txn.mu.Lock()
txn.lockKeys = append(txn.lockKeys, keys...)
for _, key := range keys {
txn.lockedMap[string(key)] = struct{}{}
if lockCtx.PointGetLock != nil {
txn.lockedMap[string(key)] = *lockCtx.PointGetLock
} else {
txn.lockedMap[string(key)] = true
}
}
txn.dirty = true
txn.mu.Unlock()
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ func (t *tableCommon) addIndices(ctx sessionctx.Context, recordID int64, r []typ
}
dupKeyErr = kv.ErrKeyExists.FastGen("Duplicate entry '%s' for key '%s'", entryKey, v.Meta().Name)
txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr)
txn.SetOption(kv.CheckExists, ctx.GetSessionVars().StmtCtx.CheckKeyExists)
}
if dupHandle, err := v.Create(ctx, rm, indexVals, recordID, opt); err != nil {
if kv.ErrKeyExists.Equal(err) {
Expand Down Expand Up @@ -1091,6 +1092,7 @@ func CheckHandleExists(ctx sessionctx.Context, t table.Table, recordID int64, da
recordKey := t.RecordKey(recordID)
e := kv.ErrKeyExists.FastGen("Duplicate entry '%d' for key 'PRIMARY'", recordID)
txn.SetOption(kv.PresumeKeyNotExistsError, e)
txn.SetOption(kv.CheckExists, ctx.GetSessionVars().StmtCtx.CheckKeyExists)
defer txn.DelOption(kv.PresumeKeyNotExistsError)
_, err = txn.Get(recordKey)
if err == nil {
Expand Down
1 change: 1 addition & 0 deletions util/mock/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ func NewContext() *Context {
sctx.sessionVars.MaxChunkSize = 32
sctx.sessionVars.StmtCtx.TimeZone = time.UTC
sctx.sessionVars.GlobalVarsAccessor = variable.NewMockGlobalAccessor()
sctx.sessionVars.StmtCtx.CheckKeyExists = make(map[string]struct{})
if err := sctx.GetSessionVars().SetSystemVar(variable.MaxAllowedPacket, "67108864"); err != nil {
panic(err)
}
Expand Down

0 comments on commit d88f03e

Please sign in to comment.