From b05ba9752e7fc11b5d2048b5a52d7a72c00c66e3 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Thu, 11 Oct 2018 07:29:18 +0800 Subject: [PATCH 1/4] plan,executor: support IndexJoin over UnionScan --- executor/builder.go | 30 +++++++++++++-- planner/core/exhaust_physical_plans.go | 52 ++++++++++++++++---------- planner/core/find_best_task.go | 11 ++++-- 3 files changed, 66 insertions(+), 27 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 0079d0264d884..70a40b7363958 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -670,18 +670,22 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor { } func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor { - src := b.build(v.Children()[0]) + reader := b.build(v.Children()[0]) if b.err != nil { b.err = errors.Trace(b.err) return nil } - us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)} + return b.buildUnionScanFromReader(reader, v) +} + +func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) Executor { + us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)} // Get the handle column index of the below plannercore. // We can guarantee that there must be only one col in the map. for _, cols := range v.Children()[0].Schema().TblID2Handle { us.belowHandleIndex = cols[0].Index } - switch x := src.(type) { + switch x := reader.(type) { case *TableReaderExecutor: us.desc = x.desc us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) @@ -718,7 +722,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E b.err = us.buildAndSortAddedRows() default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. - return src + return reader } if b.err != nil { b.err = errors.Trace(b.err) @@ -1814,10 +1818,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) case *plannercore.PhysicalIndexLookUpReader: return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + case *plannercore.PhysicalUnionScan: + return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) } return nil, errors.New("Wrong plan type for dataReaderBuilder") } +func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + builder.Plan = v.Children()[0] + reader, err := builder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + if err != nil { + return nil, errors.Trace(err) + } + e := builder.buildUnionScanFromReader(reader, v) + if e == nil { + return nil, builder.err + } + us := e.(*UnionScanExec) + us.snapshotChunkBuffer = us.newFirstChunk() + return us, nil +} + func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index dc102b0c34d08..0cda1cc45ea0d 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -367,20 +367,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou innerJoinKeys = p.LeftJoinKeys outerJoinKeys = p.RightJoinKeys } - x, ok := innerChild.(*DataSource) - if !ok { + ds, isDataSource := innerChild.(*DataSource) + us, isUnionScan := innerChild.(*LogicalUnionScan) + if !isDataSource && !isUnionScan { return nil } + if isUnionScan { + ds = us.Children()[0].(*DataSource) + } var tblPath *accessPath - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { tblPath = path break } } - if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil { + if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil { keyOff2IdxOff := make([]int, len(innerJoinKeys)) - pkCol := x.getPKIsHandleCol() pkMatched := false for i, key := range innerJoinKeys { if !key.Equal(nil, pkCol) { @@ -391,7 +394,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou keyOff2IdxOff[i] = 0 } if pkMatched { - innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys) + innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us) // Since the primary key means one value corresponding to exact one row, this will always be a no worse one // comparing to other index. return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff) @@ -404,12 +407,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou remainedOfBest []expression.Expression keyOff2IdxOff []int ) - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { continue } indexInfo := path.index - ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys) + ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys) // We choose the index by the number of used columns of the range, the much the better. // Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid. // But obviously when the range is nil, we don't need index join. @@ -422,20 +425,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou } } if bestIndexInfo != nil { - innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys) + innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us) return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff) } return nil } // constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan { - var ranges []*ranger.Range - if pk != nil { - ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) - } else { - ranges = ranger.FullIntRange(false) - } +func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { + ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) ts := PhysicalTableScan{ Table: ds.tableInfo, Columns: ds.Columns, @@ -464,11 +462,19 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col } ts.addPushedDownSelection(copTask, ds.stats) t := finishCopTask(ds.ctx, copTask) - return t.plan() + scan := t.plan() + if us != nil { + // Use `ts.stats` instead of `us.stats` because it should be more accurate. No need to specify + // childrenReqProps now since we have got ts already. + physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, ts.stats, nil) + physicalUnionScan.SetChildren(scan) + scan = physicalUnionScan + } + return scan } // constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan { +func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { is := PhysicalIndexScan{ Table: ds.tableInfo, TableAsName: ds.TableAsName, @@ -507,7 +513,15 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64} is.addPushedDownSelection(cop, ds, math.MaxFloat64, path) t := finishCopTask(ds.ctx, cop) - return t.plan() + scan := t.plan() + if us != nil { + // Use `is.stats` instead of `us.stats` because it should be more accurate. No need to specify + // childrenReqProps now since we have got is already. + physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, is.stats, nil) + physicalUnionScan.SetChildren(scan) + scan = physicalUnionScan + } + return scan } // buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index f84a6578bf98d..cbe2a3c26c23f 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -77,6 +77,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task, // findBestTask implements LogicalPlan interface. func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) { + // If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. + if prop == nil { + return nil, nil + } // Look up the task with this prop in the task map. // It's used to reduce double counting. bestTask = p.getTask(prop) @@ -199,10 +204,8 @@ func (ds *DataSource) tryToGetDualTask() (task, error) { // findBestTask implements the PhysicalPlan interface. // It will enumerate all the available indices and choose a plan with least cost. func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) { - // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself. - // So here we do nothing. - // TODO: Add a special prop to handle IndexJoin's inner plan. - // Then we can remove forceToTableScan and forceToIndexScan. + // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. if prop == nil { return nil, nil } From 6259fedbba96e68d2d6c26bb9f56fd0dda03f048 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Thu, 11 Oct 2018 11:25:18 +0800 Subject: [PATCH 2/4] add unit test --- executor/index_lookup_join_test.go | 54 ++++++++++++++++++++++++++++++ planner/core/physical_plan_test.go | 47 ++++++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index ecca46773e1d9..203c6aa79b103 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -37,3 +37,57 @@ func (s *testSuite) TestIndexLookupJoinHang(c *C) { } rs.Close() } + +func (s *testSuite) TestIndexJoinUnionScan(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t1(id int primary key, a int)") + tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))") + tk.MustExec("insert into t2 values (1,1,1),(4,2,4)") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(2,2)") + tk.MustExec("insert into t2 values(2,2,2), (3,3,3)") + // TableScan below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.id", + "├─UnionScan_12 10000.00 root ", + "│ └─TableReader_14 10000.00 root data:TableScan_13", + "│ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + "└─UnionScan_10 10.00 root ", + " └─TableReader_9 10.00 root data:TableScan_8", + " └─TableScan_8 10.00 cop table:t2, range: decided by [test.t1.a], keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "2 2 2 2 2", + )) + // IndexLookUp below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "IndexJoin_12 12500.00 root inner join, inner:UnionScan_11, outer key:test.t1.a, inner key:test.t2.a", + "├─UnionScan_13 10000.00 root ", + "│ └─TableReader_15 10000.00 root data:TableScan_14", + "│ └─TableScan_14 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + "└─UnionScan_11 10.00 root ", + " └─IndexLookUp_10 10.00 root ", + " ├─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo", + " └─TableScan_9 10.00 cop table:t2, keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2 2 2 2", + "2 2 4 2 4", + )) + // IndexScan below UnionScan + tk.MustQuery("explain select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "Projection_7 12500.00 root test.t1.a, test.t2.a", + "└─IndexJoin_11 12500.00 root inner join, inner:UnionScan_10, outer key:test.t1.a, inner key:test.t2.a", + " ├─UnionScan_12 10000.00 root ", + " │ └─TableReader_14 10000.00 root data:TableScan_13", + " │ └─TableScan_13 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo", + " └─UnionScan_10 10.00 root ", + " └─IndexReader_9 10.00 root index:IndexScan_8", + " └─IndexScan_8 10.00 cop table:t2, index:a, range: decided by [test.t1.a], keep order:false, stats:pseudo", + )) + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2", + "2 2", + )) + tk.MustExec("rollback") +} diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 332171a49f6f8..541aa343b1e49 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -1269,3 +1269,50 @@ func (s *testPlanSuite) TestRequestTypeSupportedOff(c *C) { c.Assert(err, IsNil) c.Assert(core.ToString(p), Equals, expect, Commentf("for %s", sql)) } + +func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + defer func() { + dom.Close() + store.Close() + }() + se, err := session.CreateSession4Test(store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + tests := []struct { + sql string + best string + }{ + // Test Index Join + UnionScan + TableScan. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(t1.a,t2.a)", + }, + // Test Index Join + UnionScan + DoubleRead. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(t1.a,t2.c)", + }, + // Test Index Join + UnionScan + IndexScan. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(t1.a,t2.c)->Projection", + }, + } + for i, tt := range tests { + comment := Commentf("case:%v sql:%s", i, tt.sql) + stmt, err := s.ParseOneStmt(tt.sql, "", "") + c.Assert(err, IsNil, comment) + err = se.NewTxn() + c.Assert(err, IsNil) + // Make txn not read only. + se.Txn().Set(kv.Key("AAA"), []byte("BBB")) + se.StmtCommit() + p, err := core.Optimize(se, stmt, s.is) + c.Assert(err, IsNil) + c.Assert(core.ToString(p), Equals, tt.best, comment) + } +} From 108e03d5974c8a4956797a8cb77d6de712de5ba5 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Thu, 11 Oct 2018 17:20:50 +0800 Subject: [PATCH 3/4] refactor --- planner/core/exhaust_physical_plans.go | 31 ++++++++++++-------------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 0cda1cc45ea0d..23fcd817cea76 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -462,15 +462,19 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col } ts.addPushedDownSelection(copTask, ds.stats) t := finishCopTask(ds.ctx, copTask) - scan := t.plan() - if us != nil { - // Use `ts.stats` instead of `us.stats` because it should be more accurate. No need to specify - // childrenReqProps now since we have got ts already. - physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, ts.stats, nil) - physicalUnionScan.SetChildren(scan) - scan = physicalUnionScan + reader := t.plan() + return p.constructInnerUnionScan(us, reader) +} + +func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { + if us == nil { + return reader } - return scan + // Use `reader.stats` instead of `us.stats` because it should be more accurate. No need to specify + // childrenReqProps now since we have got reader already. + physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, reader.statsInfo(), nil) + physicalUnionScan.SetChildren(reader) + return physicalUnionScan } // constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin. @@ -513,15 +517,8 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn path := &accessPath{indexFilters: indexConds, tableFilters: tblConds, countAfterIndex: math.MaxFloat64} is.addPushedDownSelection(cop, ds, math.MaxFloat64, path) t := finishCopTask(ds.ctx, cop) - scan := t.plan() - if us != nil { - // Use `is.stats` instead of `us.stats` because it should be more accurate. No need to specify - // childrenReqProps now since we have got is already. - physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, is.stats, nil) - physicalUnionScan.SetChildren(scan) - scan = physicalUnionScan - } - return scan + reader := t.plan() + return p.constructInnerUnionScan(us, reader) } // buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok. From e71c27b46d02c8a2639bbc8916e00bfc7bde1170 Mon Sep 17 00:00:00 2001 From: Kenan Yao Date: Thu, 11 Oct 2018 17:27:50 +0800 Subject: [PATCH 4/4] resolve conflicts --- planner/core/physical_plan_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index 83a9099765830..0386059230822 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -1312,7 +1312,7 @@ func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { // Make txn not read only. se.Txn().Set(kv.Key("AAA"), []byte("BBB")) se.StmtCommit() - p, err := core.Optimize(se, stmt, s.is) + p, err := planner.Optimize(se, stmt, s.is) c.Assert(err, IsNil) c.Assert(core.ToString(p), Equals, tt.best, comment) }