Skip to content

Commit

Permalink
*: collect execution details and output them in slow query log
Browse files Browse the repository at this point in the history
Cherry-pick pingcap#7302
  • Loading branch information
coocood committed Aug 13, 2018
1 parent 1cd0f67 commit 9ccc6f5
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 54 deletions.
4 changes: 3 additions & 1 deletion distsql/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,13 @@ func (r *selectResult) getSelectResp() error {
if err := r.selectResp.Error; err != nil {
return terror.ClassTiKV.New(terror.ErrCode(err.Code), err.Msg)
}
sc := r.ctx.GetSessionVars().StmtCtx
for _, warning := range r.selectResp.Warnings {
r.ctx.GetSessionVars().StmtCtx.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
sc.AppendWarning(terror.ClassTiKV.New(terror.ErrCode(warning.Code), warning.Msg))
}
r.feedback.Update(re.result.GetStartKey(), r.selectResp.OutputCounts)
r.partialCount++
sc.MergeExecDetails(re.result.GetExecDetails())
if len(r.selectResp.Chunks) == 0 {
continue
}
Expand Down
12 changes: 9 additions & 3 deletions distsql/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import (
"github.com/pingcap/tidb/util/charset"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
tipb "github.com/pingcap/tipb/go-tipb"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tipb/go-tipb"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -203,8 +204,13 @@ func (resp *mockResponse) Next(ctx context.Context) (kv.ResultSubset, error) {
// Used only for test.
type mockResultSubset struct{ data []byte }

// GetData implements kv.Response interface.
// GetData implements kv.ResultSubset interface.
func (r *mockResultSubset) GetData() []byte { return r.data }

// GetStartKey implements kv.Response interface.
// GetStartKey implements kv.ResultSubset interface.
func (r *mockResultSubset) GetStartKey() kv.Key { return nil }

// GetExecDetails implements kv.ResultSubset interface.
func (r *mockResultSubset) GetExecDetails() *execdetails.ExecDetails {
return &execdetails.ExecDetails{}
}
8 changes: 4 additions & 4 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,12 +358,12 @@ func (a *ExecStmt) logSlowQuery(txnTS uint64, succ bool) {
user := a.Ctx.GetSessionVars().User
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
} else {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] cost_time:%v succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
"[SLOW_QUERY] cost_time:%v %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
costTime, sessVars.StmtCtx.GetExecDetails(), succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
}
}

Expand Down
3 changes: 3 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package kv

import (
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/util/execdetails"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -207,6 +208,8 @@ type ResultSubset interface {
GetData() []byte
// GetStartKey gets the start key.
GetStartKey() Key
// GetExecDetails gets the detail information.
GetExecDetails() *execdetails.ExecDetails
}

// Response represents the response returned from KV layer.
Expand Down
24 changes: 24 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/memory"
)

Expand Down Expand Up @@ -60,6 +61,7 @@ type StatementContext struct {
foundRows uint64
warnings []SQLWarn
histogramsNotLoad bool
execDetails execdetails.ExecDetails
}

// Copied from SessionVars.TimeZone.
Expand Down Expand Up @@ -199,3 +201,25 @@ func (sc *StatementContext) ResetForRetry() {
sc.mu.warnings = nil
sc.mu.Unlock()
}

// MergeExecDetails merges a single region execution details into self, used to print
// the information in slow query log.
func (sc *StatementContext) MergeExecDetails(details *execdetails.ExecDetails) {
sc.mu.Lock()
sc.mu.execDetails.ProcessTime += details.ProcessTime
sc.mu.execDetails.WaitTime += details.WaitTime
sc.mu.execDetails.BackoffTime += details.BackoffTime
sc.mu.execDetails.RequestCount++
sc.mu.execDetails.TotalKeys += details.TotalKeys
sc.mu.execDetails.ProcessedKeys += details.ProcessedKeys
sc.mu.Unlock()
}

