Skip to content

Commit

Permalink
*: stale reads compatible with prepare (#25156)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Jun 11, 2021
1 parent da23dd7 commit 9189ec6
Show file tree
Hide file tree
Showing 13 changed files with 213 additions and 81 deletions.
12 changes: 5 additions & 7 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
}
}
if a.PsStmt.Executor == nil {
b := newExecutorBuilder(a.Ctx, is, a.Ti)
b := newExecutorBuilder(a.Ctx, is, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
newExecutor := b.build(a.Plan)
if b.err != nil {
return nil, b.err
Expand Down Expand Up @@ -289,7 +289,7 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
return 0, err
}
a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.SnapshotTS
a.SnapshotTS = ret.LastSnapshotTS
a.ExplicitStaleness = ret.ExplicitStaleness
p, names, err := planner.Optimize(ctx, a.Ctx, a.StmtNode, a.InfoSchema)
if err != nil {
Expand Down Expand Up @@ -336,7 +336,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
if n, ok := val.(int); ok {
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if n != int(startTS) {
panic("different tso")
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
failpoint.Return()
}
Expand All @@ -346,7 +346,7 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) {
// Convert to seconds
startTS := oracle.ExtractPhysical(a.SnapshotTS) / 1000
if int(startTS) <= n-1 || n+1 <= int(startTS) {
panic("tso violate tolerance")
panic(fmt.Sprintf("different tso %d != %d", n, startTS))
}
failpoint.Return()
}
Expand Down Expand Up @@ -792,9 +792,7 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityLow
}

b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti)
b.snapshotTS = a.SnapshotTS
b.explicitStaleness = a.ExplicitStaleness
b := newExecutorBuilder(ctx, a.InfoSchema, a.Ti, a.SnapshotTS, a.ExplicitStaleness)
e := b.build(a.Plan)
if b.err != nil {
return nil, errors.Trace(b.err)
Expand Down
10 changes: 5 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func buildHashAggExecutor(ctx sessionctx.Context, src Executor, schema *expressi
plan.SetSchema(schema)
plan.Init(ctx, nil, 0)
plan.SetChildren(nil)
b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
exec := b.build(plan)
hashAgg := exec.(*HashAggExec)
hashAgg.children[0] = src
Expand Down Expand Up @@ -342,7 +342,7 @@ func buildStreamAggExecutor(ctx sessionctx.Context, srcExec Executor, schema *ex
plan = sg
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
return b.build(plan)
}

Expand Down Expand Up @@ -575,7 +575,7 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
plan = win
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
exec := b.build(plan)
return exec
}
Expand Down Expand Up @@ -1322,7 +1322,7 @@ func prepare4IndexInnerHashJoin(tc *indexJoinTestCase, outerDS *mockDataSource,
hashCols: tc.outerHashKeyIdx,
},
innerCtx: innerCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
rowTypes: rightTypes,
colLens: colLens,
keyCols: tc.innerJoinKeyIdx,
Expand Down Expand Up @@ -1388,7 +1388,7 @@ func prepare4IndexMergeJoin(tc *indexJoinTestCase, outerDS *mockDataSource, inne
compareFuncs: outerCompareFuncs,
},
innerMergeCtx: innerMergeCtx{
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil)},
readerBuilder: &dataReaderBuilder{Plan: &mockPhysicalIndexReader{e: innerDS}, executorBuilder: newExecutorBuilder(tc.ctx, nil, nil, 0, false)},
rowTypes: rightTypes,
joinKeys: innerJoinKeys,
colLens: colLens,
Expand Down
14 changes: 10 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ type CTEStorages struct {
IterInTbl cteutil.Storage
}

func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo) *executorBuilder {
func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *TelemetryInfo, snapshotTS uint64, explicitStaleness bool) *executorBuilder {
return &executorBuilder{
ctx: ctx,
is: is,
Ti: ti,
ctx: ctx,
is: is,
Ti: ti,
snapshotTS: snapshotTS,
explicitStaleness: explicitStaleness,
}
}

Expand Down Expand Up @@ -663,6 +665,10 @@ func (b *executorBuilder) buildPrepare(v *plannercore.Prepare) Executor {
}

