Skip to content

Commit

Permalink
cherry pick pingcap#20138 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
qw4990 authored and ti-srebot committed Sep 22, 2020
1 parent 77fc45e commit f4a9ce6
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 11 deletions.
65 changes: 58 additions & 7 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2638,21 +2638,25 @@ type mockPhysicalIndexReader struct {
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
switch v := plan.(type) {
case *plannercore.PhysicalTableReader:
<<<<<<< HEAD
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents)
=======
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
>>>>>>> 640cb42... executor: do not reorder handles when building requests for `IndexMergeJoin` (#20138)
case *plannercore.PhysicalIndexReader:
return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
// The inner child of IndexJoin might be Projection when a combination of the following conditions is true:
// 1. The inner child fetch data using indexLookupReader
// 2. PK is not handle
Expand All @@ -2664,7 +2668,7 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
// Need to support physical selection because after PR 16389, TiDB will push down all the expr supported by TiKV or TiFlash
// in predicate push down stage, so if there is an expr which only supported by TiFlash, a physical selection will be added after index read
case *plannercore.PhysicalSelection:
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
if err != nil {
return nil, err
}
Expand All @@ -2681,9 +2685,10 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc)
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles)
if err != nil {
return nil, err
}
Expand All @@ -2692,7 +2697,13 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context
return us, err
}

<<<<<<< HEAD
func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, lookUpContents []*indexJoinLookUpContent) (Executor, error) {
=======
func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
>>>>>>> 640cb42... executor: do not reorder handles when building requests for `IndexMergeJoin` (#20138)
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
return nil, err
Expand All @@ -2712,7 +2723,27 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
handles = append(handles, handle)
}
}
<<<<<<< HEAD
return builder.buildTableReaderFromHandles(ctx, e, handles)
=======

if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}

e.kvRangeBuilder = kvRangeBuilderFromHandles(handles)
nextPartition := nextPartitionForTableReader{e}
return buildPartitionTable(builder.executorBuilder, tbInfo, &v.PartitionInfo, e, nextPartition)
}

type kvRangeBuilderFromFunc func(pid int64) ([]kv.KeyRange, error)

func (h kvRangeBuilderFromFunc) buildKeyRange(pid int64) ([]kv.KeyRange, error) {
return h(pid)
>>>>>>> 640cb42... executor: do not reorder handles when building requests for `IndexMergeJoin` (#20138)
}

func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []int64) (Executor, error) {
Expand Down Expand Up @@ -2745,6 +2776,26 @@ func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Contex
return e, nil
}

<<<<<<< HEAD
=======
func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle, canReorderHandles bool) (*TableReaderExecutor, error) {
if canReorderHandles {
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
})
}
var b distsql.RequestBuilder
b.SetTableHandles(getPhysicalTableID(e.table), handles)
return builder.buildTableReaderBase(ctx, e, b)
}

func (builder *dataReaderBuilder) buildTableReaderFromKvRanges(ctx context.Context, e *TableReaderExecutor, ranges []kv.KeyRange) (Executor, error) {
var b distsql.RequestBuilder
b.SetKeyRanges(ranges)
return builder.buildTableReaderBase(ctx, e, b)
}

>>>>>>> 640cb42... executor: do not reorder handles when building requests for `IndexMergeJoin` (#20138)
func (builder *dataReaderBuilder) buildIndexReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalIndexReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
e, err := buildNoRangeIndexReader(builder.executorBuilder, v)
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []in
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
if err != nil {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start)))
}()
}
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters)
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i]
}
}
imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters)
imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false)
if err != nil {
return err
}
Expand Down
44 changes: 44 additions & 0 deletions executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,51 @@ func (s *testSuite9) TestIssue18631(c *C) {
"1 1 1 1 1 1 1 1"))
}

<<<<<<< HEAD
func (s *testSuiteAgg) TestIndexJoinOnSinglePartitionTable(c *C) {
=======
func (s *testSuite9) TestIssue19408(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (c_int int, primary key(c_int))")
tk.MustExec("create table t2 (c_int int, unique key (c_int)) partition by hash (c_int) partitions 4")
tk.MustExec("insert into t1 values (1), (2), (3), (4), (5)")
tk.MustExec("insert into t2 select * from t1")
tk.MustExec("begin")
tk.MustExec("delete from t1 where c_int = 1")
tk.MustQuery("select /*+ INL_MERGE_JOIN(t1,t2) */ * from t1, t2 where t1.c_int = t2.c_int").Sort().Check(testkit.Rows(
"2 2",
"3 3",
"4 4",
"5 5"))
tk.MustQuery("select /*+ INL_JOIN(t1,t2) */ * from t1, t2 where t1.c_int = t2.c_int").Sort().Check(testkit.Rows(
"2 2",
"3 3",
"4 4",
"5 5"))
tk.MustQuery("select /*+ INL_HASH_JOIN(t1,t2) */ * from t1, t2 where t1.c_int = t2.c_int").Sort().Check(testkit.Rows(
"2 2",
"3 3",
"4 4",
"5 5"))
tk.MustExec("commit")
}

func (s *testSuite9) TestIssue20137(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (id bigint(20) unsigned, primary key(id))")
tk.MustExec("create table t2 (id bigint(20) unsigned)")
tk.MustExec("insert into t1 values (8738875760185212610)")
tk.MustExec("insert into t1 values (9814441339970117597)")
tk.MustExec("insert into t2 values (8738875760185212610)")
tk.MustExec("insert into t2 values (9814441339970117597)")
tk.MustQuery("select /*+ INL_MERGE_JOIN(t1, t2) */ * from t2 left join t1 on t1.id = t2.id order by t1.id").Check(
testkit.Rows("8738875760185212610 8738875760185212610", "9814441339970117597 9814441339970117597"))
}

func (s *testSuiteWithData) TestIndexJoinOnSinglePartitionTable(c *C) {
>>>>>>> 640cb42... executor: do not reorder handles when building requests for `IndexMergeJoin` (#20138)
// For issue 19145
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1, t2")
Expand Down
2 changes: 1 addition & 1 deletion executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, ha
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, false)
if err != nil {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
return nil, err
Expand Down

0 comments on commit f4a9ce6

Please sign in to comment.