Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix issue that some query execution stats was omitted when execution was interrupted #51787

Merged
merged 3 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ func recordExecutionSummariesForTiFlashTasks(sctx *stmtctx.StatementContext, exe

func (r *selectResult) updateCopRuntimeStats(ctx context.Context, copStats *copr.CopRuntimeStats, respTime time.Duration) (err error) {
callee := copStats.CalleeAddress
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || callee == "" {
if r.rootPlanID <= 0 || r.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl == nil || (callee == "" && len(copStats.Stats) == 0) {
return
}

Expand Down Expand Up @@ -603,6 +603,13 @@ func (r *selectResult) Close() error {
if respSize > 0 {
r.memConsume(-respSize)
}
if unconsumed, ok := r.resp.(copr.HasUnconsumedCopRuntimeStats); ok && unconsumed != nil {
unconsumedCopStats := unconsumed.CollectUnconsumedCopRuntimeStats()
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
for _, copStats := range unconsumedCopStats {
_ = r.updateCopRuntimeStats(context.Background(), copStats, time.Duration(0))
r.ctx.GetSessionVars().StmtCtx.MergeExecDetails(&copStats.ExecDetails, nil)
}
}
if r.stats != nil {
defer func() {
if ci, ok := r.resp.(copr.CopInfo); ok {
Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ go_test(
"//pkg/util/chunk",
"//pkg/util/context",
"//pkg/util/dbterror/exeerrors",
"//pkg/util/plancodec",
"//pkg/util/replayer",
"//pkg/util/sqlkiller",
"//pkg/util/syncutil",
Expand Down
11 changes: 11 additions & 0 deletions pkg/server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"github.com/pingcap/tidb/pkg/util/arena"
"github.com/pingcap/tidb/pkg/util/chunk"
"github.com/pingcap/tidb/pkg/util/dbterror/exeerrors"
"github.com/pingcap/tidb/pkg/util/plancodec"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/stretchr/testify/require"
tikverr "github.com/tikv/client-go/v2/error"
Expand Down Expand Up @@ -720,6 +721,16 @@ func TestConnExecutionTimeout(t *testing.T) {
tk.MustQuery("select SLEEP(1);").Check(testkit.Rows("0"))
err := tk.QueryToErr("select * FROM testTable2 WHERE SLEEP(1);")
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
// Test executor stats when execution time exceeded.
tk.MustExec("set @@tidb_slow_log_threshold=300")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep", `return(150)`))
err = tk.QueryToErr("select /*+ max_execution_time(600), set_var(tikv_client_read_timeout=100) */ * from testTable2")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/unistoreRPCSlowByInjestSleep"))
require.Error(t, err)
require.Equal(t, "[executor:3024]Query execution was interrupted, maximum statement execution time exceeded", err.Error())
planInfo, err := plancodec.DecodePlan(tk.Session().GetSessionVars().StmtCtx.GetEncodedPlan())
require.NoError(t, err)
require.Regexp(t, "TableReader.*cop_task: {num: .*, rpc_num: .*, rpc_time: .*", planInfo)

// Killed because of max execution time, reset Killed to 0.
tk.Session().GetSessionVars().SQLKiller.SendKillSignal(sqlkiller.MaxExecTimeExceeded)
Expand Down
66 changes: 56 additions & 10 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,8 @@ type copIterator struct {
storeBatchedNum atomic.Uint64
storeBatchedFallbackNum atomic.Uint64

runawayChecker *resourcegroup.RunawayChecker
runawayChecker *resourcegroup.RunawayChecker
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan.
Expand All @@ -723,6 +724,7 @@ type copIteratorWorker struct {

storeBatchedNum *atomic.Uint64
storeBatchedFallbackNum *atomic.Uint64
unconsumedStats *unconsumedCopRuntimeStats
}

// copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit.
Expand Down Expand Up @@ -833,6 +835,7 @@ func (worker *copIteratorWorker) run(ctx context.Context) {
func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableCollectExecutionInfo bool) {
taskCh := make(chan *copTask, 1)
smallTaskCh := make(chan *copTask, 1)
it.unconsumedStats = &unconsumedCopRuntimeStats{}
it.wg.Add(it.concurrency + it.smallTaskConcurrency)
// Start it.concurrency number of workers to handle cop requests.
for i := 0; i < it.concurrency+it.smallTaskConcurrency; i++ {
Expand All @@ -857,6 +860,7 @@ func (it *copIterator) open(ctx context.Context, enabledRateLimitAction, enableC
pagingTaskIdx: &it.pagingTaskIdx,
storeBatchedNum: &it.storeBatchedNum,
storeBatchedFallbackNum: &it.storeBatchedFallbackNum,
unconsumedStats: it.unconsumedStats,
}
go worker.run(ctx)
}
Expand Down Expand Up @@ -1096,6 +1100,23 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return resp, nil
}

// HasUnconsumedCopRuntimeStats indicate whether has unconsumed CopRuntimeStats.
type HasUnconsumedCopRuntimeStats interface {
// CollectUnconsumedCopRuntimeStats returns unconsumed CopRuntimeStats.
CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats
}

func (it *copIterator) CollectUnconsumedCopRuntimeStats() []*CopRuntimeStats {
if it == nil || it.unconsumedStats == nil {
return nil
}
it.unconsumedStats.Lock()
stats := make([]*CopRuntimeStats, 0, len(it.unconsumedStats.stats))
stats = append(stats, it.unconsumedStats.stats...)
it.unconsumedStats.Unlock()
return stats
}

// Associate each region with an independent backoffer. In this way, when multiple regions are
// unavailable, TiDB can execute very quickly without blocking
func chooseBackoffer(ctx context.Context, backoffermap map[uint64]*Backoffer, task *copTask, worker *copIteratorWorker) *Backoffer {
Expand Down Expand Up @@ -1261,6 +1282,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
err = worker.handleTiDBSendReqErr(err, task, ch)
return nil, err
}
worker.collectUnconsumedCopRuntimeStats(bo, rpcCtx)
return nil, errors.Trace(err)
}

Expand Down Expand Up @@ -1758,17 +1780,24 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
if resp.detail == nil {
resp.detail = new(CopRuntimeStats)
}
resp.detail.Stats = worker.kvclient.Stats
worker.collectCopRuntimeStats(resp.detail, bo, rpcCtx, resp)
}

func (worker *copIteratorWorker) collectCopRuntimeStats(copStats *CopRuntimeStats, bo *Backoffer, rpcCtx *tikv.RPCContext, resp *copResponse) {
copStats.Stats = worker.kvclient.Stats
backoffTimes := bo.GetBackoffTimes()
resp.detail.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
copStats.BackoffTime = time.Duration(bo.GetTotalSleep()) * time.Millisecond
copStats.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
copStats.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
copStats.BackoffTimes[backoff] = backoffTimes[backoff]
copStats.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
copStats.CalleeAddress = rpcCtx.Addr
}
if resp == nil {
return
}
sd := &util.ScanDetail{}
td := util.TimeDetail{}
Expand All @@ -1791,8 +1820,20 @@ func (worker *copIteratorWorker) handleCollectExecutionInfo(bo *Backoffer, rpcCt
}
}
}
resp.detail.ScanDetail = sd
resp.detail.TimeDetail = td
copStats.ScanDetail = sd
copStats.TimeDetail = td
}

func (worker *copIteratorWorker) collectUnconsumedCopRuntimeStats(bo *Backoffer, rpcCtx *tikv.RPCContext) {
if worker.kvclient.Stats == nil {
return
}
copStats := &CopRuntimeStats{}
worker.collectCopRuntimeStats(copStats, bo, rpcCtx, nil)
worker.unconsumedStats.Lock()
worker.unconsumedStats.stats = append(worker.unconsumedStats.stats, copStats)
worker.unconsumedStats.Unlock()
worker.kvclient.Stats = nil
}

// CopRuntimeStats contains execution detail information.
Expand All @@ -1803,6 +1844,11 @@ type CopRuntimeStats struct {
CoprCacheHit bool
}

type unconsumedCopRuntimeStats struct {
sync.Mutex
stats []*CopRuntimeStats
}

func (worker *copIteratorWorker) handleTiDBSendReqErr(err error, task *copTask, ch chan<- *copResponse) error {
errCode := errno.ErrUnknown
errMsg := err.Error()
Expand Down
4 changes: 4 additions & 0 deletions pkg/store/mockstore/unistore/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
}
})
failpoint.Inject("unistoreRPCSlowByInjestSleep", func(val failpoint.Value) {
time.Sleep(time.Duration(val.(int) * int(time.Millisecond)))
failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{Message: "Deadline is exceeded"}))
})

select {
case <-ctx.Done():
Expand Down