Skip to content

Commit

Permalink
planner/cascades: implement ImplementationRule for Selection (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
francis0407 authored and XiaTianliang committed Dec 21, 2019
1 parent 7f42fc0 commit ddc9a4e
Show file tree
Hide file tree
Showing 19 changed files with 472 additions and 85 deletions.
31 changes: 30 additions & 1 deletion planner/cascades/implementation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ var defaultImplementationMap = map[memo.Operand][]ImplementationRule{
memo.OperandShow: {
&ImplShow{},
},
memo.OperandSelection: {
&ImplSelection{},
},
}

// ImplTableDual implements LogicalTableDual as PhysicalTableDual.
Expand Down Expand Up @@ -151,9 +154,35 @@ func (r *ImplShow) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalP

// TODO(zz-jason): unifying LogicalShow and PhysicalShow to a single
// struct. So that we don't need to create a new PhysicalShow object, which
// can help us to reduce the gc presure of golang runtime and improve the
// can help us to reduce the gc pressure of golang runtime and improve the
// overall performance.
showPhys := plannercore.PhysicalShow{ShowContents: show.ShowContents}.Init(show.SCtx())
showPhys.SetSchema(logicProp.Schema)
return impl.NewShowImpl(showPhys), nil
}

// ImplSelection is the implementation rule which implements LogicalSelection
// to PhysicalSelection.
type ImplSelection struct {
}

// Match implements ImplementationRule Match interface.
func (r *ImplSelection) Match(expr *memo.GroupExpr, prop *property.PhysicalProperty) (matched bool) {
return true
}

// OnImplement implements ImplementationRule OnImplement interface.
func (r *ImplSelection) OnImplement(expr *memo.GroupExpr, reqProp *property.PhysicalProperty) (memo.Implementation, error) {
logicalSel := expr.ExprNode.(*plannercore.LogicalSelection)
physicalSel := plannercore.PhysicalSelection{
Conditions: logicalSel.Conditions,
}.Init(logicalSel.SCtx(), expr.Group.Prop.Stats.ScaleByExpectCnt(reqProp.ExpectedCnt), logicalSel.SelectBlockOffset(), reqProp.Clone())
switch expr.Group.EngineType {
case memo.EngineTiDB:
return impl.NewTiDBSelectionImpl(physicalSel), nil
case memo.EngineTiKV:
return impl.NewTiKVSelectionImpl(physicalSel), nil
default:
return nil, plannercore.ErrInternal.GenWithStack("Unsupported EngineType '%s' for Selection.", expr.Group.EngineType.String())
}
}
37 changes: 21 additions & 16 deletions planner/cascades/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testutil"
)

var _ = Suite(&testIntegrationSuite{})

type testIntegrationSuite struct {
store kv.Storage
store kv.Storage
testData testutil.TestData
}

func newStoreWithBootstrap() (kv.Storage, error) {
Expand All @@ -40,9 +42,12 @@ func (s *testIntegrationSuite) SetUpSuite(c *C) {
var err error
s.store, err = newStoreWithBootstrap()
c.Assert(err, IsNil)
s.testData, err = testutil.LoadTestSuiteData("testdata", "integration_suite")
c.Assert(err, IsNil)
}

func (s *testIntegrationSuite) TearDownSuite(c *C) {
c.Assert(s.testData.GenerateOutputIfNeeded(), IsNil)
s.store.Close()
}

Expand All @@ -62,22 +67,22 @@ func (s *testIntegrationSuite) TestPKIsHandleRangeScan(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int primary key, b int)")
tk.MustExec("insert into t values(1,2),(3,4)")
tk.MustExec("insert into t values(1,2),(3,4),(5,6)")
tk.MustExec("set session tidb_enable_cascades_planner = 1")
tk.MustQuery("explain select b from t where a > 1").Check(testkit.Rows(
"Projection_8 3333.33 root Column#2",
"└─TableReader_9 3333.33 root data:TableScan_10",
" └─TableScan_10 3333.33 cop[tikv] table:t, range:(1,+inf], keep order:false, stats:pseudo",
))
tk.MustQuery("select b from t where a > 1").Check(testkit.Rows(
"4",
))
tk.MustQuery("explain select b from t where a > 1 and a < 3").Check(testkit.Rows(
"Projection_8 2.00 root Column#2",
"└─TableReader_9 2.00 root data:TableScan_10",
" └─TableScan_10 2.00 cop[tikv] table:t, range:(1,3), keep order:false, stats:pseudo",
))
tk.MustQuery("select b from t where a > 1 and a < 3").Check(testkit.Rows())

var input []string
var output []struct {
SQL string
Result []string
}
s.testData.GetTestCases(c, &input, &output)
for i, sql := range input {
s.testData.OnRecord(func() {
output[i].SQL = sql
output[i].Result = s.testData.ConvertRowsToStrings(tk.MustQuery(sql).Rows())
})
tk.MustQuery(sql).Check(testkit.Rows(output[i].Result...))
}
}

