Skip to content

Commit

Permalink
executor: refactor joinResultGenerator to handle the unmatched oute…
Browse files Browse the repository at this point in the history
…r records (#7288) (#7315)
  • Loading branch information
zz-jason authored Aug 8, 2018
1 parent 15be7e5 commit 343ed54
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 154 deletions.
22 changes: 13 additions & 9 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type lookUpJoinTask struct {
lookupMap *mvmap.MVMap
matchedInners []chunk.Row

doneCh chan error
cursor int
doneCh chan error
cursor int
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -205,16 +206,19 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
}

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Len() == 0 {
err = e.resultGenerator.emit(outerRow, nil, chk)
} else if e.innerIter.Current() != e.innerIter.End() {
err = e.resultGenerator.emit(outerRow, e.innerIter, chk)
}
if err != nil {
return errors.Trace(err)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.resultGenerator.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.resultGenerator.onMissMatch(outerRow, chk)
}
task.cursor++
task.hasMatch = false
}
if chk.NumRows() == e.maxChunkSize {
return nil
Expand Down
44 changes: 24 additions & 20 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,20 +356,14 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
for _, b := range innerPtrs {
Expand All @@ -378,12 +372,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
innerRows = append(innerRows, matchedInner)
}
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
for iter.Begin(); iter.Current() != iter.End(); {
err = e.resultGenerators[workerID].emit(outerRow, iter, joinResult.chk)
matched, err := e.resultGenerators[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
hasMatch = hasMatch || matched

if joinResult.chk.NumRows() == e.maxChunkSize {
ok := true
e.joinResultCh <- joinResult
Expand All @@ -393,6 +390,9 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
}
if !hasMatch {
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
}
return true, joinResult
}

Expand All @@ -419,11 +419,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
err = e.resultGenerators[workerID].emit(outerChk.GetRow(i), nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
e.resultGenerators[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -530,6 +526,7 @@ type NestedLoopApplyExec struct {
innerSelected []bool
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -587,9 +584,9 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
err := e.resultGenerator.emit(outerRow, nil, chk)
if err != nil || chk.NumRows() == e.maxChunkSize {
return nil, errors.Trace(err)
e.resultGenerator.onMissMatch(outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
}
}
Expand Down Expand Up @@ -630,10 +627,15 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.resultGenerator.onMissMatch(*e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
}
Expand All @@ -645,7 +647,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

err = e.resultGenerator.emit(*e.outerRow, e.innerIter, chk)
matched, err := e.resultGenerator.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 343ed54

Please sign in to comment.