diff --git a/ddl/placement/common.go b/ddl/placement/common.go index cd02622dd0562..7c77ead97e30e 100644 --- a/ddl/placement/common.go +++ b/ddl/placement/common.go @@ -54,4 +54,8 @@ const ( // EngineLabelTiKV is the label value used in some tests. And possibly TiKV will // set the engine label with a value of EngineLabelTiKV. EngineLabelTiKV = "tikv" + + // EngineLabelTiFlashCompute is for disaggregated tiflash mode, + // it's the lable of tiflash_compute nodes. + EngineLabelTiFlashCompute = "tiflash_compute" ) diff --git a/executor/tiflashtest/tiflash_test.go b/executor/tiflashtest/tiflash_test.go index 9898ecd89a8eb..ca52fb48fd788 100644 --- a/executor/tiflashtest/tiflash_test.go +++ b/executor/tiflashtest/tiflash_test.go @@ -1277,3 +1277,35 @@ func TestDisaggregatedTiFlash(t *testing.T) { }) tk.MustQuery("select * from t;").Check(testkit.Rows()) } + +func TestDisaggregatedTiFlashQuery(t *testing.T) { + config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = true + }) + defer config.UpdateGlobal(func(conf *config.Config) { + conf.DisaggregatedTiFlash = false + }) + + store := testkit.CreateMockStore(t, withMockTiFlash(2)) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists tbl_1") + tk.MustExec(`create table tbl_1 ( col_1 bigint not null default -1443635317331776148, + col_2 text ( 176 ) collate utf8mb4_bin not null, + col_3 decimal ( 8, 3 ), + col_4 varchar ( 128 ) collate utf8mb4_bin not null, + col_5 varchar ( 377 ) collate utf8mb4_bin, + col_6 double, + col_7 varchar ( 459 ) collate utf8mb4_bin, + col_8 tinyint default -88 ) charset utf8mb4 collate utf8mb4_bin ;`) + tk.MustExec("alter table tbl_1 set tiflash replica 1") + tb := external.GetTableByName(t, tk, "test", "tbl_1") + err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true) + require.NoError(t, err) + tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"") + + needCheckTiFlashComputeNode := "false" + failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode)) + defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery") + tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;") +} diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index ab120d452ac1c..aff1c29997fbd 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1993,8 +1993,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } } } - // In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated. - if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash { + // In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated. + // So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask. + isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash + canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed() + if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash { if ts.KeepOrder { return invalidTask, nil } @@ -2010,8 +2013,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid } } mppTask := &mppTask{ - p: ts, - partTp: property.AnyType, + p: ts, + partTp: property.AnyType, + tblColHists: ds.TblColHists, } ts.PartitionInfo = PartitionInfo{ PruningConds: pushDownNot(ds.ctx, ds.allConds), @@ -2020,7 +2024,26 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid ColumnNames: ds.names, } mppTask = ts.addPushedDownSelectionToMppTask(mppTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)) - return mppTask, nil + task = mppTask + if !mppTask.invalid() { + if prop.TaskTp == property.MppTaskType && len(mppTask.rootTaskConds) > 0 { + // If got filters cannot be pushed down to tiflash, we have to make sure it will be executed in TiDB, + // So have to return a rootTask, but prop requires mppTask, cannot meet this requirement. + task = invalidTask + } else if prop.TaskTp == property.RootTaskType { + // when got here, canMppConvertToRootForDisaggregatedTiFlash is true. + task = mppTask + task = task.convertToRootTask(ds.ctx) + if !task.invalid() { + ds.addSelection4PlanCache(task.(*rootTask), ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), prop) + } + } + } + return task, nil + } + if isDisaggregatedTiFlashPath { + // prop.TaskTp is cop related, just return invalidTask. + return invalidTask, nil } copTask := &copTask{ tablePlan: ts, @@ -2230,10 +2253,8 @@ func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *mppTask, stats filterCondition, rootTaskConds := SplitSelCondsWithVirtualColumn(ts.filterCondition) var newRootConds []expression.Expression filterCondition, newRootConds = expression.PushDownExprs(ts.ctx.GetSessionVars().StmtCtx, filterCondition, ts.ctx.GetClient(), ts.StoreType) - rootTaskConds = append(rootTaskConds, newRootConds...) - if len(rootTaskConds) > 0 { - return &mppTask{} - } + mpp.rootTaskConds = append(rootTaskConds, newRootConds...) + ts.filterCondition = filterCondition // Add filter condition to table plan now. if len(ts.filterCondition) > 0 { diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 60a3eba28292f..e41a523a7671b 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -26,6 +26,7 @@ import ( "unsafe" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/bindinfo" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" @@ -1462,6 +1463,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath, // 1. path.StoreType doesn't exists in isolationReadEngines or // 2. TiFlash is disaggregated and the number of tiflash_compute node is zero. shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash + failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) { + // Ignore check if tiflash_compute node number. + // After we support disaggregated tiflash in test framework, can delete this failpoint. + shouldPruneTiFlashCompute = val.(bool) + }) if shouldPruneTiFlashCompute { outputComputeNodeErrMsg = true } diff --git a/planner/core/task.go b/planner/core/task.go index 99952038688fe..c595880c9a37b 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1982,10 +1982,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task { } attachPlan2Task(proj, newMpp) return newMpp - case NoMpp: - t = mpp.convertToRootTask(p.ctx) - attachPlan2Task(p, t) - return t default: return invalidTask } @@ -2072,6 +2068,19 @@ type mppTask struct { partTp property.MPPPartitionType hashCols []*property.MPPPartitionColumn + + // rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash. + + // For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash. + // Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash) + // Because planner will make mppTask invalid directly then use copTask directly. + + // But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask. + // When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask, + // and filters in rootTaskConds will be added in a Selection which will be executed in TiDB. + // So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash) + rootTaskConds []expression.Expression + tblColHists *statistics.HistColl } func (t *mppTask) count() float64 { @@ -2151,6 +2160,32 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { rt := &rootTask{ p: p, } + + if len(t.rootTaskConds) > 0 { + // Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask, + // so this Selection will be executed in TiDB. + _, isTableScan := t.p.(*PhysicalTableScan) + _, isSelection := t.p.(*PhysicalSelection) + if isSelection { + _, isTableScan = t.p.Children()[0].(*PhysicalTableScan) + } + if !isTableScan { + // Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition. + // It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built, + // so no other operators are added into this mppTask. + logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP())) + return invalidTask + } + selectivity, _, err := t.tblColHists.Selectivity(ctx, t.rootTaskConds, nil) + if err != nil { + logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err)) + selectivity = SelectionFactor + } + sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.p.statsInfo().Scale(selectivity), rt.p.SelectBlockOffset()) + sel.fromDataSource = true + sel.SetChildren(rt.p) + rt.p = sel + } return rt } diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index 054ec83ee7e8a..87a990f7f096c 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -912,6 +912,10 @@ func needsGCOperationForStore(store *metapb.Store) (bool, error) { // skip physical resolve locks for it. return false, nil + case placement.EngineLabelTiFlashCompute: + logutil.BgLogger().Debug("[gc worker] will ignore gc tiflash_compute node") + return false, nil + case placement.EngineLabelTiKV, "": // If no engine label is set, it should be a TiKV node. return true, nil