func (b *executorBuilder) buildExecute(v *plannercore.Execute) Executor {
b.snapshotTS = v.SnapshotTS
if b.snapshotTS != 0 {
b.is, b.err = domain.GetDomain(b.ctx).GetSnapshotInfoSchema(b.snapshotTS)
}
e := &ExecuteExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
is: b.is,
Expand Down
2 changes: 1 addition & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm
}
return &ExecStmt{
GoCtx: ctx,
SnapshotTS: ret.SnapshotTS,
SnapshotTS: ret.LastSnapshotTS,
ExplicitStaleness: ret.ExplicitStaleness,
InfoSchema: ret.InfoSchema,
Plan: finalPlan,
Expand Down
2 changes: 1 addition & 1 deletion executor/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (h *CoprocessorDAGHandler) buildDAGExecutor(req *coprocessor.Request) (Exec
}
plan = core.InjectExtraProjection(plan)
// Build executor.
b := newExecutorBuilder(h.sctx, is, nil)
b := newExecutorBuilder(h.sctx, is, nil, 0, false)
return b.build(plan), nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/executor_required_rows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func buildMergeJoinExec(ctx sessionctx.Context, joinType plannercore.JoinType, i
j.CompareFuncs = append(j.CompareFuncs, expression.GetCmpFunction(nil, j.LeftJoinKeys[i], j.RightJoinKeys[i]))
}

b := newExecutorBuilder(ctx, nil, nil)
b := newExecutorBuilder(ctx, nil, nil, 0, false)
return b.build(j)
}

Expand Down
15 changes: 8 additions & 7 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,11 +220,12 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error {
}

preparedObj := &plannercore.CachedPrepareStmt{
PreparedAst: prepared,
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalizedSQL,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
PreparedAst: prepared,
VisitInfos: destBuilder.GetVisitInfo(),
NormalizedSQL: normalizedSQL,
SQLDigest: digest,
ForUpdateRead: destBuilder.GetIsForUpdateRead(),
SnapshotTSEvaluator: ret.SnapshotTSEvaluator,
}
return vars.AddPreparedStmt(e.ID, preparedObj)
}
Expand Down Expand Up @@ -314,7 +315,7 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
ID uint32, args []types.Datum) (sqlexec.Statement, bool, bool, error) {
ID uint32, is infoschema.InfoSchema, snapshotTS uint64, args []types.Datum) (sqlexec.Statement, bool, bool, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
Expand All @@ -324,7 +325,6 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
return nil, false, false, err
}
execStmt.BinaryArgs = args
is := sctx.GetInfoSchema().(infoschema.InfoSchema)
execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, false, false, err
Expand All @@ -338,6 +338,7 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
Ctx: sctx,
OutputNames: names,
Ti: &TelemetryInfo{},
SnapshotTS: snapshotTS,
}
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[ID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
Expand Down
4 changes: 3 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/metrics"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
Expand Down Expand Up @@ -143,7 +144,8 @@ func (s *seqTestSuite) TestPrepared(c *C) {
tk.ResultSetToResult(rs, Commentf("%v", rs)).Check(testkit.Rows())

// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID, []types.Datum{types.NewDatum(1)})
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Se, stmtID,
tk.Se.GetInfoSchema().(infoschema.InfoSchema), 0, []types.Datum{types.NewDatum(1)})
c.Assert(err, IsNil)
c.Assert(stmt.OriginText(), Equals, query)

Expand Down
77 changes: 77 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,80 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) {
tk.MustExec("commit")
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3)
}

func (s *testStaleTxnSuite) TestStaleSelect(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
defer tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int)")

tolerance := 50 * time.Millisecond

tk.MustExec("insert into t values (1)")
time.Sleep(tolerance)
time1 := time.Now()

tk.MustExec("insert into t values (2)")
time.Sleep(tolerance)
time2 := time.Now()

tk.MustExec("insert into t values (3)")
time.Sleep(tolerance)

staleRows := testkit.Rows("1")
staleSQL := fmt.Sprintf(`select * from t as of timestamp '%s'`, time1.Format("2006-1-2 15:04:05.000"))

// test normal stale select
tk.MustQuery(staleSQL).Check(staleRows)

// test stale select in txn
tk.MustExec("begin")
c.Assert(tk.ExecToErr(staleSQL), NotNil)
tk.MustExec("commit")

