Skip to content

Commit

Permalink
planner: refactor the execute path when using plan cache (#36587)
Browse files Browse the repository at this point in the history
ref #36598
  • Loading branch information
Reminiscent authored Jul 27, 2022
1 parent 8af16e8 commit 16f143f
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 159 deletions.
9 changes: 4 additions & 5 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,14 +317,14 @@ func (e *DeallocateExec) Next(ctx context.Context, req *chunk.Chunk) error {

// CompileExecutePreparedStmt compiles a session Execute command to a stmt.Statement.
func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
execStmt *ast.ExecuteStmt, is infoschema.InfoSchema) (*ExecStmt, bool, bool, error) {
execStmt *ast.ExecuteStmt, is infoschema.InfoSchema) (*ExecStmt, error) {
startTime := time.Now()
defer func() {
sctx.GetSessionVars().DurationCompile = time.Since(startTime)
}()
execPlan, names, err := planner.Optimize(ctx, sctx, execStmt, is)
if err != nil {
return nil, false, false, err
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
Expand All @@ -344,13 +344,12 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execStmt.ExecID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.CachedPrepareStmt)
if !ok {
return nil, false, false, errors.Errorf("invalid CachedPrepareStmt type")
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
stmtCtx := sctx.GetSessionVars().StmtCtx
stmt.Text = preparedObj.PreparedAst.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.InitSQLDigest(preparedObj.NormalizedSQL, preparedObj.SQLDigest)
}
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan)
return stmt, tiFlashPushDown, tiFlashExchangePushDown, nil
return stmt, nil
}
2 changes: 1 addition & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func TestPrepared(t *testing.T) {

execStmt := &ast.ExecuteStmt{ExecID: stmtID, BinaryArgs: []types.Datum{types.NewDatum(1)}}
// Check that ast.Statement created by executor.CompileExecutePreparedStmt has query text.
stmt, _, _, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt,
stmt, err := executor.CompileExecutePreparedStmt(context.TODO(), tk.Session(), execStmt,
tk.Session().GetInfoSchema().(infoschema.InfoSchema))
require.NoError(t, err)
require.Equal(t, query, stmt.OriginText())
Expand Down
25 changes: 0 additions & 25 deletions planner/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"fmt"
"math"
"math/rand"
"runtime/trace"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -480,30 +479,6 @@ func handleEvolveTasks(ctx context.Context, sctx sessionctx.Context, br *bindinf
globalHandle.AddEvolvePlanTask(br.OriginalSQL, br.Db, binding)
}

// OptimizeExecStmt to optimize prepare statement protocol "execute" statement
// this is a short path ONLY does things filling prepare related params
// for point select like plan which does not need extra things
func OptimizeExecStmt(ctx context.Context, sctx sessionctx.Context,
execAst *ast.ExecuteStmt, is infoschema.InfoSchema) (plannercore.Plan, error) {
defer trace.StartRegion(ctx, "Optimize").End()
var err error

builder := planBuilderPool.Get().(*plannercore.PlanBuilder)
defer planBuilderPool.Put(builder.ResetForReuse())

builder.Init(sctx, is, nil)
p, err := builder.Build(ctx, execAst)
if err != nil {
return nil, err
}
if execPlan, ok := p.(*plannercore.Execute); ok {
err = execPlan.OptimizePreparedPlan(ctx, sctx, is)
return execPlan.Plan, err
}
err = errors.Errorf("invalid result plan type, should be Execute")
return nil, err
}

func handleStmtHints(hints []*ast.TableOptimizerHint) (stmtHints stmtctx.StmtHints, offs []int, warns []error) {
if len(hints) == 0 {
return
Expand Down
2 changes: 1 addition & 1 deletion session/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,7 @@ func BenchmarkCompileExecutePreparedStmt(b *testing.B) {
b.ResetTimer()
stmtExec := &ast.ExecuteStmt{ExecID: stmtID, BinaryArgs: args}
for i := 0; i < b.N; i++ {
_, _, _, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema))
_, err := executor.CompileExecutePreparedStmt(context.Background(), se, stmtExec, is.(infoschema.InfoSchema))
if err != nil {
b.Fatal(err)
}
Expand Down
129 changes: 2 additions & 127 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2274,14 +2274,15 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
})

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
st, tiFlashPushDown, tiFlashExchangePushDown, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is)
st, err := executor.CompileExecutePreparedStmt(ctx, s, execStmt, is)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, st.Plan)
}
if err != nil {
return nil, err
}
if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry {
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(st.Plan)
telemetry.CurrentExecuteCount.Inc()
if tiFlashPushDown {
telemetry.CurrentTiFlashPushDownCount.Inc()
Expand All @@ -2295,118 +2296,6 @@ func (s *session) preparedStmtExec(ctx context.Context, execStmt *ast.ExecuteStm
return runStmt(ctx, s, st)
}

// cachedPointPlanExec is a short path currently ONLY for cached "point select plan" execution
func (s *session) cachedPointPlanExec(ctx context.Context,
execAst *ast.ExecuteStmt, prepareStmt *plannercore.CachedPrepareStmt) (sqlexec.RecordSet, bool, error) {

prepared := prepareStmt.PreparedAst

failpoint.Inject("assertTxnManagerInCachedPlanExec", func() {
sessiontxn.RecordAssert(s, "assertTxnManagerInCachedPlanExec", true)
// stale read should not reach here
staleread.AssertStmtStaleness(s, false)
})

is := sessiontxn.GetTxnManager(s).GetTxnInfoSchema()
execPlan, err := planner.OptimizeExecStmt(ctx, s, execAst, is)
if err != nil {
return nil, false, err
}

if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, execPlan); err != nil {
return nil, false, err
}

stmtCtx := s.GetSessionVars().StmtCtx
stmt := &executor.ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Plan: execPlan,
StmtNode: execAst,
Ctx: s,
OutputNames: execPlan.OutputNames(),
PsStmt: prepareStmt,
Ti: &executor.TelemetryInfo{},
}
compileDuration := time.Since(s.sessionVars.StartTime)
sessionExecuteCompileDurationGeneral.Observe(compileDuration.Seconds())
s.GetSessionVars().DurationCompile = compileDuration

stmt.Text = prepared.Stmt.Text()
stmtCtx.OriginalSQL = stmt.Text
stmtCtx.SetPlan(execPlan)
stmtCtx.InitSQLDigest(prepareStmt.NormalizedSQL, prepareStmt.SQLDigest)
stmtCtx.SetPlanDigest(prepareStmt.NormalizedPlan, prepareStmt.PlanDigest)
logGeneralQuery(stmt, s, false)

if !s.isInternal() && config.GetGlobalConfig().EnableTelemetry {
telemetry.CurrentExecuteCount.Inc()
tiFlashPushDown, tiFlashExchangePushDown := plannercore.IsTiFlashContained(stmt.Plan)
if tiFlashPushDown {
telemetry.CurrentTiFlashPushDownCount.Inc()
}
if tiFlashExchangePushDown {
telemetry.CurrentTiFlashExchangePushDownCount.Inc()
}
}

// run ExecStmt
var resultSet sqlexec.RecordSet
switch execPlan.(type) {
case *plannercore.PointGetPlan:
resultSet, err = stmt.PointGet(ctx)
s.txn.changeToInvalid()
case *plannercore.Update:
stmtCtx.Priority = kv.PriorityHigh
resultSet, err = runStmt(ctx, s, stmt)
case nil:
resultSet, err = runStmt(ctx, s, stmt)
default:
prepared.CachedPlan = nil
return nil, false, nil
}
return resultSet, true, err
}