func (s *testIntegrationSuite) TestBasicShow(c *C) {
Expand Down
11 changes: 5 additions & 6 deletions planner/cascades/optimize.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (opt *Optimizer) implGroup(g *memo.Group, reqPhysProp *property.PhysicalPro
}
// Handle implementation rules for each equivalent GroupExpr.
var cumCost float64
var childCosts []float64
var childImpls []memo.Implementation
var childPlans []plannercore.PhysicalPlan
err := opt.fillGroupStats(g)
if err != nil {
Expand All @@ -300,8 +300,8 @@ func (opt *Optimizer) implGroup(g *memo.Group, reqPhysProp *property.PhysicalPro
}
for _, impl := range impls {
cumCost = 0.0
childCosts = childCosts[:0]
childPlans = childPlans[:0]
childImpls = childImpls[:0]
for i, childGroup := range curExpr.Children {
childImpl, err := opt.implGroup(childGroup, impl.GetPlan().GetChildReqProps(i), costLimit-cumCost)
if err != nil {
Expand All @@ -311,15 +311,14 @@ func (opt *Optimizer) implGroup(g *memo.Group, reqPhysProp *property.PhysicalPro
impl.SetCost(math.MaxFloat64)
break
}
childCost := childImpl.GetCost()
childCosts = append(childCosts, childCost)
cumCost += childCost
cumCost += childImpl.GetCost()
childImpls = append(childImpls, childImpl)
childPlans = append(childPlans, childImpl.GetPlan())
}
if impl.GetCost() == math.MaxFloat64 {
continue
}
cumCost = impl.CalcCost(outCount, childCosts, curExpr.Children...)
cumCost = impl.CalcCost(outCount, childImpls...)
if cumCost > costLimit {
continue
}
Expand Down
19 changes: 19 additions & 0 deletions planner/cascades/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[
{
"name": "TestPKIsHandleRangeScan",
"cases": [
"explain select b from t where a > 1",
"select b from t where a > 1",
"explain select b from t where a > 1 and a < 3",
"select b from t where a > 1 and a < 3",
"explain select b from t where a > 1 and b < 6",
"select b from t where a > 1 and b < 6",
"explain select a from t where a * 3 + 1 > 9 and a < 5",
"select a from t where a * 3 + 1 > 9 and a < 5",
// Test TiDBSelection Implementation.
// TODO: change this test case to agg + sel or join + sel when we support them.
"explain select a from t where a * 3 + 1 > 9 and sin(a) < 0.5 and a < 5",
"select a from t where a * 3 + 1 > 9 and sin(a) < 0.5 and a < 5"
]
}
]
80 changes: 80 additions & 0 deletions planner/cascades/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
[
{
"Name": "TestPKIsHandleRangeScan",
"Cases": [
{
"SQL": "explain select b from t where a > 1",
"Result": [
"Projection_8 3333.33 root Column#2",
"└─TableReader_9 3333.33 root data:TableScan_10",
" └─TableScan_10 3333.33 cop[tikv] table:t, range:(1,+inf], keep order:false, stats:pseudo"
]
},
{
"SQL": "select b from t where a > 1",
"Result": [
"4",
"6"
]
},
{
"SQL": "explain select b from t where a > 1 and a < 3",
"Result": [
"Projection_8 2.00 root Column#2",
"└─TableReader_9 2.00 root data:TableScan_10",
" └─TableScan_10 2.00 cop[tikv] table:t, range:(1,3), keep order:false, stats:pseudo"
]
},
{
"SQL": "select b from t where a > 1 and a < 3",
"Result": null
},
{
"SQL": "explain select b from t where a > 1 and b < 6",
"Result": [
"Projection_9 2666.67 root Column#2",
"└─TableReader_10 2666.67 root data:Selection_11",
" └─Selection_11 2666.67 cop[tikv] lt(Column#2, 6)",
" └─TableScan_12 3333.33 cop[tikv] table:t, range:(1,+inf], keep order:false, stats:pseudo"
]
},
{
"SQL": "select b from t where a > 1 and b < 6",
"Result": [
"4"
]
},
{
"SQL": "explain select a from t where a * 3 + 1 > 9 and a < 5",
"Result": [
"Projection_9 4.00 root Column#1",
"└─TableReader_10 4.00 root data:Selection_11",
" └─Selection_11 4.00 cop[tikv] gt(plus(mul(Column#1, 3), 1), 9)",
" └─TableScan_12 5.00 cop[tikv] table:t, range:[-inf,5), keep order:false, stats:pseudo"
]
},
{
"SQL": "select a from t where a * 3 + 1 > 9 and a < 5",
"Result": [
"3"
]
},
{
"SQL": "explain select a from t where a * 3 + 1 > 9 and sin(a) < 0.5 and a < 5",
"Result": [
"Projection_10 3.20 root Column#1",
"└─Selection_11 3.20 root lt(sin(cast(Column#1)), 0.5)",
" └─TableReader_12 4.00 root data:Selection_13",
" └─Selection_13 4.00 cop[tikv] gt(plus(mul(Column#1, 3), 1), 9)",
" └─TableScan_14 5.00 cop[tikv] table:t, range:[-inf,5), keep order:false, stats:pseudo"
]
},
{
"SQL": "select a from t where a * 3 + 1 > 9 and sin(a) < 0.5 and a < 5",
"Result": [
"3"
]
}
]
}
]
15 changes: 8 additions & 7 deletions planner/cascades/transformation_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func (r *PushSelDownTableScan) GetPattern() *memo.Pattern {
if p, ok := patternMap[r]; ok {
return p
}
ts := memo.NewPattern(memo.OperandTableScan)
p := memo.BuildPattern(memo.OperandSelection, ts)
ts := memo.NewPattern(memo.OperandTableScan, memo.EngineTiKVOrTiFlash)
p := memo.BuildPattern(memo.OperandSelection, memo.EngineTiKVOrTiFlash, ts)
patternMap[r] = p
return p
}
Expand Down Expand Up @@ -120,9 +120,9 @@ func (r *PushSelDownTableGather) GetPattern() *memo.Pattern {
if p, ok := patternMap[r]; ok {
return p
}
any := memo.NewPattern(memo.OperandAny)
tg := memo.BuildPattern(memo.OperandTableGather, any)
p := memo.BuildPattern(memo.OperandSelection, tg)
any := memo.NewPattern(memo.OperandAny, memo.EngineTiKVOrTiFlash)
tg := memo.BuildPattern(memo.OperandTableGather, memo.EngineTiDBOnly, any)
p := memo.BuildPattern(memo.OperandSelection, memo.EngineTiDBOnly, tg)
patternMap[r] = p
return p
}
Expand Down Expand Up @@ -150,7 +150,7 @@ func (r *PushSelDownTableGather) OnTransform(old *memo.ExprIter) (newExprs []*me
pushedSel := plannercore.LogicalSelection{Conditions: pushed}.Init(sctx, sel.SelectBlockOffset())
pushedSelExpr := memo.NewGroupExpr(pushedSel)
pushedSelExpr.Children = append(pushedSelExpr.Children, childGroup)
pushedSelGroup := memo.NewGroupWithSchema(pushedSelExpr, childGroup.Prop.Schema)
pushedSelGroup := memo.NewGroupWithSchema(pushedSelExpr, childGroup.Prop.Schema).SetEngineType(childGroup.EngineType)
// The field content of TableGather would not be modified currently, so we
// just reference the same tg instead of making a copy of it.
//
Expand Down Expand Up @@ -179,7 +179,7 @@ func (r *EnumeratePaths) GetPattern() *memo.Pattern {
if p, ok := patternMap[r]; ok {
return p
}
p := memo.NewPattern(memo.OperandDataSource)
p := memo.NewPattern(memo.OperandDataSource, memo.EngineTiDBOnly)
patternMap[r] = p
return p
}
Expand All @@ -195,6 +195,7 @@ func (r *EnumeratePaths) OnTransform(old *memo.ExprIter) (newExprs []*memo.Group
gathers := ds.Convert2Gathers()
for _, gather := range gathers {
expr := convert2GroupExpr(gather)
expr.Children[0].SetEngineType(memo.EngineTiKV)
newExprs = append(newExprs, expr)
}
return newExprs, true, false, nil
Expand Down
8 changes: 8 additions & 0 deletions planner/core/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ type PhysicalPlan interface {

// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices.
ResolveIndices() error

// Stats returns the StatsInfo of the plan.
Stats() *property.StatsInfo
}

type baseLogicalPlan struct {
Expand Down Expand Up @@ -311,6 +314,11 @@ func (p *basePlan) SelectBlockOffset() int {
return p.blockOffset
}

// Stats implements Plan Stats interface.
func (p *basePlan) Stats() *property.StatsInfo {
return p.stats
}

// Schema implements Plan Schema interface.
func (p *baseLogicalPlan) Schema() *expression.Schema {
return p.children[0].Schema()
Expand Down
6 changes: 3 additions & 3 deletions planner/implementation/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type baseImpl struct {
plan plannercore.PhysicalPlan
}

func (impl *baseImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 {
func (impl *baseImpl) CalcCost(outCount float64, children ...memo.Implementation) float64 {
impl.cost = 0
for _, childCost := range childCosts {
impl.cost += childCost
for _, child := range children {
impl.cost += child.GetCost()
}
return impl.cost
}
Expand Down
8 changes: 4 additions & 4 deletions planner/implementation/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/infoschema"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/memo"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/testleak"
)
Expand Down Expand Up @@ -54,10 +55,9 @@ func (s *testImplSuite) TestBaseImplementation(c *C) {
impl := &baseImpl{plan: p}
c.Assert(impl.GetPlan(), Equals, p)

childCosts := []float64{5.0}
cost := impl.CalcCost(10, childCosts, nil)
c.Assert(cost, Equals, 5.0)
c.Assert(impl.GetCost(), Equals, 5.0)
cost := impl.CalcCost(10, []memo.Implementation{}...)
c.Assert(cost, Equals, 0.0)
c.Assert(impl.GetCost(), Equals, 0.0)

impl.SetCost(6.0)
c.Assert(impl.GetCost(), Equals, 6.0)
Expand Down
8 changes: 4 additions & 4 deletions planner/implementation/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewTableDualImpl(dual *plannercore.PhysicalTableDual) *TableDualImpl {
}

// CalcCost calculates the cost of the table dual Implementation.
func (impl *TableDualImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 {
func (impl *TableDualImpl) CalcCost(outCount float64, children ...memo.Implementation) float64 {
return 0
}

Expand All @@ -52,7 +52,7 @@ func NewTableReaderImpl(reader *plannercore.PhysicalTableReader, hists *statisti
}

// CalcCost calculates the cost of the table reader Implementation.
func (impl *TableReaderImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 {
func (impl *TableReaderImpl) CalcCost(outCount float64, children ...memo.Implementation) float64 {
reader := impl.plan.(*plannercore.PhysicalTableReader)
width := impl.tblColHists.GetAvgRowSize(reader.Schema().Columns, false)
sessVars := reader.SCtx().GetSessionVars()
Expand All @@ -62,7 +62,7 @@ func (impl *TableReaderImpl) CalcCost(outCount float64, childCosts []float64, ch
// is Min(DistSQLScanConcurrency, numRegionsInvolvedInScan), since we cannot infer
// the number of regions involved, we simply use DistSQLScanConcurrency.
copIterWorkers := float64(sessVars.DistSQLScanConcurrency)
impl.cost = (networkCost + childCosts[0]) / copIterWorkers
impl.cost = (networkCost + children[0].GetCost()) / copIterWorkers
return impl.cost
}

Expand All @@ -85,7 +85,7 @@ func NewTableScanImpl(ts *plannercore.PhysicalTableScan, cols []*expression.Colu
}

// CalcCost calculates the cost of the table scan Implementation.
func (impl *TableScanImpl) CalcCost(outCount float64, childCosts []float64, children ...*memo.Group) float64 {
func (impl *TableScanImpl) CalcCost(outCount float64, children ...memo.Implementation) float64 {
ts := impl.plan.(*plannercore.PhysicalTableScan)
width := impl.tblColHists.GetAvgRowSize(impl.tblCols, false)
sessVars := ts.SCtx().GetSessionVars()
Expand Down
Loading

0 comments on commit ddc9a4e

Please sign in to comment.