Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add more checks when pushing TopN down #41370

Merged
merged 12 commits into from
Feb 15, 2023
79 changes: 79 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8452,3 +8452,82 @@ func TestIssue41273(t *testing.T) {
// For now tidb doesn't support push set type to TiKV, and column a is a set type, so we shouldn't generate a IndexMerge path.
require.False(t, tk.HasPlanForLastExecution("IndexMerge"))
}

// https://github.com/pingcap/tidb/issues/41355
// The "virtual generated column" push down is not supported now.
// This test covers: TopN, Projection, Selection.
func TestVirtualExprPushDown(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test;")
tk.MustExec("drop table if exists t;")
tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);")
tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);")
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'")

// TopN to tikv.
rows := [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_13", "root", "data:TableFullScan_12"},
{" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tikv.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_5", "root", "data:TableFullScan_4"},
{" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tikv.
rows = [][]interface{}{
{"Selection_7", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)

tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}

// TopN to tiflash.
rows = [][]interface{}{
{"TopN_7", "root", "test.t.c2, offset:0, count:2"},
{"└─TableReader_15", "root", "data:TableFullScan_14"},
{" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)

// Projection to tiflash.
rows = [][]interface{}{
{"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"},
{"└─TableReader_6", "root", "data:TableFullScan_5"},
{" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustExec("set session tidb_opt_projection_push_down='ON';")
tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows)
tk.MustExec("set session tidb_opt_projection_push_down='OFF';")

// Selection to tiflash.
rows = [][]interface{}{
{"Selection_8", "root", "gt(test.t.c2, 1)"},
{"└─TableReader_7", "root", "data:TableFullScan_6"},
{" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows)
}
63 changes: 52 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -955,14 +946,64 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
return true
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask.plan().Schema().Columns) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool {
if !p.canExpressionConvertedToPB(kv.TiFlash) {
return false
}
if p.containVirtualColumn(mppTask.plan().Schema().Columns) {
return false
}
return true
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
t := tasks[0].copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) {
newTask, changed := p.pushTopNDownToDynamicPartition(copTask)
if changed {
return newTask
Expand All @@ -978,7 +1019,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan)
copTask.tablePlan = pushedDownTopN
}
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down