Skip to content

Commit

Permalink
plan: fix a problem caused by union's schema (#7680)
Browse files Browse the repository at this point in the history
Before this commit. Union use the schema of its `Children[0]`.
The `Columns` information is correct.
But the `unique key information` is not, obviously.
  • Loading branch information
winoros authored Sep 20, 2018
1 parent 115c345 commit 5671382
Show file tree
Hide file tree
Showing 11 changed files with 91 additions and 96 deletions.
30 changes: 15 additions & 15 deletions cmd/explaintest/r/explain_complex.result
Original file line number Diff line number Diff line change
Expand Up @@ -179,23 +179,23 @@ CREATE TABLE `tbl_008` (`a` int, `b` int);
CREATE TABLE `tbl_009` (`a` int, `b` int);
explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;
id count task operator info
HashAgg_25 72000.00 root group by:x.b, funcs:sum(x.a)
└─Union_26 90000.00 root
├─TableReader_29 10000.00 root data:TableScan_28
│ └─TableScan_28 10000.00 cop table:tbl_001, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_32 10000.00 root data:TableScan_31
│ └─TableScan_31 10000.00 cop table:tbl_002, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_35 10000.00 root data:TableScan_34
│ └─TableScan_34 10000.00 cop table:tbl_003, range:[-inf,+inf], keep order:false, stats:pseudo
HashAgg_34 72000.00 root group by:x.b, funcs:sum(x.a)
└─Union_35 90000.00 root
├─TableReader_38 10000.00 root data:TableScan_37
│ └─TableScan_37 10000.00 cop table:tbl_004, range:[-inf,+inf], keep order:false, stats:pseudo
│ └─TableScan_37 10000.00 cop table:tbl_001, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_41 10000.00 root data:TableScan_40
│ └─TableScan_40 10000.00 cop table:tbl_005, range:[-inf,+inf], keep order:false, stats:pseudo
│ └─TableScan_40 10000.00 cop table:tbl_002, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_44 10000.00 root data:TableScan_43
│ └─TableScan_43 10000.00 cop table:tbl_006, range:[-inf,+inf], keep order:false, stats:pseudo
│ └─TableScan_43 10000.00 cop table:tbl_003, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_47 10000.00 root data:TableScan_46
│ └─TableScan_46 10000.00 cop table:tbl_007, range:[-inf,+inf], keep order:false, stats:pseudo
│ └─TableScan_46 10000.00 cop table:tbl_004, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_50 10000.00 root data:TableScan_49
│ └─TableScan_49 10000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_53 10000.00 root data:TableScan_52
└─TableScan_52 10000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false, stats:pseudo
│ └─TableScan_49 10000.00 cop table:tbl_005, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_53 10000.00 root data:TableScan_52
│ └─TableScan_52 10000.00 cop table:tbl_006, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_56 10000.00 root data:TableScan_55
│ └─TableScan_55 10000.00 cop table:tbl_007, range:[-inf,+inf], keep order:false, stats:pseudo
├─TableReader_59 10000.00 root data:TableScan_58
│ └─TableScan_58 10000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false, stats:pseudo
└─TableReader_62 10000.00 root data:TableScan_61
└─TableScan_61 10000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false, stats:pseudo
30 changes: 15 additions & 15 deletions cmd/explaintest/r/explain_complex_stats.result
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,23 @@ CREATE TABLE tbl_009 (a int, b int);
load stats 's/explain_complex_stats_tbl_009.json';
explain select sum(a) from (select * from tbl_001 union all select * from tbl_002 union all select * from tbl_003 union all select * from tbl_004 union all select * from tbl_005 union all select * from tbl_006 union all select * from tbl_007 union all select * from tbl_008 union all select * from tbl_009) x group by b;
id count task operator info
HashAgg_25 18000.00 root group by:x.b, funcs:sum(x.a)
└─Union_26 18000.00 root
├─TableReader_29 2000.00 root data:TableScan_28
│ └─TableScan_28 2000.00 cop table:tbl_001, range:[-inf,+inf], keep order:false
├─TableReader_32 2000.00 root data:TableScan_31
│ └─TableScan_31 2000.00 cop table:tbl_002, range:[-inf,+inf], keep order:false
├─TableReader_35 2000.00 root data:TableScan_34
│ └─TableScan_34 2000.00 cop table:tbl_003, range:[-inf,+inf], keep order:false
HashAgg_34 18000.00 root group by:x.b, funcs:sum(x.a)
└─Union_35 18000.00 root
├─TableReader_38 2000.00 root data:TableScan_37
│ └─TableScan_37 2000.00 cop table:tbl_004, range:[-inf,+inf], keep order:false
│ └─TableScan_37 2000.00 cop table:tbl_001, range:[-inf,+inf], keep order:false
├─TableReader_41 2000.00 root data:TableScan_40
│ └─TableScan_40 2000.00 cop table:tbl_005, range:[-inf,+inf], keep order:false
│ └─TableScan_40 2000.00 cop table:tbl_002, range:[-inf,+inf], keep order:false
├─TableReader_44 2000.00 root data:TableScan_43
│ └─TableScan_43 2000.00 cop table:tbl_006, range:[-inf,+inf], keep order:false
│ └─TableScan_43 2000.00 cop table:tbl_003, range:[-inf,+inf], keep order:false
├─TableReader_47 2000.00 root data:TableScan_46
│ └─TableScan_46 2000.00 cop table:tbl_007, range:[-inf,+inf], keep order:false
│ └─TableScan_46 2000.00 cop table:tbl_004, range:[-inf,+inf], keep order:false
├─TableReader_50 2000.00 root data:TableScan_49
│ └─TableScan_49 2000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false
└─TableReader_53 2000.00 root data:TableScan_52
└─TableScan_52 2000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false
│ └─TableScan_49 2000.00 cop table:tbl_005, range:[-inf,+inf], keep order:false
├─TableReader_53 2000.00 root data:TableScan_52
│ └─TableScan_52 2000.00 cop table:tbl_006, range:[-inf,+inf], keep order:false
├─TableReader_56 2000.00 root data:TableScan_55
│ └─TableScan_55 2000.00 cop table:tbl_007, range:[-inf,+inf], keep order:false
├─TableReader_59 2000.00 root data:TableScan_58
│ └─TableScan_58 2000.00 cop table:tbl_008, range:[-inf,+inf], keep order:false
└─TableReader_62 2000.00 root data:TableScan_61
└─TableScan_61 2000.00 cop table:tbl_009, range:[-inf,+inf], keep order:false
54 changes: 27 additions & 27 deletions cmd/explaintest/r/explain_easy.result
Original file line number Diff line number Diff line change
Expand Up @@ -151,35 +151,35 @@ TableReader_5 10000.00 root data:TableScan_4
└─TableScan_4 10000.00 cop table:t1, range:[-inf,+inf], keep order:false, stats:pseudo
explain select c1 from t2 union select c1 from t2 union all select c1 from t2;
id count task operator info
Union_14 26000.00 root
├─TableReader_17 10000.00 root data:TableScan_16
│ └─TableScan_16 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
└─HashAgg_21 16000.00 root group by:t2.c1, funcs:firstrow(join_agg_0)
└─Union_22 16000.00 root
├─StreamAgg_35 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_36 8000.00 root index:StreamAgg_26
│ └─StreamAgg_26 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_34 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_52 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
└─IndexReader_53 8000.00 root index:StreamAgg_43
└─StreamAgg_43 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexScan_51 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
Union_17 26000.00 root
├─TableReader_20 10000.00 root data:TableScan_19
│ └─TableScan_19 10000.00 cop table:t2, range:[-inf,+inf], keep order:false, stats:pseudo
└─HashAgg_24 16000.00 root group by:t2.c1, funcs:firstrow(join_agg_0)
└─Union_25 16000.00 root
├─StreamAgg_38 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_39 8000.00 root index:StreamAgg_29
│ └─StreamAgg_29 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_37 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_55 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
└─IndexReader_56 8000.00 root index:StreamAgg_46
└─StreamAgg_46 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexScan_54 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
explain select c1 from t2 union all select c1 from t2 union select c1 from t2;
id count task operator info
HashAgg_15 24000.00 root group by:t2.c1, funcs:firstrow(join_agg_0)
└─Union_16 24000.00 root
├─StreamAgg_29 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_30 8000.00 root index:StreamAgg_20
│ └─StreamAgg_20 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_28 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
├─StreamAgg_46 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_47 8000.00 root index:StreamAgg_37
│ └─StreamAgg_37 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_45 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_63 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
└─IndexReader_64 8000.00 root index:StreamAgg_54
└─StreamAgg_54 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexScan_62 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
HashAgg_18 24000.00 root group by:t2.c1, funcs:firstrow(join_agg_0)
└─Union_19 24000.00 root
├─StreamAgg_32 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_33 8000.00 root index:StreamAgg_23
│ └─StreamAgg_23 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_31 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
├─StreamAgg_49 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
│ └─IndexReader_50 8000.00 root index:StreamAgg_40
│ └─StreamAgg_40 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
│ └─IndexScan_48 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
└─StreamAgg_66 8000.00 root group by:col_2, funcs:firstrow(col_0), firstrow(col_1)
└─IndexReader_67 8000.00 root index:StreamAgg_57
└─StreamAgg_57 8000.00 cop group by:test.t2.c1, funcs:firstrow(test.t2.c1), firstrow(test.t2.c1)
└─IndexScan_65 10000.00 cop table:t2, index:c1, range:[NULL,+inf], keep order:true, stats:pseudo
set @@session.tidb_opt_insubquery_unfold = 0;
explain select sum(t1.c1 in (select c1 from t2)) from t1;
id count task operator info
Expand Down
2 changes: 1 addition & 1 deletion executor/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *testSuite) TestAggregation(c *C) {
tk.MustExec("insert into t value(1), (0)")
tk.MustQuery("select a from t group by 1")
// This result is compatible with MySQL, the readable result is shown in the next case.
result = tk.MustQuery("select max(a) from t group by a")
result = tk.MustQuery("select max(a) from t group by a order by a")
result.Check(testkit.Rows(string([]byte{0x0}), string([]byte{0x1})))
result = tk.MustQuery("select cast(a as signed) as idx, cast(max(a) as signed), cast(min(a) as signed) from t group by 1 order by idx")
result.Check(testkit.Rows("0 0 0", "1 1 1"))
Expand Down
1 change: 1 addition & 0 deletions plan/exhaust_physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,6 +881,7 @@ func (p *LogicalUnionAll) exhaustPhysicalPlans(prop *property.PhysicalProperty)
chReqProps = append(chReqProps, &property.PhysicalProperty{ExpectedCnt: prop.ExpectedCnt})
}
ua := PhysicalUnionAll{}.init(p.ctx, p.stats.ScaleByExpectCnt(prop.ExpectedCnt), chReqProps...)
ua.SetSchema(p.Schema())
return []PhysicalPlan{ua}
}

