Skip to content

Commit

Permalink
Cost-based optimizer (NVIDIA#1616)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Mar 3, 2021
1 parent 47883ad commit f079c5b
Show file tree
Hide file tree
Showing 6 changed files with 811 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,271 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.collection.mutable.ListBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression}
import org.apache.spark.sql.execution.{ProjectExec, SparkPlan}
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.internal.SQLConf

class CostBasedOptimizer(conf: RapidsConf) extends Logging {

// the intention is to make the cost model pluggable since we are probably going to need to
// experiment a fair bit with this part
private val costModel = new DefaultCostModel(conf)

/**
* Walk the plan and determine CPU and GPU costs for each operator and then make decisions
* about whether operators should run on CPU or GPU.
*
* @param plan The plan to optimize
* @return A list of optimizations that were applied
*/
def optimize(plan: SparkPlanMeta[SparkPlan]): Seq[Optimization] = {
val optimizations = new ListBuffer[Optimization]()
recursivelyOptimize(plan, optimizations, finalOperator = true, "")
optimizations
}

private def recursivelyOptimize(
plan: SparkPlanMeta[SparkPlan],
optimizations: ListBuffer[Optimization],
finalOperator: Boolean,
indent: String = ""): (Double, Double) = {

// get the CPU and GPU cost of the child plan(s)
val childCosts = plan.childPlans
.map(child => recursivelyOptimize(
child.asInstanceOf[SparkPlanMeta[SparkPlan]],
optimizations,
finalOperator = false,
indent + " "))

val (childCpuCosts, childGpuCosts) = childCosts.unzip

// get the CPU and GPU cost of this operator
val (operatorCpuCost, operatorGpuCost) = costModel.applyCost(plan)

// calculate total (this operator + children)
val totalCpuCost = operatorCpuCost + childCpuCosts.sum
var totalGpuCost = operatorGpuCost + childGpuCosts.sum

// determine how many transitions between CPU and GPU are taking place between
// the child operators and this operator
val numTransitions = plan.childPlans
.count(_.canThisBeReplaced != plan.canThisBeReplaced)

if (numTransitions > 0) {
if (plan.canThisBeReplaced) {
// at least one child is transitioning from CPU to GPU
val transitionCost = plan.childPlans.filter(!_.canThisBeReplaced)
.map(costModel.transitionToGpuCost).sum
val gpuCost = operatorGpuCost + transitionCost
if (gpuCost > operatorCpuCost) {
optimizations.append(AvoidTransition(plan))
plan.costPreventsRunningOnGpu()
// stay on CPU, so costs are same
totalGpuCost = totalCpuCost;
} else {
totalGpuCost += transitionCost
}
} else {
// at least one child is transitioning from GPU to CPU
plan.childPlans.zip(childCosts).foreach {
case (child, childCosts) =>
val (childCpuCost, childGpuCost) = childCosts
val transitionCost = costModel.transitionToCpuCost(child)
val childGpuTotal = childGpuCost + transitionCost
if (child.canThisBeReplaced && childGpuTotal > childCpuCost) {
optimizations.append(ReplaceSection(
child.asInstanceOf[SparkPlanMeta[SparkPlan]], totalCpuCost, totalGpuCost))
child.recursiveCostPreventsRunningOnGpu()
}
}

// recalculate the transition costs because child plans may have changed
val transitionCost = plan.childPlans
.filter(_.canThisBeReplaced)
.map(costModel.transitionToCpuCost).sum
totalGpuCost += transitionCost
}
}

// special behavior if this is the final operator in the plan
if (finalOperator && plan.canThisBeReplaced) {
totalGpuCost += costModel.transitionToCpuCost(plan)
}

if (totalGpuCost > totalCpuCost) {
// we have reached a point where we have transitioned onto GPU for part of this
// plan but with no benefit from doing so, so we want to undo this and go back to CPU
if (plan.canThisBeReplaced) {
// this plan would have been on GPU so we move it and onto CPU and recurse down
// until we reach a part of the plan that is already on CPU and then stop
optimizations.append(ReplaceSection(plan, totalCpuCost, totalGpuCost))
plan.recursiveCostPreventsRunningOnGpu()
}

// reset the costs because this section of the plan was not moved to GPU
totalGpuCost = totalCpuCost
}

if (!plan.canThisBeReplaced) {
// reset the costs because this section of the plan was not moved to GPU
totalGpuCost = totalCpuCost
}

(totalCpuCost, totalGpuCost)
}

}

