Skip to content

Commit

Permalink
planner: add a hint for CTE (#34574)
Browse files Browse the repository at this point in the history
close #17472
  • Loading branch information
dayicklp authored Jul 19, 2022
1 parent 0026324 commit a57dd37
Show file tree
Hide file tree
Showing 11 changed files with 416 additions and 6 deletions.
2 changes: 1 addition & 1 deletion parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3534,7 +3534,7 @@ func (n *TableOptimizerHint) Restore(ctx *format.RestoreCtx) error {
}
// Hints without args except query block.
switch n.HintName.L {
case "hash_agg", "stream_agg", "agg_to_cop", "read_consistent_replica", "no_index_merge", "qb_name", "ignore_plan_cache", "limit_to_cop", "straight_join":
case "hash_agg", "stream_agg", "agg_to_cop", "read_consistent_replica", "no_index_merge", "qb_name", "ignore_plan_cache", "limit_to_cop", "straight_join", "merge":
ctx.WritePlain(")")
return nil
}
Expand Down
1 change: 1 addition & 0 deletions parser/ast/misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ func TestTableOptimizerHintRestore(t *testing.T) {
{"AGG_TO_COP()", "AGG_TO_COP()"},
{"AGG_TO_COP(@sel_1)", "AGG_TO_COP(@`sel_1`)"},
{"LIMIT_TO_COP()", "LIMIT_TO_COP()"},
{"MERGE()", "MERGE()"},
{"STRAIGHT_JOIN()", "STRAIGHT_JOIN()"},
{"NO_INDEX_MERGE()", "NO_INDEX_MERGE()"},
{"NO_INDEX_MERGE(@sel1)", "NO_INDEX_MERGE(@`sel1`)"},
Expand Down
6 changes: 3 additions & 3 deletions parser/hintparser.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion parser/hintparser.y
Original file line number Diff line number Diff line change
Expand Up @@ -531,13 +531,13 @@ UnsupportedTableLevelOptimizerHintName:
| "NO_BNL"
/* HASH_JOIN is supported by TiDB */
| "NO_HASH_JOIN"
| "MERGE"
| "NO_MERGE"

SupportedTableLevelOptimizerHintName:
"MERGE_JOIN"
| "BROADCAST_JOIN"
| "INL_JOIN"
| "MERGE"
| "INL_HASH_JOIN"
| "SWAP_JOIN_INPUTS"
| "NO_SWAP_JOIN_INPUTS"
Expand Down
34 changes: 34 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4229,6 +4229,16 @@ func TestOptimizerHints(t *testing.T) {
require.Equal(t, "limit_to_cop", hints[0].HintName.L)
require.Equal(t, "limit_to_cop", hints[1].HintName.L)

// Test CTE MERGE
stmt, _, err = p.Parse("with cte(x) as (select * from t1) select /*+ MERGE(), merge() */ * from cte;", "", "")
require.NoError(t, err)
selectStmt = stmt[0].(*ast.SelectStmt)

hints = selectStmt.TableHints
require.Len(t, hints, 2)
require.Equal(t, "merge", hints[0].HintName.L)
require.Equal(t, "merge", hints[1].HintName.L)

// Test STRAIGHT_JOIN
stmt, _, err = p.Parse("select /*+ STRAIGHT_JOIN(), straight_join() */ c1, c2 from t1, t2 where t1.c1 = t2.c1", "", "")
require.NoError(t, err)
Expand Down Expand Up @@ -6475,6 +6485,30 @@ func TestCTE(t *testing.T) {
RunTest(t, table, false)
}

// For CTE Merge
func TestCTEMerge(t *testing.T) {
table := []testCase{
{"WITH `cte` AS (SELECT 1,2) SELECT `col1`,`col2` FROM `cte`", true, "WITH `cte` AS (SELECT 1,2) SELECT `col1`,`col2` FROM `cte`"},
{"WITH `cte` (col1, col2) AS (SELECT 1,2 UNION ALL SELECT 3,4) SELECT col1, col2 FROM cte;", true, "WITH `cte` (`col1`, `col2`) AS (SELECT 1,2 UNION ALL SELECT 3,4) SELECT `col1`,`col2` FROM `cte`"},
{"WITH `cte` AS (SELECT 1,2), cte2 as (select 3) SELECT `col1`,`col2` FROM `cte`", true, "WITH `cte` AS (SELECT 1,2), `cte2` AS (SELECT 3) SELECT `col1`,`col2` FROM `cte`"},
{"with cte(a) as (select 1) update t, cte set t.a=1 where t.a=cte.a;", true, "WITH `cte` (`a`) AS (SELECT 1) UPDATE (`t`) JOIN `cte` SET `t`.`a`=1 WHERE `t`.`a`=`cte`.`a`"},
{"with cte(a) as (select 1) delete t from t, cte where t.a=cte.a;", true, "WITH `cte` (`a`) AS (SELECT 1) DELETE `t` FROM (`t`) JOIN `cte` WHERE `t`.`a`=`cte`.`a`"},
{"WITH cte1 AS (SELECT 1) SELECT * FROM (WITH cte2 AS (SELECT 2) SELECT * FROM cte2 JOIN cte1) AS dt;", true, "WITH `cte1` AS (SELECT 1) SELECT * FROM (WITH `cte2` AS (SELECT 2) SELECT * FROM `cte2` JOIN `cte1`) AS `dt`"},
{"WITH cte AS (SELECT 1) SELECT /*+ MAX_EXECUTION_TIME(1000) */ * FROM cte;", true, "WITH `cte` AS (SELECT 1) SELECT /*+ MAX_EXECUTION_TIME(1000)*/ * FROM `cte`"},
{"with cte as (table t) table cte;", true, "WITH `cte` AS (TABLE `t`) TABLE `cte`"},
{"with cte as (select 1) select 1 union with cte as (select 1) select * from cte;", false, ""},
{"with cte as (select 1) (select 1);", true, "WITH `cte` AS (SELECT 1) (SELECT 1)"},
{"with cte as (select 1) (select 1 union select 1)", true, "WITH `cte` AS (SELECT 1) (SELECT 1 UNION SELECT 1)"},
{"select * from (with cte as (select 1) select 1 union select 2) qn", true, "SELECT * FROM (WITH `cte` AS (SELECT 1) SELECT 1 UNION SELECT 2) AS `qn`"},
{"select * from t where 1 > (with cte as (select 2) select * from cte)", true, "SELECT * FROM `t` WHERE 1>(WITH `cte` AS (SELECT 2) SELECT * FROM `cte`)"},
{"( with cte(n) as ( select 1 ) select n+1 from cte union select n+2 from cte) union select 1", true, "(WITH `cte` (`n`) AS (SELECT 1) SELECT `n`+1 FROM `cte` UNION SELECT `n`+2 FROM `cte`) UNION SELECT 1"},
{"( with cte(n) as ( select 1 ) select n+1 from cte) union select 1", true, "(WITH `cte` (`n`) AS (SELECT 1) SELECT `n`+1 FROM `cte`) UNION SELECT 1"},
{"( with cte(n) as ( select 1 ) (select n+1 from cte)) union select 1", true, "(WITH `cte` (`n`) AS (SELECT 1) (SELECT `n`+1 FROM `cte`)) UNION SELECT 1"},
}

RunTest(t, table, false)
}

func TestAsOfClause(t *testing.T) {
table := []testCase{
{"SELECT * FROM `t` AS /* comment */ a;", true, "SELECT * FROM `t` AS `a`"},
Expand Down
50 changes: 49 additions & 1 deletion planner/core/logical_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ const (
HintIgnorePlanCache = "ignore_plan_cache"
// HintLimitToCop is a hint enforce pushing limit or topn to coprocessor.
HintLimitToCop = "limit_to_cop"
//HintMerge is a hint which can switch turning inline for the CTE.
HintMerge = "merge"
// HintSemiJoinRewrite is a hint to force we rewrite the semi join operator as much as possible.
HintSemiJoinRewrite = "semi_join_rewrite"
)
Expand Down Expand Up @@ -1763,7 +1765,6 @@ func (b *PlanBuilder) buildUnion(ctx context.Context, selects []LogicalPlan, aft
if err != nil {
return nil, err
}

unionDistinctPlan, err := b.buildUnionAll(ctx, distinctSelectPlans)
if err != nil {
return nil, err
Expand Down Expand Up @@ -3523,6 +3524,7 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev
aggHints aggHintInfo
timeRangeHint ast.HintTimeRange
limitHints limitHintInfo
MergeHints MergeHintInfo
leadingJoinOrder []hintTableInfo
leadingHintCnt int
)
Expand Down Expand Up @@ -3627,6 +3629,8 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev
timeRangeHint = hint.HintData.(ast.HintTimeRange)
case HintLimitToCop:
limitHints.preferLimitToCop = true
case HintMerge:
MergeHints.preferMerge = true
case HintLeading:
if leadingHintCnt == 0 {
leadingJoinOrder = append(leadingJoinOrder, tableNames2HintTableInfo(b.ctx, hint.HintName.L, hint.Tables, b.hintProcessor, currentLevel)...)
Expand Down Expand Up @@ -3663,6 +3667,7 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint, currentLev
indexMergeHintList: indexMergeHintList,
timeRangeHint: timeRangeHint,
limitHints: limitHints,
MergeHints: MergeHints,
leadingJoinOrder: leadingJoinOrder,
})
}
Expand Down Expand Up @@ -4004,6 +4009,12 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L
}
}

if b.buildingCTE {
if hints := b.TableHints(); hints != nil {
b.outerCTEs[len(b.outerCTEs)-1].isInline = hints.MergeHints.preferMerge
}
}

sel.Fields.Fields = originalFields
if oldLen != p.Schema().Len() {
proj := LogicalProjection{Exprs: expression.Column2Exprs(p.Schema().Columns[:oldLen])}.Init(b.ctx, b.getSelectOffset())
Expand Down Expand Up @@ -4166,6 +4177,20 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
lp := LogicalCTE{cteAsName: tn.Name, cte: cte.cteClass, seedStat: cte.seedStat, isOuterMostCTE: !b.buildingCTE}.Init(b.ctx, b.getSelectOffset())
prevSchema := cte.seedLP.Schema().Clone()
lp.SetSchema(getResultCTESchema(cte.seedLP.Schema(), b.ctx.GetSessionVars()))

if cte.isInline {
lp.MergeHints.preferMerge = cte.isInline
saveCte := b.outerCTEs[i:]
b.outerCTEs = b.outerCTEs[:i]
o := b.buildingCTE
b.buildingCTE = false
defer func() {
b.outerCTEs = append(b.outerCTEs, saveCte...)
b.buildingCTE = o
}()
return b.buildDataSourceFromCTEMerge(ctx, cte.def)
}

for i, col := range lp.schema.Columns {
lp.cte.ColumnMap[string(col.HashCode(nil))] = prevSchema.Columns[i]
}
Expand All @@ -4188,6 +4213,29 @@ func (b *PlanBuilder) tryBuildCTE(ctx context.Context, tn *ast.TableName, asName
return nil, nil
}

func (b *PlanBuilder) buildDataSourceFromCTEMerge(ctx context.Context, cte *ast.CommonTableExpression) (LogicalPlan, error) {
p, err := b.buildResultSetNode(ctx, cte.Query.Query)
if err != nil {
return nil, err
}
outPutNames := p.OutputNames()
for _, name := range outPutNames {
name.TblName = cte.Name
name.DBName = model.NewCIStr(b.ctx.GetSessionVars().CurrentDB)
}

if len(cte.ColNameList) > 0 {
if len(cte.ColNameList) != len(p.OutputNames()) {
return nil, errors.New("CTE columns length is not consistent")
}
for i, n := range cte.ColNameList {
outPutNames[i].ColName = n
}
}
p.SetOutputNames(outPutNames)
return p, nil
}

func (b *PlanBuilder) buildDataSource(ctx context.Context, tn *ast.TableName, asName *model.CIStr) (LogicalPlan, error) {
dbName := tn.Schema
sessionVars := b.ctx.GetSessionVars()
Expand Down
1 change: 1 addition & 0 deletions planner/core/logical_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1881,6 +1881,7 @@ type LogicalCTE struct {
cteAsName model.CIStr
seedStat *property.StatsInfo
isOuterMostCTE bool
MergeHints MergeHintInfo
}

// LogicalCTETable is for CTE table
Expand Down
70 changes: 70 additions & 0 deletions planner/core/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,6 +1110,76 @@ func TestLimitToCopHint(t *testing.T) {
}
}

func TestCTEMergeHint(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tc")
tk.MustExec("drop table if exists te")
tk.MustExec("drop table if exists t1")
tk.MustExec("drop table if exists t2")
tk.MustExec("drop table if exists t3")
tk.MustExec("drop table if exists t4")
tk.MustExec("create table tc(a int)")
tk.MustExec("create table te(c int)")
tk.MustExec("create table t1(a int)")
tk.MustExec("create table t2(b int)")
tk.MustExec("create table t3(c int)")
tk.MustExec("create table t4(d int)")
tk.MustExec("insert into tc values (1), (5), (10), (15), (20), (30), (50);")
tk.MustExec("insert into te values (1), (5), (10), (25), (40), (60), (100);")
tk.MustExec("insert into t1 values (1), (5), (10), (25), (40), (60), (100);")
tk.MustExec("insert into t2 values (1), (5), (10), (25), (40), (60), (100);")
tk.MustExec("insert into t3 values (1), (5), (10), (25), (40), (60), (100);")
tk.MustExec("insert into t4 values (1), (5), (10), (25), (40), (60), (100);")
tk.MustExec("analyze table tc;")
tk.MustExec("analyze table te;")
tk.MustExec("analyze table t1;")
tk.MustExec("analyze table t2;")
tk.MustExec("analyze table t3;")
tk.MustExec("analyze table t4;")
var (
input []string
output []struct {
SQL string
Plan []string
Warning []string
}
)

planSuiteData := core.GetPlanSuiteData()
planSuiteData.GetTestCases(t, &input, &output)

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))

comment := fmt.Sprintf("case:%v sql:%s", i, ts)
warnings := tk.Session().GetSessionVars().StmtCtx.GetWarnings()
testdata.OnRecord(func() {
if len(warnings) > 0 {
output[i].Warning = make([]string, len(warnings))
for j, warning := range warnings {
output[i].Warning[j] = warning.Err.Error()
}
}
})
if len(output[i].Warning) == 0 {
require.Len(t, warnings, 0)
} else {
require.Len(t, warnings, len(output[i].Warning), comment)
for j, warning := range warnings {
require.Equal(t, stmtctx.WarnLevelWarning, warning.Level, comment)
require.Equal(t, output[i].Warning[j], warning.Err.Error(), comment)
}
}
}
}