Expand Down
58 changes: 25 additions & 33 deletions plan/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,49 +646,41 @@ func unionJoinFieldType(a, b *types.FieldType) *types.FieldType {
}

func (b *planBuilder) buildProjection4Union(u *LogicalUnionAll) {
unionSchema := u.children[0].Schema().Clone()
unionCols := make([]*expression.Column, 0, u.children[0].Schema().Len())

// Infer union result types by its children's schema.
for i, col := range unionSchema.Columns {
var resultTp *types.FieldType
for j, child := range u.children {
childTp := child.Schema().Columns[i].RetType
if j == 0 {
resultTp = childTp
} else {
resultTp = unionJoinFieldType(resultTp, childTp)
}
}
unionSchema.Columns[i] = col.Clone().(*expression.Column)
unionSchema.Columns[i].RetType = resultTp
unionSchema.Columns[i].DBName = model.NewCIStr("")
for i, col := range u.children[0].Schema().Columns {
resultTp := col.RetType
for j := 1; j < len(u.children); j++ {
childTp := u.children[j].Schema().Columns[i].RetType
resultTp = unionJoinFieldType(resultTp, childTp)
}
unionCols = append(unionCols, &expression.Column{
ColName: col.ColName,
TblName: col.TblName,
RetType: resultTp,
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
})
}
// If the types of some child don't match the types of union, we add a projection with cast function.
u.schema = expression.NewSchema(unionCols...)
// Process each child and add a projection above original child.
// So the schema of `UnionAll` can be the same with its children's.
for childID, child := range u.children {
exprs := make([]expression.Expression, len(child.Schema().Columns))
needProjection := false
for i, srcCol := range child.Schema().Columns {
dstType := unionSchema.Columns[i].RetType
dstType := unionCols[i].RetType
srcType := srcCol.RetType
if !srcType.Equal(dstType) {
exprs[i] = expression.BuildCastFunction4Union(b.ctx, srcCol, dstType)
needProjection = true
} else {
exprs[i] = srcCol
}
}
if _, isProj := child.(*LogicalProjection); needProjection || !isProj {
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs}.init(b.ctx)
if childID == 0 {
for _, col := range unionSchema.Columns {
col.UniqueID = b.ctx.GetSessionVars().AllocPlanColumnID()
}
}
proj.SetChildren(child)
u.children[childID] = proj
}
u.children[childID].(*LogicalProjection).SetSchema(unionSchema.Clone())
b.optFlag |= flagEliminateProjection
proj := LogicalProjection{Exprs: exprs}.init(b.ctx)
proj.SetSchema(expression.NewSchema(unionCols...))
proj.SetChildren(child)
u.children[childID] = proj
}
}

