Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#39394
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
wshwsh12 authored and ti-chi-bot committed Nov 29, 2022
1 parent 5ccc10b commit bf74e2b
Show file tree
Hide file tree
Showing 26 changed files with 339 additions and 82 deletions.
11 changes: 9 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,13 @@ func TestSelectWithRuntimeStats(t *testing.T) {
}

func TestSelectResultRuntimeStats(t *testing.T) {
<<<<<<< HEAD
t.Parallel()
basic := &execdetails.BasicRuntimeStats{}
=======
stmtStats := execdetails.NewRuntimeStatsColl(nil)
basic := stmtStats.GetBasicRuntimeStats(1)
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
basic.Record(time.Second, 20)
s1 := &selectResultRuntimeStats{
copRespTime: []time.Duration{time.Second, time.Millisecond},
Expand All @@ -124,8 +129,6 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}

s2 := *s1
stmtStats := execdetails.NewRuntimeStatsColl(nil)
stmtStats.RegisterStats(1, basic)
stmtStats.RegisterStats(1, s1)
stmtStats.RegisterStats(1, &s2)
stats := stmtStats.GetRootStats(1)
Expand All @@ -140,7 +143,11 @@ func TestSelectResultRuntimeStats(t *testing.T) {
}
stmtStats.RegisterStats(2, s1)
stats = stmtStats.GetRootStats(2)
<<<<<<< HEAD
expect = "cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00}, backoff{RegionMiss: 1ms}"
=======
expect = "time:0s, loops:0, cop_task: {num: 2, max: 1s, min: 1ms, avg: 500.5ms, p95: 1s, max_proc_keys: 200, p95_proc_keys: 200, tot_proc: 1s, tot_wait: 1s, rpc_num: 1, rpc_time: 1s, copr_cache_hit_ratio: 0.00, distsql_concurrency: 15}, backoff{RegionMiss: 1ms}"
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
require.Equal(t, expect, stats.String())
// Test for idempotence.
require.Equal(t, expect, stats.String())
Expand Down
5 changes: 3 additions & 2 deletions distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,12 +359,10 @@ func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr
}

if r.stats == nil {
id := r.rootPlanID
r.stats = &selectResultRuntimeStats{
backoffSleep: make(map[string]time.Duration),
rpcStat: tikv.NewRegionRequestRuntimeStats(),
}
r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, r.stats)
}
r.stats.mergeCopRuntimeStats(copStats, respTime)

Expand Down Expand Up @@ -455,6 +453,9 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if r.stats != nil {
defer r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(r.rootPlanID, r.stats)
}
return r.resp.Close()
}

Expand Down
6 changes: 3 additions & 3 deletions distsql/select_result_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestUpdateCopRuntimeStats(t *testing.T) {
require.Nil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)

sr.rootPlanID = 1234
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "a"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "a"}}}, 0)

