Skip to content

Commit

Permalink
execute: add rpc runtime stats information for insert/update/replace …
Browse files Browse the repository at this point in the history
…statement (#19334)
  • Loading branch information
crazycs520 authored Aug 24, 2020
1 parent d152b51 commit 650be7c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 6 deletions.
4 changes: 2 additions & 2 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type BatchPointGetExec struct {
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand Down Expand Up @@ -104,7 +104,7 @@ func (e *BatchPointGetExec) Open(context.Context) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down
24 changes: 24 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6169,3 +6169,27 @@ func (s *testSuite) TestKillTableReader(c *C) {
atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1)
wg.Wait()
}

func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("create table t1 (a int, b int, unique index (a))")

testSQLs := []string{
"insert ignore into t1 values (5,5);",
"insert into t1 values (5,5) on duplicate key update a=a+1;",
"replace into t1 values (5,6),(6,7)",
"update t1 set a=a+1 where a=6;",
}

for _, sql := range testSQLs {
tk.MustExec(sql)
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
c.Assert(stats.String(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
}
}
7 changes: 7 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -72,6 +73,8 @@ type InsertValues struct {
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

type defaultVal struct {
Expand Down Expand Up @@ -924,6 +927,21 @@ func (e *InsertValues) handleWarning(err error) {
sc.AppendWarning(err)
}

func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error {
Expand All @@ -941,6 +959,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type PointGetExecutor struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -139,7 +139,7 @@ func (e *PointGetExecutor) Open(context.Context) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down Expand Up @@ -465,12 +465,12 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
return nil
}

type pointGetRuntimeStats struct {
type runtimeStatsWithSnapshot struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
func (e *runtimeStatsWithSnapshot) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
Expand Down
7 changes: 7 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
txnSize := txn.Size()

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -47,6 +48,8 @@ type UpdateExec struct {
allAssignmentsAreConstant bool
drained bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error {
Expand Down Expand Up @@ -168,6 +171,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand Down Expand Up @@ -260,6 +269,12 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab
// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().DelOption(kv.CollectRuntimeStats)
}
}
return e.children[0].Close()
}

Expand All @@ -280,3 +295,18 @@ func (e *UpdateExec) setMessage() {
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUpdateInfo], numMatched, numChanged, numWarnings)
stmtCtx.SetMessage(msg)
}

func (e *UpdateExec) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

0 comments on commit 650be7c

Please sign in to comment.