/**
* The cost model is behind a trait so that we can consider making this pluggable in the future
* so that users can override the cost model to suit specific use cases.
*/
trait CostModel {

/**
* Determine the CPU and GPU cost for an individual operator.
* @param plan Operator
* @return (cpuCost, gpuCost)
*/
def applyCost(plan: SparkPlanMeta[_]): (Double, Double)

/**
* Determine the cost of transitioning data from CPU to GPU for a specific operator
* @param plan Operator
* @return Cost
*/
def transitionToGpuCost(plan: SparkPlanMeta[_]): Double

/**
* Determine the cost of transitioning data from GPU to CPU for a specific operator
*/
def transitionToCpuCost(plan: SparkPlanMeta[_]): Double
}

class DefaultCostModel(conf: RapidsConf) extends CostModel {

def transitionToGpuCost(plan: SparkPlanMeta[_]) = {
// this is a placeholder for now - we would want to try and calculate the transition cost
// based on the data types and size (if known)
conf.defaultTransitionToGpuCost
}

def transitionToCpuCost(plan: SparkPlanMeta[_]) = {
// this is a placeholder for now - we would want to try and calculate the transition cost
// based on the data types and size (if known)
conf.defaultTransitionToCpuCost
}

override def applyCost(plan: SparkPlanMeta[_]): (Double, Double) = {

// for now we have a constant cost for CPU operations and we make the GPU cost relative
// to this but later we may want to calculate actual CPU costs
val cpuCost = 1.0

// always check for user overrides first
val gpuCost = plan.conf.getOperatorCost(plan.wrapped.getClass.getSimpleName).getOrElse {
plan.wrapped match {
case _: ProjectExec =>
// the cost of a projection is the average cost of its expressions
plan.childExprs
.map(expr => exprCost(expr.asInstanceOf[BaseExprMeta[Expression]]))
.sum / plan.childExprs.length

case _: ShuffleExchangeExec =>
// setting the GPU cost of ShuffleExchangeExec to 1.0 avoids moving from CPU to GPU for
// a shuffle. This must happen before the join consistency or we risk running into issues
// with disabling one exchange that would make a join inconsistent
1.0

case _ => conf.defaultOperatorCost
}
}

plan.cpuCost = cpuCost
plan.gpuCost = gpuCost

(cpuCost, gpuCost)
}

private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT]): Double = {
// always check for user overrides first
expr.conf.getExpressionCost(expr.getClass.getSimpleName).getOrElse {
expr match {
case cast: CastExprMeta[_] =>
// different CAST operations have different costs, so we allow these to be configured
// based on the data types involved
expr.conf.getExpressionCost(s"Cast${cast.fromType}To${cast.toType}")
.getOrElse(conf.defaultExpressionCost)
case _ =>
// many of our BaseExprMeta implementations are anonymous classes so we look directly at
// the wrapped expressions in some cases
expr.wrapped match {
case _: AttributeReference => 1.0 // no benefit on GPU
case Alias(_: AttributeReference, _) => 1.0 // no benefit on GPU
case _ => conf.defaultExpressionCost
}
}
}
}

}

sealed abstract class Optimization

case class AvoidTransition[INPUT <: SparkPlan](plan: SparkPlanMeta[INPUT]) extends Optimization {
override def toString: String = s"It is not worth moving to GPU for operator: " +
s"${Explain.format(plan)}"
}

case class ReplaceSection[INPUT <: SparkPlan](
plan: SparkPlanMeta[INPUT],
totalCpuCost: Double,
totalGpuCost: Double) extends Optimization {
override def toString: String = s"It is not worth keeping this section on GPU; " +
s"gpuCost=$totalGpuCost, cpuCost=$totalCpuCost:\n${Explain.format(plan)}"
}

