Skip to content

Commit

Permalink
executor: correctly handle panic for hashjoin build phase (#14056)
Browse files Browse the repository at this point in the history
  • Loading branch information
fzhedu authored Feb 5, 2020
1 parent ac8caad commit 72b1f0e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
19 changes: 18 additions & 1 deletion executor/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"sync/atomic"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -247,6 +248,7 @@ func (e *HashJoinExec) fetchBuildSideRows(ctx context.Context, chkCh chan<- *chu
e.buildFinished <- errors.Trace(err)
return
}
failpoint.Inject("errorFetchBuildSideRowsMockOOMPanic", nil)
if chk.NumRows() == 0 {
return
}
Expand Down Expand Up @@ -654,7 +656,16 @@ func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) {
// buildSideResultCh transfers build side chunk from build side fetch to build hash table.
buildSideResultCh := make(chan *chunk.Chunk, 1)
doneCh := make(chan struct{})
go util.WithRecovery(func() { e.fetchBuildSideRows(ctx, buildSideResultCh, doneCh) }, nil)
fetchBuildSideRowsOk := make(chan error, 1)
go util.WithRecovery(
func() { e.fetchBuildSideRows(ctx, buildSideResultCh, doneCh) },
func(r interface{}) {
if r != nil {
fetchBuildSideRowsOk <- errors.Errorf("%v", r)
}
close(fetchBuildSideRowsOk)
},
)

// TODO: Parallel build hash table. Currently not support because `rowHashMap` is not thread-safe.
err := e.buildHashTableForList(buildSideResultCh)
Expand All @@ -667,6 +678,12 @@ func (e *HashJoinExec) fetchAndBuildHashTable(ctx context.Context) {
// 2. if probeSideResult.NumRows() == 0, fetchProbeSideChunks will not wait for the build side.
for range buildSideResultCh {
}
// Check whether err is nil to avoid sending redundant error into buildFinished.
if err == nil {
if err = <-fetchBuildSideRowsOk; err != nil {
e.buildFinished <- err
}
}
}

// buildHashTableForList builds hash table from `list`.
Expand Down
15 changes: 15 additions & 0 deletions executor/seqtest/seq_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,21 @@ func (s *seqTestSuite) TestMaxDeltaSchemaCount(c *C) {
tk.MustQuery("select @@global.tidb_max_delta_schema_count").Check(testkit.Rows("2048"))
}

func (s *seqTestSuite) TestOOMPanicInHashJoinWhenFetchBuildRows(c *C) {
fpName := "github.com/pingcap/tidb/executor/errorFetchBuildSideRowsMockOOMPanic"
c.Assert(failpoint.Enable(fpName, `panic("ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")`), IsNil)
defer func() {
c.Assert(failpoint.Disable(fpName), IsNil)
}()
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(c1 int, c2 int)")
tk.MustExec("insert into t values(1,1),(2,2)")
err := tk.QueryToErr("select * from t as t2 join t as t1 where t1.c1=t2.c1")
c.Assert(err.Error(), Equals, "failpoint panic: ERROR 1105 (HY000): Out Of Memory Quota![conn_id=1]")
}

type testOOMSuite struct {
store kv.Storage
do *domain.Domain
Expand Down

0 comments on commit 72b1f0e

Please sign in to comment.