// GetExecDetails gets the execution details for the statement.
func (sc *StatementContext) GetExecDetails() execdetails.ExecDetails {
var details execdetails.ExecDetails
sc.mu.Lock()
details = sc.mu.execDetails
sc.mu.Unlock()
return details
}
100 changes: 54 additions & 46 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/goroutine_pool"
tipb "github.com/pingcap/tipb/go-tipb"
log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request) kv.Response {
it.concurrency = 1
}
if !it.req.KeepOrder {
it.respChan = make(chan copResponse, it.concurrency)
it.respChan = make(chan *copResponse, it.concurrency)
}
it.open(ctx)
return it
Expand All @@ -138,7 +139,7 @@ type copTask struct {
region RegionVerID
ranges *copRanges

respChan chan copResponse
respChan chan *copResponse
storeAddr string
cmdType tikvrpc.CmdType
}
Expand Down Expand Up @@ -273,7 +274,7 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
tasks = append(tasks, &copTask{
region: region,
ranges: ranges,
respChan: make(chan copResponse, 1),
respChan: make(chan *copResponse, 1),
cmdType: cmdType,
})
}
Expand Down Expand Up @@ -379,7 +380,7 @@ type copIterator struct {
curr int

// Otherwise, results are stored in respChan.
respChan chan copResponse
respChan chan *copResponse
wg sync.WaitGroup
}

Expand All @@ -389,7 +390,7 @@ type copIteratorWorker struct {
wg *sync.WaitGroup
store *tikvStore
req *kv.Request
respChan chan<- copResponse
respChan chan<- *copResponse
finishCh <-chan struct{}
}

Expand All @@ -399,15 +400,30 @@ type copIteratorTaskSender struct {
wg *sync.WaitGroup
tasks []*copTask
finishCh <-chan struct{}
respChan chan<- copResponse
respChan chan<- *copResponse
}

type copResponse struct {
*coprocessor.Response
pbResp *coprocessor.Response
execdetails.ExecDetails
startKey kv.Key
err error
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResponse) GetData() []byte {
return rs.pbResp.Data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResponse) GetStartKey() kv.Key {
return rs.startKey
}

func (rs *copResponse) GetExecDetails() *execdetails.ExecDetails {
return &rs.ExecDetails
}

const minLogCopTaskTime = 300 * time.Millisecond

// run is a worker function that get a copTask from channel, handle it and
Expand Down Expand Up @@ -482,7 +498,7 @@ func (sender *copIteratorTaskSender) run() {
}
}

