From 1a876c99b2f4e64a0d949533038bc153868a1ed5 Mon Sep 17 00:00:00 2001 From: Yuanjia Zhang Date: Tue, 21 Jun 2022 23:02:37 +0800 Subject: [PATCH] planner: introduce new cost formula for MPPAggs (#35436) ref pingcap/tidb#35240 --- planner/core/plan_cost.go | 31 +++++++++++++++++++++++++++---- planner/core/task.go | 4 ++-- 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/planner/core/plan_cost.go b/planner/core/plan_cost.go index 33dd500683bc5..aac4ebe642b62 100644 --- a/planner/core/plan_cost.go +++ b/planner/core/plan_cost.go @@ -312,7 +312,12 @@ func (p *PhysicalTableReader) GetPlanCost(taskType property.TaskType, costFlag u concurrency = float64(p.ctx.GetSessionVars().DistSQLScanConcurrency()) rowSize = getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false) seekCost = estimateNetSeekCost(p.tablePlan) - childCost, err := p.tablePlan.GetPlanCost(property.CopSingleReadTaskType, costFlag) + tType := property.MppTaskType + if p.ctx.GetSessionVars().CostModelVersion == modelVer1 { + // regard the underlying tasks as cop-task on modelVer1 for compatibility + tType = property.CopSingleReadTaskType + } + childCost, err := p.tablePlan.GetPlanCost(tType, costFlag) if err != nil { return 0, err } @@ -326,7 +331,8 @@ func (p *PhysicalTableReader) GetPlanCost(taskType property.TaskType, costFlag u // consider concurrency p.planCost /= concurrency // consider tidb_enforce_mpp - if isMPP && p.ctx.GetSessionVars().IsMPPEnforced() { + if isMPP && p.ctx.GetSessionVars().IsMPPEnforced() && + !hasCostFlag(costFlag, CostFlagRecalculate) { // show the real cost in explain-statements p.planCost /= 1000000000 } } @@ -892,12 +898,19 @@ func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, costFlag uint } // GetCost computes cost of stream aggregation considering CPU/memory. -func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot bool, costFlag uint64) float64 { +func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFlag uint64) float64 { aggFuncFactor := p.getAggFuncCostFactor(false) var cpuCost float64 sessVars := p.ctx.GetSessionVars() if isRoot { cpuCost = inputRows * sessVars.GetCPUFactor() * aggFuncFactor + } else if isMPP { + if p.ctx.GetSessionVars().CostModelVersion == modelVer2 { + // use the dedicated CPU factor for TiFlash on modelVer2 + cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor + } else { + cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor + } } else { cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor } @@ -916,7 +929,7 @@ func (p *PhysicalStreamAgg) GetPlanCost(taskType property.TaskType, costFlag uin return 0, err } p.planCost = childCost - p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType, costFlag) + p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType, taskType == property.MppTaskType, costFlag) p.planCostInit = true return p.planCost, nil } @@ -936,6 +949,13 @@ func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFla // Cost of additional goroutines. cpuCost += (con + 1) * sessVars.GetConcurrencyFactor() } + } else if isMPP { + if p.ctx.GetSessionVars().CostModelVersion == modelVer2 { + // use the dedicated CPU factor for TiFlash on modelVer2 + cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor + } else { + cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor + } } else { cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor } @@ -1144,6 +1164,9 @@ func (p *PhysicalExchangeReceiver) GetPlanCost(taskType property.TaskType, costF } func getOperatorActRows(operator PhysicalPlan) float64 { + if operator == nil { + return 0 + } runtimeInfo := operator.SCtx().GetSessionVars().StmtCtx.RuntimeStatsColl id := operator.ID() actRows := 0.0 diff --git a/planner/core/task.go b/planner/core/task.go index e2f66173492a7..96ea4dd402ed3 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1666,7 +1666,7 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task { partialAgg.SetChildren(cop.indexPlan) cop.indexPlan = partialAgg } - cop.addCost(partialAgg.(*PhysicalStreamAgg).GetCost(inputRows, false, 0)) + cop.addCost(partialAgg.(*PhysicalStreamAgg).GetCost(inputRows, false, false, 0)) partialAgg.SetCost(cop.cost()) } t = cop.convertToRootTask(p.ctx) @@ -1679,7 +1679,7 @@ func (p *PhysicalStreamAgg) attach2Task(tasks ...task) task { } else { attachPlan2Task(p, t) } - t.addCost(final.GetCost(inputRows, true, 0)) + t.addCost(final.GetCost(inputRows, true, false, 0)) t.plan().SetCost(t.cost()) return t }