diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index d1a2d424e68fc..3c762176cafde 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -6714,6 +6714,87 @@ func TestAutoIncrementCheckWithCheckConstraint(t *testing.T) { )`) } +// https://github.com/pingcap/tidb/issues/41355 +// The "virtual generated column" push down is not supported now. +// This test covers: TopN, Projection, Selection. +func TestVirtualExprPushDown(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test;") + tk.MustExec("drop table if exists t;") + tk.MustExec("CREATE TABLE t (c1 int DEFAULT 0, c2 int GENERATED ALWAYS AS (abs(c1)) VIRTUAL);") + tk.MustExec("insert into t(c1) values(1), (-1), (2), (-2), (99), (-99);") + tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'") + + // TopN to tikv. + rows := [][]interface{}{ + {"TopN_7", "root", "test.t.c2, offset:0, count:2"}, + {"└─TableReader_13", "root", "data:TableFullScan_12"}, + {" └─TableFullScan_12", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows) + + // Projection to tikv. + rows = [][]interface{}{ + {"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"}, + {"└─TableReader_5", "root", "data:TableFullScan_4"}, + {" └─TableFullScan_4", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustExec("set session tidb_opt_projection_push_down='ON';") + tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows) + tk.MustExec("set session tidb_opt_projection_push_down='OFF';") + + // Selection to tikv. + rows = [][]interface{}{ + {"Selection_7", "root", "gt(test.t.c2, 1)"}, + {"└─TableReader_6", "root", "data:TableFullScan_5"}, + {" └─TableFullScan_5", "cop[tikv]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows) + + tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1") + tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'") + dom := domain.GetDomain(tk.Session()) + is := dom.InfoSchema() + db, exists := is.SchemaByName(model.NewCIStr("test")) + require.True(t, exists) + for _, tblInfo := range db.Tables { + if tblInfo.Name.L == "t" { + tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{ + Count: 1, + Available: true, + } + } + } + + // TopN to tiflash. + rows = [][]interface{}{ + {"TopN_7", "root", "test.t.c2, offset:0, count:2"}, + {"└─TableReader_15", "root", "data:TableFullScan_14"}, + {" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows) + + // Projection to tiflash. + rows = [][]interface{}{ + {"Projection_3", "root", "plus(test.t.c1, test.t.c2)->Column#4"}, + {"└─TableReader_6", "root", "data:TableFullScan_5"}, + {" └─TableFullScan_5", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustExec("set session tidb_opt_projection_push_down='ON';") + tk.MustQuery("explain select c1 + c2 from t;").CheckAt([]int{0, 2, 4}, rows) + tk.MustExec("set session tidb_opt_projection_push_down='OFF';") + + // Selection to tiflash. + rows = [][]interface{}{ + {"Selection_8", "root", "gt(test.t.c2, 1)"}, + {"└─TableReader_7", "root", "data:TableFullScan_6"}, + {" └─TableFullScan_6", "cop[tiflash]", "keep order:false, stats:pseudo"}, + } + tk.MustQuery("explain select * from t where c2 > 1;").CheckAt([]int{0, 2, 4}, rows) +} + // https://github.com/pingcap/tidb/issues/41273 func TestIssue41273(t *testing.T) { store, clean := testkit.CreateMockStore(t) diff --git a/planner/core/task.go b/planner/core/task.go index 075f1e21488bf..47f742b35a77e 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -941,15 +941,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool { return true } -// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed. -func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool { - exprs := make([]expression.Expression, 0, len(p.ByItems)) - for _, item := range p.ByItems { - exprs = append(exprs, item.Expr) - } - return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp) -} - func (p *PhysicalSort) attach2Task(tasks ...task) task { t := tasks[0].copy() t = attachPlan2Task(p, t) @@ -1003,6 +994,56 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []* return true } +// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb. +func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool { + exprs := make([]expression.Expression, 0, len(p.ByItems)) + for _, item := range p.ByItems { + exprs = append(exprs, item.Expr) + } + return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp) +} + +// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns. +func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool { + for _, by := range p.ByItems { + cols := expression.ExtractColumns(by.Expr) + for _, col := range cols { + for _, tCol := range tCols { + // A column with ID > 0 indicates that the column can be resolved by data source. + if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil { + return true + } + } + } + } + return false +} + +// canPushDownToTiKV checks whether this topN can be pushed down to TiKV. +func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool { + if !p.canExpressionConvertedToPB(kv.TiKV) { + return false + } + if len(copTask.rootTaskConds) != 0 { + return false + } + if p.containVirtualColumn(copTask.plan().Schema().Columns) { + return false + } + return true +} + +// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash. +func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool { + if !p.canExpressionConvertedToPB(kv.TiFlash) { + return false + } + if p.containVirtualColumn(mppTask.plan().Schema().Columns) { + return false + } + return true +} + func (p *PhysicalTopN) attach2Task(tasks ...task) task { t := tasks[0].copy() inputCount := t.count() @@ -1011,7 +1052,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { cols = append(cols, expression.ExtractColumns(item.Expr)...) } needPushDown := len(cols) > 0 - if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 { + if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) && len(copTask.rootTaskConds) == 0 { // If all columns in topN are from index plan, we push it to index plan, otherwise we finish the index plan and // push it to table plan. var pushedDownTopN *PhysicalTopN @@ -1024,7 +1065,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task { copTask.tablePlan = pushedDownTopN } copTask.addCost(pushedDownTopN.GetCost(inputCount, false)) - } else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) { + } else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) { pushedDownTopN := p.getPushedDownTopN(mppTask.p) mppTask.addCost(pushedDownTopN.GetCost(mppTask.p.StatsCount(), false)) pushedDownTopN.SetCost(mppTask.cst)