Expand All @@ -698,15 +690,15 @@ func (b *planBuilder) buildUnion(union *ast.UnionStmt) (LogicalPlan, error) {
return nil, errors.Trace(err)
}

unionDistinctPlan := b.buildSubUnion(distinctSelectPlans)
unionDistinctPlan := b.buildUnionAll(distinctSelectPlans)
if unionDistinctPlan != nil {
unionDistinctPlan = b.buildDistinct(unionDistinctPlan, unionDistinctPlan.Schema().Len())
if len(allSelectPlans) > 0 {
allSelectPlans = append(allSelectPlans, unionDistinctPlan)
}
}

unionAllPlan := b.buildSubUnion(allSelectPlans)
unionAllPlan := b.buildUnionAll(allSelectPlans)
unionPlan := unionDistinctPlan
if unionAllPlan != nil {
unionPlan = unionAllPlan
Expand Down Expand Up @@ -758,7 +750,7 @@ func (b *planBuilder) divideUnionSelectPlans(selects []*ast.SelectStmt) (distinc
return children[:firstUnionAllIdx], children[firstUnionAllIdx:], nil
}

func (b *planBuilder) buildSubUnion(subPlan []LogicalPlan) LogicalPlan {
func (b *planBuilder) buildUnionAll(subPlan []LogicalPlan) LogicalPlan {
if len(subPlan) == 0 {
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions plan/logical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (s *testPlanSuite) TestPredicatePushDown(c *C) {
},
{
sql: "select a, d from (select * from t union all select * from t union all select * from t) z where a < 10",
best: "UnionAll{DataScan(t)->Projection->DataScan(t)->Projection->DataScan(t)->Projection}->Projection",
best: "UnionAll{DataScan(t)->Projection->Projection->DataScan(t)->Projection->Projection->DataScan(t)->Projection->Projection}->Projection",
},
{
sql: "select (select count(*) from t where t.a = k.a) from t k",
Expand Down Expand Up @@ -1011,7 +1011,7 @@ func (s *testPlanSuite) TestEagerAggregation(c *C) {
},
{
sql: "select sum(c1) from (select c c1, d c2 from t a union all select a c1, b c2 from t b union all select b c1, e c2 from t c) x group by c2",
best: "UnionAll{DataScan(a)->Aggr(sum(a.c),firstrow(a.d))->DataScan(b)->Aggr(sum(b.a),firstrow(b.b))->DataScan(c)->Aggr(sum(c.b),firstrow(c.e))}->Aggr(sum(join_agg_0))->Projection",
best: "UnionAll{DataScan(a)->Projection->Aggr(sum(a.c1),firstrow(a.c2))->DataScan(b)->Projection->Aggr(sum(b.c1),firstrow(b.c2))->DataScan(c)->Projection->Aggr(sum(c.c1),firstrow(c.c2))}->Aggr(sum(join_agg_0))->Projection",
},
{
sql: "select max(a.b), max(b.b) from t a join t b on a.c = b.c group by a.a",
Expand All @@ -1023,7 +1023,7 @@ func (s *testPlanSuite) TestEagerAggregation(c *C) {
},
{
sql: "select max(c.b) from (select * from t a union all select * from t b) c group by c.a",
best: "UnionAll{DataScan(a)->Projection->DataScan(b)->Projection}->Projection->Projection",
best: "UnionAll{DataScan(a)->Projection->Projection->Projection->DataScan(b)->Projection->Projection->Projection}->Aggr(max(join_agg_0))->Projection",
},
{
sql: "select max(a.c) from t a join t b on a.a=b.a and a.b=b.b group by a.b",
Expand Down
2 changes: 1 addition & 1 deletion plan/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (ds *DataSource) TableInfo() *model.TableInfo {

// LogicalUnionAll represents LogicalUnionAll plan.
type LogicalUnionAll struct {
baseLogicalPlan
logicalSchemaProducer
}

// LogicalSort stands for the order by plan.
Expand Down
2 changes: 1 addition & 1 deletion plan/physical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ type PhysicalLimit struct {

// PhysicalUnionAll is the physical operator of UnionAll.
type PhysicalUnionAll struct {
basePhysicalPlan
physicalSchemaProducer
}

// AggregationType stands for the mode of aggregation plan.
Expand Down
1 change: 1 addition & 0 deletions plan/rule_column_pruning.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ func (p *LogicalUnionAll) PruneColumns(parentUsedCols []*expression.Column) {
for _, child := range p.Children() {
child.PruneColumns(parentUsedCols)
}
p.schema.Columns = p.children[0].Schema().Columns
}

// PruneColumns implements LogicalPlan interface.
Expand Down
1 change: 1 addition & 0 deletions plan/rule_partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (s *partitionProcessor) prune(ds *DataSource) (LogicalPlan, error) {
}
unionAll := LogicalUnionAll{}.init(ds.context())
unionAll.SetChildren(children...)
unionAll.SetSchema(ds.schema)
return unionAll, nil
}

Expand Down

0 comments on commit 5671382

Please sign in to comment.