Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: refine planner code for disaggregated tiflash mode #39813

Merged
merged 15 commits into from
Dec 29, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ddl/placement/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,8 @@ const (
// EngineLabelTiKV is the label value used in some tests. And possibly TiKV will
// set the engine label with a value of EngineLabelTiKV.
EngineLabelTiKV = "tikv"

// EngineLabelTiFlashCompute is for disaggregated tiflash mode,
// it's the lable of tiflash_compute nodes.
EngineLabelTiFlashCompute = "tiflash_compute"
)
32 changes: 32 additions & 0 deletions executor/tiflashtest/tiflash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,3 +1303,35 @@ func TestDisaggregatedTiFlash(t *testing.T) {
})
tk.MustQuery("select * from t;").Check(testkit.Rows())
}

func TestDisaggregatedTiFlashQuery(t *testing.T) {
config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = true
})
defer config.UpdateGlobal(func(conf *config.Config) {
conf.DisaggregatedTiFlash = false
})

store := testkit.CreateMockStore(t, withMockTiFlash(2))
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists tbl_1")
tk.MustExec(`create table tbl_1 ( col_1 bigint not null default -1443635317331776148,
col_2 text ( 176 ) collate utf8mb4_bin not null,
col_3 decimal ( 8, 3 ),
col_4 varchar ( 128 ) collate utf8mb4_bin not null,
col_5 varchar ( 377 ) collate utf8mb4_bin,
col_6 double,
col_7 varchar ( 459 ) collate utf8mb4_bin,
col_8 tinyint default -88 ) charset utf8mb4 collate utf8mb4_bin ;`)
tk.MustExec("alter table tbl_1 set tiflash replica 1")
tb := external.GetTableByName(t, tk, "test", "tbl_1")
err := domain.GetDomain(tk.Session()).DDL().UpdateTableReplicaInfo(tk.Session(), tb.Meta().ID, true)
require.NoError(t, err)
tk.MustExec("set @@session.tidb_isolation_read_engines=\"tiflash\"")

