Skip to content

Commit

Permalink
*: Remove some useless staleness code (#35849)
Browse files Browse the repository at this point in the history
close #35848
  • Loading branch information
lcwangchao authored Jun 30, 2022
1 parent 86cd94a commit 4887126
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 188 deletions.
33 changes: 0 additions & 33 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package executor
import (
"bytes"
"context"
"fmt"
"math"
"sort"
"strconv"
Expand Down Expand Up @@ -727,38 +726,6 @@ func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
outputNames: v.OutputNames(),
}

failpoint.Inject("assertStaleReadValuesSameWithExecuteAndBuilder", func() {
// This fail point is used to assert the behavior after refactoring is exactly the same with the previous implement.
// Some variables in `plannercore.Execute` is deprecated and only be used for asserting now.
if b.isStaleness != v.IsStaleness {
panic(fmt.Sprintf("%v != %v", b.isStaleness, v.IsStaleness))
}

if b.readReplicaScope != v.ReadReplicaScope {
panic(fmt.Sprintf("%s != %s", b.readReplicaScope, v.ReadReplicaScope))
}

if v.SnapshotTS != 0 {
is, err := domain.GetDomain(b.ctx).GetSnapshotInfoSchema(v.SnapshotTS)
if err != nil {
panic(err)
}

if b.is.SchemaMetaVersion() != is.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", b.is.SchemaMetaVersion(), is.SchemaMetaVersion()))
}

ts, err := sessiontxn.GetTxnManager(b.ctx).GetStmtReadTS()
if err != nil {
panic(e)
}

if v.SnapshotTS != ts {
panic(fmt.Sprintf("%d != %d", ts, v.SnapshotTS))
}
}
})