func TestPushdownDistinctEnable(t *testing.T) {
var (
input []string
Expand Down
8 changes: 8 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,19 @@ type tableHintInfo struct {
indexMergeHintList []indexHintInfo
timeRangeHint ast.HintTimeRange
limitHints limitHintInfo
MergeHints MergeHintInfo
leadingJoinOrder []hintTableInfo
}

type limitHintInfo struct {
preferLimitToCop bool
}

//MergeHintInfo ...one bool flag for cte
type MergeHintInfo struct {
preferMerge bool
}

type hintTableInfo struct {
dbName model.CIStr
tblName model.CIStr
Expand Down Expand Up @@ -427,6 +433,8 @@ type cteInfo struct {
seedStat *property.StatsInfo
// The LogicalCTEs that reference the same table should share the same CteClass.
cteClass *CTEClass

isInline bool
}

// PlanBuilder builds Plan from an ast.Node.
Expand Down
17 changes: 17 additions & 0 deletions planner/core/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,23 @@
"select /*+ LIMIT_TO_COP() */ a from tn where a > 10 limit 1"
]
},
{
"name": "TestCTEMergeHint",
"cases": [
"with cte as (select /*+ MERGE()*/ * from tc where tc.a < 60) select * from cte where cte.a <18",
"with cte as (select * from tc where tc.a < 60) select * from cte where cte.a <18",
"WITH cte1 AS (SELECT /*+ MERGE()*/ a FROM tc), cte2 AS (SELECT /*+ MERGE()*/ c FROM te) SELECT * FROM cte1 JOIN cte2 WHERE cte1.a = cte2.c;",
"WITH cte1 AS (SELECT a FROM tc), cte2 AS (SELECT /*+ MERGE()*/ c FROM te) SELECT * FROM cte1 JOIN cte2 WHERE cte1.a = cte2.c;",
"with recursive cte1(c1) as (select 1 union select /*+ merge */ c1 + 1 c1 from cte1 where c1 < 100) select * from cte1;",
"with cte1 as (select * from tc), cte2 as (with cte3 as (select /*+ MERGE() */ * from te) ,cte4 as (select * from tc) select * from cte3,cte4) select * from cte2;",
"with cte1 as (select * from t1), cte2 as (with cte3 as (with cte5 as (select * from t2),cte6 as (select * from t3) select * from cte5,cte6) ,cte4 as (select * from t4) select * from cte3,cte4) select * from cte1,cte2;",
"with cte1 as (select /*+ MERGE() */ * from t1), cte2 as (with cte3 as (with cte5 as (select * from t2),cte6 as (select * from t3) select * from cte5,cte6) ,cte4 as (select * from t4) select * from cte3,cte4) select * from cte1,cte2;",
"with cte1 as (select * from t1), cte2 as (with cte3 as (with cte5 as (select * from t2),cte6 as (select * from t3) select * from cte5,cte6) ,cte4 as (select /*+ MERGE() */ * from t4) select * from cte3,cte4) select * from cte1,cte2;",
"with cte1 as (select * from t1), cte2 as (with cte3 as (with cte5 as (select * from t2),cte6 as (select /*+ MERGE() */ * from t3) select * from cte5,cte6) ,cte4 as (select * from t4) select * from cte3,cte4) select * from cte1,cte2;",
"with cte2 as (with cte4 as (select * from tc) select * from te, cte4) select * from cte2;",
"with cte2 as (with cte4 as (select /*+ merge() */ * from tc) select * from te, cte4) select * from cte2;"
]
},
{
"name": "TestPushdownDistinctEnable",
"cases": [
Expand Down
Loading

0 comments on commit a57dd37

Please sign in to comment.