object Explain {

def format(plan: SparkPlanMeta[_]): String = {
plan.wrapped match {
case p: SparkPlan => p.simpleString(SQLConf.get.maxToStringFields)
case other => other.toString
}
}

def formatTree(plan: SparkPlanMeta[_]): String = {
val b = new StringBuilder
formatTree(plan, b, "")
b.toString
}

def formatTree(plan: SparkPlanMeta[_], b: StringBuilder, indent: String): Unit = {
b.append(indent)
b.append(format(plan))
b.append('\n')
plan.childPlans.filter(_.canThisBeReplaced)
.foreach(child => formatTree(child, b, indent + " "))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids

import java.time.ZoneId

import scala.collection.mutable.ListBuffer
import scala.reflect.ClassTag

import ai.rapids.cudf.DType
Expand Down Expand Up @@ -397,6 +398,16 @@ final class CreateDataSourceTableAsSelectCommandMeta(
}
}

/**
* Listener trait so that tests can confirm that the expected optimizations are being applied
*/
trait GpuOverridesListener {
def optimizedPlan(
plan: SparkPlanMeta[SparkPlan],
sparkPlan: SparkPlan,
costOptimizations: Seq[Optimization])
}

object GpuOverrides {
val FLOAT_DIFFERS_GROUP_INCOMPAT =
"when enabling these, there may be extra groups produced for floating point grouping " +
Expand All @@ -412,6 +423,21 @@ object GpuOverrides {
"\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R",
"?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">")

// this listener mechanism is global and is intended for use by unit tests only
private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]()

def addListener(listener: GpuOverridesListener): Unit = {
listeners += listener
}

def removeListener(listener: GpuOverridesListener): Unit = {
listeners -= listener
}

def removeAllListeners(): Unit = {
listeners.clear()
}

def canRegexpBeTreatedLikeARegularString(strLit: UTF8String): Boolean = {
val s = strLit.toString
!regexList.exists(pattern => s.contains(pattern))
Expand Down Expand Up @@ -2746,7 +2772,10 @@ case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging {
}

case class GpuOverrides() extends Rule[SparkPlan] with Logging {
override def apply(plan: SparkPlan): SparkPlan = {

// Spark calls this method once for the whole plan when AQE is off. When AQE is on, it
// gets called once for each query stage (where a query stage is an `Exchange`).
override def apply(plan: SparkPlan) :SparkPlan = {
val conf = new RapidsConf(plan.conf)
if (conf.isSqlEnabled) {
val updatedPlan = if (plan.conf.adaptiveExecutionEnabled) {
Expand Down Expand Up @@ -2774,16 +2803,30 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging {
}
plan
} else {
val optimizations = if (conf.optimizerEnabled) {
// we need to run these rules both before and after CBO because the cost
// is impacted by forcing operators onto CPU due to other rules that we have
wrap.runAfterTagRules()
val optimizer = new CostBasedOptimizer(conf)
optimizer.optimize(wrap)
} else {
Seq.empty
}
wrap.runAfterTagRules()
if (!exp.equalsIgnoreCase("NONE")) {
wrap.tagForExplain()
val explain = wrap.explain(exp.equalsIgnoreCase("ALL"))
if (!explain.isEmpty) {
logWarning(s"\n$explain")
if (conf.optimizerExplain.equalsIgnoreCase("ALL") && optimizations.nonEmpty) {
logWarning(s"Cost-based optimizations applied:\n${optimizations.mkString("\n")}")
}
}
}
val convertedPlan = wrap.convertIfNeeded()
addSortsIfNeeded(convertedPlan, conf)
val sparkPlan = addSortsIfNeeded(convertedPlan, conf)
GpuOverrides.listeners.foreach(_.optimizedPlan(wrap, sparkPlan, optimizations))
sparkPlan
}
}

Expand Down
Loading

0 comments on commit f079c5b

Please sign in to comment.