func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan copResponse) (resp copResponse, ok bool, exit bool) {
func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copResponse) (resp *copResponse, ok bool, exit bool) {
select {
case resp, ok = <-respCh:
case <-it.finishCh:
Expand All @@ -506,7 +522,7 @@ func (sender *copIteratorTaskSender) sendToTaskCh(t *copTask) (exit bool) {
return
}

func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- copResponse) (exit bool) {
func (worker *copIteratorWorker) sendToRespCh(resp *copResponse, respCh chan<- *copResponse) (exit bool) {
select {
case respCh <- resp:
case <-worker.finishCh:
Expand All @@ -515,29 +531,13 @@ func (worker *copIteratorWorker) sendToRespCh(resp copResponse, respCh chan<- co
return
}

// copResultSubset implements the kv.ResultSubset interface.
type copResultSubset struct {
data []byte
startKey kv.Key
}

// GetData implements the kv.ResultSubset GetData interface.
func (rs *copResultSubset) GetData() []byte {
return rs.data
}

// GetStartKey implements the kv.ResultSubset GetStartKey interface.
func (rs *copResultSubset) GetStartKey() kv.Key {
return rs.startKey
}

// Next returns next coprocessor result.
// NOTE: Use nil to indicate finish, so if the returned ResultSubset is not nil, reader should continue to call Next().
func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
metrics.TiKVCoprocessorCounter.WithLabelValues("next").Inc()

var (
resp copResponse
resp *copResponse
ok bool
closed bool
)
Expand Down Expand Up @@ -578,20 +578,16 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
if err != nil {
return nil, errors.Trace(err)
}

if resp.Data == nil {
return &copResultSubset{}, nil
}
return &copResultSubset{data: resp.Data, startKey: resp.startKey}, nil
return resp, nil
}

// handleTask handles single copTask, sends the result to channel, retry automatically on error.
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- copResponse) {
func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh chan<- *copResponse) {
remainTasks := []*copTask{task}
for len(remainTasks) > 0 {
tasks, err := worker.handleTaskOnce(bo, remainTasks[0], respCh)
if err != nil {
resp := copResponse{err: errors.Trace(err)}
resp := &copResponse{err: errors.Trace(err)}
worker.sendToRespCh(resp, respCh)
return
}
Expand All @@ -605,7 +601,7 @@ func (worker *copIteratorWorker) handleTask(bo *Backoffer, task *copTask, respCh

// handleTaskOnce handles single copTask, successful results are send to channel.
// If error happened, returns error. If region split or meet lock, returns the remain tasks.
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {

// gofail: var handleTaskOnceError bool
// if handleTaskOnceError {
Expand Down Expand Up @@ -646,7 +642,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch
}

// Handles the response for non-streaming copTask.
return worker.handleCopResponse(bo, resp.Cop, task, ch, nil)
return worker.handleCopResponse(bo, &copResponse{pbResp: resp.Cop}, task, ch, nil)
}

const (
Expand Down Expand Up @@ -697,7 +693,7 @@ func appendScanDetail(logStr string, columnFamily string, scanInfo *kvrpcpb.Scan
return logStr
}

func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- copResponse) ([]*copTask, error) {
func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *tikvrpc.CopStreamResponse, task *copTask, ch chan<- *copResponse) ([]*copTask, error) {
defer stream.Close()
var resp *coprocessor.Response
var lastRange *coprocessor.KeyRange
Expand All @@ -707,7 +703,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
return nil, nil
}
for {
remainedTasks, err := worker.handleCopResponse(bo, resp, task, ch, lastRange)
remainedTasks, err := worker.handleCopResponse(bo, &copResponse{pbResp: resp}, task, ch, lastRange)
if err != nil || len(remainedTasks) != 0 {
return remainedTasks, errors.Trace(err)
}
Expand All @@ -733,16 +729,16 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, stream *ti
// returns more tasks when that happens, or handles the response if no error.
// if we're handling streaming coprocessor response, lastRange is the range of last
// successful response, otherwise it's nil.
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coprocessor.Response, task *copTask, ch chan<- copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.GetRegionError(); regionErr != nil {
func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *copResponse, task *copTask, ch chan<- *copResponse, lastRange *coprocessor.KeyRange) ([]*copTask, error) {
if regionErr := resp.pbResp.GetRegionError(); regionErr != nil {
if err := bo.Backoff(BoRegionMiss, errors.New(regionErr.String())); err != nil {
return nil, errors.Trace(err)
}
// We may meet RegionError at the first packet, but not during visiting the stream.
metrics.TiKVCoprocessorCounter.WithLabelValues("rebuild_task").Inc()
return buildCopTasks(bo, worker.store.regionCache, task.ranges, worker.req.Desc, worker.req.Streaming)
}
if lockErr := resp.GetLocked(); lockErr != nil {
if lockErr := resp.pbResp.GetLocked(); lockErr != nil {
log.Debugf("coprocessor encounters lock: %v", lockErr)
ok, err1 := worker.store.lockResolver.ResolveLocks(bo, []*Lock{NewLock(lockErr)})
if err1 != nil {
Expand All @@ -755,19 +751,31 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, resp *coproces
}
return worker.buildCopTasksFromRemain(bo, lastRange, task)
}
if otherErr := resp.GetOtherError(); otherErr != "" {
if otherErr := resp.pbResp.GetOtherError(); otherErr != "" {
err := errors.Errorf("other error: %s", otherErr)
log.Warnf("coprocessor err: %v", err)
return nil, errors.Trace(err)
}
var startKey kv.Key
// When the request is using streaming API, the `Range` is not nil.
if resp.Range != nil {
startKey = resp.Range.Start
if resp.pbResp.Range != nil {
resp.startKey = resp.pbResp.Range.Start
} else {
startKey = task.ranges.at(0).StartKey
resp.startKey = task.ranges.at(0).StartKey
}
resp.BackoffTime = time.Duration(bo.totalSleep) * time.Millisecond
if pbDetails := resp.pbResp.ExecDetails; pbDetails != nil {
if handleTime := pbDetails.HandleTime; handleTime != nil {
resp.WaitTime = time.Duration(handleTime.WaitMs) * time.Millisecond
resp.ProcessTime = time.Duration(handleTime.ProcessMs) * time.Millisecond
}
if scanDetail := pbDetails.ScanDetail; scanDetail != nil {
if scanDetail.Write != nil {
resp.TotalKeys += scanDetail.Write.Total
resp.ProcessedKeys += scanDetail.Write.Processed
}
}
}
worker.sendToRespCh(copResponse{resp, startKey, nil}, ch)
worker.sendToRespCh(resp, ch)
return nil, nil
}

Expand Down
Loading

0 comments on commit 9ccc6f5

Please sign in to comment.