Skip to content

Commit

Permalink
*: add executor runtime info for explain for connection statement (#…
Browse files Browse the repository at this point in the history
…19183) (#20384)

Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
Signed-off-by: crazycs520 <crazycs520@gmail.com>
  • Loading branch information
ti-srebot authored Oct 22, 2020
1 parent a132f2b commit fdd7a42
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 44 deletions.
36 changes: 34 additions & 2 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package executor_test

import (
"bytes"
"crypto/tls"
"fmt"
"math"
Expand All @@ -25,6 +26,7 @@ import (
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/testkit"
)
Expand Down Expand Up @@ -60,7 +62,7 @@ func (msm *mockSessionManager1) Kill(cid uint64, query bool) {
func (msm *mockSessionManager1) UpdateTLSConfig(cfg *tls.Config) {
}

func (s *testSuite) TestExplainFor(c *C) {
func (s *testSuite9) TestExplainFor(c *C) {
tkRoot := testkit.NewTestKitWithInit(c, s.store)
tkUser := testkit.NewTestKitWithInit(c, s.store)
tkRoot.MustExec("create table t1(c1 int, c2 int)")
Expand All @@ -69,6 +71,7 @@ func (s *testSuite) TestExplainFor(c *C) {
tkRoot.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost", CurrentUser: true, AuthUsername: "root", AuthHostname: "%"}, nil, []byte("012345678901234567890"))
tkUser.Se.Auth(&auth.UserIdentity{Username: "tu", Hostname: "localhost", CurrentUser: true, AuthUsername: "tu", AuthHostname: "%"}, nil, []byte("012345678901234567890"))

tkRoot.MustExec("set @@tidb_enable_collect_execution_info=0;")
tkRoot.MustQuery("select * from t1;")
tkRootProcess := tkRoot.Se.ShowProcess()
ps := []*util.ProcessInfo{tkRootProcess}
Expand All @@ -78,6 +81,30 @@ func (s *testSuite) TestExplainFor(c *C) {
"TableReader_5 10000.00 root data:TableFullScan_4",
"└─TableFullScan_4 10000.00 cop[tikv] table:t1 keep order:false, stats:pseudo",
))
tkRoot.MustExec("set @@tidb_enable_collect_execution_info=1;")
tkRoot.MustQuery("select * from t1;")
tkRootProcess = tkRoot.Se.ShowProcess()
ps = []*util.ProcessInfo{tkRootProcess}
tkRoot.Se.SetSessionManager(&mockSessionManager1{PS: ps})
tkUser.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows := tkRoot.MustQuery(fmt.Sprintf("explain for connection %d", tkRootProcess.ID)).Rows()
c.Assert(len(rows), Equals, 2)
c.Assert(len(rows[0]), Equals, 9)
buf := bytes.NewBuffer(nil)
for i, row := range rows {
if i > 0 {
buf.WriteString("\n")
}
for j, v := range row {
if j > 0 {
buf.WriteString(" ")
}
buf.WriteString(fmt.Sprintf("%v", v))
}
}
c.Assert(buf.String(), Matches, ""+
"TableReader_5 10000.00 0 root time:.*, loops:1, cop_task:.*num: 1, max:.*, proc_keys: 0, rpc_num: 1, rpc_time: .*data:TableFullScan_4 N/A N/A\n"+
"└─TableFullScan_4 10000.00 0 cop.* table:t1 time:.*, loops:.*keep order:false, stats:pseudo N/A N/A")
err := tkUser.ExecToErr(fmt.Sprintf("explain for connection %d", tkRootProcess.ID))
c.Check(core.ErrAccessDenied.Equal(err), IsTrue)
err = tkUser.ExecToErr("explain for connection 42")
Expand All @@ -89,9 +116,10 @@ func (s *testSuite) TestExplainFor(c *C) {
tkRoot.MustExec(fmt.Sprintf("explain for connection %d", tkRootProcess.ID))
}

func (s *testSuite) TestIssue11124(c *C) {
func (s *testSuite9) TestIssue11124(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk2 := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("set @@tidb_enable_collect_execution_info=0;")
tk.MustExec("drop table if exists kankan1")
tk.MustExec("drop table if exists kankan2")
tk.MustExec("create table kankan1(id int, name text);")
Expand Down Expand Up @@ -130,6 +158,9 @@ type testPrepareSerialSuite struct {
}

func (s *testPrepareSerialSuite) TestExplainForConnPlanCache(c *C) {
if israce.RaceEnabled {
c.Skip("skip race test")
}
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
core.SetPreparedPlanCache(orgEnable)
Expand All @@ -145,6 +176,7 @@ func (s *testPrepareSerialSuite) TestExplainForConnPlanCache(c *C) {
tk2 := testkit.NewTestKitWithInit(c, s.store)

tk1.MustExec("use test")
tk1.MustExec("set @@tidb_enable_collect_execution_info=0;")
tk1.MustExec("drop table if exists t")
tk1.MustExec("create table t(a int)")
tk1.MustExec("prepare stmt from 'select * from t where a = ?'")
Expand Down
4 changes: 3 additions & 1 deletion executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *testSuite1) TestIgnorePlanCache(c *C) {
c.Assert(tk.Se.GetSessionVars().StmtCtx.UseCache, IsFalse)
}

func (s *testSuite1) TestPrepareStmtAfterIsolationReadChange(c *C) {
func (s *testSuite9) TestPrepareStmtAfterIsolationReadChange(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "localhost", CurrentUser: true, AuthUsername: "root", AuthHostname: "%"}, nil, []byte("012345678901234567890"))

Expand All @@ -87,6 +87,7 @@ func (s *testSuite1) TestPrepareStmtAfterIsolationReadChange(c *C) {
}

tk.MustExec("set @@session.tidb_isolation_read_engines='tikv'")
tk.MustExec("set @@tidb_enable_collect_execution_info=0;")
tk.MustExec("prepare stmt from \"select * from t\"")
tk.MustQuery("execute stmt")
tkProcess := tk.Se.ShowProcess()
Expand Down Expand Up @@ -182,6 +183,7 @@ func (s *testSuite9) TestPlanCacheOnPointGet(c *C) {

// For point get
tk.MustExec("drop table if exists t1")
tk.MustExec("set @@tidb_enable_collect_execution_info=0;")
tk.MustExec("create table t1(a varchar(20), b varchar(20), c varchar(20), primary key(a, b))")
tk.MustExec("insert into t1 values('1','1','111'),('2','2','222'),('3','3','333')")
tk.MustExec(`prepare stmt2 from "select * from t1 where t1.a = ? and t1.b = ?"`)
Expand Down
58 changes: 39 additions & 19 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/logutil"
Expand Down Expand Up @@ -829,12 +830,14 @@ type SelectInto struct {
type Explain struct {
baseSchemaProducer

TargetPlan Plan
Format string
Analyze bool
ExecStmt ast.StmtNode
TargetPlan Plan
Format string
Analyze bool
ExecStmt ast.StmtNode
RuntimeStatsColl *execdetails.RuntimeStatsColl

Rows [][]string
ExplainRows [][]string
explainedPlans map[int]bool
}

Expand All @@ -857,9 +860,9 @@ func (e *Explain) prepareSchema() error {
format := strings.ToLower(e.Format)

switch {
case format == ast.ExplainFormatROW && !e.Analyze:
case format == ast.ExplainFormatROW && (!e.Analyze && e.RuntimeStatsColl == nil):
fieldNames = []string{"id", "estRows", "task", "access object", "operator info"}
case format == ast.ExplainFormatROW && e.Analyze:
case format == ast.ExplainFormatROW && (e.Analyze || e.RuntimeStatsColl != nil):
fieldNames = []string{"id", "estRows", "actRows", "task", "access object", "execution info", "operator info", "memory", "disk"}
case format == ast.ExplainFormatDOT:
fieldNames = []string{"dot contents"}
Expand Down Expand Up @@ -1009,10 +1012,12 @@ func (e *Explain) explainPlanInRowFormat(p Plan, taskType, driverSide, indent st
return
}

func getRuntimeInfo(ctx sessionctx.Context, p Plan) (actRows, analyzeInfo, memoryInfo, diskInfo string) {
runtimeStatsColl := ctx.GetSessionVars().StmtCtx.RuntimeStatsColl
func getRuntimeInfo(ctx sessionctx.Context, p Plan, runtimeStatsColl *execdetails.RuntimeStatsColl) (actRows, analyzeInfo, memoryInfo, diskInfo string) {
if runtimeStatsColl == nil {
return
runtimeStatsColl = ctx.GetSessionVars().StmtCtx.RuntimeStatsColl
if runtimeStatsColl == nil {
return
}
}
explainID := p.ID()

Expand Down Expand Up @@ -1053,28 +1058,43 @@ func (e *Explain) prepareOperatorInfo(p Plan, taskType, driverSide, indent strin
}

id := texttree.PrettyIdentifier(p.ExplainID().String()+driverSide, indent, isLastChild)
estRows, accessObject, operatorInfo := e.getOperatorInfo(p, id)

var row []string
if e.Analyze {
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(e.ctx, p, nil)
row = []string{id, estRows, actRows, taskType, accessObject, analyzeInfo, operatorInfo, memoryInfo, diskInfo}
} else if e.RuntimeStatsColl != nil {
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(e.ctx, p, e.RuntimeStatsColl)
row = []string{id, estRows, actRows, taskType, accessObject, analyzeInfo, operatorInfo, memoryInfo, diskInfo}
} else {
row = []string{id, estRows, taskType, accessObject, operatorInfo}
}
e.Rows = append(e.Rows, row)
}

func (e *Explain) getOperatorInfo(p Plan, id string) (string, string, string) {
// For `explain for connection` statement, `e.ExplainRows` will be set.
for _, row := range e.ExplainRows {
if len(row) < 5 {
panic("should never happen")
}
if row[0] == id {
return row[1], row[3], row[4]
}
}
estRows := "N/A"
if si := p.statsInfo(); si != nil {
estRows = strconv.FormatFloat(si.RowCount, 'f', 2, 64)
}

var accessObject, operatorInfo string
if plan, ok := p.(dataAccesser); ok {
accessObject = plan.AccessObject()
operatorInfo = plan.OperatorInfo(false)
} else {
operatorInfo = p.ExplainInfo()
}

var row []string
if e.Analyze {
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(e.ctx, p)
row = []string{id, estRows, actRows, taskType, accessObject, analyzeInfo, operatorInfo, memoryInfo, diskInfo}
} else {
row = []string{id, estRows, taskType, accessObject, operatorInfo}
}
e.Rows = append(e.Rows, row)
return estRows, accessObject, operatorInfo
}

func (e *Explain) prepareDotInfo(p PhysicalPlan) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (pn *planEncoder) encodePlanTree(p Plan) string {

func (pn *planEncoder) encodePlan(p Plan, isRoot bool, store kv.StoreType, depth int) {
taskTypeInfo := plancodec.EncodeTaskType(isRoot, store)
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(p.SCtx(), p)
actRows, analyzeInfo, memoryInfo, diskInfo := getRuntimeInfo(p.SCtx(), p, nil)
rowCount := 0.0
if statsInfo := p.statsInfo(); statsInfo != nil {
rowCount = p.statsInfo().RowCount
Expand Down
23 changes: 14 additions & 9 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/types/parser_driver"
util2 "github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/logutil"
utilparser "github.com/pingcap/tidb/util/parser"
Expand Down Expand Up @@ -2946,13 +2947,14 @@ func (b *PlanBuilder) buildTrace(trace *ast.TraceStmt) (Plan, error) {
return p, nil
}

func (b *PlanBuilder) buildExplainPlan(targetPlan Plan, format string, rows [][]string, analyze bool, execStmt ast.StmtNode) (Plan, error) {
func (b *PlanBuilder) buildExplainPlan(targetPlan Plan, format string, explainRows [][]string, analyze bool, execStmt ast.StmtNode, runtimeStats *execdetails.RuntimeStatsColl) (Plan, error) {
p := &Explain{
TargetPlan: targetPlan,
Format: format,
Analyze: analyze,
ExecStmt: execStmt,
Rows: rows,
TargetPlan: targetPlan,
Format: format,
Analyze: analyze,
ExecStmt: execStmt,
ExplainRows: explainRows,
RuntimeStatsColl: runtimeStats,
}
p.ctx = b.ctx
return p, p.prepareSchema()
Expand All @@ -2977,8 +2979,11 @@ func (b *PlanBuilder) buildExplainFor(explainFor *ast.ExplainForStmt) (Plan, err
if !ok || targetPlan == nil {
return &Explain{Format: explainFor.Format}, nil
}

return b.buildExplainPlan(targetPlan, explainFor.Format, processInfo.PlanExplainRows, false, nil)
var explainRows [][]string
if explainFor.Format == ast.ExplainFormatROW {
explainRows = processInfo.PlanExplainRows
}
return b.buildExplainPlan(targetPlan, explainFor.Format, explainRows, false, nil, processInfo.RuntimeStatsColl)
}

func (b *PlanBuilder) buildExplain(ctx context.Context, explain *ast.ExplainStmt) (Plan, error) {
Expand All @@ -2990,7 +2995,7 @@ func (b *PlanBuilder) buildExplain(ctx context.Context, explain *ast.ExplainStmt
return nil, err
}

return b.buildExplainPlan(targetPlan, explain.Format, nil, explain.Analyze, explain.Stmt)
return b.buildExplainPlan(targetPlan, explain.Format, nil, explain.Analyze, explain.Stmt, nil)
}

func (b *PlanBuilder) buildSelectInto(ctx context.Context, sel *ast.SelectStmt) (Plan, error) {
Expand Down
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,7 @@ func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecu
Command: command,
Plan: s.currentPlan,
PlanExplainRows: plannercore.GetExplainRowsForPlan(s.currentPlan),
RuntimeStatsColl: s.sessionVars.StmtCtx.RuntimeStatsColl,
Time: t,
State: s.Status(),
Info: sql,
Expand Down
26 changes: 14 additions & 12 deletions util/processinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,24 @@ import (
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
)

// ProcessInfo is a struct used for show processlist statement.
type ProcessInfo struct {
ID uint64
User string
Host string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
StatsInfo func(interface{}) map[string]uint64
ID uint64
User string
Host string
DB string
Digest string
Plan interface{}
PlanExplainRows [][]string
RuntimeStatsColl *execdetails.RuntimeStatsColl
Time time.Time
Info string
CurTxnStartTS uint64
StmtCtx *stmtctx.StatementContext
StatsInfo func(interface{}) map[string]uint64
// MaxExecutionTime is the timeout for select statement, in milliseconds.
// If the query takes too long, kill it.
MaxExecutionTime uint64
Expand Down

0 comments on commit fdd7a42

Please sign in to comment.