Skip to content

Commit

Permalink
fix pessimistic check (#19004)
Browse files Browse the repository at this point in the history
Co-authored-by: ti-srebot <66930949+ti-srebot@users.noreply.github.com>
  • Loading branch information
cfzjywxk and ti-srebot authored Aug 6, 2020
1 parent 931ff98 commit 574540a
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 7 deletions.
2 changes: 2 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx {
LockKeysDuration: &seVars.StmtCtx.LockKeysDuration,
LockKeysCount: &seVars.StmtCtx.LockKeysCount,
LockExpired: &seVars.TxnCtx.LockExpire,
CheckKeyExists: seVars.StmtCtx.CheckKeyExists,
}
}

Expand Down Expand Up @@ -1652,6 +1653,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
}

sc.TblInfo2UnionScan = make(map[*model.TableInfo]bool)
sc.CheckKeyExists = make(map[string]struct{})
errCount, warnCount := vars.StmtCtx.NumErrorWarnings()
vars.SysErrorCount = errCount
vars.SysWarningCount = warnCount
Expand Down
1 change: 1 addition & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type LockCtx struct {
Values map[string]ReturnedValue
ValuesLock sync.Mutex
LockExpired *uint32
CheckKeyExists map[string]struct{}
}

// ReturnedValue pairs the Value and AlreadyLocked flag for PessimisticLock return values result.
Expand Down
182 changes: 182 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1498,3 +1498,185 @@ func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) {
tk.MustExec("commit")
tk.MustExec("admin check table t")
}

func (s *testPessimisticSuite) TestInsertDupKeyAfterLock(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk2.MustExec("use test_db")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));")
tk2.MustExec("insert into t1 values(1, 2, 3);")
tk2.MustExec("insert into t1 values(10, 20, 30);")

// Test insert after lock.
tk.MustExec("begin pessimistic")
err := tk.ExecToErr("update t1 set c2 = 20 where c1 = 1;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 where c2 = 2 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

// Test insert after insert.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("insert into t1 values(5, 6, 7)")
err = tk.ExecToErr("insert into t1 values(6, 6, 7);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30"))

// Test insert after delete.
tk.MustExec("begin pessimistic")
tk.MustExec("delete from t1 where c2 > 2")
tk.MustExec("insert into t1 values(10, 20, 500);")
err = tk.ExecToErr("insert into t1 values(20, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test range.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test select for update after dml.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 6, 7)")
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c1 = 6 for update")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(7, 6, 7)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(5, 8, 6)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("select * from t1 where c1 = 5 for update")
tk.MustExec("select * from t1 where c2 = 8 for update")
tk.MustExec("select * from t1 for update")
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500"))

// Test optimistic for update.
tk.MustExec("begin optimistic")
tk.MustQuery("select * from t1 where c1 = 1 for update").Check(testkit.Rows("1 2 3"))
tk.MustExec("insert into t1 values(10, 10, 10)")
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}

