Skip to content

Commit

Permalink
executor: fix index join bug caused by innerWorker panic (pingcap#31563
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot authored Jun 24, 2022
1 parent 5701597 commit 2f4f52b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 29 deletions.
39 changes: 39 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8815,6 +8815,45 @@ func (s *testSerialSuite) TestIssue28650(c *C) {
}
}

func (s *testSerialSuite) TestIndexJoin31494(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t1, t2;")
tk.MustExec("create table t1(a int(11) default null, b int(11) default null, key(b));")
insertStr := "insert into t1 values(1, 1)"
for i := 1; i < 32768; i++ {
insertStr += fmt.Sprintf(", (%d, %d)", i, i)
}
tk.MustExec(insertStr)
tk.MustExec("create table t2(a int(11) default null, b int(11) default null, c int(11) default null)")
insertStr = "insert into t2 values(1, 1, 1)"
for i := 1; i < 32768; i++ {
insertStr += fmt.Sprintf(", (%d, %d, %d)", i, i, i)
}
tk.MustExec(insertStr)
sm := &mockSessionManager1{
PS: make([]*util.ProcessInfo, 0),
}
tk.Se.SetSessionManager(sm)
s.domain.ExpensiveQueryHandle().SetSessionManager(sm)
defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.OOMAction = config.OOMActionCancel
})
c.Assert(tk.Se.Auth(&auth.UserIdentity{Username: "root", Hostname: "%"}, nil, nil), IsTrue)
tk.MustExec("set @@tidb_mem_quota_query=2097152;")
// This bug will be reproduced in 10 times.
for i := 0; i < 10; i++ {
err := tk.QueryToErr("select /*+ inl_join(t1) */ * from t1 right join t2 on t1.b=t2.b;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
err = tk.QueryToErr("select /*+ inl_hash_join(t1) */ * from t1 right join t2 on t1.b=t2.b;")
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, "Out Of Memory Quota!.*")
}
}

func (s *testSuite) TestDeleteWithMulTbl(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
13 changes: 0 additions & 13 deletions executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,6 @@ func (e *IndexNestedLoopHashJoin) finishJoinWorkers(r interface{}) {
e.taskCh <- task
}
if e.cancelFunc != nil {
e.IndexLookUpJoin.ctxCancelReason.Store(err)
e.cancelFunc()
}
}
Expand Down Expand Up @@ -248,9 +247,6 @@ func (e *IndexNestedLoopHashJoin) Next(ctx context.Context, req *chunk.Chunk) er
return result.err
}
case <-ctx.Done():
if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
req.SwapColumns(result.chk)
Expand Down Expand Up @@ -280,9 +276,6 @@ func (e *IndexNestedLoopHashJoin) runInOrder(ctx context.Context, req *chunk.Chu
return result.err
}
case <-ctx.Done():
if err := e.IndexLookUpJoin.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
req.SwapColumns(result.chk)
Expand Down Expand Up @@ -696,9 +689,6 @@ func (iw *indexHashJoinInnerWorker) doJoinUnordered(ctx context.Context, task *i
select {
case resultCh <- joinResult:
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
Expand Down Expand Up @@ -849,9 +839,6 @@ func (iw *indexHashJoinInnerWorker) doJoinInOrder(ctx context.Context, task *ind
select {
case resultCh <- joinResult:
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
}
joinResult, ok = iw.getNewJoinResult(ctx)
Expand Down
18 changes: 2 additions & 16 deletions executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ type IndexLookUpJoin struct {

memTracker *memory.Tracker // track memory usage.

stats *indexLookUpJoinRuntimeStats
ctxCancelReason atomic.Value
finished *atomic.Value
stats *indexLookUpJoinRuntimeStats
finished *atomic.Value
}

type outerCtx struct {
Expand Down Expand Up @@ -313,9 +312,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
select {
case task = <-e.resultCh:
case <-ctx.Done():
if err := e.ctxCancelReason.Load(); err != nil {
return nil, err.(error)
}
return nil, ctx.Err()
}
if task == nil {
Expand All @@ -328,9 +324,6 @@ func (e *IndexLookUpJoin) getFinishedTask(ctx context.Context) (*lookUpJoinTask,
return nil, err
}
case <-ctx.Done():
if err := e.ctxCancelReason.Load(); err != nil {
return nil, err.(error)
}
return nil, ctx.Err()
}

Expand Down Expand Up @@ -363,8 +356,6 @@ func (ow *outerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
err := errors.Errorf("%v", r)
task.doneCh <- err
ow.pushToChan(ctx, task, ow.resultCh)
ow.lookup.ctxCancelReason.Store(err)
ow.lookup.cancelFunc()
}
close(ow.resultCh)
close(ow.innerCh)
Expand Down Expand Up @@ -484,8 +475,6 @@ func (iw *innerWorker) run(ctx context.Context, wg *sync.WaitGroup) {
err := errors.Errorf("%v", r)
// "task != nil" is guaranteed when panic happened.
task.doneCh <- err
iw.lookup.ctxCancelReason.Store(err)
iw.lookup.cancelFunc()
}
wg.Done()
}()
Expand Down Expand Up @@ -696,9 +685,6 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
for {
select {
case <-ctx.Done():
if err := iw.lookup.ctxCancelReason.Load(); err != nil {
return err.(error)
}
return ctx.Err()
default:
}
Expand Down

0 comments on commit 2f4f52b

Please sign in to comment.