From 90369b1b9bea8d471a24d46418951cd7b61676b6 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 21 Jun 2022 10:15:28 +0800 Subject: [PATCH 1/6] txn: remove `legacy.SimpleTxnContextProvider` --- executor/builder.go | 16 +- session/session.go | 10 +- sessiontxn/legacy/BUILD.bazel | 41 ----- sessiontxn/legacy/provider.go | 243 ----------------------------- sessiontxn/legacy/provider_test.go | 126 --------------- 5 files changed, 2 insertions(+), 434 deletions(-) delete mode 100644 sessiontxn/legacy/BUILD.bazel delete mode 100644 sessiontxn/legacy/provider.go delete mode 100644 sessiontxn/legacy/provider_test.go diff --git a/executor/builder.go b/executor/builder.go index d86200b573058..0aebf8ba124b9 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/legacy" "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/statistics" "github.com/pingcap/tidb/store/helper" @@ -121,26 +120,13 @@ type CTEStorages struct { } func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder { - b := &executorBuilder{ + return &executorBuilder{ ctx: ctx, is: is, Ti: ti, isStaleness: staleread.IsStmtStaleness(ctx), readReplicaScope: replicaReadScope, } - - txnManager := sessiontxn.GetTxnManager(ctx) - if provider, ok := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); ok { - provider.GetReadTSFunc = b.getReadTS - provider.GetForUpdateTSFunc = func() (uint64, error) { - if b.forUpdateTS != 0 { - return b.forUpdateTS, nil - } - return b.getReadTS() - } - } - - return b } // MockPhysicalPlan is used to return a specified executor in when build. diff --git a/session/session.go b/session/session.go index bb1f15836fd90..518131cdf53ee 100644 --- a/session/session.go +++ b/session/session.go @@ -49,7 +49,6 @@ import ( "github.com/pingcap/tidb/parser/terror" "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/legacy" "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/store/driver/txn" "github.com/pingcap/tidb/store/helper" @@ -2371,7 +2370,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ return nil, errors.Errorf("invalid CachedPrepareStmt type") } - var is infoschema.InfoSchema var snapshotTS uint64 replicaReadScope := oracle.GlobalTxnScope @@ -2383,7 +2381,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ txnManager := sessiontxn.GetTxnManager(s) if staleReadProcessor.IsStaleness() { snapshotTS = staleReadProcessor.GetStalenessReadTS() - is = staleReadProcessor.GetStalenessInfoSchema() + is := staleReadProcessor.GetStalenessInfoSchema() replicaReadScope = config.GetTxnScopeFromConfig() err = txnManager.EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{ Type: sessiontxn.EnterNewTxnWithReplaceProvider, @@ -2393,8 +2391,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ if err != nil { return nil, err } - } else { - is = s.GetInfoSchema().(infoschema.InfoSchema) } staleness := snapshotTS > 0 @@ -2410,10 +2406,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ return nil, err } - if p, isOK := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); isOK { - p.InfoSchema = is - } - if ok { rs, ok, err := s.cachedPointPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args) if err != nil { diff --git a/sessiontxn/legacy/BUILD.bazel b/sessiontxn/legacy/BUILD.bazel deleted file mode 100644 index d9e9b5249cbbf..0000000000000 --- a/sessiontxn/legacy/BUILD.bazel +++ /dev/null @@ -1,41 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "legacy", - srcs = ["provider.go"], - importpath = "github.com/pingcap/tidb/sessiontxn/legacy", - visibility = ["//visibility:public"], - deps = [ - "//domain", - "//infoschema", - "//kv", - "//parser/ast", - "//parser/terror", - "//sessionctx", - "//sessionctx/variable", - "//sessiontxn", - "//sessiontxn/staleread", - "//table/temptable", - "//util/logutil", - "@com_github_pingcap_errors//:errors", - "@com_github_tikv_client_go_v2//error", - "@org_uber_go_zap//:zap", - ], -) - -go_test( - name = "legacy_test", - srcs = ["provider_test.go"], - deps = [ - ":legacy", - "//domain", - "//kv", - "//sessionctx", - "//sessiontxn", - "//testkit", - "@com_github_pingcap_errors//:errors", - "@com_github_pingcap_kvproto//pkg/kvrpcpb", - "@com_github_stretchr_testify//require", - "@com_github_tikv_client_go_v2//error", - ], -) diff --git a/sessiontxn/legacy/provider.go b/sessiontxn/legacy/provider.go deleted file mode 100644 index a571c612f24aa..0000000000000 --- a/sessiontxn/legacy/provider.go +++ /dev/null @@ -1,243 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacy - -import ( - "context" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/infoschema" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/parser/ast" - "github.com/pingcap/tidb/parser/terror" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessionctx/variable" - "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/staleread" - "github.com/pingcap/tidb/table/temptable" - "github.com/pingcap/tidb/util/logutil" - tikverr "github.com/tikv/client-go/v2/error" - "go.uber.org/zap" -) - -// SimpleTxnContextProvider implements TxnContextProvider -// It is only used in refactor stage -// TODO: remove it after refactor finished -type SimpleTxnContextProvider struct { - Ctx context.Context - Sctx sessionctx.Context - InfoSchema infoschema.InfoSchema - GetReadTSFunc func() (uint64, error) - GetForUpdateTSFunc func() (uint64, error) - UpdateForUpdateTS func(seCtx sessionctx.Context, newForUpdateTS uint64) error - - Pessimistic bool - CausalConsistencyOnly bool - - isTxnActive bool -} - -// GetTxnInfoSchema returns the information schema used by txn -func (p *SimpleTxnContextProvider) GetTxnInfoSchema() infoschema.InfoSchema { - return p.InfoSchema -} - -// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update) -func (p *SimpleTxnContextProvider) GetStmtReadTS() (uint64, error) { - if p.GetReadTSFunc == nil { - return 0, errors.New("ReadTSFunc not set") - } - return p.GetReadTSFunc() -} - -// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update -func (p *SimpleTxnContextProvider) GetStmtForUpdateTS() (uint64, error) { - if p.GetForUpdateTSFunc == nil { - return 0, errors.New("GetForUpdateTSFunc not set") - } - return p.GetForUpdateTSFunc() -} - -// OnInitialize is the hook that should be called when enter a new txn with this provider -func (p *SimpleTxnContextProvider) OnInitialize(ctx context.Context, tp sessiontxn.EnterNewTxnType) error { - p.Ctx = ctx - sessVars := p.Sctx.GetSessionVars() - switch tp { - case sessiontxn.EnterNewTxnDefault, sessiontxn.EnterNewTxnWithBeginStmt: - shouldReuseTxn := tp == sessiontxn.EnterNewTxnWithBeginStmt && sessiontxn.CanReuseTxnWhenExplicitBegin(p.Sctx) - if !shouldReuseTxn { - if err := p.Sctx.NewTxn(ctx); err != nil { - return err - } - } - - if tp == sessiontxn.EnterNewTxnWithBeginStmt { - // With START TRANSACTION, autocommit remains disabled until you end - // the transaction with COMMIT or ROLLBACK. The autocommit mode then - // reverts to its previous state. - sessVars.SetInTxn(true) - } - - sessVars.TxnCtx.IsPessimistic = p.Pessimistic - if _, err := p.activateTxn(); err != nil { - return err - } - - if is, ok := sessVars.TxnCtx.InfoSchema.(infoschema.InfoSchema); ok { - p.InfoSchema = is - } - case sessiontxn.EnterNewTxnBeforeStmt: - p.InfoSchema = temptable.AttachLocalTemporaryTableInfoSchema(p.Sctx, domain.GetDomain(p.Sctx).InfoSchema()) - sessVars.TxnCtx = &variable.TransactionContext{ - TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{ - InfoSchema: p.InfoSchema, - CreateTime: time.Now(), - ShardStep: int(sessVars.ShardAllocateStep), - TxnScope: sessVars.CheckAndGetTxnScope(), - IsPessimistic: p.Pessimistic, - }, - } - default: - return errors.Errorf("Unsupported type: %v", tp) - } - - return nil -} - -// OnStmtStart is the hook that should be called when a new statement started -func (p *SimpleTxnContextProvider) OnStmtStart(ctx context.Context) error { - p.Ctx = ctx - p.InfoSchema = p.Sctx.GetInfoSchema().(infoschema.InfoSchema) - return nil -} - -// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error -func (p *SimpleTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) { - switch point { - case sessiontxn.StmtErrAfterPessimisticLock: - return p.handleAfterPessimisticLockError(err) - default: - return sessiontxn.NoIdea() - } -} - -func (p *SimpleTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) { - sessVars := p.Sctx.GetSessionVars() - if sessVars.IsIsolation(ast.Serializable) { - return sessiontxn.ErrorAction(lockErr) - } - - txnCtx := sessVars.TxnCtx - if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok { - if !deadlock.IsRetryable { - return sessiontxn.ErrorAction(lockErr) - } - logutil.Logger(p.Ctx).Info("single statement deadlock, retry statement", - zap.Uint64("txn", txnCtx.StartTS), - zap.Uint64("lockTS", deadlock.LockTs), - zap.Stringer("lockKey", kv.Key(deadlock.LockKey)), - zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash)) - } else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) { - errStr := lockErr.Error() - forUpdateTS := txnCtx.GetForUpdateTS() - logutil.Logger(p.Ctx).Debug("pessimistic write conflict, retry statement", - zap.Uint64("txn", txnCtx.StartTS), - zap.Uint64("forUpdateTS", forUpdateTS), - zap.String("err", errStr)) - // Always update forUpdateTS by getting a new timestamp from PD. - // If we use the conflict commitTS as the new forUpdateTS and async commit - // is used, the commitTS of this transaction may exceed the max timestamp - // that PD allocates. Then, the change may be invisible to a new transaction, - // which means linearizability is broken. - } else { - // this branch if err not nil, always update forUpdateTS to avoid problem described below - // for nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn - // the select for updateTs must be updated, otherwise there maybe rollback problem. - // begin; select for update key1(here ErrLocked or other errors(or max_execution_time like util), - // key1 lock not get and async rollback key1 is raised) - // select for update key1 again(this time lock succ(maybe lock released by others)) - // the async rollback operation rollbacked the lock just acquired - tsErr := p.UpdateForUpdateTS(p.Sctx, 0) - if tsErr != nil { - logutil.Logger(p.Ctx).Warn("UpdateForUpdateTS failed", zap.Error(tsErr)) - } - return sessiontxn.ErrorAction(lockErr) - } - - if err := p.UpdateForUpdateTS(p.Sctx, 0); err != nil { - return sessiontxn.ErrorAction(lockErr) - } - - return sessiontxn.RetryReady() -} - -// OnStmtRetry is the hook that should be called when a statement retry -func (p *SimpleTxnContextProvider) OnStmtRetry(_ context.Context) error { - return nil -} - -func (p *SimpleTxnContextProvider) prepareTSFuture() error { - if p.Sctx.GetSessionVars().SnapshotTS != 0 || staleread.IsStmtStaleness(p.Sctx) || p.Sctx.GetPreparedTSFuture() != nil { - return nil - } - - txn, err := p.Sctx.Txn(false) - if err != nil { - return err - } - - if txn.Valid() { - return nil - } - - txnScope := p.Sctx.GetSessionVars().CheckAndGetTxnScope() - future := sessiontxn.NewOracleFuture(p.Ctx, p.Sctx, txnScope) - return p.Sctx.PrepareTSFuture(p.Ctx, future, txnScope) -} - -// activateTxn actives the txn -func (p *SimpleTxnContextProvider) activateTxn() (kv.Transaction, error) { - if p.isTxnActive { - return p.Sctx.Txn(true) - } - - txn, err := p.Sctx.Txn(true) - if err != nil { - return nil, err - } - - if p.Pessimistic { - txn.SetOption(kv.Pessimistic, true) - } - - if p.CausalConsistencyOnly { - txn.SetOption(kv.GuaranteeLinearizability, false) - } - - p.isTxnActive = true - return txn, nil -} - -// AdviseWarmup provides warmup for inner state -func (p *SimpleTxnContextProvider) AdviseWarmup() error { - return p.prepareTSFuture() -} - -// AdviseOptimizeWithPlan providers optimization according to the plan -func (p *SimpleTxnContextProvider) AdviseOptimizeWithPlan(_ interface{}) error { - return nil -} diff --git a/sessiontxn/legacy/provider_test.go b/sessiontxn/legacy/provider_test.go deleted file mode 100644 index 22aa042632182..0000000000000 --- a/sessiontxn/legacy/provider_test.go +++ /dev/null @@ -1,126 +0,0 @@ -// Copyright 2022 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package legacy_test - -import ( - "context" - "testing" - - "github.com/pingcap/errors" - "github.com/pingcap/kvproto/pkg/kvrpcpb" - "github.com/pingcap/tidb/domain" - "github.com/pingcap/tidb/kv" - "github.com/pingcap/tidb/sessionctx" - "github.com/pingcap/tidb/sessiontxn" - "github.com/pingcap/tidb/sessiontxn/legacy" - "github.com/pingcap/tidb/testkit" - "github.com/stretchr/testify/require" - tikverr "github.com/tikv/client-go/v2/error" -) - -func TestErrorHandle(t *testing.T) { - store, do, clean := testkit.CreateMockStoreAndDomain(t) - defer clean() - - tk := testkit.NewTestKit(t, store) - - provider := newSimpleProvider(tk, do) - require.NoError(t, provider.OnStmtStart(context.TODO())) - expectedForUpdateTS := getForUpdateTS(t, provider) - - var lockErr error - - // StmtErrAfterLock: ErrWriteConflict should retry and update forUpdateTS - lockErr = kv.ErrWriteConflict - action, err := provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) - require.Equal(t, sessiontxn.StmtActionRetryReady, action) - require.Nil(t, err) - expectedForUpdateTS += 1 - require.Equal(t, expectedForUpdateTS, getForUpdateTS(t, provider)) - - // StmtErrAfterLock: DeadLock that is not retryable will just return an error - lockErr = newDeadLockError(false) - action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) - require.Equal(t, sessiontxn.StmtActionError, action) - require.Equal(t, lockErr, err) - require.Equal(t, expectedForUpdateTS, getForUpdateTS(t, provider)) - - // StmtErrAfterLock: DeadLock that is retryable should retry and update forUpdateTS - lockErr = newDeadLockError(true) - action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) - require.Equal(t, sessiontxn.StmtActionRetryReady, action) - require.Nil(t, err) - expectedForUpdateTS += 1 - require.Equal(t, expectedForUpdateTS, getForUpdateTS(t, provider)) - - // StmtErrAfterLock: other errors should only update forUpdateTS but not retry - lockErr = errors.New("other error") - action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterPessimisticLock, lockErr) - require.Equal(t, sessiontxn.StmtActionError, action) - require.Equal(t, lockErr, err) - expectedForUpdateTS += 1 - require.Equal(t, expectedForUpdateTS, getForUpdateTS(t, provider)) - - // StmtErrAfterQuery: always not retry - lockErr = kv.ErrWriteConflict - action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterQuery, lockErr) - require.Equal(t, sessiontxn.StmtActionNoIdea, action) - require.Nil(t, err) - - tk.Session().GetSessionVars().StmtCtx.RCCheckTS = true - require.NoError(t, provider.OnStmtStart(context.TODO())) - action, err = provider.OnStmtErrorForNextAction(sessiontxn.StmtErrAfterQuery, lockErr) - require.Equal(t, sessiontxn.StmtActionNoIdea, action) - require.Nil(t, err) -} - -func getForUpdateTS(t *testing.T, provider *legacy.SimpleTxnContextProvider) uint64 { - forUpdateTS, err := provider.GetStmtForUpdateTS() - require.NoError(t, err) - return forUpdateTS -} - -func newDeadLockError(isRetryable bool) error { - return &tikverr.ErrDeadlock{ - Deadlock: &kvrpcpb.Deadlock{}, - IsRetryable: isRetryable, - } -} - -func newSimpleProvider(tk *testkit.TestKit, do *domain.Domain) *legacy.SimpleTxnContextProvider { - tk.MustExec("begin pessimistic") - readTS := uint64(1) - forUpdateTS := uint64(1) - return &legacy.SimpleTxnContextProvider{ - Ctx: context.TODO(), - Sctx: tk.Session(), - InfoSchema: do.InfoSchema(), - GetReadTSFunc: func() (uint64, error) { - return readTS, nil - }, - GetForUpdateTSFunc: func() (uint64, error) { - return forUpdateTS, nil - }, - UpdateForUpdateTS: func(seCtx sessionctx.Context, newForUpdateTS uint64) error { - if newForUpdateTS == 0 { - forUpdateTS += 1 - } else { - forUpdateTS = newForUpdateTS - } - return nil - }, - Pessimistic: true, - } -} From 255446f34d9322a0464819c251aebe85e6dd884d Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 23 Jun 2022 10:51:50 +0800 Subject: [PATCH 2/6] update --- executor/BUILD.bazel | 1 - session/BUILD.bazel | 1 - 2 files changed, 2 deletions(-) diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 295b9e07c1f3d..aaad809f192b3 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -132,7 +132,6 @@ go_library( "//sessionctx/stmtctx", "//sessionctx/variable", "//sessiontxn", - "//sessiontxn/legacy", "//sessiontxn/staleread", "//statistics", "//statistics/handle", diff --git a/session/BUILD.bazel b/session/BUILD.bazel index e6cfe0f898682..cb33aadb57274 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -49,7 +49,6 @@ go_library( "//sessionctx/variable", "//sessiontxn", "//sessiontxn/isolation", - "//sessiontxn/legacy", "//sessiontxn/staleread", "//statistics", "//statistics/handle", From cc3cc8518f7c5222ba7cda27a7648988f6d0f6da Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 23 Jun 2022 10:55:57 +0800 Subject: [PATCH 3/6] remove some in executor --- executor/builder.go | 56 ++++----------------------------------------- 1 file changed, 5 insertions(+), 51 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index aa14922972856..d586233f5d44f 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -87,14 +87,11 @@ var ( // executorBuilder builds an Executor from a Plan. // The InfoSchema must not change during execution. type executorBuilder struct { - ctx sessionctx.Context - is infoschema.InfoSchema - snapshotTS uint64 // The ts for snapshot-read. A select statement without for update will use this ts - forUpdateTS uint64 // The ts should be used by insert/update/delete/select-for-update statement - snapshotTSCached bool - err error // err is set when there is error happened during Executor building process. - hasLock bool - Ti *TelemetryInfo + ctx sessionctx.Context + is infoschema.InfoSchema + err error // err is set when there is error happened during Executor building process. + hasLock bool + Ti *TelemetryInfo // isStaleness means whether this statement use stale read. isStaleness bool readReplicaScope string @@ -646,8 +643,6 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor if b.err = b.updateForUpdateTSIfNeeded(v.Children()[0]); b.err != nil { return nil } - // Build 'select for update' using the 'for update' ts. - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() src := b.build(v.Children()[0]) if b.err != nil { @@ -858,7 +853,6 @@ func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { return nil } } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selectExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -1564,44 +1558,6 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) { return txnManager.GetStmtReadTS() } -// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level -// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return -// the current timestamp in RC isolation -func (b *executorBuilder) getReadTS() (uint64, error) { - failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() { - // after refactoring stale read will use its own context provider - staleread.AssertStmtStaleness(b.ctx, false) - }) - - if b.snapshotTSCached { - return b.snapshotTS, nil - } - - if snapshotTS := b.ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 { - b.snapshotTS = snapshotTS - b.snapshotTSCached = true - return snapshotTS, nil - } - - if b.snapshotTS != 0 { - b.snapshotTSCached = true - // Return the cached value. - return b.snapshotTS, nil - } - - txn, err := b.ctx.Txn(true) - if err != nil { - return 0, err - } - - b.snapshotTS = txn.StartTS() - if b.snapshotTS == 0 { - return 0, errors.Trace(ErrGetStartTS) - } - b.snapshotTSCached = true - return b.snapshotTS, nil -} - func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor { switch v.DBName.L { case util.MetricSchemaName.L: @@ -2103,7 +2059,6 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil { return nil } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2160,7 +2115,6 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { if b.err = b.updateForUpdateTSIfNeeded(v.SelectPlan); b.err != nil { return nil } - b.forUpdateTS = b.ctx.GetSessionVars().TxnCtx.GetForUpdateTS() selExec := b.build(v.SelectPlan) if b.err != nil { return nil From ad6ad39fc6ddad729087207be1a401683ea78235 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Thu, 23 Jun 2022 10:59:52 +0800 Subject: [PATCH 4/6] update --- executor/stale_txn_test.go | 2 -- sessiontxn/txn_context_test.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index 62255326cbce0..51e50b670ade2 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -34,10 +34,8 @@ import ( func enableStaleReadCommonFailPoint(t *testing.T) func() { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return")) return func() { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan")) } } diff --git a/sessiontxn/txn_context_test.go b/sessiontxn/txn_context_test.go index 8ac6f0aaa7322..6cdc7e514bac2 100644 --- a/sessiontxn/txn_context_test.go +++ b/sessiontxn/txn_context_test.go @@ -51,7 +51,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return")) - require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError", "return")) require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return")) @@ -84,7 +83,6 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) { require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder")) - require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookBeforeFirstRunExecutor")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/hookAfterOnStmtRetryWithLockError")) require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt")) From c672f4fea419bdd29dbd64a22c8f9065e38ca287 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 28 Jun 2022 11:56:51 +0800 Subject: [PATCH 5/6] update --- executor/builder.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 5df0078280c3f..b26a9beba4947 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -640,7 +640,7 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor defer func() { b.inSelectLockStmt = false }() } b.hasLock = true - if b.err = b.updateForUpdateTSIfNeeded(); b.err != nil { + if b.err = b.updateForUpdateTS(); b.err != nil { return nil } @@ -846,7 +846,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor { func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor { b.inInsertStmt = true - if b.err = b.updateForUpdateTSIfNeeded(); b.err != nil { + if b.err = b.updateForUpdateTS(); b.err != nil { return nil } @@ -2055,9 +2055,10 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor { } } } - if b.err = b.updateForUpdateTSIfNeeded(); b.err != nil { + if b.err = b.updateForUpdateTS(); b.err != nil { return nil } + selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2112,9 +2113,10 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { tblID2table[info.TblID], _ = b.is.TableByID(info.TblID) } - if b.err = b.updateForUpdateTSIfNeeded(); b.err != nil { + if b.err = b.updateForUpdateTS(); b.err != nil { return nil } + selExec := b.build(v.SelectPlan) if b.err != nil { return nil @@ -2130,8 +2132,8 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor { return deleteExec } -func (b *executorBuilder) updateForUpdateTSIfNeeded() error { - // GetStmtForUpdateTS will auto update the for update ts if necessary +func (b *executorBuilder) updateForUpdateTS() error { + // GetStmtForUpdateTS will auto update the for update ts if it is necessary _, err := sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS() return err } From c1d6853e95f79fd614dd91f09033b6a6bbb36789 Mon Sep 17 00:00:00 2001 From: Chao Wang Date: Tue, 28 Jun 2022 11:58:11 +0800 Subject: [PATCH 6/6] bazel --- ddl/BUILD.bazel | 1 + sessionctx/sessionstates/BUILD.bazel | 1 + sessionctx/stmtctx/BUILD.bazel | 4 ++++ sessiontxn/isolation/BUILD.bazel | 1 + tests/realtikvtest/sessiontest/BUILD.bazel | 1 + 5 files changed, 8 insertions(+) diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index 324d39bc469ec..807f65715202b 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -143,6 +143,7 @@ go_test( "integration_test.go", "main_test.go", "modify_column_test.go", + "multi_schema_change_test.go", "options_test.go", "partition_test.go", "placement_policy_ddl_test.go", diff --git a/sessionctx/sessionstates/BUILD.bazel b/sessionctx/sessionstates/BUILD.bazel index ba5cb9254f9f8..0b0d2c6d1c6e8 100644 --- a/sessionctx/sessionstates/BUILD.bazel +++ b/sessionctx/sessionstates/BUILD.bazel @@ -7,6 +7,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//parser/types", + "//sessionctx/stmtctx", "//types", ], ) diff --git a/sessionctx/stmtctx/BUILD.bazel b/sessionctx/stmtctx/BUILD.bazel index 168b86f32cf62..c59196a17766c 100644 --- a/sessionctx/stmtctx/BUILD.bazel +++ b/sessionctx/stmtctx/BUILD.bazel @@ -10,12 +10,14 @@ go_library( "//parser/ast", "//parser/model", "//parser/mysql", + "//parser/terror", "//util/disk", "//util/execdetails", "//util/memory", "//util/resourcegrouptag", "//util/topsql/stmtstats", "//util/tracing", + "@com_github_pingcap_errors//:errors", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", "@org_uber_go_atomic//:atomic", @@ -32,9 +34,11 @@ go_test( embed = [":stmtctx"], deps = [ "//kv", + "//sessionctx/variable", "//testkit", "//testkit/testsetup", "//util/execdetails", + "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//util", "@org_uber_go_goleak//:goleak", diff --git a/sessiontxn/isolation/BUILD.bazel b/sessiontxn/isolation/BUILD.bazel index 35fb911f4b636..1345c09f08b21 100644 --- a/sessiontxn/isolation/BUILD.bazel +++ b/sessiontxn/isolation/BUILD.bazel @@ -53,6 +53,7 @@ go_test( "//testkit", "//testkit/testsetup", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//error", diff --git a/tests/realtikvtest/sessiontest/BUILD.bazel b/tests/realtikvtest/sessiontest/BUILD.bazel index 95c8ae77a9646..b0e5afb4dc0fd 100644 --- a/tests/realtikvtest/sessiontest/BUILD.bazel +++ b/tests/realtikvtest/sessiontest/BUILD.bazel @@ -20,6 +20,7 @@ go_test( "//kv", "//meta/autoid", "//parser", + "//parser/ast", "//parser/auth", "//parser/format", "//parser/model",