From f42288551db47dd09435d68b55419e8822d6a293 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Wed, 5 Aug 2020 16:09:52 +0800 Subject: [PATCH 1/4] transaction: fix LockKeys race --- session/pessimistic_test.go | 12 ++++++++++++ store/tikv/txn.go | 6 ++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index faa67cfbff40e..6d11da84c1a28 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -1486,3 +1486,15 @@ func (s *testPessimisticSuite) TestPessimisticTxnWithDDLChangeColumn(c *C) { tk2.MustExec("drop database if exists test_db") } + +func (s *testPessimisticSuite) TestPessimisticUnionForUpdate(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(id int, v int, k int, primary key (id), key kk(k))") + tk.MustExec("insert into t select 1, 1, 1") + tk.MustExec("begin pessimistic") + tk.MustQuery("(select * from t where id between 0 and 1 for update) union all (select * from t where id between 0 and 1 for update)") + tk.MustExec("update t set k = 2 where k = 1") + tk.MustExec("commit") + tk.MustExec("admin check table t") +} diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 08f3fbbb98838..757b66a1a6453 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -330,6 +330,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { // lockWaitTime in ms, except that kv.LockAlwaysWait(0) means always wait lock, kv.LockNowait(-1) means nowait lock func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput ...kv.Key) error { + txn.mu.Lock() + defer txn.mu.Unlock() // Exclude keys that are already locked. var err error keys := make([][]byte, 0, len(keysInput)) @@ -347,7 +349,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput *lockCtx.LockKeysCount += int32(len(keys)) } }() - txn.mu.Lock() for _, key := range keysInput { if _, ok := txn.lockedMap[string(key)]; !ok { keys = append(keys, key) @@ -357,7 +358,6 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput lockCtx.Values[string(key)] = kv.ReturnedValue{AlreadyLocked: true} } } - txn.mu.Unlock() if len(keys) == 0 { return nil } @@ -423,13 +423,11 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput txn.committer.ttlManager.run(txn.committer, lockCtx) } } - txn.mu.Lock() txn.lockKeys = append(txn.lockKeys, keys...) for _, key := range keys { txn.lockedMap[string(key)] = struct{}{} } txn.dirty = true - txn.mu.Unlock() return nil } From 4fc070667757dab3d503b4ba131e43d579d453f6 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Wed, 5 Aug 2020 19:15:06 +0800 Subject: [PATCH 2/4] do not update delta for lock keys --- executor/executor.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index f4c1db2b15be6..c1bf2909bd846 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -942,13 +942,6 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { lockWaitTime = kv.LockNoWait } - if len(e.keys) > 0 { - // This operation is only for schema validator check. - for id := range e.tblID2Handle { - e.ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(id, 0, 0, map[int64]int64{}) - } - } - return doLockKeys(ctx, e.ctx, newLockCtx(e.ctx.GetSessionVars(), lockWaitTime), e.keys...) } From 0f2cd186b489db8258ef98bbc1587ed0309ec964 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Wed, 5 Aug 2020 20:07:59 +0800 Subject: [PATCH 3/4] fix more race --- kv/kv.go | 2 +- sessionctx/stmtctx/stmtctx.go | 17 +++++++++-------- store/tikv/txn.go | 2 +- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 77d6393eeeba3..361c635ec9533 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -250,7 +250,7 @@ type LockCtx struct { LockWaitTime int64 WaitStartTime time.Time PessimisticLockWaited *int32 - LockKeysDuration *time.Duration + LockKeysDuration *int64 LockKeysCount *int32 ReturnValues bool Values map[string]ReturnedValue diff --git a/sessionctx/stmtctx/stmtctx.go b/sessionctx/stmtctx/stmtctx.go index 15895b6706510..6498180875cb8 100644 --- a/sessionctx/stmtctx/stmtctx.go +++ b/sessionctx/stmtctx/stmtctx.go @@ -146,10 +146,10 @@ type StatementContext struct { planNormalized string planDigest string Tables []TableEntry - PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" - lockWaitStartTime *time.Time // LockWaitStartTime stores the pessimistic lock wait start time + PointExec bool // for point update cached execution, Constant expression need to set "paramMarker" + lockWaitStartTime int64 // LockWaitStartTime stores the pessimistic lock wait start time PessimisticLockWaited int32 - LockKeysDuration time.Duration + LockKeysDuration int64 LockKeysCount int32 TblInfo2UnionScan map[*model.TableInfo]bool TaskID uint64 // unique ID for an execution of a statement @@ -510,7 +510,7 @@ func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails { var details execdetails.ExecDetails sc.mu.Lock() details = sc.mu.execDetails - details.LockKeysDuration = sc.LockKeysDuration + details.LockKeysDuration = time.Duration(atomic.LoadInt64(&sc.LockKeysDuration)) sc.mu.Unlock() return details } @@ -651,11 +651,12 @@ func (sc *StatementContext) SetFlagsFromPBFlag(flags uint64) { // GetLockWaitStartTime returns the statement pessimistic lock wait start time func (sc *StatementContext) GetLockWaitStartTime() time.Time { - if sc.lockWaitStartTime == nil { - curTime := time.Now() - sc.lockWaitStartTime = &curTime + startTime := atomic.LoadInt64(&sc.lockWaitStartTime) + if startTime == 0 { + startTime = time.Now().UnixNano() + atomic.StoreInt64(&sc.lockWaitStartTime, startTime) } - return *sc.lockWaitStartTime + return time.Unix(0, startTime) } //CopTasksDetails collects some useful information of cop-tasks during execution. diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 757b66a1a6453..c5b181953ea85 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -340,7 +340,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keysInput if lockCtx.PessimisticLockWaited != nil { if atomic.LoadInt32(lockCtx.PessimisticLockWaited) > 0 { timeWaited := time.Since(lockCtx.WaitStartTime) - *lockCtx.LockKeysDuration = timeWaited + atomic.StoreInt64(lockCtx.LockKeysDuration, int64(timeWaited)) metrics.TiKVPessimisticLockKeysDuration.Observe(timeWaited.Seconds()) } } From 9b8061a539a519b55bdaed61a42a7cf8cd0b3f07 Mon Sep 17 00:00:00 2001 From: Evan Zhou Date: Wed, 5 Aug 2020 20:30:28 +0800 Subject: [PATCH 4/4] fix another race --- executor/executor.go | 2 +- session/session.go | 2 +- sessionctx/variable/session.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index c1bf2909bd846..6a5ad18e1488b 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -965,7 +965,7 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *kv.LockCtx { func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { sctx := se.GetSessionVars().StmtCtx if !sctx.InUpdateStmt && !sctx.InDeleteStmt { - se.GetSessionVars().TxnCtx.ForUpdate = true + atomic.StoreUint32(&se.GetSessionVars().TxnCtx.ForUpdate, 1) } // Lock keys only once when finished fetching all results. txn, err := se.Txn(true) diff --git a/session/session.go b/session/session.go index fa07ea03cc122..cc43d2f1755fe 100644 --- a/session/session.go +++ b/session/session.go @@ -627,7 +627,7 @@ func (s *session) retry(ctx context.Context, maxCnt uint) (err error) { connID := s.sessionVars.ConnectionID s.sessionVars.RetryInfo.Retrying = true - if s.sessionVars.TxnCtx.ForUpdate { + if atomic.LoadUint32(&s.sessionVars.TxnCtx.ForUpdate) == 1 { err = ErrForUpdateCantRetry.GenWithStackByArgs(connID) return err } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index e806ae4391d31..a7aab8d6c5b46 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -163,11 +163,11 @@ type TransactionContext struct { // CreateTime For metrics. CreateTime time.Time StatementCount int - ForUpdate bool CouldRetry bool IsPessimistic bool Isolation string LockExpire uint32 + ForUpdate uint32 } // GetShard returns the shard prefix for the next `count` rowids.