From 9189ec66ac47c956c3ea55a98ed97507e4e45d07 Mon Sep 17 00:00:00 2001 From: xhe Date: Fri, 11 Jun 2021 19:32:35 +0800 Subject: [PATCH] *: stale reads compatible with prepare (#25156) --- executor/adapter.go | 12 ++-- executor/benchmark_test.go | 10 ++-- executor/builder.go | 14 +++-- executor/compiler.go | 2 +- executor/coprocessor.go | 2 +- executor/executor_required_rows_test.go | 2 +- executor/prepared.go | 15 ++--- executor/seqtest/prepared_test.go | 4 +- executor/stale_txn_test.go | 77 +++++++++++++++++++++++++ planner/core/cache.go | 20 ++++--- planner/core/common_plans.go | 18 +++++- planner/core/preprocess.go | 72 +++++++++++++---------- session/session.go | 46 ++++++++++----- 13 files changed, 213 insertions(+), 81 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 7f2cc3e72bd06..55e135093782e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -245,7 +245,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec } } if a.PsStmt.Executor == nil { - b := newExecutorBuilder(a.Ctx, is, a.Ti) + b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.ExplicitStaleness) newExecutor := b.build(a.Plan) if b.err != nil { return nil, b.err @@ -289,7 +289,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) { return 0, err } a.InfoSchema = ret.InfoSchema - a.SnapshotTS = ret.SnapshotTS + a.SnapshotTS = ret.LastSnapshotTS a.ExplicitStaleness = ret.ExplicitStaleness p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema) if err != nil { @@ -336,7 +336,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if n, ok := val.(int); ok { startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000 if n != int(startTS) { - panic("different tso") + panic(fmt.Sprintf("different tso %d != %d", n, startTS)) } failpoint.Return() } @@ -346,7 +346,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { // Convert to seconds startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000 if int(startTS) <= n-1 || n+1 <= int(startTS) { - panic("tso violate tolerance") + panic(fmt.Sprintf("different tso %d != %d", n, startTS)) } failpoint.Return() } @@ -792,9 +792,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) { ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow } - b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti) - b.snapshotTS = a.SnapshotTS - b.explicitStaleness = a.ExplicitStaleness + b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness) e := b.build(a.Plan) if b.err != nil { return nil, errors.Trace(b.err) diff --git a/executor/benchmark_test.go b/executor/benchmark_test.go index 8e19d39ddb23d..4cd7bf09fcee9 100644 --- a/executor/benchmark_test.go +++ b/executor/benchmark_test.go @@ -290,7 +290,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi plan.SetSchema(schema) plan.Init(ctx, nil, 0) plan.SetChildren(nil) - b := newExecutorBuilder(ctx, nil, nil) + b := newExecutorBuilder(ctx, nil, nil, 0, false) exec := b.build(plan) hashAgg := exec.(*HashAggExec) hashAgg.children[0] = src @@ -342,7 +342,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex plan = sg } - b := newExecutorBuilder(ctx, nil, nil) + b := newExecutorBuilder(ctx, nil, nil, 0, false) return b.build(plan) } @@ -575,7 +575,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f plan = win } - b := newExecutorBuilder(ctx, nil, nil) + b := newExecutorBuilder(ctx, nil, nil, 0, false) exec := b.build(plan) return exec } @@ -1322,7 +1322,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource, hashCols: tc.outerHashKeyIdx, }, innerCtx: innerCtx{ - readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)}, + readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)}, rowTypes: rightTypes, colLens: colLens, keyCols: tc.innerJoinKeyIdx, @@ -1388,7 +1388,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne compareFuncs: outerCompareFuncs, }, innerMergeCtx: innerMergeCtx{ - readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)}, + readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)}, rowTypes: rightTypes, joinKeys: innerJoinKeys, colLens: colLens, diff --git a/executor/builder.go b/executor/builder.go index 70f76120d5dcf..6fe12ae08eb18 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -95,11 +95,13 @@ type CTEStorages struct { IterInTbl cteutil.Storage } -func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder { +func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, explicitStaleness bool) *executorBuilder { return &executorBuilder{ - ctx: ctx, - is: is, - Ti: ti, + ctx: ctx, + is: is, + Ti: ti, + snapshotTS: snapshotTS, + explicitStaleness: explicitStaleness, } } @@ -663,6 +665,10 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor { } func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor { + b.snapshotTS = v.SnapshotTS + if b.snapshotTS != 0 { + b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS) + } e := &ExecuteExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), is: b.is, diff --git a/executor/compiler.go b/executor/compiler.go index c763c43067047..18b5b32979296 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -70,7 +70,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm } return &ExecStmt{ GoCtx: ctx, - SnapshotTS: ret.SnapshotTS, + SnapshotTS: ret.LastSnapshotTS, ExplicitStaleness: ret.ExplicitStaleness, InfoSchema: ret.InfoSchema, Plan: finalPlan, diff --git a/executor/coprocessor.go b/executor/coprocessor.go index 04b90af2ad63b..5eebd868cfb34 100644 --- a/executor/coprocessor.go +++ b/executor/coprocessor.go @@ -168,7 +168,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec } plan = core.InjectExtraProjection(plan) // Build executor. - b := newExecutorBuilder(h.sctx, is, nil) + b := newExecutorBuilder(h.sctx, is, nil, 0, false) return b.build(plan), nil } diff --git a/executor/executor_required_rows_test.go b/executor/executor_required_rows_test.go index a0eb15ca77860..bb2766189ed52 100644 --- a/executor/executor_required_rows_test.go +++ b/executor/executor_required_rows_test.go @@ -843,7 +843,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i])) } - b := newExecutorBuilder(ctx, nil, nil) + b := newExecutorBuilder(ctx, nil, nil, 0, false) return b.build(j) } diff --git a/executor/prepared.go b/executor/prepared.go index a5fd131d1a87b..7eb92ad37ff61 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -220,11 +220,12 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { } preparedObj := &plannercore.CachedPrepareStmt{ - PreparedAst: prepared, - VisitInfos: destBuilder.GetVisitInfo(), - NormalizedSQL: normalizedSQL, - SQLDigest: digest, - ForUpdateRead: destBuilder.GetIsForUpdateRead(), + PreparedAst: prepared, + VisitInfos: destBuilder.GetVisitInfo(), + NormalizedSQL: normalizedSQL, + SQLDigest: digest, + ForUpdateRead: destBuilder.GetIsForUpdateRead(), + SnapshotTSEvaluator: ret.SnapshotTSEvaluator, } return vars.AddPreparedStmt(e.ID, preparedObj) } @@ -314,7 +315,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error { // CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement. func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, - ID uint32, args []types.Datum) (sqlexec.Statement, bool, bool, error) { + ID uint32, is infoschema.InfoSchema, snapshotTS uint64, args []types.Datum) (sqlexec.Statement, bool, bool, error) { startTime := time.Now() defer func() { sctx.GetSessionVars().DurationCompile = time.Since(startTime) @@ -324,7 +325,6 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, return nil, false, false, err } execStmt.BinaryArgs = args - is := sctx.GetInfoSchema().(infoschema.InfoSchema) execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is) if err != nil { return nil, false, false, err @@ -338,6 +338,7 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context, Ctx: sctx, OutputNames: names, Ti: &TelemetryInfo{}, + SnapshotTS: snapshotTS, } if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[ID]; ok { preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt) diff --git a/executor/seqtest/prepared_test.go b/executor/seqtest/prepared_test.go index bb8f05e5eff54..93ba9cb30cace 100644 --- a/executor/seqtest/prepared_test.go +++ b/executor/seqtest/prepared_test.go @@ -22,6 +22,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/metrics" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" @@ -143,7 +144,8 @@ func (s *seqTestSuite) TestPrepared(c *C) { tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows()) // Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text. - stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID, []types.Datum{types.NewDatum(1)}) + stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID, + tk.Se.GetInfoSchema().(infoschema.InfoSchema), 0, []types.Datum{types.NewDatum(1)}) c.Assert(err, IsNil) c.Assert(stmt.OriginText(), Equals, query) diff --git a/executor/stale_txn_test.go b/executor/stale_txn_test.go index fd620ca56ad5b..8870a9120187d 100644 --- a/executor/stale_txn_test.go +++ b/executor/stale_txn_test.go @@ -772,3 +772,80 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) { tk.MustExec("commit") c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3) } + +func (s *testStaleTxnSuite) TestStaleSelect(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + defer tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int)") + + tolerance := 50 * time.Millisecond + + tk.MustExec("insert into t values (1)") + time.Sleep(tolerance) + time1 := time.Now() + + tk.MustExec("insert into t values (2)") + time.Sleep(tolerance) + time2 := time.Now() + + tk.MustExec("insert into t values (3)") + time.Sleep(tolerance) + + staleRows := testkit.Rows("1") + staleSQL := fmt.Sprintf(`select * from t as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000")) + + // test normal stale select + tk.MustQuery(staleSQL).Check(staleRows) + + // test stale select in txn + tk.MustExec("begin") + c.Assert(tk.ExecToErr(staleSQL), NotNil) + tk.MustExec("commit") + + // test prepared stale select + tk.MustExec(fmt.Sprintf(`prepare s from "%s"`, staleSQL)) + tk.MustQuery("execute s") + + // test prepared stale select in txn + tk.MustExec("begin") + tk.MustQuery("execute s").Check(staleRows) + tk.MustExec("commit") + + // test stale select in stale txn + tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000"))) + c.Assert(tk.ExecToErr(staleSQL), NotNil) + tk.MustExec("commit") + + // test prepared stale select in stale txn + tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000"))) + tk.MustQuery("execute s").Check(staleRows) + tk.MustExec("commit") + + // test prepared stale select with schema change + tk.MustExec("alter table t add column c int") + tk.MustExec("insert into t values (4, 5)") + time.Sleep(10 * time.Millisecond) + tk.MustQuery("execute s").Check(staleRows) + + // test dynamic timestamp stale select + time3 := time.Now() + tk.MustExec("alter table t add column d int") + tk.MustExec("insert into t values (4, 4, 4)") + time.Sleep(tolerance) + time4 := time.Now() + staleRows = testkit.Rows("1 ", "2 ", "3 ", "4 5") + tk.MustQuery(fmt.Sprintf("select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND", time4.Sub(time3).Microseconds())).Check(staleRows) + + // test prepared dynamic timestamp stale select + time5 := time.Now() + tk.MustExec(fmt.Sprintf(`prepare v from "select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND"`, time5.Sub(time3).Microseconds())) + tk.MustQuery("execute v").Check(staleRows) + + // test point get + time6 := time.Now() + tk.MustExec("insert into t values (5, 5, 5)") + time.Sleep(tolerance) + tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 ")) +} diff --git a/planner/core/cache.go b/planner/core/cache.go index 0e5a624b3d635..dd691ba80f86b 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" @@ -194,13 +195,14 @@ func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*mod // CachedPrepareStmt store prepared ast from PrepareExec and other related fields type CachedPrepareStmt struct { - PreparedAst *ast.Prepared - VisitInfos []visitInfo - ColumnInfos interface{} - Executor interface{} - NormalizedSQL string - NormalizedPlan string - SQLDigest *parser.Digest - PlanDigest *parser.Digest - ForUpdateRead bool + PreparedAst *ast.Prepared + VisitInfos []visitInfo + ColumnInfos interface{} + Executor interface{} + NormalizedSQL string + NormalizedPlan string + SQLDigest *parser.Digest + PlanDigest *parser.Digest + ForUpdateRead bool + SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) } diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 8e3baf262101c..0fdb48169fd4e 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/kv" @@ -182,6 +183,7 @@ type Execute struct { UsingVars []expression.Expression PrepareParams []types.Datum ExecID uint32 + SnapshotTS uint64 Stmt ast.StmtNode StmtType string Plan Plan @@ -256,6 +258,20 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont } } + var snapshotTS uint64 + if preparedObj.SnapshotTSEvaluator != nil { + // if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL: + // which means its infoschema is specified by the SQL, not the current/latest infoschema + var err error + snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx) + if err != nil { + return errors.Trace(err) + } + is, err = domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS) + if err != nil { + return errors.Trace(err) + } + } if prepared.SchemaVersion != is.SchemaMetaVersion() { // In order to avoid some correctness issues, we have to clear the // cached plan once the schema version is changed. @@ -265,7 +281,6 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont preparedObj.Executor = nil // If the schema version has changed we need to preprocess it again, // if this time it failed, the real reason for the error is schema changed. - // FIXME: compatible with prepare https://github.com/pingcap/tidb/issues/24932 ret := &PreprocessorReturn{InfoSchema: is} err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret)) if err != nil { @@ -277,6 +292,7 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont if err != nil { return err } + e.SnapshotTS = snapshotTS e.Stmt = prepared.Stmt return nil } diff --git a/planner/core/preprocess.go b/planner/core/preprocess.go index 8366545b1fd45..deb3957210cae 100644 --- a/planner/core/preprocess.go +++ b/planner/core/preprocess.go @@ -101,14 +101,8 @@ func Preprocess(ctx sessionctx.Context, node ast.Node, preprocessOpt ...Preproce v.PreprocessorReturn = &PreprocessorReturn{} } node.Accept(&v) - readTS := ctx.GetSessionVars().TxnReadTS.UseTxnReadTS() - if readTS > 0 { - v.PreprocessorReturn.SnapshotTS = readTS - } // InfoSchema must be non-nil after preprocessing - if v.InfoSchema == nil { - v.ensureInfoSchema() - } + v.ensureInfoSchema() return errors.Trace(v.err) } @@ -132,9 +126,13 @@ const ( // PreprocessorReturn is used to retain information obtained in the preprocessor. type PreprocessorReturn struct { - SnapshotTS uint64 - ExplicitStaleness bool - InfoSchema infoschema.InfoSchema + initedLastSnapshotTS bool + ExplicitStaleness bool + SnapshotTSEvaluator func(sessionctx.Context) (uint64, error) + // LastSnapshotTS is the last evaluated snapshotTS if any + // otherwise it defaults to zero + LastSnapshotTS uint64 + InfoSchema infoschema.InfoSchema } // preprocessor is an ast.Visitor that preprocess @@ -1267,7 +1265,7 @@ func (p *preprocessor) handleTableName(tn *ast.TableName) { return } - p.handleAsOf(tn.AsOf) + p.handleAsOfAndReadTS(tn.AsOf) if p.err != nil { return } @@ -1420,16 +1418,24 @@ func (p *preprocessor) checkFuncCastExpr(node *ast.FuncCastExpr) { } } -// handleAsOf tries to validate the timestamp. -// If it is not nil, timestamp is used to get the history infoschema from the infocache. -func (p *preprocessor) handleAsOf(node *ast.AsOfClause) { - readTS := p.ctx.GetSessionVars().TxnReadTS.PeakTxnReadTS() - if readTS > 0 && node != nil { - p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of") - return +// handleAsOfAndReadTS tries to handle as of closure, or possibly read_ts. +// If read_ts is not nil, it will be consumed. +// If as of is not nil, timestamp is used to get the history infoschema from the infocache. +func (p *preprocessor) handleAsOfAndReadTS(node *ast.AsOfClause) { + ts := p.ctx.GetSessionVars().TxnReadTS.UseTxnReadTS() + if ts > 0 { + if node != nil { + p.err = ErrAsOf.FastGenWithCause("can't use select as of while already set transaction as of") + return + } + if !p.initedLastSnapshotTS { + p.SnapshotTSEvaluator = func(sessionctx.Context) (uint64, error) { + return ts, nil + } + p.LastSnapshotTS = ts + p.ExplicitStaleness = true + } } - dom := domain.GetDomain(p.ctx) - ts := uint64(0) if node != nil { if p.ctx.GetSessionVars().InTxn() { p.err = ErrAsOf.FastGenWithCause("as of timestamp can't be set in transaction.") @@ -1439,20 +1445,26 @@ func (p *preprocessor) handleAsOf(node *ast.AsOfClause) { if p.err != nil { return } - } - if ts != 0 && p.InfoSchema == nil { - is, err := dom.GetSnapshotInfoSchema(ts) - if err != nil { - p.err = err - return + if !p.initedLastSnapshotTS { + p.SnapshotTSEvaluator = func(ctx sessionctx.Context) (uint64, error) { + return calculateTsExpr(ctx, node) + } + p.LastSnapshotTS = ts + p.ExplicitStaleness = true } - p.SnapshotTS = ts - p.ExplicitStaleness = true - p.InfoSchema = is } - if p.SnapshotTS != ts { + if p.LastSnapshotTS != ts { p.err = ErrDifferentAsOf.GenWithStack("can not set different time in the as of") + return + } + if p.LastSnapshotTS != 0 { + dom := domain.GetDomain(p.ctx) + p.InfoSchema, p.err = dom.GetSnapshotInfoSchema(p.LastSnapshotTS) + if p.err != nil { + return + } } + p.initedLastSnapshotTS = true } // ensureInfoSchema get the infoschema from the preprecessor. diff --git a/session/session.go b/session/session.go index 45b7ebf620ae6..b504392f02174 100644 --- a/session/session.go +++ b/session/session.go @@ -1770,8 +1770,9 @@ func (s *session) PrepareStmt(sql string) (stmtID uint32, paramCount int, fields } func (s *session) preparedStmtExec(ctx context.Context, + is infoschema.InfoSchema, snapshotTS uint64, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) { - st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, stmtID, args) + st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, stmtID, is, snapshotTS, args) if err != nil { return nil, err } @@ -1791,15 +1792,10 @@ func (s *session) preparedStmtExec(ctx context.Context, // cachedPlanExec short path currently ONLY for cached "point select plan" execution func (s *session) cachedPlanExec(ctx context.Context, + is infoschema.InfoSchema, snapshotTS uint64, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) { prepared := prepareStmt.PreparedAst // compile ExecStmt - var is infoschema.InfoSchema - if prepareStmt.ForUpdateRead { - is = domain.GetDomain(s).InfoSchema() - } else { - is = s.GetInfoSchema().(infoschema.InfoSchema) - } execAst := &ast.ExecuteStmt{ExecID: stmtID} if err := executor.ResetContextOfStmt(s, execAst); err != nil { return nil, err @@ -1820,6 +1816,7 @@ func (s *session) cachedPlanExec(ctx context.Context, OutputNames: execPlan.OutputNames(), PsStmt: prepareStmt, Ti: &executor.TelemetryInfo{}, + SnapshotTS: snapshotTS, } compileDuration := time.Since(s.sessionVars.StartTime) sessionExecuteCompileDurationGeneral.Observe(compileDuration.Seconds()) @@ -1878,11 +1875,16 @@ func (s *session) IsCachedExecOk(ctx context.Context, preparedStmt *plannercore. if !plannercore.IsAutoCommitTxn(s) { return false, nil } - // check schema version - is := s.GetInfoSchema().(infoschema.InfoSchema) - if prepared.SchemaVersion != is.SchemaMetaVersion() { - prepared.CachedPlan = nil - return false, nil + // SnapshotTSEvaluator != nil, it is stale read + // stale read expect a stale infoschema + // so skip infoschema check + if preparedStmt.SnapshotTSEvaluator == nil { + // check schema version + is := s.GetInfoSchema().(infoschema.InfoSchema) + if prepared.SchemaVersion != is.SchemaMetaVersion() { + prepared.CachedPlan = nil + return false, nil + } } // maybe we'd better check cached plan type here, current // only point select/update will be cached, see "getPhysicalPlan" func @@ -1926,11 +1928,27 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ return nil, err } s.txn.onStmtStart(preparedStmt.SQLDigest.String()) + var is infoschema.InfoSchema + var snapshotTS uint64 + if preparedStmt.ForUpdateRead { + is = domain.GetDomain(s).InfoSchema() + } else if preparedStmt.SnapshotTSEvaluator != nil { + snapshotTS, err = preparedStmt.SnapshotTSEvaluator(s) + if err != nil { + return nil, errors.Trace(err) + } + is, err = domain.GetDomain(s).GetSnapshotInfoSchema(snapshotTS) + if err != nil { + return nil, errors.Trace(err) + } + } else { + is = s.GetInfoSchema().(infoschema.InfoSchema) + } var rs sqlexec.RecordSet if ok { - rs, err = s.cachedPlanExec(ctx, stmtID, preparedStmt, args) + rs, err = s.cachedPlanExec(ctx, is, snapshotTS, stmtID, preparedStmt, args) } else { - rs, err = s.preparedStmtExec(ctx, stmtID, preparedStmt, args) + rs, err = s.preparedStmtExec(ctx, is, snapshotTS, stmtID, preparedStmt, args) } s.txn.onStmtEnd() return rs, err