Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: do not reorder handles when building requests for IndexMergeJoin #20138

Merged
merged 5 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2943,21 +2943,21 @@ type mockPhysicalIndexReader struct {
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
return builder.buildExecutorForIndexJoinInternal(ctx, builder.Plan, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
}

func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.Context, plan plannercore.Plan, lookUpContents []*indexJoinLookUpContent,
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
IndexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
switch v := plan.(type) {
case *plannercore.PhysicalTableReader:
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildTableReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
case *plannercore.PhysicalIndexReader:
return builder.buildIndexReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
case *plannercore.PhysicalUnionScan:
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
return builder.buildUnionScanForIndexJoin(ctx, v, lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
// The inner child of IndexJoin might be Projection when a combination of the following conditions is true:
// 1. The inner child fetch data using indexLookupReader
// 2. PK is not handle
Expand All @@ -2969,7 +2969,7 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
// Need to support physical selection because after PR 16389, TiDB will push down all the expr supported by TiKV or TiFlash
// in predicate push down stage, so if there is an expr which only supported by TiFlash, a physical selection will be added after index read
case *plannercore.PhysicalSelection:
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc)
childExec, err := builder.buildExecutorForIndexJoinInternal(ctx, v.Children()[0], lookUpContents, IndexRanges, keyOff2IdxOff, cwc, canReorderHandles)
if err != nil {
return nil, err
}
Expand All @@ -2986,9 +2986,10 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoinInternal(ctx context.
}

func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan,
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
values []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder}
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc)
reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff, cwc, canReorderHandles)
if err != nil {
return nil, err
}
Expand All @@ -3001,7 +3002,8 @@ func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context
}

func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader,
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int, cwc *plannercore.ColWithCmpFuncManager) (Executor, error) {
lookUpContents []*indexJoinLookUpContent, indexRanges []*ranger.Range, keyOff2IdxOff []int,
cwc *plannercore.ColWithCmpFuncManager, canReorderHandles bool) (Executor, error) {
e, err := buildNoRangeTableReader(builder.executorBuilder, v)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3037,10 +3039,10 @@ func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Conte
}

if tbInfo.GetPartitionInfo() == nil {
return builder.buildTableReaderFromHandles(ctx, e, handles)
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}
if !builder.ctx.GetSessionVars().UseDynamicPartitionPrune() {
return builder.buildTableReaderFromHandles(ctx, e, handles)
return builder.buildTableReaderFromHandles(ctx, e, handles, canReorderHandles)
}

e.kvRangeBuilder = kvRangeBuilderFromHandles(handles)
Expand Down Expand Up @@ -3091,10 +3093,12 @@ func (builder *dataReaderBuilder) buildTableReaderBase(ctx context.Context, e *T
return e, nil
}

func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle) (*TableReaderExecutor, error) {
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
})
func (builder *dataReaderBuilder) buildTableReaderFromHandles(ctx context.Context, e *TableReaderExecutor, handles []kv.Handle, canReorderHandles bool) (*TableReaderExecutor, error) {
if canReorderHandles {
sort.Slice(handles, func(i, j int) bool {
return handles[i].Compare(handles[j]) < 0
})
}
var b distsql.RequestBuilder
b.SetTableHandles(getPhysicalTableID(e.table), handles)
return builder.buildTableReaderBase(ctx, e, b)
Expand Down
2 changes: 1 addition & 1 deletion executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func (e *IndexLookUpExecutor) buildTableReader(ctx context.Context, handles []kv
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, true)
if err != nil {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ func (iw *innerWorker) fetchInnerResults(ctx context.Context, task *lookUpJoinTa
atomic.AddInt64(&iw.stats.fetch, int64(time.Since(start)))
}()
}
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters)
innerExec, err := iw.readerBuilder.buildExecutorForIndexJoin(ctx, lookUpContent, iw.indexRanges, iw.keyOff2IdxOff, iw.nextColCompareFilters, true)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion executor/index_lookup_merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ func (imw *innerMergeWorker) handleTask(ctx context.Context, task *lookUpMergeJo
dLookUpKeys[i], dLookUpKeys[lenKeys-i-1] = dLookUpKeys[lenKeys-i-1], dLookUpKeys[i]
}
}
imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters)
imw.innerExec, err = imw.readerBuilder.buildExecutorForIndexJoin(ctx, dLookUpKeys, imw.indexRanges, imw.keyOff2IdxOff, imw.nextColCompareFilters, false)
if err != nil {
return err
}
Expand Down
13 changes: 13 additions & 0 deletions executor/index_lookup_merge_join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ func (s *testSuite9) TestIssue19408(c *C) {
tk.MustExec("commit")
}

func (s *testSuite9) TestIssue20137(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t1, t2")
tk.MustExec("create table t1 (id bigint(20) unsigned, primary key(id))")
tk.MustExec("create table t2 (id bigint(20) unsigned)")
tk.MustExec("insert into t1 values (8738875760185212610)")
tk.MustExec("insert into t1 values (9814441339970117597)")
tk.MustExec("insert into t2 values (8738875760185212610)")
tk.MustExec("insert into t2 values (9814441339970117597)")
tk.MustQuery("select /*+ INL_MERGE_JOIN(t1, t2) */ * from t2 left join t1 on t1.id = t2.id order by t1.id").Check(
testkit.Rows("8738875760185212610 8738875760185212610", "9814441339970117597 9814441339970117597"))
}

func (s *testSuiteWithData) TestIndexJoinOnSinglePartitionTable(c *C) {
// For issue 19145
tk := testkit.NewTestKitWithInit(c, s.store)
Expand Down
2 changes: 1 addition & 1 deletion executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ func (e *IndexMergeReaderExecutor) buildFinalTableReader(ctx context.Context, ha
plans: e.tblPlans,
}
tableReaderExec.buildVirtualColumnInfo()
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles)
tableReader, err := e.dataReaderBuilder.buildTableReaderFromHandles(ctx, tableReaderExec, handles, false)
if err != nil {
logutil.Logger(ctx).Error("build table reader from handles failed", zap.Error(err))
return nil, err
Expand Down