needCheckTiFlashComputeNode := "false"
failpoint.Enable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery", fmt.Sprintf("return(%s)", needCheckTiFlashComputeNode))
defer failpoint.Disable("github.com/pingcap/tidb/planner/core/testDisaggregatedTiFlashQuery")
tk.MustExec("explain select max( tbl_1.col_1 ) as r0 , sum( tbl_1.col_1 ) as r1 , sum( tbl_1.col_8 ) as r2 from tbl_1 where tbl_1.col_8 != 68 or tbl_1.col_3 between null and 939 order by r0,r1,r2;")
}
37 changes: 28 additions & 9 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1986,8 +1986,11 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
}
// In disaggregated tiflash mode, only MPP is allowed, Cop and BatchCop is deprecated.
if prop.TaskTp == property.MppTaskType || config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash {
// In disaggregated tiflash mode, only MPP is allowed, cop and batchCop is deprecated.
// So if prop.TaskTp is RootTaskType, have to use mppTask then convert to rootTask.
isDisaggregatedTiFlashPath := config.GetGlobalConfig().DisaggregatedTiFlash && ts.StoreType == kv.TiFlash
canMppConvertToRootForDisaggregatedTiFlash := isDisaggregatedTiFlashPath && prop.TaskTp == property.RootTaskType && ds.SCtx().GetSessionVars().IsMPPAllowed()
if prop.TaskTp == property.MppTaskType || canMppConvertToRootForDisaggregatedTiFlash {
if ts.KeepOrder {
return invalidTask, nil
}
Expand All @@ -2003,8 +2006,9 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
}
}
mppTask := &mppTask{
p: ts,
partTp: property.AnyType,
p: ts,
partTp: property.AnyType,
tblColHists: ds.TblColHists,
}
ts.PartitionInfo = PartitionInfo{
PruningConds: pushDownNot(ds.ctx, ds.allConds),
Expand All @@ -2013,7 +2017,24 @@ func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candid
ColumnNames: ds.names,
}
mppTask = ts.addPushedDownSelectionToMppTask(mppTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt))
return mppTask, nil
task = mppTask
if !mppTask.invalid() {
if prop.TaskTp == property.MppTaskType && len(mppTask.rootTaskConds) > 0 {
// If got filters cannot be pushed down to tiflash, we have to make sure it will be executed in TiDB,
// So have to return a rootTask, but prop requires mppTask, cannot meet this requirement.
task = invalidTask
} else if prop.TaskTp == property.RootTaskType {
// when got here, canMppConvertToRootForDisaggregatedTiFlash is true.
task = mppTask
task = task.convertToRootTask(ds.ctx)
ds.addSelection4PlanCache(task.(*rootTask), ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), prop)
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
}
}
return task, nil
}
if isDisaggregatedTiFlashPath {
// prop.TaskTp is cop related, just return invalidTask.
return invalidTask, nil
}
copTask := &copTask{
tablePlan: ts,
Expand Down Expand Up @@ -2223,10 +2244,8 @@ func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *mppTask, stats
filterCondition, rootTaskConds := SplitSelCondsWithVirtualColumn(ts.filterCondition)
var newRootConds []expression.Expression
filterCondition, newRootConds = expression.PushDownExprs(ts.ctx.GetSessionVars().StmtCtx, filterCondition, ts.ctx.GetClient(), ts.StoreType)
rootTaskConds = append(rootTaskConds, newRootConds...)
if len(rootTaskConds) > 0 {
return &mppTask{}
}
mpp.rootTaskConds = append(rootTaskConds, newRootConds...)

ts.filterCondition = filterCondition
// Add filter condition to table plan now.
if len(ts.filterCondition) > 0 {
Expand Down
6 changes: 6 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"unsafe"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -1418,6 +1419,11 @@ func filterPathByIsolationRead(ctx sessionctx.Context, paths []*util.AccessPath,
// 1. path.StoreType doesn't exists in isolationReadEngines or
// 2. TiFlash is disaggregated and the number of tiflash_compute node is zero.
shouldPruneTiFlashCompute := noTiFlashComputeNode && exists && paths[i].StoreType == kv.TiFlash
failpoint.Inject("testDisaggregatedTiFlashQuery", func(val failpoint.Value) {
// Ignore check if tiflash_compute node number.
// After we support disaggregated tiflash in test framework, can delete this failpoint.
shouldPruneTiFlashCompute = val.(bool)
})
if shouldPruneTiFlashCompute {
outputComputeNodeErrMsg = true
}
Expand Down
43 changes: 39 additions & 4 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1977,10 +1977,6 @@ func (p *PhysicalHashAgg) attach2TaskForMpp(tasks ...task) task {
}
attachPlan2Task(proj, newMpp)
return newMpp
case NoMpp:
t = mpp.convertToRootTask(p.ctx)
attachPlan2Task(p, t)
return t
default:
return invalidTask
}
Expand Down Expand Up @@ -2067,6 +2063,19 @@ type mppTask struct {

partTp property.MPPPartitionType
hashCols []*property.MPPPartitionColumn

// rootTaskConds record filters of TableScan that cannot be pushed down to TiFlash.

// For logical plan like: HashAgg -> Selection -> TableScan, if filters in Selection cannot be pushed to TiFlash.
// Planner will generate physical plan like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> PhysicalTableScan(cop tiflash)
// Because planner will make mppTask invalid directly then use copTask directly.

// But in DisaggregatedTiFlash mode, cop and batchCop protocol is disabled, so we have to consider this situation for mppTask.
// When generating PhysicalTableScan, if prop.TaskTp is RootTaskType, mppTask will be converted to rootTask,
// and filters in rootTaskConds will be added in a Selection which will be executed in TiDB.
// So physical plan be like: PhysicalHashAgg -> PhysicalSelection -> TableReader -> ExchangeSender -> PhysicalTableScan(mpp tiflash)
rootTaskConds []expression.Expression
tblColHists *statistics.HistColl
}

func (t *mppTask) count() float64 {
Expand Down Expand Up @@ -2146,6 +2155,32 @@ func (t *mppTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask {
rt := &rootTask{
p: p,
}

if len(t.rootTaskConds) > 0 {
// Some Filter cannot be pushed down to TiFlash, need to add Selection in rootTask,
// so this Selection will be executed in TiDB.
_, isTableScan := t.p.(*PhysicalTableScan)
_, isSelection := t.p.(*PhysicalSelection)
if isSelection {
_, isTableScan = t.p.Children()[0].(*PhysicalTableScan)
}
if !isTableScan {
// Need to make sure oriTaskPlan is TableScan, because rootTaskConds is part of TableScan.FilterCondition.
// It's only for TableScan. This is ensured by converting mppTask to rootTask just after TableScan is built,
// so no other operators are added into this mppTask.
logutil.BgLogger().Error("expect Selection or TableScan for mppTask.p", zap.String("mppTask.p", t.p.TP()))
return invalidTask
}
selectivity, _, err := t.tblColHists.Selectivity(ctx, t.rootTaskConds, nil)
if err != nil {
logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
selectivity = SelectionFactor
}
sel := PhysicalSelection{Conditions: t.rootTaskConds}.Init(ctx, rt.p.statsInfo().Scale(selectivity), rt.p.SelectBlockOffset())
sel.fromDataSource = true
sel.SetChildren(rt.p)
rt.p = sel
}
return rt
}

Expand Down
4 changes: 4 additions & 0 deletions store/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,10 @@ func needsGCOperationForStore(store *metapb.Store) (bool, error) {
// skip physical resolve locks for it.
return false, nil

case placement.EngineLabelTiFlashCompute:
logutil.BgLogger().Debug("[gc worker] will ignore gc tiflash_compute node")
return false, nil

case placement.EngineLabelTiKV, "":
// If no engine label is set, it should be a TiKV node.
return true, nil
Expand Down