diff --git a/executor/batch_point_get.go b/executor/batch_point_get.go index 9f808dc57a155..9c4be7d3ccb1b 100644 --- a/executor/batch_point_get.go +++ b/executor/batch_point_get.go @@ -65,7 +65,7 @@ type BatchPointGetExec struct { virtualColumnRetFieldTypes []*types.FieldType snapshot kv.Snapshot - stats *pointGetRuntimeStats + stats *runtimeStatsWithSnapshot } // buildVirtualColumnInfo saves virtual column indices and sort them in definition order @@ -104,7 +104,7 @@ func (e *BatchPointGetExec) Open(context.Context) error { } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &pointGetRuntimeStats{ + e.stats = &runtimeStatsWithSnapshot{ BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } diff --git a/executor/executor_test.go b/executor/executor_test.go index 3c364dd76bd45..667f9f5299ddd 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6169,3 +6169,27 @@ func (s *testSuite) TestKillTableReader(c *C) { atomic.StoreUint32(&tk.Se.GetSessionVars().Killed, 1) wg.Wait() } + +func (s *testSuite) TestCollectDMLRuntimeStats(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("create table t1 (a int, b int, unique index (a))") + + testSQLs := []string{ + "insert ignore into t1 values (5,5);", + "insert into t1 values (5,5) on duplicate key update a=a+1;", + "replace into t1 values (5,6),(6,7)", + "update t1 set a=a+1 where a=6;", + } + + for _, sql := range testSQLs { + tk.MustExec(sql) + info := tk.Se.ShowProcess() + c.Assert(info, NotNil) + p, ok := info.Plan.(plannercore.Plan) + c.Assert(ok, IsTrue) + stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID()) + c.Assert(stats.String(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*") + } +} diff --git a/executor/insert.go b/executor/insert.go index eff87f9438e33..d1c6fa6318e6c 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -192,6 +192,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D return err } + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { diff --git a/executor/insert_common.go b/executor/insert_common.go index 74746a08cfcfe..582f98753f2b2 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/types" @@ -72,6 +73,8 @@ type InsertValues struct { // https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html lazyFillAutoID bool memTracker *memory.Tracker + + stats *runtimeStatsWithSnapshot } type defaultVal struct { @@ -924,6 +927,21 @@ func (e *InsertValues) handleWarning(err error) { sc.AppendWarning(err) } +func (e *InsertValues) collectRuntimeStatsEnabled() bool { + if e.runtimeStats != nil { + if e.stats == nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + return true + } + return false +} + // batchCheckAndInsert checks rows with duplicate errors. // All duplicate rows will be ignored and appended as duplicate warnings. func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) error) error { @@ -941,6 +959,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D return err } + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Fill cache using BatchGet, the following Get requests don't need to visit TiKV. if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil { return err diff --git a/executor/point_get.go b/executor/point_get.go index 673567faa10c8..fd8429135f488 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -86,7 +86,7 @@ type PointGetExecutor struct { // virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns. virtualColumnRetFieldTypes []*types.FieldType - stats *pointGetRuntimeStats + stats *runtimeStatsWithSnapshot } // Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field @@ -139,7 +139,7 @@ func (e *PointGetExecutor) Open(context.Context) error { } if e.runtimeStats != nil { snapshotStats := &tikv.SnapshotRuntimeStats{} - e.stats = &pointGetRuntimeStats{ + e.stats = &runtimeStatsWithSnapshot{ BasicRuntimeStats: e.runtimeStats, SnapshotRuntimeStats: snapshotStats, } @@ -465,12 +465,12 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo { return nil } -type pointGetRuntimeStats struct { +type runtimeStatsWithSnapshot struct { *execdetails.BasicRuntimeStats *tikv.SnapshotRuntimeStats } -func (e *pointGetRuntimeStats) String() string { +func (e *runtimeStatsWithSnapshot) String() string { var basic, rpcStatsStr string if e.BasicRuntimeStats != nil { basic = e.BasicRuntimeStats.String() diff --git a/executor/replace.go b/executor/replace.go index febf07e7e0eb0..261c14c85dbd6 100644 --- a/executor/replace.go +++ b/executor/replace.go @@ -192,6 +192,13 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error { } txnSize := txn.Size() + if e.collectRuntimeStatsEnabled() { + if snapshot := txn.GetSnapshot(); snapshot != nil { + snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + defer snapshot.DelOption(kv.CollectRuntimeStats) + } + } + // Use BatchGet to fill cache. // It's an optimization and could be removed without affecting correctness. if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil { diff --git a/executor/update.go b/executor/update.go index fab711617f3bd..f85585dc4c66b 100644 --- a/executor/update.go +++ b/executor/update.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" @@ -47,6 +48,8 @@ type UpdateExec struct { allAssignmentsAreConstant bool drained bool memTracker *memory.Tracker + + stats *runtimeStatsWithSnapshot } func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error { @@ -168,6 +171,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) { } memUsageOfChk = chk.MemoryUsage() e.memTracker.Consume(memUsageOfChk) + if e.collectRuntimeStatsEnabled() { + txn, err := e.ctx.Txn(false) + if err == nil && txn.GetSnapshot() != nil { + txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats) + } + } for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ { chunkRow := chk.GetRow(rowIdx) datumRow := chunkRow.GetDatumRow(fields) @@ -260,6 +269,12 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab // Close implements the Executor Close interface. func (e *UpdateExec) Close() error { e.setMessage() + if e.runtimeStats != nil && e.stats != nil { + txn, err := e.ctx.Txn(false) + if err == nil && txn.GetSnapshot() != nil { + txn.GetSnapshot().DelOption(kv.CollectRuntimeStats) + } + } return e.children[0].Close() } @@ -280,3 +295,18 @@ func (e *UpdateExec) setMessage() { msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUpdateInfo], numMatched, numChanged, numWarnings) stmtCtx.SetMessage(msg) } + +func (e *UpdateExec) collectRuntimeStatsEnabled() bool { + if e.runtimeStats != nil { + if e.stats == nil { + snapshotStats := &tikv.SnapshotRuntimeStats{} + e.stats = &runtimeStatsWithSnapshot{ + BasicRuntimeStats: e.runtimeStats, + SnapshotRuntimeStats: snapshotStats, + } + e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats) + } + return true + } + return false +}