Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

For KeyIsLocked error reported for timeout, if a lock is recently updated, don't try to resolve it. #758

Merged
merged 9 commits into from
Apr 13, 2023
157 changes: 110 additions & 47 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ type diagnosticContext struct {
reqDuration time.Duration
}

func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (action actionPessimisticLock) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
convertMutationsToPb := func(committerMutations CommitterMutations) []*kvrpcpb.Mutation {
mutations := make([]*kvrpcpb.Mutation, committerMutations.Len())
c.txn.GetMemBuffer().RLock()
Expand All @@ -120,26 +122,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *

m := batch.mutations
mutations := convertMutationsToPb(m)
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticLock, &kvrpcpb.PessimisticLockRequest{
Mutations: mutations,
PrimaryLock: c.primary(),
StartVersion: c.startTS,
ForUpdateTs: c.forUpdateTS,
IsFirstLock: c.isFirstLock,
WaitTimeout: action.LockWaitTime(),
ReturnValues: action.ReturnValues,
CheckExistence: action.CheckExistence,
MinCommitTs: c.forUpdateTS + 1,
WakeUpMode: action.wakeUpMode,
LockOnlyIfExists: action.LockOnlyIfExists,
}, kvrpcpb.Context{
Priority: c.priority,
SyncLog: c.syncLog,
ResourceGroupTag: action.LockCtx.ResourceGroupTag,
MaxExecutionDurationMs: uint64(client.MaxWriteExecutionTime.Milliseconds()),
RequestSource: c.txn.GetRequestSource(),
ResourceGroupName: c.resourceGroupName,
},
)
if action.LockCtx.ResourceGroupTag == nil && action.LockCtx.ResourceGroupTagger != nil {
req.ResourceGroupTag = action.LockCtx.ResourceGroupTagger(req.Req.(*kvrpcpb.PessimisticLockRequest))
}
Expand Down Expand Up @@ -168,8 +172,10 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
for _, m := range mutations {
keys = append(keys, hex.EncodeToString(m.Key))
}
logutil.BgLogger().Info("[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys))
logutil.BgLogger().Info(
"[failpoint] injected lock ttl = 1 on pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Strings("keys", keys),
)
}
req.PessimisticLock().LockTtl = ttl
if _, err := util.EvalFailpoint("PessimisticLockErrWriteConflict"); err == nil {
Expand Down Expand Up @@ -221,7 +227,9 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
}
}

