Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: refactor joinResultGenerator to handle the unmatched outer records #7288

Merged
merged 15 commits into from
Aug 8, 2018
22 changes: 13 additions & 9 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,9 @@ type lookUpJoinTask struct {
lookupMap *mvmap.MVMap
matchedInners []chunk.Row

doneCh chan error
cursor int
doneCh chan error
cursor int
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -205,16 +206,19 @@ func (e *IndexLookUpJoin) Next(ctx context.Context, chk *chunk.Chunk) error {
}

outerRow := task.outerResult.GetRow(task.cursor)
if e.innerIter.Len() == 0 {
err = e.resultGenerator.emit(outerRow, nil, chk)
} else if e.innerIter.Current() != e.innerIter.End() {
err = e.resultGenerator.emit(outerRow, e.innerIter, chk)
}
if err != nil {
return errors.Trace(err)
if e.innerIter.Current() != e.innerIter.End() {
matched, err := e.resultGenerator.tryToMatch(outerRow, e.innerIter, chk)
if err != nil {
return errors.Trace(err)
}
task.hasMatch = task.hasMatch || matched
}
if e.innerIter.Current() == e.innerIter.End() {
if !task.hasMatch {
e.resultGenerator.onMissMatch(outerRow, chk)
}
task.cursor++
task.hasMatch = false
}
if chk.NumRows() == e.maxChunkSize {
return nil
Expand Down
44 changes: 24 additions & 20 deletions executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,20 +385,14 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
return false, joinResult
}
if hasNull {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
e.hashTableValBufs[workerID] = e.hashTable.Get(joinKey, e.hashTableValBufs[workerID][:0])
innerPtrs := e.hashTableValBufs[workerID]
if len(innerPtrs) == 0 {
err = e.resultGenerators[workerID].emit(outerRow, nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
}
return err == nil, joinResult
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
return true, joinResult
}
innerRows := make([]chunk.Row, 0, len(innerPtrs))
for _, b := range innerPtrs {
Expand All @@ -407,12 +401,15 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
innerRows = append(innerRows, matchedInner)
}
iter := chunk.NewIterator4Slice(innerRows)
hasMatch := false
for iter.Begin(); iter.Current() != iter.End(); {
err = e.resultGenerators[workerID].emit(outerRow, iter, joinResult.chk)
matched, err := e.resultGenerators[workerID].tryToMatch(outerRow, iter, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
hasMatch = hasMatch || matched

if joinResult.chk.NumRows() == e.maxChunkSize {
ok := true
e.joinResultCh <- joinResult
Expand All @@ -422,6 +419,9 @@ func (e *HashJoinExec) joinMatchedOuterRow2Chunk(workerID uint, outerRow chunk.R
}
}
}
if !hasMatch {
e.resultGenerators[workerID].onMissMatch(outerRow, joinResult.chk)
}
return true, joinResult
}

Expand All @@ -448,11 +448,7 @@ func (e *HashJoinExec) join2Chunk(workerID uint, outerChk *chunk.Chunk, joinResu
}
for i := range selected {
if !selected[i] { // process unmatched outer rows
err = e.resultGenerators[workerID].emit(outerChk.GetRow(i), nil, joinResult.chk)
if err != nil {
joinResult.err = errors.Trace(err)
return false, joinResult
}
e.resultGenerators[workerID].onMissMatch(outerChk.GetRow(i), joinResult.chk)
} else { // process matched outer rows
ok, joinResult = e.joinMatchedOuterRow2Chunk(workerID, outerChk.GetRow(i), joinResult)
if !ok {
Expand Down Expand Up @@ -578,6 +574,7 @@ type NestedLoopApplyExec struct {
innerSelected []bool
innerIter chunk.Iterator
outerRow *chunk.Row
hasMatch bool

memTracker *memory.Tracker // track memory usage.
}
Expand Down Expand Up @@ -635,9 +632,9 @@ func (e *NestedLoopApplyExec) fetchSelectedOuterRow(ctx context.Context, chk *ch
if selected {
return &outerRow, nil
} else if e.outer {
err := e.resultGenerator.emit(outerRow, nil, chk)
if err != nil || chk.NumRows() == e.maxChunkSize {
return nil, errors.Trace(err)
e.resultGenerator.onMissMatch(outerRow, chk)
if chk.NumRows() == e.maxChunkSize {
return nil, nil
}
}
}
Expand Down Expand Up @@ -678,10 +675,15 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
chk.Reset()
for {
if e.innerIter == nil || e.innerIter.Current() == e.innerIter.End() {
if e.outerRow != nil && !e.hasMatch {
e.resultGenerator.onMissMatch(*e.outerRow, chk)
}
e.outerRow, err = e.fetchSelectedOuterRow(ctx, chk)
if e.outerRow == nil || err != nil {
return errors.Trace(err)
}
e.hasMatch = false

for _, col := range e.outerSchema {
*col.Data = e.outerRow.GetDatum(col.Index, col.RetType)
}
Expand All @@ -693,7 +695,9 @@ func (e *NestedLoopApplyExec) Next(ctx context.Context, chk *chunk.Chunk) (err e
e.innerIter.Begin()
}

err = e.resultGenerator.emit(*e.outerRow, e.innerIter, chk)
matched, err := e.resultGenerator.tryToMatch(*e.outerRow, e.innerIter, chk)
e.hasMatch = e.hasMatch || matched

if err != nil || chk.NumRows() == e.maxChunkSize {
return errors.Trace(err)
}
Expand Down
Loading