Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: collect execution details and output them in slow query log #7302

Merged
merged 5 commits into from
Aug 10, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -204,12 +205,17 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
// Used only for test.
type mockResultSubset struct{ data []byte }

// GetData implements kv.Response interface.
// GetData implements kv.ResultSubset interface.
func (r *mockResultSubset) GetData() []byte { return r.data }

// GetStartKey implements kv.Response interface.
// GetStartKey implements kv.ResultSubset interface.
func (r *mockResultSubset) GetStartKey() kv.Key { return nil }

// GetExecDetails implements kv.ResultSubset interface.
func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}

func populateBuffer() []byte {
numCols := 4
numRows := 1024
Expand Down
4 changes: 3 additions & 1 deletion distsql/select_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,13 @@ func (r *selectResult) getSelectResp() error {
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
sc := r.ctx.GetSessionVars().StmtCtx
for _, warning := range r.selectResp.Warnings {
r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to print execution details for every coprocessor request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sometimes there are hundreds of coprocessor requests, then the SLOW_QUERY log would be unreadable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sometime there is a data skew in some key ranges, and this data skew can slow down the coprocessor performance and cause other coprocessor request waits for processing.

if we merge all the coprocessor request for a user query, we can not recognize the data skew.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we maintain a max?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main problem this PR solves is to distinguish culprit queries among SLOW queries.
The total time is enough for this purpose.

if len(r.selectResp.Chunks) == 0 {
continue
}
Expand Down
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
user := a.Ctx.GetSessionVars().User
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[SLOW_QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
}
}

Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -212,6 +213,8 @@ type ResultSubset interface {
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
}

// Response represents the response returned from KV layer.
Expand Down
22 changes: 22 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -67,6 +68,7 @@ type StatementContext struct {
foundRows uint64
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
}

// Copied from SessionVars.TimeZone.
Expand Down Expand Up @@ -236,3 +238,23 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.warnings = nil
sc.mu.Unlock()
}

// MergeExecDetails merges a single region execution details into self, used to print
// the information in slow query log.
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) {
sc.mu.Lock()
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we need to make execDetails r/w lock-free?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the lock contention is very low.

}

// GetExecDetails gets the execution details for the statement.
func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
sc.mu.Lock()
details = sc.mu.execDetails
sc.mu.Unlock()
return details
}
100 changes: 56 additions & 44 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/goroutine_pool"
"github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
it.concurrency = 1
}
if !it.req.KeepOrder {
it.respChan = make(chan copResponse, it.concurrency)
it.respChan = make(chan *copResponse, it.concurrency)
}
it.open(ctx)
return it
Expand All @@ -114,7 +115,7 @@ type copTask struct {
region RegionVerID
ranges *copRanges

respChan chan copResponse
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
}
Expand Down Expand Up @@ -247,7 +248,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
tasks = append(tasks, &copTask{
region: region,
ranges: ranges,
respChan: make(chan copResponse, 1),
respChan: make(chan *copResponse, 1),
cmdType: cmdType,
})
}
Expand Down Expand Up @@ -353,7 +354,7 @@ type copIterator struct {
curr int

// Otherwise, results are stored in respChan.
respChan chan copResponse
respChan chan *copResponse
wg sync.WaitGroup

vars *kv.Variables
Expand All @@ -365,7 +366,7 @@ type copIteratorWorker struct {
wg *sync.WaitGroup
store *tikvStore
req *kv.Request
respChan chan<- copResponse
respChan chan<- *copResponse
finishCh <-chan struct{}
vars *kv.Variables
}
Expand All @@ -376,15 +377,30 @@ type copIteratorTaskSender struct {
wg *sync.WaitGroup
tasks []*copTask
finishCh <-chan struct{}
respChan chan<- copResponse
respChan chan<- *copResponse
}

type copResponse struct {
*coprocessor.Response
pbResp *coprocessor.Response
execdetails.ExecDetails
startKey kv.Key
err error
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResponse) GetData() []byte {
return rs.pbResp.Data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResponse) GetStartKey() kv.Key {
return rs.startKey
}

func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails {
return &rs.ExecDetails
}

const minLogCopTaskTime = 300 * time.Millisecond

// run is a worker function that get a copTask from channel, handle it and
Expand Down Expand Up @@ -462,7 +478,7 @@ func (sender *copIteratorTaskSender) run() {
}
}

func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan copResponse) (resp copResponse, ok bool, exit bool) {
func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copResponse) (resp *copResponse, ok bool, exit bool) {
select {
case resp, ok = <-respCh:
case <-it.finishCh:
Expand All @@ -486,7 +502,7 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
return
}

func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- copResponse) (exit bool) {
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
select {
case respCh <- resp:
case <-worker.finishCh:
Expand All @@ -495,27 +511,11 @@ func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- co
return
}

// copResultSubset implements the kv.ResultSubset interface.
type copResultSubset struct {
data []byte
startKey kv.Key
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResultSubset) GetData() []byte {
return rs.data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResultSubset) GetStartKey() kv.Key {
return rs.startKey
}

// Next returns next coprocessor result.
// NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next().
func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
var (
resp copResponse
resp *copResponse
ok bool
closed bool
)
Expand Down Expand Up @@ -557,19 +557,19 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
return nil, errors.Trace(err)
}

if resp.Data == nil {
return &copResultSubset{}, nil
if resp.pbResp.Data == nil {
return &copResponse{}, nil
}
return &copResultSubset{data: resp.Data, startKey: resp.startKey}, nil
return resp, nil
}

// handleTask handles single copTask, sends the result to channel, retry automatically on error.
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- copResponse) {
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
remainTasks := []*copTask{task}
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
if err != nil {
resp := copResponse{err: errors.Trace(err)}
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
return
}
Expand All @@ -583,7 +583,7 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh

// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {

// gofail: var handleTaskOnceError bool
// if handleTaskOnceError {
Expand Down Expand Up @@ -623,7 +623,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, resp.Cop, task, ch, nil)
return worker.handleCopResponse(bo, &copResponse{pbResp: resp.Cop}, task, ch, nil)
}

const (
Expand Down Expand Up @@ -674,7 +674,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -684,7 +684,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, resp, task, ch, lastRange)
remainedTasks, err := worker.handleCopResponse(bo, &copResponse{pbResp: resp}, task, ch, lastRange)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand All @@ -710,15 +710,15 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan<- copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.GetRegionError(); regionErr != nil {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming)
}
if lockErr := resp.GetLocked(); lockErr != nil {
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
log.Debugf("coprocessor encounters lock: %v", lockErr)
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
Expand All @@ -731,19 +731,31 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coproces
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
}
if otherErr := resp.GetOtherError(); otherErr != "" {
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
log.Warnf("coprocessor err: %v", err)
return nil, errors.Trace(err)
}
var startKey kv.Key
// When the request is using streaming API, the `Range` is not nil.
if resp.Range != nil {
startKey = resp.Range.Start
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else {
startKey = task.ranges.at(0).StartKey
resp.startKey = task.ranges.at(0).StartKey
}
resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
}
if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
if scanDetail.Write != nil {
resp.TotalKeys += scanDetail.Write.Total
resp.ProcessedKeys += scanDetail.Write.Processed
}
}
}
worker.sendToRespCh(copResponse{resp, startKey, nil}, ch)
worker.sendToRespCh(resp, ch)
return nil, nil
}

Expand Down
Loading