Skip to content

Commit

Permalink
executor: Count the number of hash table collisions and build elapse …
Browse files Browse the repository at this point in the history
…during Hash Join. (#14423)
  • Loading branch information
SunRunAway authored Feb 5, 2020
1 parent 72b1f0e commit f2d0e90
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 4 deletions.
16 changes: 16 additions & 0 deletions executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
package executor

import (
"fmt"
"hash"
"hash/fnv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -73,10 +75,20 @@ func (hc *hashContext) initHash(rows int) {
}
}

type hashStatistic struct {
probeCollision int
buildTableElapse time.Duration
}

func (s *hashStatistic) String() string {
return fmt.Sprintf("probe collision:%v, build:%v", s.probeCollision, s.buildTableElapse)
}

// hashRowContainer handles the rows and the hash map of a table.
type hashRowContainer struct {
sc *stmtctx.StatementContext
hCtx *hashContext
stat hashStatistic

// hashTable stores the map of hashKey and RowPtr
hashTable *rowHashMap
Expand Down Expand Up @@ -130,6 +142,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
return
}
if !ok {
c.stat.probeCollision++
continue
}
matched = append(matched, matchedRow)
Expand Down Expand Up @@ -162,6 +175,9 @@ func (c *hashRowContainer) PutChunk(chk *chunk.Chunk) error {
// key of hash table: hash value of key columns
// value of hash table: RowPtr of the corresponded row
func (c *hashRowContainer) PutChunkSelected(chk *chunk.Chunk, selected []bool) error {
start := time.Now()
defer func() { c.stat.buildTableElapse += time.Since(start) }()

chkIdx := uint32(c.rowContainer.NumChunks())
err := c.rowContainer.Add(chk)
if err != nil {
Expand Down
16 changes: 12 additions & 4 deletions executor/hash_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,25 @@ func (s *pkgTestSuite) TestHashRowContainer(c *C) {
hashFunc := func() hash.Hash64 {
return fnv.New64()
}
s.testHashRowContainer(c, hashFunc, false)
s.testHashRowContainer(c, hashFunc, true)
rowContainer := s.testHashRowContainer(c, hashFunc, false)
c.Assert(rowContainer.stat.probeCollision, Equals, 0)
c.Assert(rowContainer.stat.buildTableElapse > 0, IsTrue)

rowContainer = s.testHashRowContainer(c, hashFunc, true)
c.Assert(rowContainer.stat.probeCollision, Equals, 0)
c.Assert(rowContainer.stat.buildTableElapse > 0, IsTrue)

h := &hashCollision{count: 0}
hashFuncCollision := func() hash.Hash64 {
return h
}
s.testHashRowContainer(c, hashFuncCollision, false)
rowContainer = s.testHashRowContainer(c, hashFuncCollision, false)
c.Assert(h.count > 0, IsTrue)
c.Assert(rowContainer.stat.probeCollision > 0, IsTrue)
c.Assert(rowContainer.stat.buildTableElapse > 0, IsTrue)
}

func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, spill bool) {
func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, spill bool) *hashRowContainer {
sctx := mock.NewContext()
var err error
numRows := 10
Expand Down Expand Up @@ -176,4 +183,5 @@ func (s *pkgTestSuite) testHashRowContainer(c *C, hashFunc func() hash.Hash64, s
c.Assert(len(matched), Equals, 2)
c.Assert(matched[0].GetDatumRow(colTypes), DeepEquals, chk0.GetRow(1).GetDatumRow(colTypes))
c.Assert(matched[1].GetDatumRow(colTypes), DeepEquals, chk1.GetRow(1).GetDatumRow(colTypes))
return rowContainer
}
1 change: 1 addition & 0 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ func (e *HashJoinExec) Close() error {
if e.runtimeStats != nil {
concurrency := cap(e.joiners)
e.runtimeStats.SetConcurrencyInfo("Concurrency", concurrency)
e.runtimeStats.SetAdditionalInfo(e.rowContainer.stat.String())
}
err := e.baseExecutor.Close()
return err
Expand Down
14 changes: 14 additions & 0 deletions util/execdetails/execdetails.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,13 @@ type RuntimeStats struct {
// executor return row count.
rows int64

// protect concurrency
mu sync.Mutex
// executor concurrency information
concurrency []concurrencyInfo

// additional information for executors
additionalInfo string
}

// NewRuntimeStatsColl creates new executor collector.
Expand Down Expand Up @@ -452,6 +456,13 @@ func (e *RuntimeStats) SetConcurrencyInfo(name string, num int) {
e.concurrency = append(e.concurrency, concurrencyInfo{concurrencyName: name, concurrencyNum: num})
}

// SetAdditionalInfo sets the additional information.
func (e *RuntimeStats) SetAdditionalInfo(info string) {
e.mu.Lock()
e.additionalInfo = info
e.mu.Unlock()
}

func (e *RuntimeStats) String() string {
result := fmt.Sprintf("time:%v, loops:%d, rows:%d", time.Duration(e.consume), e.loop, e.rows)
if len(e.concurrency) > 0 {
Expand All @@ -463,5 +474,8 @@ func (e *RuntimeStats) String() string {
}
}
}
if len(e.additionalInfo) > 0 {
result += ", " + e.additionalInfo
}
return result
}

0 comments on commit f2d0e90

Please sign in to comment.