Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

execute: add rpc runtime stats information for insert/update/replace statement #19334

Merged
merged 11 commits into from
Aug 24, 2020
4 changes: 2 additions & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type BatchPointGetExec struct {
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand Down Expand Up @@ -104,7 +104,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down
24 changes: 24 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6169,3 +6169,27 @@ func (s *testSuite) TestKillTableReader(c *C) {
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1)
wg.Wait()
}

func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, unique index (a))")

testSQLs := []string{
"insert ignore into t1 values (5,5);",
"insert into t1 values (5,5) on duplicate key update a=a+1;",
"replace into t1 values (5,6),(6,7)",
"update t1 set a=a+1 where a=6;",
}

for _, sql := range testSQLs {
tk.MustExec(sql)
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
c.Assert(stats.String(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
}
}
7 changes: 7 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -72,6 +73,8 @@ type InsertValues struct {
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

type defaultVal struct {
Expand Down Expand Up @@ -924,6 +927,21 @@ func (e *InsertValues) handleWarning(err error) {
sc.AppendWarning(err)
}

func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error {
Expand All @@ -941,6 +959,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type PointGetExecutor struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -139,7 +139,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down Expand Up @@ -465,12 +465,12 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
return nil
}

type pointGetRuntimeStats struct {
type runtimeStatsWithSnapshot struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
func (e *runtimeStatsWithSnapshot) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
Expand Down
7 changes: 7 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
txnSize := txn.Size()

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -47,6 +48,8 @@ type UpdateExec struct {
allAssignmentsAreConstant bool
drained bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error {
Expand Down Expand Up @@ -168,6 +171,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand Down Expand Up @@ -260,6 +269,12 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab
// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().DelOption(kv.CollectRuntimeStats)
}
}
return e.children[0].Close()
}

Expand All @@ -280,3 +295,18 @@ func (e *UpdateExec) setMessage() {
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUpdateInfo], numMatched, numChanged, numWarnings)
stmtCtx.SetMessage(msg)
}

func (e *UpdateExec) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}