// test prepared stale select
tk.MustExec(fmt.Sprintf(`prepare s from "%s"`, staleSQL))
tk.MustQuery("execute s")

// test prepared stale select in txn
tk.MustExec("begin")
tk.MustQuery("execute s").Check(staleRows)
tk.MustExec("commit")

// test stale select in stale txn
tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000")))
c.Assert(tk.ExecToErr(staleSQL), NotNil)
tk.MustExec("commit")

// test prepared stale select in stale txn
tk.MustExec(fmt.Sprintf(`start transaction read only as of timestamp '%s'`, time2.Format("2006-1-2 15:04:05.000")))
tk.MustQuery("execute s").Check(staleRows)
tk.MustExec("commit")

// test prepared stale select with schema change
tk.MustExec("alter table t add column c int")
tk.MustExec("insert into t values (4, 5)")
time.Sleep(10 * time.Millisecond)
tk.MustQuery("execute s").Check(staleRows)

// test dynamic timestamp stale select
time3 := time.Now()
tk.MustExec("alter table t add column d int")
tk.MustExec("insert into t values (4, 4, 4)")
time.Sleep(tolerance)
time4 := time.Now()
staleRows = testkit.Rows("1 <nil>", "2 <nil>", "3 <nil>", "4 5")
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND", time4.Sub(time3).Microseconds())).Check(staleRows)

// test prepared dynamic timestamp stale select
time5 := time.Now()
tk.MustExec(fmt.Sprintf(`prepare v from "select * from t as of timestamp CURRENT_TIMESTAMP(3) - INTERVAL %d MICROSECOND"`, time5.Sub(time3).Microseconds()))
tk.MustQuery("execute v").Check(staleRows)

// test point get
time6 := time.Now()
tk.MustExec("insert into t values (5, 5, 5)")
time.Sleep(tolerance)
tk.MustQuery(fmt.Sprintf("select * from t as of timestamp '%s' where c=5", time6.Format("2006-1-2 15:04:05.000"))).Check(testkit.Rows("4 5 <nil>"))
}
20 changes: 11 additions & 9 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -194,13 +195,14 @@ func NewPSTMTPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*mod

// CachedPrepareStmt store prepared ast from PrepareExec and other related fields
type CachedPrepareStmt struct {
PreparedAst *ast.Prepared
VisitInfos []visitInfo
ColumnInfos interface{}
Executor interface{}
NormalizedSQL string
NormalizedPlan string
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
PreparedAst *ast.Prepared
VisitInfos []visitInfo
ColumnInfos interface{}
Executor interface{}
NormalizedSQL string
NormalizedPlan string
SQLDigest *parser.Digest
PlanDigest *parser.Digest
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
}
18 changes: 17 additions & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -182,6 +183,7 @@ type Execute struct {
UsingVars []expression.Expression
PrepareParams []types.Datum
ExecID uint32
SnapshotTS uint64
Stmt ast.StmtNode
StmtType string
Plan Plan
Expand Down Expand Up @@ -256,6 +258,20 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
}
}

var snapshotTS uint64
if preparedObj.SnapshotTSEvaluator != nil {
// if preparedObj.SnapshotTSEvaluator != nil, it is a stale read SQL:
// which means its infoschema is specified by the SQL, not the current/latest infoschema
var err error
snapshotTS, err = preparedObj.SnapshotTSEvaluator(sctx)
if err != nil {
return errors.Trace(err)
}
is, err = domain.GetDomain(sctx).GetSnapshotInfoSchema(snapshotTS)
if err != nil {
return errors.Trace(err)
}
}
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
Expand All @@ -265,7 +281,6 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
preparedObj.Executor = nil
// If the schema version has changed we need to preprocess it again,
// if this time it failed, the real reason for the error is schema changed.
// FIXME: compatible with prepare https://github.com/pingcap/tidb/issues/24932
ret := &PreprocessorReturn{InfoSchema: is}
err := Preprocess(sctx, prepared.Stmt, InPrepare, WithPreprocessorReturn(ret))
if err != nil {
Expand All @@ -277,6 +292,7 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont
if err != nil {
return err
}
e.SnapshotTS = snapshotTS
e.Stmt = prepared.Stmt
return nil
}
Expand Down
Loading

0 comments on commit 9189ec6

Please sign in to comment.