diff --git a/pkg/distsql/select_result.go b/pkg/distsql/select_result.go index 55d3da05d363b..63a71b3779628 100644 --- a/pkg/distsql/select_result.go +++ b/pkg/distsql/select_result.go @@ -502,7 +502,7 @@ func recordExecutionSummariesForTiFlashTasks(sctx *stmtctx.StatementContext, exe func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) { callee := copStats.CalleeAddress - if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" { + if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && len(copStats.Stats) == 0) { return } @@ -603,6 +603,13 @@ func (r *selectResult) Close() error { if respSize > 0 { r.memConsume(-respSize) } + if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil { + unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats() + for _, copStats := range unconsumedCopStats { + _ = r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0)) + r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil) + } + } if r.stats != nil { defer func() { if ci, ok := r.resp.(copr.CopInfo); ok { diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index ef63c250b22e9..c6b7ad1e527f7 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -190,6 +190,7 @@ go_test( "//pkg/util/chunk", "//pkg/util/context", "//pkg/util/dbterror/exeerrors", + "//pkg/util/plancodec", "//pkg/util/replayer", "//pkg/util/sqlkiller", "//pkg/util/syncutil", diff --git a/pkg/server/conn_test.go b/pkg/server/conn_test.go index d4353af605020..25161e4536260 100644 --- a/pkg/server/conn_test.go +++ b/pkg/server/conn_test.go @@ -52,6 +52,7 @@ import ( "github.com/pingcap/tidb/pkg/util/arena" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" + "github.com/pingcap/tidb/pkg/util/plancodec" "github.com/pingcap/tidb/pkg/util/sqlkiller" "github.com/stretchr/testify/require" tikverr "github.com/tikv/client-go/v2/error" @@ -720,6 +721,16 @@ func TestConnExecutionTimeout(t *testing.T) { tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0")) err := tk.QueryToErr("select * FROM testTable2 WHERE SLEEP(1);") require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error()) + // Test executor stats when execution time exceeded. + tk.MustExec("set @@tidb_slow_log_threshold=300") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`)) + err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2") + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep")) + require.Error(t, err) + require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error()) + planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan()) + require.NoError(t, err) + require.Regexp(t, "TableReader.*cop_task: {num: .*, rpc_num: .*, rpc_time: .*", planInfo) // Killed because of max execution time, reset Killed to 0. tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded) diff --git a/pkg/store/copr/coprocessor.go b/pkg/store/copr/coprocessor.go index 3abfba960f186..d4e6bb045d974 100644 --- a/pkg/store/copr/coprocessor.go +++ b/pkg/store/copr/coprocessor.go @@ -700,7 +700,8 @@ type copIterator struct { storeBatchedNum atomic.Uint64 storeBatchedFallbackNum atomic.Uint64 - runawayChecker *resourcegroup.RunawayChecker + runawayChecker *resourcegroup.RunawayChecker + unconsumedStats *unconsumedCopRuntimeStats } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -723,6 +724,7 @@ type copIteratorWorker struct { storeBatchedNum *atomic.Uint64 storeBatchedFallbackNum *atomic.Uint64 + unconsumedStats *unconsumedCopRuntimeStats } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -833,6 +835,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) { func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) { taskCh := make(chan *copTask, 1) smallTaskCh := make(chan *copTask, 1) + it.unconsumedStats = &unconsumedCopRuntimeStats{} it.wg.Add(it.concurrency + it.smallTaskConcurrency) // Start it.concurrency number of workers to handle cop requests. for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ { @@ -857,6 +860,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC pagingTaskIdx: &it.pagingTaskIdx, storeBatchedNum: &it.storeBatchedNum, storeBatchedFallbackNum: &it.storeBatchedFallbackNum, + unconsumedStats: it.unconsumedStats, } go worker.run(ctx) } @@ -1096,6 +1100,23 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) { return resp, nil } +// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats. +type HasUnconsumedCopRuntimeStats interface { + // CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats. + CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats +} + +func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats { + if it == nil || it.unconsumedStats == nil { + return nil + } + it.unconsumedStats.Lock() + stats := make([]*CopRuntimeStats, 0, len(it.unconsumedStats.stats)) + stats = append(stats, it.unconsumedStats.stats...) + it.unconsumedStats.Unlock() + return stats +} + // Associate each region with an independent backoffer. In this way, when multiple regions are // unavailable, TiDB can execute very quickly without blocking func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer { @@ -1261,6 +1282,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch err = worker.handleTiDBSendReqErr(err, task, ch) return nil, err } + worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx) return nil, errors.Trace(err) } @@ -1758,17 +1780,24 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt if resp.detail == nil { resp.detail = new(CopRuntimeStats) } - resp.detail.Stats = worker.kvclient.Stats + worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp) +} + +func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) { + copStats.Stats = worker.kvclient.Stats backoffTimes := bo.GetBackoffTimes() - resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond - resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) - resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) + copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond + copStats.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) + copStats.BackoffTimes = make(map[string]int, len(backoffTimes)) for backoff := range backoffTimes { - resp.detail.BackoffTimes[backoff] = backoffTimes[backoff] - resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond + copStats.BackoffTimes[backoff] = backoffTimes[backoff] + copStats.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond } if rpcCtx != nil { - resp.detail.CalleeAddress = rpcCtx.Addr + copStats.CalleeAddress = rpcCtx.Addr + } + if resp == nil { + return } sd := &util.ScanDetail{} td := util.TimeDetail{} @@ -1791,8 +1820,20 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt } } } - resp.detail.ScanDetail = sd - resp.detail.TimeDetail = td + copStats.ScanDetail = sd + copStats.TimeDetail = td +} + +func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) { + if worker.kvclient.Stats == nil { + return + } + copStats := &CopRuntimeStats{} + worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil) + worker.unconsumedStats.Lock() + worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats) + worker.unconsumedStats.Unlock() + worker.kvclient.Stats = nil } // CopRuntimeStats contains execution detail information. @@ -1803,6 +1844,11 @@ type CopRuntimeStats struct { CoprCacheHit bool } +type unconsumedCopRuntimeStats struct { + sync.Mutex + stats []*CopRuntimeStats +} + func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error { errCode := errno.ErrUnknown errMsg := err.Error() diff --git a/pkg/store/mockstore/unistore/rpc.go b/pkg/store/mockstore/unistore/rpc.go index c2f7ac78a00e7..b255138dff63b 100644 --- a/pkg/store/mockstore/unistore/rpc.go +++ b/pkg/store/mockstore/unistore/rpc.go @@ -93,6 +93,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"})) } }) + failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) { + time.Sleep(time.Duration(val.(int) * int(time.Millisecond))) + failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"})) + }) select { case <-ctx.Done():