Skip to content

Commit

Permalink
txn: remove legacy.SimpleTxnContextProvider (#35667)
Browse files Browse the repository at this point in the history
close #35666
  • Loading branch information
lcwangchao authored Jun 29, 2022
1 parent 2874911 commit f0d5f6e
Show file tree
Hide file tree
Showing 12 changed files with 23 additions and 494 deletions.
1 change: 0 additions & 1 deletion executor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ go_library(
"//sessionctx/stmtctx",
"//sessionctx/variable",
"//sessiontxn",
"//sessiontxn/legacy",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
Expand Down
85 changes: 16 additions & 69 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -88,14 +87,11 @@ var (
// executorBuilder builds an Executor from a Plan.
// The InfoSchema must not change during execution.
type executorBuilder struct {
ctx sessionctx.Context
is infoschema.InfoSchema
snapshotTS uint64 // The ts for snapshot-read. A select statement without for update will use this ts
forUpdateTS uint64 // The ts should be used by insert/update/delete/select-for-update statement
snapshotTSCached bool
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
ctx sessionctx.Context
is infoschema.InfoSchema
err error // err is set when there is error happened during Executor building process.
hasLock bool
Ti *TelemetryInfo
// isStaleness means whether this statement use stale read.
isStaleness bool
readReplicaScope string
Expand All @@ -121,26 +117,13 @@ type CTEStorages struct {
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, replicaReadScope string) *executorBuilder {
b := &executorBuilder{
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
isStaleness: staleread.IsStmtStaleness(ctx),
readReplicaScope: replicaReadScope,
}

txnManager := sessiontxn.GetTxnManager(ctx)
if provider, ok := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); ok {
provider.GetReadTSFunc = b.getReadTS
provider.GetForUpdateTSFunc = func() (uint64, error) {
if b.forUpdateTS != 0 {
return b.forUpdateTS, nil
}
return b.getReadTS()
}
}

return b
}

// MockPhysicalPlan is used to return a specified executor in when build.
Expand Down Expand Up @@ -657,9 +640,7 @@ func (b *executorBuilder) buildSelectLock(v *plannercore.PhysicalLock) Executor
defer func() { b.inSelectLockStmt = false }()
}
b.hasLock = true

// Build 'select for update' using the 'for update' ts.
if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -865,8 +846,7 @@ func (b *executorBuilder) buildSetConfig(v *plannercore.SetConfig) Executor {

func (b *executorBuilder) buildInsert(v *plannercore.Insert) Executor {
b.inInsertStmt = true

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -1581,44 +1561,6 @@ func (b *executorBuilder) getSnapshotTS() (uint64, error) {
return txnManager.GetStmtReadTS()
}

// getReadTS returns the ts used by select (without for-update clause). The return value is affected by the isolation level
// and some stale/historical read contexts. For example, it will return txn.StartTS in RR and return
// the current timestamp in RC isolation
func (b *executorBuilder) getReadTS() (uint64, error) {
failpoint.Inject("assertNotStaleReadForExecutorGetReadTS", func() {
// after refactoring stale read will use its own context provider
staleread.AssertStmtStaleness(b.ctx, false)
})

if b.snapshotTSCached {
return b.snapshotTS, nil
}

if snapshotTS := b.ctx.GetSessionVars().SnapshotTS; snapshotTS != 0 {
b.snapshotTS = snapshotTS
b.snapshotTSCached = true
return snapshotTS, nil
}

if b.snapshotTS != 0 {
b.snapshotTSCached = true
// Return the cached value.
return b.snapshotTS, nil
}

txn, err := b.ctx.Txn(true)
if err != nil {
return 0, err
}

b.snapshotTS = txn.StartTS()
if b.snapshotTS == 0 {
return 0, errors.Trace(ErrGetStartTS)
}
b.snapshotTSCached = true
return b.snapshotTS, nil
}

func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executor {
switch v.DBName.L {
case util.MetricSchemaName.L:
Expand Down Expand Up @@ -2119,8 +2061,7 @@ func (b *executorBuilder) buildUpdate(v *plannercore.Update) Executor {
}
}
}

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand Down Expand Up @@ -2178,7 +2119,7 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
tblID2table[info.TblID], _ = b.is.TableByID(info.TblID)
}

if b.forUpdateTS, b.err = b.getSnapshotTS(); b.err != nil {
if b.err = b.updateForUpdateTS(); b.err != nil {
return nil
}

Expand All @@ -2197,6 +2138,12 @@ func (b *executorBuilder) buildDelete(v *plannercore.Delete) Executor {
return deleteExec
}

func (b *executorBuilder) updateForUpdateTS() error {
// GetStmtForUpdateTS will auto update the for update ts if it is necessary
_, err := sessiontxn.GetTxnManager(b.ctx).GetStmtForUpdateTS()
return err
}

func (b *executorBuilder) buildAnalyzeIndexPushdown(task plannercore.AnalyzeIndexTask, opts map[ast.AnalyzeOptionType]uint64, autoAnalyze string) *analyzeTask {
job := &statistics.AnalyzeJob{DBName: task.DBName, TableName: task.TableName, PartitionName: task.PartitionName, JobInfo: autoAnalyze + "analyze index " + task.IndexInfo.Name.O}
_, offset := timeutil.Zone(b.ctx.GetSessionVars().Location())
Expand Down
2 changes: 0 additions & 2 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ import (
func enableStaleReadCommonFailPoint(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS", "return"))
return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertNotStaleReadForExecutorGetReadTS"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan"))
}
}
Expand Down
1 change: 0 additions & 1 deletion session/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ go_library(
"//sessionctx/variable",
"//sessiontxn",
"//sessiontxn/isolation",
"//sessiontxn/legacy",
"//sessiontxn/staleread",
"//statistics",
"//statistics/handle",
Expand Down
10 changes: 1 addition & 9 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ import (
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/sessionctx/sessionstates"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/sessiontxn/legacy"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/helper"
Expand Down Expand Up @@ -2388,7 +2387,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}

var is infoschema.InfoSchema
var snapshotTS uint64
replicaReadScope := oracle.GlobalTxnScope

Expand All @@ -2400,7 +2398,7 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
txnManager := sessiontxn.GetTxnManager(s)
if staleReadProcessor.IsStaleness() {
snapshotTS = staleReadProcessor.GetStalenessReadTS()
is = staleReadProcessor.GetStalenessInfoSchema()
is := staleReadProcessor.GetStalenessInfoSchema()
replicaReadScope = config.GetTxnScopeFromConfig()
err = txnManager.EnterNewTxn(ctx, &sessiontxn.EnterNewTxnRequest{
Type: sessiontxn.EnterNewTxnWithReplaceProvider,
Expand All @@ -2410,8 +2408,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
if err != nil {
return nil, err
}
} else {
is = s.GetInfoSchema().(infoschema.InfoSchema)
}

staleness := snapshotTS > 0
Expand All @@ -2427,10 +2423,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
return nil, err
}

if p, isOK := txnManager.GetContextProvider().(*legacy.SimpleTxnContextProvider); isOK {
p.InfoSchema = is
}

if ok {
rs, ok, err := s.cachedPointPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args)
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions sessionctx/sessionstates/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//parser/types",
"//sessionctx/stmtctx",
"//types",
],
)
Expand Down
4 changes: 4 additions & 0 deletions sessionctx/stmtctx/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ go_library(
"//parser/ast",
"//parser/model",
"//parser/mysql",
"//parser/terror",
"//util/disk",
"//util/execdetails",
"//util/memory",
"//util/resourcegrouptag",
"//util/topsql/stmtstats",
"//util/tracing",
"@com_github_pingcap_errors//:errors",
"@com_github_tikv_client_go_v2//tikvrpc",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_atomic//:atomic",
Expand All @@ -32,9 +34,11 @@ go_test(
embed = [":stmtctx"],
deps = [
"//kv",
"//sessionctx/variable",
"//testkit",
"//testkit/testsetup",
"//util/execdetails",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_goleak//:goleak",
Expand Down
1 change: 1 addition & 0 deletions sessiontxn/isolation/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ go_test(
"//testkit",
"//testkit/testsetup",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//error",
Expand Down
41 changes: 0 additions & 41 deletions sessiontxn/legacy/BUILD.bazel

This file was deleted.

Loading

0 comments on commit f0d5f6e

Please sign in to comment.