Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49218
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
XuHuaiyu authored and ti-chi-bot committed Dec 13, 2023
1 parent e091ee5 commit 14a0a3a
Show file tree
Hide file tree
Showing 4 changed files with 966 additions and 41 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4790,6 +4790,9 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
}
}
if len(kvRanges) != 0 && memTracker != nil {
failpoint.Inject("testIssue49033", func() {
panic("testIssue49033")
})
memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges)))
}
if len(tmpDatumRanges) != 0 && memTracker != nil {
Expand Down
104 changes: 63 additions & 41 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ type IndexNestedLoopHashJoin struct {

stats *indexLookUpJoinRuntimeStats
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
ctxWithCancel context.Context
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -149,7 +153,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.cancelFunc = cancelFunc
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
if e.keepOuterOrder {
e.taskCh = make(chan *indexHashJoinTask, concurrency)
Expand All @@ -162,7 +166,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
e.workerWg.Add(1)
ow := e.newOuterWorker(innerCh)
go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers)
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)

for i := 0; i < concurrency; i++ {
if !e.keepOuterOrder {
Expand All @@ -179,7 +183,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.workerWg.Add(concurrency)
for i := 0; i < concurrency; i++ {
workerID := i
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers)
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
}
go e.wait4JoinWorkers()
}
Expand All @@ -194,6 +198,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand All @@ -219,59 +224,39 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
return e.runInOrder(e.ctxWithCancel, req)
}
// unordered run
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-e.resultCh:
if !ok {
return nil
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
return e.runUnordered(e.ctxWithCancel, req)
}

func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
var (
result *indexHashJoinResult
ok bool
)
for {
if e.isDryUpTasks(ctx) {
return nil
}
if e.curTask.err != nil {
return e.curTask.err
}
select {
case result, ok = <-e.curTask.resultCh:
if !ok {
e.curTask = nil
continue
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
if err != nil {
return err
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
if result == nil {
e.curTask = nil
continue
}
return e.handleResult(req, result)
}
}

func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
result, err := e.getResultFromChannel(ctx, e.resultCh)
if err != nil {
return err
}
return e.handleResult(req, result)
}

// isDryUpTasks indicates whether all the tasks have been processed.
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
if e.curTask != nil {
Expand All @@ -289,6 +274,38 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
return false
}

func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-resultCh:
if !ok {
return nil, nil
}
if result.err != nil {
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
if err == nil {
err = ctx.Err()
}
return nil, err
}
return result, nil
}

func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
if result == nil {
return nil
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
}

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.stats != nil {
Expand All @@ -311,7 +328,12 @@ func (e *IndexNestedLoopHashJoin) Close() error {
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
<<<<<<< HEAD:executor/index_lookup_hash_join.go
return e.baseExecutor.Close()
=======
e.ctxWithCancel = nil
return e.BaseExecutor.Close()
>>>>>>> 9f612e3762a (pkg/executor: fix the hang issue in indexHashJoin (#49218)):pkg/executor/index_lookup_hash_join.go
}

func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
Expand Down
4 changes: 4 additions & 0 deletions executor/jointest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD:executor/jointest/BUILD.bazel
shard_count = 50,
=======
shard_count = 8,
>>>>>>> 9f612e3762a (pkg/executor: fix the hang issue in indexHashJoin (#49218)):pkg/executor/test/jointest/BUILD.bazel
deps = [
"//config",
"//meta/autoid",
Expand Down
Loading

0 comments on commit 14a0a3a

Please sign in to comment.