failpoint.Inject("assertExecutePrepareStatementStalenessOption", func(val failpoint.Value) {
vs := strings.Split(val.(string), "_")
assertTS, assertTxnScope := vs[0], vs[1]
Expand Down
56 changes: 0 additions & 56 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,7 @@ import (
"github.com/tikv/client-go/v2/oracle"
)

func enableStaleReadCommonFailPoint(t *testing.T) func() {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return"))
return func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan"))
}
}

func TestExactStalenessTransaction(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()
testcases := []struct {
name string
preSQL string
Expand Down Expand Up @@ -106,9 +95,6 @@ func TestExactStalenessTransaction(t *testing.T) {
}

func TestSelectAsOf(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -264,9 +250,6 @@ func TestSelectAsOf(t *testing.T) {
}

func TestStaleReadKVRequest(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -362,9 +345,6 @@ func TestStaleReadKVRequest(t *testing.T) {
}

func TestStalenessAndHistoryRead(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -449,9 +429,6 @@ func TestStalenessAndHistoryRead(t *testing.T) {
}

func TestTimeBoundedStalenessTxn(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -548,9 +525,6 @@ func TestStalenessTransactionSchemaVer(t *testing.T) {
}

func TestSetTransactionReadOnlyAsOf(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

t1, err := time.Parse(types.TimeFormat, "2016-09-21 09:53:04")
require.NoError(t, err)
store, clean := testkit.CreateMockStore(t)
Expand Down Expand Up @@ -618,9 +592,6 @@ func TestSetTransactionReadOnlyAsOf(t *testing.T) {
}

func TestValidateReadOnlyInStalenessTransaction(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

errMsg1 := ".*only support read-only statement during read-only staleness transactions.*"
errMsg2 := ".*select lock hasn't been supported in stale read yet.*"
testcases := []struct {
Expand Down Expand Up @@ -800,9 +771,6 @@ func TestValidateReadOnlyInStalenessTransaction(t *testing.T) {
}

func TestSpecialSQLInStalenessTxn(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -858,9 +826,6 @@ func TestSpecialSQLInStalenessTxn(t *testing.T) {
}

func TestAsOfTimestampCompatibility(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -918,9 +883,6 @@ func TestAsOfTimestampCompatibility(t *testing.T) {
}

func TestSetTransactionInfoSchema(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -962,9 +924,6 @@ func TestSetTransactionInfoSchema(t *testing.T) {
}

func TestStaleSelect(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1049,9 +1008,6 @@ func TestStaleReadFutureTime(t *testing.T) {
}

func TestStaleReadPrepare(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1108,9 +1064,6 @@ func TestStaleReadPrepare(t *testing.T) {
}

func TestStmtCtxStaleFlag(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1206,9 +1159,6 @@ func TestStmtCtxStaleFlag(t *testing.T) {
}

func TestStaleSessionQuery(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1249,9 +1199,6 @@ func TestStaleSessionQuery(t *testing.T) {
}

func TestStaleReadCompatibility(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -1298,9 +1245,6 @@ func TestStaleReadCompatibility(t *testing.T) {
}

func TestStaleReadNoExtraTSORequest(t *testing.T) {
disableCommonFailPoint := enableStaleReadCommonFailPoint(t)
defer disableCommonFailPoint()

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
Expand Down
99 changes: 4 additions & 95 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ import (
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -38,7 +36,6 @@ import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
Expand All @@ -52,7 +49,6 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/texttree"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -194,15 +190,9 @@ type Execute struct {
TxtProtoVars []expression.Expression // parsed variables under text protocol
BinProtoVars []types.Datum // parsed variables under binary protocol
ExecID uint32
// Deprecated: SnapshotTS now is only used for asserting after refactoring stale read, it will be removed later.
SnapshotTS uint64
// Deprecated: IsStaleness now is only used for asserting after refactoring stale read, it will be removed later.
IsStaleness bool
// Deprecated: ReadReplicaScope now is only used for asserting after refactoring stale read, it will be removed later.
ReadReplicaScope string
Stmt ast.StmtNode
StmtType string
Plan Plan
Stmt ast.StmtNode
StmtType string
Plan Plan
}

// Check if result of GetVar expr is BinaryLiteral
Expand Down Expand Up @@ -274,29 +264,6 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
}

// Just setting `e.SnapshotTS`, `e.ReadReplicaScope` and `e.IsStaleness` with the return value of `handleExecuteBuilderOption`
// for asserting the stale read context after refactoring is exactly the same with the previous logic.
snapshotTS, readReplicaScope, isStaleness, err := handleExecuteBuilderOption(sctx, preparedObj)
if err != nil {
return err
}
e.SnapshotTS = snapshotTS
e.ReadReplicaScope = readReplicaScope
e.IsStaleness = isStaleness

failpoint.Inject("assertStaleReadForOptimizePreparedPlan", func() {
staleread.AssertStmtStaleness(sctx, isStaleness)
if isStaleness {
is2, err := domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
panic(err)
}

if is.SchemaMetaVersion() != is2.SchemaMetaVersion() {
panic(fmt.Sprintf("%d != %d", is.SchemaMetaVersion(), is2.SchemaMetaVersion()))
}
}
})
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
Expand Down Expand Up @@ -328,72 +295,14 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
prepared.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
err = e.getPhysicalPlan(ctx, sctx, is, preparedObj)
err := e.getPhysicalPlan(ctx, sctx, is, preparedObj)
if err != nil {
return err
}
e.Stmt = prepared.Stmt
return nil
}

// Deprecated: it will be removed later. Now it is only used for asserting
func handleExecuteBuilderOption(sctx sessionctx.Context,
preparedObj *CachedPrepareStmt) (snapshotTS uint64, readReplicaScope string, isStaleness bool, err error) {
snapshotTS = 0
readReplicaScope = oracle.GlobalTxnScope
isStaleness = false
err = nil
vars := sctx.GetSessionVars()
readTS := vars.TxnReadTS.PeakTxnReadTS()
if readTS > 0 {
// It means we meet following case:
// 1. prepare p from 'select * from t as of timestamp now() - x seconds'
// 1. set transaction read only as of timestamp ts2
// 2. execute prepare p
// The execute statement would be refused due to timestamp conflict
if preparedObj.SnapshotTSEvaluator != nil {
err = ErrAsOf.FastGenWithCause("as of timestamp can't be set after set transaction read only as of.")
return
}
snapshotTS = vars.TxnReadTS.UseTxnReadTS()
isStaleness = true
readReplicaScope = config.GetTxnScopeFromConfig()
return
}
// It means we meet following case:
// 1. prepare p from 'select * from t as of timestamp ts1'
// 1. begin
// 2. execute prepare p
// The execute statement would be refused due to timestamp conflict
if preparedObj.SnapshotTSEvaluator != nil {
if vars.InTxn() {
err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.")
return
}
// if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL:
// which means its infoschema is specified by the SQL, not the current/latest infoschema
snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx)
if err != nil {
err = errors.Trace(err)
return
}
isStaleness = true
readReplicaScope = config.GetTxnScopeFromConfig()
return
}
// It means we meet following case:
// 1. prepare p from 'select * from t'
// 1. start transaction read only as of timestamp ts1
// 2. execute prepare p
if vars.InTxn() && vars.TxnCtx.IsStaleness {
isStaleness = true
snapshotTS = vars.TxnCtx.StartTS
readReplicaScope = vars.TxnCtx.TxnScope
return
}
return
}

func (e *Execute) checkPreparedPriv(ctx context.Context, sctx sessionctx.Context,
preparedObj *CachedPrepareStmt, is infoschema.InfoSchema) error {
if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
Expand Down
4 changes: 0 additions & 4 deletions sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,10 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterBuildExecutor", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/assertTxnManagerForUpdateTSEqual", "return"))
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan", "return"))

store, do, clean := testkit.CreateMockStoreAndDomain(t)

Expand All @@ -81,12 +79,10 @@ func setupTxnContextTest(t *testing.T) (kv.Storage, *domain.Domain, func()) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterBuildExecutor"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerAfterPessimisticLockErrorRetry"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertTxnManagerInShortPointGetPlan"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/assertStaleReadValuesSameWithExecuteAndBuilder"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInRunStmt"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInPreparedStmtExec"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerInCachedPlanExec"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/session/assertTxnManagerForUpdateTSEqual"))
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/planner/core/assertStaleReadForOptimizePreparedPlan"))

tk.Session().SetValue(sessiontxn.AssertRecordsKey, nil)
tk.Session().SetValue(sessiontxn.AssertTxnInfoSchemaKey, nil)
Expand Down

0 comments on commit 4887126

Please sign in to comment.