diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 6a55ce885..7f79ee0fd 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -100,7 +100,9 @@ type diagnosticContext struct { reqDuration time.Duration } -func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { +func (action actionPessimisticLock) handleSingleBatch( + c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations, +) error { convertMutationsToPb := func(committerMutations CommitterMutations) []*kvrpcpb.Mutation { mutations := make([]*kvrpcpb.Mutation, committerMutations.Len()) c.txn.GetMemBuffer().RLock() @@ -120,26 +122,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * m := batch.mutations mutations := convertMutationsToPb(m) - req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{ - Mutations: mutations, - PrimaryLock: c.primary(), - StartVersion: c.startTS, - ForUpdateTs: c.forUpdateTS, - IsFirstLock: c.isFirstLock, - WaitTimeout: action.LockWaitTime(), - ReturnValues: action.ReturnValues, - CheckExistence: action.CheckExistence, - MinCommitTs: c.forUpdateTS + 1, - WakeUpMode: action.wakeUpMode, - LockOnlyIfExists: action.LockOnlyIfExists, - }, kvrpcpb.Context{ - Priority: c.priority, - SyncLog: c.syncLog, - ResourceGroupTag: action.LockCtx.ResourceGroupTag, - MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), - RequestSource: c.txn.GetRequestSource(), - ResourceGroupName: c.resourceGroupName, - }) + req := tikvrpc.NewRequest( + tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{ + Mutations: mutations, + PrimaryLock: c.primary(), + StartVersion: c.startTS, + ForUpdateTs: c.forUpdateTS, + IsFirstLock: c.isFirstLock, + WaitTimeout: action.LockWaitTime(), + ReturnValues: action.ReturnValues, + CheckExistence: action.CheckExistence, + MinCommitTs: c.forUpdateTS + 1, + WakeUpMode: action.wakeUpMode, + LockOnlyIfExists: action.LockOnlyIfExists, + }, kvrpcpb.Context{ + Priority: c.priority, + SyncLog: c.syncLog, + ResourceGroupTag: action.LockCtx.ResourceGroupTag, + MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()), + RequestSource: c.txn.GetRequestSource(), + ResourceGroupName: c.resourceGroupName, + }, + ) if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil { req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest)) } @@ -168,8 +172,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * for _, m := range mutations { keys = append(keys, hex.EncodeToString(m.Key)) } - logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock", - zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys)) + logutil.BgLogger().Info( + "[failpoint] injected lock ttl = 1 on pessimistic lock", + zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys), + ) } req.PessimisticLock().LockTtl = ttl if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil { @@ -221,7 +227,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * } } -func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error) (finished bool, err error) { +func (action actionPessimisticLock) handleRegionError( + c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error, +) (finished bool, err error) { // For other region error and the fake region error, backoff because // there's something wrong. // For the real EpochNotMatch error, don't backoff. @@ -242,7 +250,13 @@ func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo * return true, err } -func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) (locks []*txnlock.Lock, finished bool, err error) { +// When handling wait timeout, if the current lock is updated within the threshold, do not try to resolve lock +// The default timeout in TiKV is 1 second. 300ms should be appropriate for common hot update workloads. +const skipResolveThresholdMs = 300 + +func (action actionPessimisticLock) handleKeyErrorForResolve( + c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError, +) (locks []*txnlock.Lock, finished bool, err error) { for _, keyErr := range keyErrs { // Check already exists error if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil { @@ -253,6 +267,15 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs return nil, true, errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock}) } + // Do not resolve the lock if the lock was recently updated which indicates the txn holding the lock is + // much likely alive. + // This should only happen for wait timeout. + if lockInfo := keyErr.GetLocked(); lockInfo != nil && + lockInfo.DurationToLastUpdateMs > 0 && + lockInfo.DurationToLastUpdateMs < skipResolveThresholdMs { + continue + } + // Extract lock from key error lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr) if err1 != nil { @@ -260,10 +283,16 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs } locks = append(locks, lock) } + if len(locks) == 0 { + return nil, false, nil + } return locks, false, nil } -func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) { +func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode( + c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, + resp *tikvrpc.Response, diagCtx *diagnosticContext, +) (finished bool, err error) { regionErr, err := resp.GetRegionError() if err != nil { return true, err @@ -283,7 +312,12 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t if len(keyErrs) == 0 { if action.LockCtx.Stats != nil { - action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2) + action.LockCtx.Stats.MergeReqDetails( + diagCtx.reqDuration, + batch.region.GetID(), + diagCtx.sender.GetStoreAddr(), + lockResp.ExecDetailsV2, + ) } if batch.isPrimary { @@ -314,10 +348,14 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t } return true, nil } - locks, finished, err := action.handleKeyError(c, keyErrs) + + locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs) if err != nil { return finished, err } + if len(locks) == 0 { + return false, nil + } // Because we already waited on tikv, no need to Backoff here. // tikv default will wait 3s(also the maximum wait value) when lock error occurs @@ -360,7 +398,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t return false, nil } -func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) { +func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode( + c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, + resp *tikvrpc.Response, diagCtx *diagnosticContext, +) (finished bool, err error) { regionErr, err := resp.GetRegionError() if err != nil { return true, err @@ -376,7 +417,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c if len(mutationsPb) > 1 || len(lockResp.Results) > 1 { panic("unreachable") } - if batch.isPrimary && len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed { + if batch.isPrimary && + len(lockResp.Results) > 0 && + lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed { // After locking the primary key, we should protect the primary lock from expiring. c.run(c, action.LockCtx) } @@ -422,11 +465,16 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c if len(lockResp.Results) > 0 && !isMutationFailed { if action.LockCtx.Stats != nil { - action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2) + action.LockCtx.Stats.MergeReqDetails( + diagCtx.reqDuration, + batch.region.GetID(), + diagCtx.sender.GetStoreAddr(), + lockResp.ExecDetailsV2, + ) } } - locks, finished, err := action.handleKeyError(c, keyErrs) + locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs) if err != nil { return finished, err } @@ -477,9 +525,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c return false, nil } - // If the failedMutations is not empty and the error is not KeyIsLocked, the function should have already - // returned before. So this is an unreachable path. - return true, errors.New("Pessimistic lock response corrupted") + // This can be the situation where KeyIsLocked errors are generated by timeout, + // and we decide not to resolve them. Instead, just retry + return false, nil } if len(locks) != 0 { @@ -497,16 +545,20 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c return true, nil } -func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error { +func (actionPessimisticRollback) handleSingleBatch( + c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations, +) error { forUpdateTS := c.forUpdateTS if c.maxLockedWithConflictTS > forUpdateTS { forUpdateTS = c.maxLockedWithConflictTS } - req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{ - StartVersion: c.startTS, - ForUpdateTs: forUpdateTS, - Keys: batch.mutations.GetKeys(), - }) + req := tikvrpc.NewRequest( + tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{ + StartVersion: c.startTS, + ForUpdateTs: forUpdateTS, + Keys: batch.mutations.GetKeys(), + }, + ) req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx()) req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds()) resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort) @@ -528,7 +580,10 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret return nil } -func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, mutations CommitterMutations) error { +func (c *twoPhaseCommitter) pessimisticLockMutations( + bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, + mutations CommitterMutations, +) error { if c.sessionID > 0 { if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil { // Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like @@ -537,19 +592,27 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt for _, action := range strings.Split(v, ",") { if action == "delay" { duration := time.Duration(rand.Int63n(int64(time.Second) * 5)) - logutil.Logger(bo.GetCtx()).Info("[failpoint] injected delay at pessimistic lock", - zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration)) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected delay at pessimistic lock", + zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration), + ) time.Sleep(duration) } else if action == "fail" { - logutil.Logger(bo.GetCtx()).Info("[failpoint] injected failure at pessimistic lock", - zap.Uint64("txnStartTS", c.startTS)) + logutil.Logger(bo.GetCtx()).Info( + "[failpoint] injected failure at pessimistic lock", + zap.Uint64("txnStartTS", c.startTS), + ) return errors.New("injected failure at pessimistic lock") } } } } } - return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations) + return c.doActionOnMutations( + bo, + actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, + mutations, + ) } func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {