Skip to content

Commit

Permalink
executor: add transaction commit runtime informati ... (#19366) (#20185)
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Oct 3, 2020
1 parent 2a758ef commit 3320248
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 36 deletions.
10 changes: 10 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,16 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool, hasMoreResults bool) {
stmtDetail = *(stmtDetailRaw.(*execdetails.StmtExecDetails))
}
execDetail := sessVars.StmtCtx.GetExecDetails()

// Attach commit runtime stats to executor runtime stats.
if execDetail.CommitDetail != nil && sessVars.StmtCtx.RuntimeStatsColl != nil {
stats := sessVars.StmtCtx.RuntimeStatsColl.GetRootStats(a.Plan.ID())
statsWithCommit := &execdetails.RuntimeStatsWithCommit{
RuntimeStats: stats,
Commit: execDetail.CommitDetail,
}
sessVars.StmtCtx.RuntimeStatsColl.RegisterStats(a.Plan.ID(), statsWithCommit)
}
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := plannercore.GetStatsInfo(a.Plan)
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
Expand Down
82 changes: 82 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package execdetails

import (
"bytes"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -491,3 +492,84 @@ func (e *RuntimeStatsWithConcurrencyInfo) String() string {
}
return result
}

// RuntimeStatsWithCommit is the RuntimeStats with commit detail.
type RuntimeStatsWithCommit struct {
RuntimeStats
Commit *CommitDetails
}

func (e *RuntimeStatsWithCommit) String() string {
var result string
if e.RuntimeStats != nil {
result = e.RuntimeStats.String()
}
if e.Commit == nil {
return result
}
buf := bytes.NewBuffer(make([]byte, 0, len(result)+32))
buf.WriteString(result)
if e.Commit.PrewriteTime > 0 {
buf.WriteString(", prewrite:")
buf.WriteString(e.Commit.PrewriteTime.String())
}
if e.Commit.WaitPrewriteBinlogTime > 0 {
buf.WriteString(", wait_prewrite_binlog:")
buf.WriteString(e.Commit.WaitPrewriteBinlogTime.String())
}
if e.Commit.GetCommitTsTime > 0 {
buf.WriteString(", get_commit_ts:")
buf.WriteString(e.Commit.GetCommitTsTime.String())
}
if e.Commit.CommitTime > 0 {
buf.WriteString(", commit:")
buf.WriteString(e.Commit.CommitTime.String())
}
commitBackoffTime := atomic.LoadInt64(&e.Commit.CommitBackoffTime)
if commitBackoffTime > 0 {
buf.WriteString(", commit_backoff: {time: ")
buf.WriteString(time.Duration(commitBackoffTime).String())
tpMap := make(map[string]struct{})
tpArray := []string{}
e.Commit.Mu.Lock()
if len(e.Commit.Mu.BackoffTypes) > 0 {
for _, tp := range e.Commit.Mu.BackoffTypes {
tpStr := tp.String()
_, ok := tpMap[tpStr]
if ok {
continue
}
tpMap[tpStr] = struct{}{}
tpArray = append(tpArray, tpStr)
}
buf.WriteString(", type: ")
sort.Strings(tpArray)
buf.WriteString(fmt.Sprintf("%v", tpArray))
}
e.Commit.Mu.Unlock()
buf.WriteString("}")
}
if e.Commit.ResolveLockTime > 0 {
buf.WriteString(", resolve_lock: ")
buf.WriteString(time.Duration(e.Commit.ResolveLockTime).String())
}

prewriteRegionNum := atomic.LoadInt32(&e.Commit.PrewriteRegionNum)
if prewriteRegionNum > 0 {
buf.WriteString(", region_num:")
buf.WriteString(strconv.FormatInt(int64(prewriteRegionNum), 10))
}
if e.Commit.WriteKeys > 0 {
buf.WriteString(", write_keys:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteKeys), 10))
}
if e.Commit.WriteSize > 0 {
buf.WriteString(", write_byte:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.WriteSize), 10))
}
if e.Commit.TxnRetry > 0 {
buf.WriteString(", txn_retry:")
buf.WriteString(strconv.FormatInt(int64(e.Commit.TxnRetry), 10))
}
return buf.String()
}
73 changes: 37 additions & 36 deletions util/execdetails/execdetails_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package execdetails

import (
"fmt"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -121,40 +120,42 @@ func TestCopRuntimeStats(t *testing.T) {
}
}

func TestCopRuntimeStatsForTiFlash(t *testing.T) {
stats := NewRuntimeStatsColl()
tableScanID := 1
aggID := 2
tableReaderID := 3
stats.RecordOneCopTask(aggID, "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(1, 1, 1, "tablescan_"+strconv.Itoa(tableScanID)))
stats.RecordOneCopTask(aggID, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(2, 2, 2, "tablescan_"+strconv.Itoa(tableScanID)))
stats.RecordOneCopTask(tableScanID, "8.8.8.8", mockExecutorExecutionSummaryForTiFlash(3, 3, 3, "aggregation_"+strconv.Itoa(aggID)))
stats.RecordOneCopTask(tableScanID, "8.8.8.9", mockExecutorExecutionSummaryForTiFlash(4, 4, 4, "aggregation_"+strconv.Itoa(aggID)))
if stats.ExistsCopStats(tableScanID) != true {
t.Fatal("exist")
}
cop := stats.GetCopStats(tableScanID)
if cop.String() != "proc max:2ns, min:1ns, p80:2ns, p95:2ns, iters:3, tasks:2" {
t.Fatal("table_scan")
}
copStats := cop.stats["8.8.8.8"]
if copStats == nil {
t.Fatal("cop stats is nil")
}
copStats[0].SetRowNum(10)
copStats[0].Record(time.Second, 10)
if copStats[0].String() != "time:1.000000001s, loops:2" {
t.Fatalf("cop stats string is not expect, got: %v", copStats[0].String())
}

if stats.GetCopStats(aggID).String() != "proc max:4ns, min:3ns, p80:4ns, p95:4ns, iters:7, tasks:2" {
t.Fatal("agg")
}
rootStats := stats.GetRootStats(tableReaderID)
if rootStats == nil {
t.Fatal("table_reader")
}
if stats.ExistsRootStats(tableReaderID) == false {
t.Fatal("table_reader not exists")
func TestRuntimeStatsWithCommit(t *testing.T) {
basicStats := &BasicRuntimeStats{
loop: 1,
consume: int64(time.Second),
}
commitDetail := &CommitDetails{
GetCommitTsTime: time.Second,
PrewriteTime: time.Second,
CommitTime: time.Second,
CommitBackoffTime: int64(time.Second),
Mu: struct {
sync.Mutex
BackoffTypes []fmt.Stringer
}{BackoffTypes: []fmt.Stringer{
stringutil.MemoizeStr(func() string {
return "backoff1"
}),
stringutil.MemoizeStr(func() string {
return "backoff2"
}),
stringutil.MemoizeStr(func() string {
return "backoff1"
}),
}},
ResolveLockTime: int64(time.Second),
WriteKeys: 3,
WriteSize: 66,
PrewriteRegionNum: 5,
TxnRetry: 2,
}
stats := &RuntimeStatsWithCommit{
RuntimeStats: basicStats,
Commit: commitDetail,
}
expect := "time:1s, loops:1, prewrite:1s, get_commit_ts:1s, commit:1s, commit_backoff: {time: 1s, type: [backoff1 backoff2]}, resolve_lock: 1s, region_num:5, write_keys:3, write_byte:66, txn_retry:2"
if stats.String() != expect {
t.Fatalf("%v != %v", stats.String(), expect)
}
}

0 comments on commit 3320248

Please sign in to comment.