func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error) (finished bool, err error) {
func (action actionPessimisticLock) handleRegionError(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, regionErr *errorpb.Error,
) (finished bool, err error) {
// For other region error and the fake region error, backoff because
// there's something wrong.
// For the real EpochNotMatch error, don't backoff.
Expand All @@ -242,7 +250,13 @@ func (action actionPessimisticLock) handleRegionError(c *twoPhaseCommitter, bo *
return true, err
}

func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError) (locks []*txnlock.Lock, finished bool, err error) {
// When handling wait timeout, if the current lock is updated within the threshold, do not try to resolve lock
// The default timeout in TiKV is 1 second. 300ms should be appropriate for common hot update workloads.
const skipResolveThresholdMs = 300

func (action actionPessimisticLock) handleKeyErrorForResolve(
c *twoPhaseCommitter, keyErrs []*kvrpcpb.KeyError,
) (locks []*txnlock.Lock, finished bool, err error) {
for _, keyErr := range keyErrs {
// Check already exists error
if alreadyExist := keyErr.GetAlreadyExist(); alreadyExist != nil {
Expand All @@ -253,17 +267,32 @@ func (action actionPessimisticLock) handleKeyError(c *twoPhaseCommitter, keyErrs
return nil, true, errors.WithStack(&tikverr.ErrDeadlock{Deadlock: deadlock})
}

// Do not resolve the lock if the lock was recently updated which indicates the txn holding the lock is
// much likely alive.
// This should only happen for wait timeout.
if lockInfo := keyErr.GetLocked(); lockInfo != nil &&
lockInfo.DurationToLastUpdateMs > 0 &&
lockInfo.DurationToLastUpdateMs < skipResolveThresholdMs {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default wait time in TiKV is 1s and the threshold is 1.2s, it seems that it will never resolve lock if the lock state have ever updated, and the threshold doesn't take any effect.
I think a proper threshold can depend on the actual wait timeout in TiKV, such as something like 0.3*waitTimeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 1 second default value is a setting in TiKV and may change without letting client-go know. Did you mean simply setting the threshold to 300ms?
I don't have a clear idea on what an optimal strategy would look like. For the hot-update workload, I think both of the strategies work fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe 300ms is fine as the 1 second would not be changed in most cases.
Is there any situation that this setting that would impact performance a lot? I don't come up with one yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've verified 300ms threshold in the hot-update workload. It works.

continue
}

// Extract lock from key error
lock, err1 := txnlock.ExtractLockFromKeyErr(keyErr)
if err1 != nil {
return nil, true, err1
}
locks = append(locks, lock)
}
if len(locks) == 0 {
return nil, false, nil
}
return locks, false, nil
}

func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
Expand All @@ -283,7 +312,12 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
if len(keyErrs) == 0 {

if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}

if batch.isPrimary {
Expand Down Expand Up @@ -314,10 +348,14 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
}
return true, nil
}
locks, finished, err := action.handleKeyError(c, keyErrs)

locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
if len(locks) == 0 {
return false, nil
}

// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
Expand Down Expand Up @@ -360,7 +398,10 @@ func (action actionPessimisticLock) handlePessimisticLockResponseNormalMode(c *t
return false, nil
}

func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation, resp *tikvrpc.Response, diagCtx *diagnosticContext) (finished bool, err error) {
func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(
c *twoPhaseCommitter, bo *retry.Backoffer, batch *batchMutations, mutationsPb []*kvrpcpb.Mutation,
resp *tikvrpc.Response, diagCtx *diagnosticContext,
) (finished bool, err error) {
regionErr, err := resp.GetRegionError()
if err != nil {
return true, err
Expand All @@ -376,7 +417,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
if len(mutationsPb) > 1 || len(lockResp.Results) > 1 {
panic("unreachable")
}
if batch.isPrimary && len(lockResp.Results) > 0 && lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
if batch.isPrimary &&
len(lockResp.Results) > 0 &&
lockResp.Results[0].Type != kvrpcpb.PessimisticLockKeyResultType_LockResultFailed {
// After locking the primary key, we should protect the primary lock from expiring.
c.run(c, action.LockCtx)
}
Expand Down Expand Up @@ -422,11 +465,16 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c

if len(lockResp.Results) > 0 && !isMutationFailed {
if action.LockCtx.Stats != nil {
action.LockCtx.Stats.MergeReqDetails(diagCtx.reqDuration, batch.region.GetID(), diagCtx.sender.GetStoreAddr(), lockResp.ExecDetailsV2)
action.LockCtx.Stats.MergeReqDetails(
diagCtx.reqDuration,
batch.region.GetID(),
diagCtx.sender.GetStoreAddr(),
lockResp.ExecDetailsV2,
)
}
}

locks, finished, err := action.handleKeyError(c, keyErrs)
locks, finished, err := action.handleKeyErrorForResolve(c, keyErrs)
if err != nil {
return finished, err
}
Expand Down Expand Up @@ -477,9 +525,9 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return false, nil
}

// If the failedMutations is not empty and the error is not KeyIsLocked, the function should have already
// returned before. So this is an unreachable path.
return true, errors.New("Pessimistic lock response corrupted")
// This can be the situation where KeyIsLocked errors are generated by timeout,
// and we decide not to resolve them. Instead, just retry
return false, nil
}

if len(locks) != 0 {
Expand All @@ -497,16 +545,20 @@ func (action actionPessimisticLock) handlePessimisticLockResponseForceLockMode(c
return true, nil
}

func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations) error {
func (actionPessimisticRollback) handleSingleBatch(
c *twoPhaseCommitter, bo *retry.Backoffer, batch batchMutations,
) error {
forUpdateTS := c.forUpdateTS
if c.maxLockedWithConflictTS > forUpdateTS {
forUpdateTS = c.maxLockedWithConflictTS
}
req := tikvrpc.NewRequest(tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
})
req := tikvrpc.NewRequest(
tikvrpc.CmdPessimisticRollback, &kvrpcpb.PessimisticRollbackRequest{
StartVersion: c.startTS,
ForUpdateTs: forUpdateTS,
Keys: batch.mutations.GetKeys(),
},
)
req.RequestSource = util.RequestSourceFromCtx(bo.GetCtx())
req.MaxExecutionDurationMs = uint64(client.MaxWriteExecutionTime.Milliseconds())
resp, err := c.store.SendReq(bo, req, batch.region, client.ReadTimeoutShort)
Expand All @@ -528,7 +580,10 @@ func (actionPessimisticRollback) handleSingleBatch(c *twoPhaseCommitter, bo *ret
return nil
}

func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode, mutations CommitterMutations) error {
func (c *twoPhaseCommitter) pessimisticLockMutations(
bo *retry.Backoffer, lockCtx *kv.LockCtx, lockWaitMode kvrpcpb.PessimisticLockWakeUpMode,
mutations CommitterMutations,
) error {
if c.sessionID > 0 {
if val, err := util.EvalFailpoint("beforePessimisticLock"); err == nil {
// Pass multiple instructions in one string, delimited by commas, to trigger multiple behaviors, like
Expand All @@ -537,19 +592,27 @@ func (c *twoPhaseCommitter) pessimisticLockMutations(bo *retry.Backoffer, lockCt
for _, action := range strings.Split(v, ",") {
if action == "delay" {
duration := time.Duration(rand.Int63n(int64(time.Second) * 5))
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected delay at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS), zap.Duration("duration", duration),
)
time.Sleep(duration)
} else if action == "fail" {
logutil.Logger(bo.GetCtx()).Info("[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS))
logutil.Logger(bo.GetCtx()).Info(
"[failpoint] injected failure at pessimistic lock",
zap.Uint64("txnStartTS", c.startTS),
)
return errors.New("injected failure at pessimistic lock")
}
}
}
}
}
return c.doActionOnMutations(bo, actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()}, mutations)
return c.doActionOnMutations(
bo,
actionPessimisticLock{LockCtx: lockCtx, wakeUpMode: lockWaitMode, isInternal: c.txn.isInternal()},
mutations,
)
}

func (c *twoPhaseCommitter) pessimisticRollbackMutations(bo *retry.Backoffer, mutations CommitterMutations) error {
Expand Down