func (s *testPessimisticSuite) TestInsertDupKeyAfterLockBatchPointGet(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop database if exists test_db")
tk.MustExec("create database test_db")
tk.MustExec("use test_db")
tk2.MustExec("use test_db")
tk2.MustExec("drop table if exists t1")
tk2.MustExec("create table t1(c1 int primary key, c2 int, c3 int, unique key uk(c2));")
tk2.MustExec("insert into t1 values(1, 2, 3);")
tk2.MustExec("insert into t1 values(10, 20, 30);")

// Test insert after lock.
tk.MustExec("begin pessimistic")
err := tk.ExecToErr("update t1 set c2 = 20 where c1 in (1);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

tk.MustExec("begin pessimistic")
tk.MustExec("select * from t1 where c2 in (2) for update")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 30"))

// Test insert after insert.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("insert into t1 values(5, 6, 7)")
err = tk.ExecToErr("insert into t1 values(6, 6, 7);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 30"))

// Test insert after delete.
tk.MustExec("begin pessimistic")
tk.MustExec("delete from t1 where c2 > 2")
tk.MustExec("insert into t1 values(10, 20, 500);")
err = tk.ExecToErr("insert into t1 values(20, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 20, 30);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test range.
tk.MustExec("begin pessimistic")
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 5;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("update t1 set c2 = 20 where c1 >= 1 and c1 < 50;")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(1, 15, 300);")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "10 20 500"))

// Test select for update after dml.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t1 values(5, 6, 7)")
tk.MustExec("select * from t1 where c1 in (5, 6) for update")
tk.MustExec("select * from t1 where c1 = 6 for update")
tk.MustExec("select * from t1 for update")
err = tk.ExecToErr("insert into t1 values(7, 6, 7)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
err = tk.ExecToErr("insert into t1 values(5, 8, 6)")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
tk.MustExec("select * from t1 where c2 = 8 for update")
tk.MustExec("select * from t1 where c1 in (5, 8) for update")
tk.MustExec("select * from t1 for update")
tk.MustExec("commit")
tk2.MustQuery("select * from t1").Check(testkit.Rows("1 2 3", "5 6 7", "10 20 500"))

// Test optimistic for update.
tk.MustExec("begin optimistic")
tk.MustQuery("select * from t1 where c1 in (1) for update").Check(testkit.Rows("1 2 3"))
tk.MustExec("insert into t1 values(10, 10, 10)")
err = tk.ExecToErr("commit")
c.Assert(terror.ErrorEqual(err, kv.ErrKeyExists), IsTrue)
}
6 changes: 4 additions & 2 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,9 @@ type StatementContext struct {
LockKeysDuration int64
LockKeysCount int32
TblInfo2UnionScan map[*model.TableInfo]bool
TaskID uint64 // unique ID for an execution of a statement
TaskMapBakTS uint64 // counter for
TaskID uint64 // unique ID for an execution of a statement
TaskMapBakTS uint64 // counter for
CheckKeyExists map[string]struct{} // mark the keys needs to check for existence for pessimistic locks.
}

// StmtHints are SessionVars related sql hints.
Expand Down Expand Up @@ -485,6 +486,7 @@ func (sc *StatementContext) ResetForRetry() {
sc.TableIDs = sc.TableIDs[:0]
sc.IndexNames = sc.IndexNames[:0]
sc.TaskID = AllocateTaskID()
sc.CheckKeyExists = make(map[string]struct{})
}

// MergeExecDetails merges a single region execution details into self, used to print
Expand Down
26 changes: 21 additions & 5 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type tikvTxn struct {
startTime time.Time // Monotonic timestamp for recording txn time consuming.
commitTS uint64
lockKeys [][]byte
lockedMap map[string]struct{}
lockedMap map[string]bool
mu sync.Mutex // For thread-safe LockKeys function.
setCnt int64
vars *kv.Variables
Expand Down Expand Up @@ -100,7 +100,7 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uin
return &tikvTxn{
snapshot: snapshot,
us: kv.NewUnionStore(snapshot),
lockedMap: map[string]struct{}{},
lockedMap: make(map[string]bool),
store: store,
startTS: startTS,
startTime: time.Now(),
Expand Down Expand Up @@ -350,9 +350,17 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
}
}()
for _, key := range keysInput {
if _, ok := txn.lockedMap[string(key)]; !ok {
// The value of lockedMap is only used by pessimistic transactions.
valueExist, locked := txn.lockedMap[string(key)]
_, checkKeyExists := lockCtx.CheckKeyExists[string(key)]
if !locked {
keys = append(keys, key)
} else if lockCtx.ReturnValues {
} else if txn.IsPessimistic() {
if checkKeyExists && valueExist {
return txn.committer.extractKeyExistsErr(key)
}
}
if lockCtx.ReturnValues && locked {
// An already locked key can not return values, we add an entry to let the caller get the value
// in other ways.
lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true}
Expand Down Expand Up @@ -425,7 +433,15 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput
}
txn.lockKeys = append(txn.lockKeys, keys...)
for _, key := range keys {
txn.lockedMap[string(key)] = struct{}{}
// PointGet and BatchPointGet will return value in pessimistic lock response, the value may not exist.
// For other lock modes, the locked key values always exist.
if lockCtx.ReturnValues {
val, _ := lockCtx.Values[string(key)]
valExists := len(val.Value) > 0
txn.lockedMap[string(key)] = valExists
} else {
txn.lockedMap[string(key)] = true
}
}
txn.dirty = true
return nil
Expand Down
1 change: 1 addition & 0 deletions table/tables/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ func (c *index) Create(sctx sessionctx.Context, us kv.UnionStore, indexedValues
if err != nil || len(value) == 0 {
if sctx.GetSessionVars().LazyCheckKeyNotExists() && err != nil {
err = us.GetMemBuffer().SetWithFlags(key, idxVal, kv.SetPresumeKeyNotExists)
sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{}
} else {
err = us.GetMemBuffer().Set(key, idxVal)
}
Expand Down
1 change: 1 addition & 0 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,7 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .

if setPresume {
err = memBuffer.SetWithFlags(key, value, kv.SetPresumeKeyNotExists)
sctx.GetSessionVars().StmtCtx.CheckKeyExists[string(key)] = struct{}{}
} else {
err = memBuffer.Set(key, value)
}
Expand Down

0 comments on commit 574540a

Please sign in to comment.