Skip to content

Commit

Permalink
cherry pick pingcap#19334 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
crazycs520 authored and ti-srebot committed Oct 13, 2020
1 parent 8c6879c commit b787c50
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 5 deletions.
45 changes: 44 additions & 1 deletion executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type BatchPointGetExec struct {
virtualColumnRetFieldTypes []*types.FieldType

snapshot kv.Snapshot
stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// buildVirtualColumnInfo saves virtual column indices and sort them in definition order
Expand All @@ -78,6 +78,49 @@ func (e *BatchPointGetExec) buildVirtualColumnInfo() {

// Open implements the Executor interface.
func (e *BatchPointGetExec) Open(context.Context) error {
<<<<<<< HEAD
=======
e.snapshotTS = e.startTS
txnCtx := e.ctx.GetSessionVars().TxnCtx
if e.lock {
e.snapshotTS = txnCtx.GetForUpdateTS()
}
txn, err := e.ctx.Txn(false)
if err != nil {
return err
}
e.txn = txn
var snapshot kv.Snapshot
if txn.Valid() && txnCtx.StartTS == txnCtx.GetForUpdateTS() {
// We can safely reuse the transaction snapshot if startTS is equal to forUpdateTS.
// The snapshot may contains cache that can reduce RPC call.
snapshot = txn.GetSnapshot()
} else {
snapshot, err = e.ctx.GetStore().GetSnapshot(kv.Version{Ver: e.snapshotTS})
if err != nil {
return err
}
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.ctx.GetSessionVars().GetReplicaRead().IsFollowerRead() {
snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower)
}
snapshot.SetOption(kv.TaskID, e.ctx.GetSessionVars().StmtCtx.TaskID)
var batchGetter kv.BatchGetter = snapshot
if txn.Valid() {
batchGetter = kv.NewBufferBatchGetter(txn.GetMemBuffer(), &PessimisticLockCacheGetter{txnCtx: txnCtx}, snapshot)
}
e.snapshot = snapshot
e.batchGetter = batchGetter
>>>>>>> 650be7c43... execute: add rpc runtime stats information for insert/update/replace statement (#19334)
return nil
}

Expand Down
13 changes: 13 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5991,6 +5991,7 @@ func (s *testSplitTable) TestKillTableReader(c *C) {
wg.Wait()
}

<<<<<<< HEAD
func (s *testSerialSuite1) TestPrevStmtDesensitization(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test;")
Expand All @@ -6017,6 +6018,8 @@ func (s *testSuite) TestIssue19372(c *C) {
tk.MustQuery("select (select t2.c_str from t2 where t2.c_str <= t1.c_str and t2.c_int in (1, 2) order by t2.c_str limit 1) x from t1 order by c_int;").Check(testkit.Rows("a", "a", "a"))
}

=======
>>>>>>> 650be7c43... execute: add rpc runtime stats information for insert/update/replace statement (#19334)
func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand All @@ -6030,12 +6033,18 @@ func (s *testSuite) TestCollectDMLRuntimeStats(c *C) {
"update t1 set a=a+1 where a=6;",
}

<<<<<<< HEAD
getRootStats := func() string {
=======
for _, sql := range testSQLs {
tk.MustExec(sql)
>>>>>>> 650be7c43... execute: add rpc runtime stats information for insert/update/replace statement (#19334)
info := tk.Se.ShowProcess()
c.Assert(info, NotNil)
p, ok := info.Plan.(plannercore.Plan)
c.Assert(ok, IsTrue)
stats := tk.Se.GetSessionVars().StmtCtx.RuntimeStatsColl.GetRootStats(p.ID())
<<<<<<< HEAD
return stats.String()
}
for _, sql := range testSQLs {
Expand Down Expand Up @@ -6066,4 +6075,8 @@ func (s *testSuite) TestIssue13758(c *C) {
"4",
"<nil>",
))
=======
c.Assert(stats.String(), Matches, "time.*loops.*Get.*num_rpc.*total_time.*")
}
>>>>>>> 650be7c43... execute: add rpc runtime stats information for insert/update/replace statement (#19334)
}
7 changes: 7 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/types"
Expand Down Expand Up @@ -72,6 +73,8 @@ type InsertValues struct {
// https://dev.mysql.com/doc/refman/8.0/en/innodb-auto-increment-handling.html
lazyFillAutoID bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

type defaultVal struct {
Expand Down Expand Up @@ -929,6 +932,21 @@ func (e *InsertValues) handleWarning(err error) {
sc.AppendWarning(err)
}

func (e *InsertValues) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

// batchCheckAndInsert checks rows with duplicate errors.
// All duplicate rows will be ignored and appended as duplicate warnings.
func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.Datum, addRecord func(ctx context.Context, row []types.Datum) (int64, error)) error {
Expand All @@ -946,6 +964,13 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D
return err
}

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Fill cache using BatchGet, the following Get requests don't need to visit TiKV.
if _, err = prefetchUniqueIndices(ctx, txn, toBeCheckedRows); err != nil {
return err
Expand Down
8 changes: 4 additions & 4 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type PointGetExecutor struct {
// virtualColumnRetFieldTypes records the RetFieldTypes of virtual columns.
virtualColumnRetFieldTypes []*types.FieldType

stats *pointGetRuntimeStats
stats *runtimeStatsWithSnapshot
}

// Init set fields needed for PointGetExecutor reuse, this does NOT change baseExecutor field
Expand Down Expand Up @@ -157,7 +157,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {
}
if e.runtimeStats != nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &pointGetRuntimeStats{
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
Expand Down Expand Up @@ -420,12 +420,12 @@ func getColInfoByID(tbl *model.TableInfo, colID int64) *model.ColumnInfo {
return nil
}

type pointGetRuntimeStats struct {
type runtimeStatsWithSnapshot struct {
*execdetails.BasicRuntimeStats
*tikv.SnapshotRuntimeStats
}

func (e *pointGetRuntimeStats) String() string {
func (e *runtimeStatsWithSnapshot) String() string {
var basic, rpcStatsStr string
if e.BasicRuntimeStats != nil {
basic = e.BasicRuntimeStats.String()
Expand Down
7 changes: 7 additions & 0 deletions executor/replace.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,13 @@ func (e *ReplaceExec) exec(ctx context.Context, newRows [][]types.Datum) error {
}
txnSize := txn.Size()

if e.collectRuntimeStatsEnabled() {
if snapshot := txn.GetSnapshot(); snapshot != nil {
snapshot.SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
defer snapshot.DelOption(kv.CollectRuntimeStats)
}
}

// Use BatchGet to fill cache.
// It's an optimization and could be removed without affecting correctness.
if err = prefetchDataCache(ctx, txn, toBeCheckedRows); err != nil {
Expand Down
30 changes: 30 additions & 0 deletions executor/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
Expand All @@ -48,6 +49,8 @@ type UpdateExec struct {
allAssignmentsAreConstant bool
drained bool
memTracker *memory.Tracker

stats *runtimeStatsWithSnapshot
}

func (e *UpdateExec) exec(ctx context.Context, schema *expression.Schema, row, newData []types.Datum) error {
Expand Down Expand Up @@ -166,6 +169,12 @@ func (e *UpdateExec) updateRows(ctx context.Context) (int, error) {
}
memUsageOfChk = chk.MemoryUsage()
e.memTracker.Consume(memUsageOfChk)
if e.collectRuntimeStatsEnabled() {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().SetOption(kv.CollectRuntimeStats, e.stats.SnapshotRuntimeStats)
}
}
for rowIdx := 0; rowIdx < chk.NumRows(); rowIdx++ {
chunkRow := chk.GetRow(rowIdx)
datumRow := chunkRow.GetDatumRow(fields)
Expand Down Expand Up @@ -258,6 +267,12 @@ func (e *UpdateExec) composeNewRow(rowIdx int, oldRow []types.Datum, cols []*tab
// Close implements the Executor Close interface.
func (e *UpdateExec) Close() error {
e.setMessage()
if e.runtimeStats != nil && e.stats != nil {
txn, err := e.ctx.Txn(false)
if err == nil && txn.GetSnapshot() != nil {
txn.GetSnapshot().DelOption(kv.CollectRuntimeStats)
}
}
return e.children[0].Close()
}

Expand All @@ -278,3 +293,18 @@ func (e *UpdateExec) setMessage() {
msg := fmt.Sprintf(mysql.MySQLErrName[mysql.ErrUpdateInfo], numMatched, numChanged, numWarnings)
stmtCtx.SetMessage(msg)
}

func (e *UpdateExec) collectRuntimeStatsEnabled() bool {
if e.runtimeStats != nil {
if e.stats == nil {
snapshotStats := &tikv.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
BasicRuntimeStats: e.runtimeStats,
SnapshotRuntimeStats: snapshotStats,
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
return false
}

0 comments on commit b787c50

Please sign in to comment.