ctx.GetSessionVars().StmtCtx.RuntimeStatsColl = execdetails.NewRuntimeStatsColl(nil)
i := uint64(1)
Expand All @@ -49,13 +49,13 @@ func TestUpdateCopRuntimeStats(t *testing.T) {

require.NotEqual(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.False(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.ExistsCopStats(1234))

sr.copPlanIDs = []int{sr.rootPlanID}
require.NotNil(t, ctx.GetSessionVars().StmtCtx.RuntimeStatsColl)
require.Equal(t, len(sr.copPlanIDs), len(sr.selectResp.GetExecutionSummaries()))

sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{CalleeAddress: "callee"}}, 0)
sr.updateCopRuntimeStats(context.Background(), &copr.CopRuntimeStats{ExecDetails: execdetails.ExecDetails{DetailsNeedP90: execdetails.DetailsNeedP90{CalleeAddress: "callee"}}}, 0)
require.Equal(t, "tikv_task:{time:1ns, loops:1}", ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetOrCreateCopStats(1234, "tikv").String())
}
4 changes: 3 additions & 1 deletion executor/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ func (d *HashAggIntermData) getPartialResultBatch(sc *stmtctx.StatementContext,

// Close implements the Executor Close interface.
func (e *HashAggExec) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.isUnparallelExec {
var firstErr error
e.childResult = nil
Expand Down Expand Up @@ -1102,7 +1105,6 @@ func (e *HashAggExec) initRuntimeStats() {
stats.PartialStats = make([]*AggWorkerStat, 0, stats.PartialConcurrency)
stats.FinalStats = make([]*AggWorkerStat, 0, stats.FinalConcurrency)
e.stats = stats
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down
3 changes: 3 additions & 0 deletions executor/batch_point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ func (e *BatchPointGetExec) Open(context.Context) error {

// Close implements the Executor interface.
func (e *BatchPointGetExec) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.runtimeStats != nil && e.snapshot != nil {
e.snapshot.SetOption(kv.CollectRuntimeStats, nil)
}
Expand Down
13 changes: 13 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4390,6 +4390,19 @@ func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan
b.err = err
return nil
}
<<<<<<< HEAD
=======
if e.ctx.GetSessionVars().IsReplicaReadClosestAdaptive() {
e.snapshot.SetOption(kv.ReplicaReadAdjuster, newReplicaReadAdjuster(e.ctx, plan.GetAvgRowSize()))
}
if e.runtimeStats != nil {
snapshotStats := &txnsnapshot.SnapshotRuntimeStats{}
e.stats = &runtimeStatsWithSnapshot{
SnapshotRuntimeStats: snapshotStats,
}
e.snapshot.SetOption(kv.CollectRuntimeStats, snapshotStats)
}
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))

startTS, err := b.getSnapshotTS()
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,15 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, task *lookup

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
<<<<<<< HEAD
if e.table.Meta().TempTableType != model.TempTableNone {
=======
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.kvRanges = e.kvRanges[:0]
if e.dummy {
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
return nil
}

Expand Down Expand Up @@ -820,7 +828,7 @@ func (w *indexWorker) fetchHandles(ctx context.Context, result distsql.SelectRes
idxID := w.idxLookup.getIndexPlanRootID()
if w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if idxID != w.idxLookup.id && w.idxLookup.stats != nil {
w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(idxID, w.idxLookup.stats.indexScanBasicStats)
w.idxLookup.stats.indexScanBasicStats = w.idxLookup.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(idxID)
}
}
for {
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,7 @@ func newBaseExecutor(ctx sessionctx.Context, schema *expression.Schema, id int,
}
if ctx.GetSessionVars().StmtCtx.RuntimeStatsColl != nil {
if e.id > 0 {
e.runtimeStats = &execdetails.BasicRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(id, e.runtimeStats)
e.runtimeStats = e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(id)
}
}
if schema != nil {
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,6 @@ func (e *IndexNestedLoopHashJoin) Open(ctx context.Context) error {
e.innerPtrBytes = make([][]byte, 0, 8)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.finished.Store(false)
e.startWorkers(ctx)
Expand Down Expand Up @@ -304,6 +303,9 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
Expand Down
4 changes: 3 additions & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,6 @@ func (e *IndexLookUpJoin) Open(ctx context.Context) error {
e.finished.Store(false)
if e.runtimeStats != nil {
e.stats = &indexLookUpJoinRuntimeStats{}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
e.startWorkers(ctx)
return nil
Expand Down Expand Up @@ -779,6 +778,9 @@ func (iw *innerWorker) hasNullInJoinKey(row chunk.Row) bool {

// Close implements the Executor interface.
func (e *IndexLookUpJoin) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand Down
3 changes: 3 additions & 0 deletions executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,9 @@ func (imw *innerMergeWorker) fetchNextInnerResult(ctx context.Context, task *loo

// Close implements the Executor interface.
func (e *IndexLookUpMergeJoin) Close() error {
if e.runtimeStats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.runtimeStats)
}
if e.cancelFunc != nil {
e.cancelFunc()
e.cancelFunc = nil
Expand Down
7 changes: 4 additions & 3 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,6 @@ func (e *IndexMergeReaderExecutor) initRuntimeStats() {
e.stats = &IndexMergeRuntimeStat{
Concurrency: e.ctx.GetSessionVars().IndexLookupConcurrency(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
}

Expand Down Expand Up @@ -624,6 +623,9 @@ func (e *IndexMergeReaderExecutor) handleHandlesFetcherPanic(ctx context.Context

// Close implements Exec Close interface.
func (e *IndexMergeReaderExecutor) Close() error {
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
if e.finished == nil {
return nil
}
Expand Down Expand Up @@ -741,8 +743,7 @@ func (w *partialIndexWorker) fetchHandles(
var basicStats *execdetails.BasicRuntimeStats
if w.stats != nil {
if w.idxID != 0 {
basicStats = &execdetails.BasicRuntimeStats{}
w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(w.idxID, basicStats)
basicStats = w.sc.GetSessionVars().StmtCtx.RuntimeStatsColl.GetBasicRuntimeStats(w.idxID)
}
}
for {
Expand Down
7 changes: 7 additions & 0 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,13 @@ func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *InsertExec) Close() error {
<<<<<<< HEAD
=======
if e.runtimeStats != nil && e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
defer e.memTracker.ReplaceBytesUsed(0)
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
e.ctx.GetSessionVars().CurrInsertValues = chunk.Row{}
e.ctx.GetSessionVars().CurrInsertBatchExtraCols = e.ctx.GetSessionVars().CurrInsertBatchExtraCols[0:0:0]
e.setMessage()
Expand Down
1 change: 0 additions & 1 deletion executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,7 +1031,6 @@ func (e *InsertValues) collectRuntimeStatsEnabled() bool {
SnapshotRuntimeStats: snapshotStats,
AllocatorRuntimeStats: autoid.NewAllocatorRuntimeStats(),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return true
}
Expand Down
21 changes: 19 additions & 2 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func (e *HashJoinExec) Close() error {
if e.stats != nil && e.rowContainer != nil {
e.stats.hashStat = *e.rowContainer.stat
}
if e.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
err := e.baseExecutor.Close()
return err
}
Expand Down Expand Up @@ -183,7 +186,6 @@ func (e *HashJoinExec) Open(ctx context.Context) error {
e.stats = &hashJoinRuntimeStats{
concurrent: cap(e.joiners),
}
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.stats)
}
return nil
}
Expand Down Expand Up @@ -841,7 +843,6 @@ func (e *NestedLoopApplyExec) Close() error {
e.memTracker = nil
if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.canUseCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -851,6 +852,11 @@ func (e *NestedLoopApplyExec) Close() error {
} else {
runtimeStats.setCacheInfo(false, 0)
}
<<<<<<< HEAD
=======
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", 0))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
>>>>>>> 23543a4805 (*: merge the runtime stats in time to avoid using too many memory (#39394))
}
return e.outerExec.Close()
}
Expand Down Expand Up @@ -1108,6 +1114,17 @@ func (e *joinRuntimeStats) Tp() int {
return execdetails.TpJoinRuntimeStats
}

func (e *joinRuntimeStats) Clone() execdetails.RuntimeStats {
newJRS := &joinRuntimeStats{
RuntimeStatsWithConcurrencyInfo: e.RuntimeStatsWithConcurrencyInfo,
applyCache: e.applyCache,
cache: e.cache,
hasHashStat: e.hasHashStat,
hashStat: e.hashStat,
}
return newJRS
}

type hashJoinRuntimeStats struct {
fetchAndBuildHashTable time.Duration
hashStat hashStatistic
Expand Down
3 changes: 3 additions & 0 deletions executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func (e *LoadDataExec) Next(ctx context.Context, req *chunk.Chunk) error {

// Close implements the Executor Close interface.
func (e *LoadDataExec) Close() error {
if e.runtimeStats != nil && e.loadDataInfo != nil && e.loadDataInfo.stats != nil {
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, e.loadDataInfo.stats)
}
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/parallel_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func (e *ParallelNestedLoopApplyExec) Close() error {

if e.runtimeStats != nil {
runtimeStats := newJoinRuntimeStats()
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
if e.useCache {
var hitRatio float64
if e.cacheAccessCounter > 0 {
Expand All @@ -187,6 +186,7 @@ func (e *ParallelNestedLoopApplyExec) Close() error {
runtimeStats.setCacheInfo(false, 0)
}
runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", e.concurrency))
defer e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}
return err
}
Expand Down
Loading

0 comments on commit bf74e2b

Please sign in to comment.