// IsCachedExecOk check if we can execute using plan cached in prepared structure
// Be careful with the short path, current precondition is ths cached plan satisfying
// IsPointGetWithPKOrUniqueKeyByAutoCommit
func (s *session) IsCachedExecOk(preparedStmt *plannercore.CachedPrepareStmt) (bool, error) {
prepared := preparedStmt.PreparedAst
if prepared.CachedPlan == nil || staleread.IsStmtStaleness(s) {
return false, nil
}
// check auto commit
if !plannercore.IsAutoCommitTxn(s) {
return false, nil
}
is := s.GetInfoSchema().(infoschema.InfoSchema)
if prepared.SchemaVersion != is.SchemaMetaVersion() {
prepared.CachedPlan = nil
preparedStmt.ColumnInfos = nil
return false, nil
}
// maybe we'd better check cached plan type here, current
// only point select/update will be cached, see "getPhysicalPlan" func
var ok bool
var err error
switch prepared.CachedPlan.(type) {
case *plannercore.PointGetPlan:
ok = true
case *plannercore.Update:
pointUpdate := prepared.CachedPlan.(*plannercore.Update)
_, ok = pointUpdate.SelectPlan.(*plannercore.PointGetPlan)
if !ok {
err = errors.Errorf("cached update plan not point update")
prepared.CachedPlan = nil
return false, err
}
default:
ok = false
}
return ok, err
}

// ExecutePreparedStmt executes a prepared statement.
func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args []types.Datum) (sqlexec.RecordSet, error) {
var err error
Expand Down Expand Up @@ -2453,10 +2342,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
}

executor.CountStmtNode(preparedStmt.PreparedAst.Stmt, s.sessionVars.InRestrictedSQL)
cacheExecOk, err := s.IsCachedExecOk(preparedStmt)
if err != nil {
return nil, err
}
s.txn.onStmtStart(preparedStmt.SQLDigest.String())
defer s.txn.onStmtEnd()

Expand All @@ -2466,16 +2351,6 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
s.setRequestSource(ctx, preparedStmt.PreparedAst.StmtType, preparedStmt.PreparedAst.Stmt)
// even the txn is valid, still need to set session variable for coprocessor usage.
s.sessionVars.RequestSourceType = preparedStmt.PreparedAst.StmtType

if cacheExecOk {
rs, ok, err := s.cachedPointPlanExec(ctx, execStmt, preparedStmt)
if err != nil {
return nil, err
}
if ok { // fallback to preparedStmtExec if we cannot get a valid point select plan in cachedPointPlanExec
return rs, nil
}
}
return s.preparedStmtExec(ctx, execStmt, preparedStmt)
}

Expand Down

0 comments on commit 16f143f

Please sign in to comment.