Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#49218
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
XuHuaiyu authored and ti-chi-bot committed Dec 13, 2023
1 parent 721bb1a commit 3e8d7ed
Show file tree
Hide file tree
Showing 4 changed files with 986 additions and 41 deletions.
3 changes: 3 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4225,6 +4225,9 @@ func buildKvRangesForIndexJoin(ctx sessionctx.Context, tableID, indexID int64, l
}
}
if len(kvRanges) != 0 && memTracker != nil {
failpoint.Inject("testIssue49033", func() {
panic("testIssue49033")
})
memTracker.Consume(int64(2 * cap(kvRanges[0].StartKey) * len(kvRanges)))
}
if len(tmpDatumRanges) != 0 && memTracker != nil {
Expand Down
104 changes: 63 additions & 41 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ type IndexNestedLoopHashJoin struct {

stats *indexLookUpJoinRuntimeStats
prepared bool
// panicErr records the error generated by panic recover. This is introduced to
// return the actual error message instead of `context cancelled` to the client.
panicErr error
ctxWithCancel context.Context
}

type indexHashJoinOuterWorker struct {
Expand Down Expand Up @@ -165,7 +169,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.stats.concurrency = concurrency
}
workerCtx, cancelFunc := context.WithCancel(ctx)
e.cancelFunc = cancelFunc
e.ctxWithCancel, e.cancelFunc = workerCtx, cancelFunc
innerCh := make(chan *indexHashJoinTask, concurrency)
if e.keepOuterOrder {
e.taskCh = make(chan *indexHashJoinTask, concurrency)
Expand All @@ -178,7 +182,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.joinChkResourceCh = make([]chan *chunk.Chunk, concurrency)
e.workerWg.Add(1)
ow := e.newOuterWorker(innerCh)
go util.WithRecovery(func() { ow.run(workerCtx) }, e.finishJoinWorkers)
go util.WithRecovery(func() { ow.run(e.ctxWithCancel) }, e.finishJoinWorkers)

for i := 0; i < concurrency; i++ {
if !e.keepOuterOrder {
Expand All @@ -195,7 +199,7 @@ func (e *IndexNestedLoopHashJoin) startWorkers(ctx context.Context) {
e.workerWg.Add(concurrency)
for i := 0; i < concurrency; i++ {
workerID := i
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(workerCtx, cancelFunc) }, e.finishJoinWorkers)
go util.WithRecovery(func() { e.newInnerWorker(innerCh, workerID).run(e.ctxWithCancel, cancelFunc) }, e.finishJoinWorkers)
}
go e.wait4JoinWorkers()
}
Expand All @@ -210,6 +214,7 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
task := &indexHashJoinTask{err: err}
e.taskCh <- task
}
e.panicErr = err
if e.cancelFunc != nil {
e.cancelFunc()
}
Expand All @@ -235,59 +240,39 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
}
req.Reset()
if e.keepOuterOrder {
return e.runInOrder(ctx, req)
return e.runInOrder(e.ctxWithCancel, req)
}
// unordered run
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-e.resultCh:
if !ok {
return nil
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
return e.runUnordered(e.ctxWithCancel, req)
}

func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chunk) error {
var (
result *indexHashJoinResult
ok bool
)
for {
if e.isDryUpTasks(ctx) {
return nil
}
if e.curTask.err != nil {
return e.curTask.err
}
select {
case result, ok = <-e.curTask.resultCh:
if !ok {
e.curTask = nil
continue
}
if result.err != nil {
return result.err
}
case <-ctx.Done():
return ctx.Err()
result, err := e.getResultFromChannel(ctx, e.curTask.resultCh)
if err != nil {
return err
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
if result == nil {
e.curTask = nil
continue
}
return e.handleResult(req, result)
}
}

func (e *IndexNestedLoopHashJoin) runUnordered(ctx context.Context, req *chunk.Chunk) error {
result, err := e.getResultFromChannel(ctx, e.resultCh)
if err != nil {
return err
}
return e.handleResult(req, result)
}

// isDryUpTasks indicates whether all the tasks have been processed.
func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
if e.curTask != nil {
Expand All @@ -305,6 +290,38 @@ func (e *IndexNestedLoopHashJoin) isDryUpTasks(ctx context.Context) bool {
return false
}

func (e *IndexNestedLoopHashJoin) getResultFromChannel(ctx context.Context, resultCh <-chan *indexHashJoinResult) (*indexHashJoinResult, error) {
var (
result *indexHashJoinResult
ok bool
)
select {
case result, ok = <-resultCh:
if !ok {
return nil, nil
}
if result.err != nil {
return nil, result.err
}
case <-ctx.Done():
err := e.panicErr
if err == nil {
err = ctx.Err()
}
return nil, err
}
return result, nil
}

func (*IndexNestedLoopHashJoin) handleResult(req *chunk.Chunk, result *indexHashJoinResult) error {
if result == nil {
return nil
}
req.SwapColumns(result.chk)
result.src <- result.chk
return nil
}

// Close implements the IndexNestedLoopHashJoin Executor interface.
func (e *IndexNestedLoopHashJoin) Close() error {
if e.cancelFunc != nil {
Expand All @@ -326,7 +343,12 @@ func (e *IndexNestedLoopHashJoin) Close() error {
e.joinChkResourceCh = nil
e.finished.Store(false)
e.prepared = false
<<<<<<< HEAD:executor/index_lookup_hash_join.go
return e.baseExecutor.Close()
=======
e.ctxWithCancel = nil
return e.BaseExecutor.Close()
>>>>>>> 9f612e3762a (pkg/executor: fix the hang issue in indexHashJoin (#49218)):pkg/executor/index_lookup_hash_join.go
}

func (ow *indexHashJoinOuterWorker) run(ctx context.Context) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/executor/test/jointest/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "jointest_test",
timeout = "moderate",
srcs = [
"join_test.go",
"main_test.go",
],
flaky = True,
race = "on",
shard_count = 8,
deps = [
"//pkg/config",
"//pkg/meta/autoid",
"//pkg/session",
"//pkg/testkit",
"//pkg/util/dbterror/exeerrors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
],
)
Loading

0 comments on commit 3e8d7ed

Please sign in to comment.