diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala new file mode 100644 index 00000000000..ff3a3fc6b34 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/CostBasedOptimizer.scala @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import scala.collection.mutable.ListBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression} +import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.internal.SQLConf + +class CostBasedOptimizer(conf: RapidsConf) extends Logging { + + // the intention is to make the cost model pluggable since we are probably going to need to + // experiment a fair bit with this part + private val costModel = new DefaultCostModel(conf) + + /** + * Walk the plan and determine CPU and GPU costs for each operator and then make decisions + * about whether operators should run on CPU or GPU. + * + * @param plan The plan to optimize + * @return A list of optimizations that were applied + */ + def optimize(plan: SparkPlanMeta[SparkPlan]): Seq[Optimization] = { + val optimizations = new ListBuffer[Optimization]() + recursivelyOptimize(plan, optimizations, finalOperator = true, "") + optimizations + } + + private def recursivelyOptimize( + plan: SparkPlanMeta[SparkPlan], + optimizations: ListBuffer[Optimization], + finalOperator: Boolean, + indent: String = ""): (Double, Double) = { + + // get the CPU and GPU cost of the child plan(s) + val childCosts = plan.childPlans + .map(child => recursivelyOptimize( + child.asInstanceOf[SparkPlanMeta[SparkPlan]], + optimizations, + finalOperator = false, + indent + " ")) + + val (childCpuCosts, childGpuCosts) = childCosts.unzip + + // get the CPU and GPU cost of this operator + val (operatorCpuCost, operatorGpuCost) = costModel.applyCost(plan) + + // calculate total (this operator + children) + val totalCpuCost = operatorCpuCost + childCpuCosts.sum + var totalGpuCost = operatorGpuCost + childGpuCosts.sum + + // determine how many transitions between CPU and GPU are taking place between + // the child operators and this operator + val numTransitions = plan.childPlans + .count(_.canThisBeReplaced != plan.canThisBeReplaced) + + if (numTransitions > 0) { + if (plan.canThisBeReplaced) { + // at least one child is transitioning from CPU to GPU + val transitionCost = plan.childPlans.filter(!_.canThisBeReplaced) + .map(costModel.transitionToGpuCost).sum + val gpuCost = operatorGpuCost + transitionCost + if (gpuCost > operatorCpuCost) { + optimizations.append(AvoidTransition(plan)) + plan.costPreventsRunningOnGpu() + // stay on CPU, so costs are same + totalGpuCost = totalCpuCost; + } else { + totalGpuCost += transitionCost + } + } else { + // at least one child is transitioning from GPU to CPU + plan.childPlans.zip(childCosts).foreach { + case (child, childCosts) => + val (childCpuCost, childGpuCost) = childCosts + val transitionCost = costModel.transitionToCpuCost(child) + val childGpuTotal = childGpuCost + transitionCost + if (child.canThisBeReplaced && childGpuTotal > childCpuCost) { + optimizations.append(ReplaceSection( + child.asInstanceOf[SparkPlanMeta[SparkPlan]], totalCpuCost, totalGpuCost)) + child.recursiveCostPreventsRunningOnGpu() + } + } + + // recalculate the transition costs because child plans may have changed + val transitionCost = plan.childPlans + .filter(_.canThisBeReplaced) + .map(costModel.transitionToCpuCost).sum + totalGpuCost += transitionCost + } + } + + // special behavior if this is the final operator in the plan + if (finalOperator && plan.canThisBeReplaced) { + totalGpuCost += costModel.transitionToCpuCost(plan) + } + + if (totalGpuCost > totalCpuCost) { + // we have reached a point where we have transitioned onto GPU for part of this + // plan but with no benefit from doing so, so we want to undo this and go back to CPU + if (plan.canThisBeReplaced) { + // this plan would have been on GPU so we move it and onto CPU and recurse down + // until we reach a part of the plan that is already on CPU and then stop + optimizations.append(ReplaceSection(plan, totalCpuCost, totalGpuCost)) + plan.recursiveCostPreventsRunningOnGpu() + } + + // reset the costs because this section of the plan was not moved to GPU + totalGpuCost = totalCpuCost + } + + if (!plan.canThisBeReplaced) { + // reset the costs because this section of the plan was not moved to GPU + totalGpuCost = totalCpuCost + } + + (totalCpuCost, totalGpuCost) + } + +} + +/** + * The cost model is behind a trait so that we can consider making this pluggable in the future + * so that users can override the cost model to suit specific use cases. + */ +trait CostModel { + + /** + * Determine the CPU and GPU cost for an individual operator. + * @param plan Operator + * @return (cpuCost, gpuCost) + */ + def applyCost(plan: SparkPlanMeta[_]): (Double, Double) + + /** + * Determine the cost of transitioning data from CPU to GPU for a specific operator + * @param plan Operator + * @return Cost + */ + def transitionToGpuCost(plan: SparkPlanMeta[_]): Double + + /** + * Determine the cost of transitioning data from GPU to CPU for a specific operator + */ + def transitionToCpuCost(plan: SparkPlanMeta[_]): Double +} + +class DefaultCostModel(conf: RapidsConf) extends CostModel { + + def transitionToGpuCost(plan: SparkPlanMeta[_]) = { + // this is a placeholder for now - we would want to try and calculate the transition cost + // based on the data types and size (if known) + conf.defaultTransitionToGpuCost + } + + def transitionToCpuCost(plan: SparkPlanMeta[_]) = { + // this is a placeholder for now - we would want to try and calculate the transition cost + // based on the data types and size (if known) + conf.defaultTransitionToCpuCost + } + + override def applyCost(plan: SparkPlanMeta[_]): (Double, Double) = { + + // for now we have a constant cost for CPU operations and we make the GPU cost relative + // to this but later we may want to calculate actual CPU costs + val cpuCost = 1.0 + + // always check for user overrides first + val gpuCost = plan.conf.getOperatorCost(plan.wrapped.getClass.getSimpleName).getOrElse { + plan.wrapped match { + case _: ProjectExec => + // the cost of a projection is the average cost of its expressions + plan.childExprs + .map(expr => exprCost(expr.asInstanceOf[BaseExprMeta[Expression]])) + .sum / plan.childExprs.length + + case _: ShuffleExchangeExec => + // setting the GPU cost of ShuffleExchangeExec to 1.0 avoids moving from CPU to GPU for + // a shuffle. This must happen before the join consistency or we risk running into issues + // with disabling one exchange that would make a join inconsistent + 1.0 + + case _ => conf.defaultOperatorCost + } + } + + plan.cpuCost = cpuCost + plan.gpuCost = gpuCost + + (cpuCost, gpuCost) + } + + private def exprCost[INPUT <: Expression](expr: BaseExprMeta[INPUT]): Double = { + // always check for user overrides first + expr.conf.getExpressionCost(expr.getClass.getSimpleName).getOrElse { + expr match { + case cast: CastExprMeta[_] => + // different CAST operations have different costs, so we allow these to be configured + // based on the data types involved + expr.conf.getExpressionCost(s"Cast${cast.fromType}To${cast.toType}") + .getOrElse(conf.defaultExpressionCost) + case _ => + // many of our BaseExprMeta implementations are anonymous classes so we look directly at + // the wrapped expressions in some cases + expr.wrapped match { + case _: AttributeReference => 1.0 // no benefit on GPU + case Alias(_: AttributeReference, _) => 1.0 // no benefit on GPU + case _ => conf.defaultExpressionCost + } + } + } + } + +} + +sealed abstract class Optimization + +case class AvoidTransition[INPUT <: SparkPlan](plan: SparkPlanMeta[INPUT]) extends Optimization { + override def toString: String = s"It is not worth moving to GPU for operator: " + + s"${Explain.format(plan)}" +} + +case class ReplaceSection[INPUT <: SparkPlan]( + plan: SparkPlanMeta[INPUT], + totalCpuCost: Double, + totalGpuCost: Double) extends Optimization { + override def toString: String = s"It is not worth keeping this section on GPU; " + + s"gpuCost=$totalGpuCost, cpuCost=$totalCpuCost:\n${Explain.format(plan)}" +} + +object Explain { + + def format(plan: SparkPlanMeta[_]): String = { + plan.wrapped match { + case p: SparkPlan => p.simpleString(SQLConf.get.maxToStringFields) + case other => other.toString + } + } + + def formatTree(plan: SparkPlanMeta[_]): String = { + val b = new StringBuilder + formatTree(plan, b, "") + b.toString + } + + def formatTree(plan: SparkPlanMeta[_], b: StringBuilder, indent: String): Unit = { + b.append(indent) + b.append(format(plan)) + b.append('\n') + plan.childPlans.filter(_.canThisBeReplaced) + .foreach(child => formatTree(child, b, indent + " ")) + } + +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 3ae3b84d8cc..4ee26765271 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -18,6 +18,7 @@ package com.nvidia.spark.rapids import java.time.ZoneId +import scala.collection.mutable.ListBuffer import scala.reflect.ClassTag import ai.rapids.cudf.DType @@ -397,6 +398,16 @@ final class CreateDataSourceTableAsSelectCommandMeta( } } +/** + * Listener trait so that tests can confirm that the expected optimizations are being applied + */ +trait GpuOverridesListener { + def optimizedPlan( + plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) +} + object GpuOverrides { val FLOAT_DIFFERS_GROUP_INCOMPAT = "when enabling these, there may be extra groups produced for floating point grouping " + @@ -412,6 +423,21 @@ object GpuOverrides { "\\S", "\\v", "\\V", "\\w", "\\w", "\\p", "$", "\\b", "\\B", "\\A", "\\G", "\\Z", "\\z", "\\R", "?", "|", "(", ")", "{", "}", "\\k", "\\Q", "\\E", ":", "!", "<=", ">") + // this listener mechanism is global and is intended for use by unit tests only + private val listeners: ListBuffer[GpuOverridesListener] = new ListBuffer[GpuOverridesListener]() + + def addListener(listener: GpuOverridesListener): Unit = { + listeners += listener + } + + def removeListener(listener: GpuOverridesListener): Unit = { + listeners -= listener + } + + def removeAllListeners(): Unit = { + listeners.clear() + } + def canRegexpBeTreatedLikeARegularString(strLit: UTF8String): Boolean = { val s = strLit.toString !regexList.exists(pattern => s.contains(pattern)) @@ -2746,7 +2772,10 @@ case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging { } case class GpuOverrides() extends Rule[SparkPlan] with Logging { - override def apply(plan: SparkPlan): SparkPlan = { + + // Spark calls this method once for the whole plan when AQE is off. When AQE is on, it + // gets called once for each query stage (where a query stage is an `Exchange`). + override def apply(plan: SparkPlan) :SparkPlan = { val conf = new RapidsConf(plan.conf) if (conf.isSqlEnabled) { val updatedPlan = if (plan.conf.adaptiveExecutionEnabled) { @@ -2774,16 +2803,30 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { } plan } else { + val optimizations = if (conf.optimizerEnabled) { + // we need to run these rules both before and after CBO because the cost + // is impacted by forcing operators onto CPU due to other rules that we have + wrap.runAfterTagRules() + val optimizer = new CostBasedOptimizer(conf) + optimizer.optimize(wrap) + } else { + Seq.empty + } wrap.runAfterTagRules() if (!exp.equalsIgnoreCase("NONE")) { wrap.tagForExplain() val explain = wrap.explain(exp.equalsIgnoreCase("ALL")) if (!explain.isEmpty) { logWarning(s"\n$explain") + if (conf.optimizerExplain.equalsIgnoreCase("ALL") && optimizations.nonEmpty) { + logWarning(s"Cost-based optimizations applied:\n${optimizations.mkString("\n")}") + } } } val convertedPlan = wrap.convertIfNeeded() - addSortsIfNeeded(convertedPlan, conf) + val sparkPlan = addSortsIfNeeded(convertedPlan, conf) + GpuOverrides.listeners.foreach(_.optimizedPlan(wrap, sparkPlan, optimizations)) + sparkPlan } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 41048a8eb6e..eb30ceb4e72 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -889,7 +889,48 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val USE_ARROW_OPT = conf("spark.rapids.arrowCopyOptimizationEnabled") + val OPTIMIZER_ENABLED = conf("spark.rapids.sql.optimizer.enabled") + .internal() + .doc("Enable cost-based optimizer that will attempt to avoid " + + "transitions to GPU for operations that will not result in improved performance " + + "over CPU") + .booleanConf + .createWithDefault(false) + + val OPTIMIZER_EXPLAIN = conf("spark.rapids.sql.optimizer.explain") + .internal() + .doc("Explain why some parts of a query were not placed on a GPU due to " + + "optimization rules. Possible values are ALL: print everything, NONE: print nothing") + .stringConf + .createWithDefault("NONE") + + val OPTIMIZER_DEFAULT_GPU_OPERATOR_COST = conf("spark.rapids.sql.optimizer.defaultExecGpuCost") + .internal() + .doc("Default relative GPU cost of running an operator on the GPU") + .doubleConf + .createWithDefault(0.8) + + val OPTIMIZER_DEFAULT_GPU_EXPRESSION_COST = conf("spark.rapids.sql.optimizer.defaultExprGpuCost") + .internal() + .doc("Default relative GPU cost of running an expression on the GPU") + .doubleConf + .createWithDefault(0.8) + + val OPTIMIZER_DEFAULT_TRANSITION_TO_CPU_COST = conf( + "spark.rapids.sql.optimizer.defaultTransitionToCpuCost") + .internal() + .doc("Default cost of transitioning from GPU to CPU") + .doubleConf + .createWithDefault(0.15) + + val OPTIMIZER_DEFAULT_TRANSITION_TO_GPU_COST = conf( + "spark.rapids.sql.optimizer.defaultTransitionToGpuCost") + .internal() + .doc("Default cost of transitioning from CPU to GPU") + .doubleConf + .createWithDefault(0.15) + + val USE_ARROW_OPT = conf("spark.rapids.arrowCopyOptmizationEnabled") .doc("Option to turn off using the optimized Arrow copy code when reading from " + "ArrowColumnVector in HostColumnarToGpu. Left as internal as user shouldn't " + "have to turn it off, but its convenient for testing.") @@ -1202,10 +1243,39 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val getCloudSchemes: Option[Seq[String]] = get(CLOUD_SCHEMES) + lazy val optimizerEnabled: Boolean = get(OPTIMIZER_ENABLED) + + lazy val optimizerExplain: String = get(OPTIMIZER_EXPLAIN) + + lazy val defaultOperatorCost: Double = get(OPTIMIZER_DEFAULT_GPU_OPERATOR_COST) + + lazy val defaultExpressionCost: Double = get(OPTIMIZER_DEFAULT_GPU_EXPRESSION_COST) + + lazy val defaultTransitionToCpuCost: Double = get(OPTIMIZER_DEFAULT_TRANSITION_TO_CPU_COST) + + lazy val defaultTransitionToGpuCost: Double = get(OPTIMIZER_DEFAULT_TRANSITION_TO_GPU_COST) + lazy val getAlluxioPathsToReplace: Option[Seq[String]] = get(ALLUXIO_PATHS_REPLACE) def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) } + + /** + * Get the GPU cost of an expression, for use in the cost-based optimizer. + */ + def getExpressionCost(operatorName: String): Option[Double] = { + val key = s"spark.rapids.sql.optimizer.expr.$operatorName" + conf.get(key).map(toDouble(_, key)) + } + + /** + * Get the GPU cost of an operator, for use in the cost-based optimizer. + */ + def getOperatorCost(operatorName: String): Option[Double] = { + val key = s"spark.rapids.sql.optimizer.exec.$operatorName" + conf.get(key).map(toDouble(_, key)) + } + } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 00df25aa698..c75a3fbb8f3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -116,9 +116,29 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( private var cannotReplaceAnyOfPlanReasons: Option[mutable.Set[String]] = None private var shouldBeRemovedReasons: Option[mutable.Set[String]] = None protected var cannotRunOnGpuBecauseOfSparkPlan: Boolean = false + protected var cannotRunOnGpuBecauseOfCost: Boolean = false val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported") + /** + * Recursively force a section of the plan back onto CPU, stopping once a plan + * is reached that is already on CPU. + */ + final def recursiveCostPreventsRunningOnGpu(): Unit = { + if (canThisBeReplaced) { + costPreventsRunningOnGpu() + childDataWriteCmds.foreach(_.recursiveCostPreventsRunningOnGpu()) + } + } + + final def costPreventsRunningOnGpu(): Unit = { + cannotRunOnGpuBecauseOfCost = true + willNotWorkOnGpu("Removed by cost-based optimizer") + childExprs.foreach(_.recursiveCostPreventsRunningOnGpu()) + childParts.foreach(_.recursiveCostPreventsRunningOnGpu()) + childScans.foreach(_.recursiveCostPreventsRunningOnGpu()) + } + final def recursiveSparkPlanPreventsRunningOnGpu(): Unit = { cannotRunOnGpuBecauseOfSparkPlan = true childExprs.foreach(_.recursiveSparkPlanPreventsRunningOnGpu()) @@ -290,9 +310,13 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( final private def getIndicatorChar: String = { if (shouldThisBeRemoved) { "#" + } else if (cannotRunOnGpuBecauseOfCost) { + "$" } else if (canThisBeReplaced) { if (cannotRunOnGpuBecauseOfSparkPlan) { "@" + } else if (cannotRunOnGpuBecauseOfCost) { + "$" } else { "*" } @@ -495,6 +519,9 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, override val childParts: Seq[PartMeta[_]] = Seq.empty override val childDataWriteCmds: Seq[DataWritingCommandMeta[_]] = Seq.empty + var cpuCost: Double = 0 + var gpuCost: Double = 0 + override def convertToCpu(): SparkPlan = { wrapped.withNewChildren(childPlans.map(_.convertIfNeeded())) } @@ -947,4 +974,4 @@ final class RuleNotFoundExprMeta[INPUT <: Expression]( override def convertToGpu(): GpuExpression = throw new IllegalStateException("Cannot be converted to GPU") -} +} \ No newline at end of file diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala new file mode 100644 index 00000000000..0118ef1baed --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CostBasedOptimizerSuite.scala @@ -0,0 +1,383 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import scala.collection.mutable.ListBuffer + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.execution.{ProjectExec, SortExec, SparkPlan, WholeStageCodegenExec} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase +import org.apache.spark.sql.types.DataTypes + +class CostBasedOptimizerSuite extends SparkQueryCompareTestSuite with BeforeAndAfter { + + before { + GpuOverrides.removeAllListeners() + } + + after { + GpuOverrides.removeAllListeners() + } + + test("Force section of plan back onto CPU, AQE on") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key, "false") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec,SortExec,SortMergeJoinExec," + + "Alias,Cast,LessThan") + + val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df1: DataFrame = createQuery(spark) + .alias("df1") + .orderBy("more_strings_1") + val df2: DataFrame = createQuery(spark) + .alias("df2") + .orderBy("more_strings_2") + val df = df1.join(df2, col("df1.more_strings_1").equalTo(col("df2.more_strings_2"))) + .orderBy("df2.more_strings_2") + + df.collect() + + // check that the expected optimization was applied + val opt = optimizations.last.last.asInstanceOf[ReplaceSection[_]] + assert(opt.totalGpuCost > opt.totalCpuCost) + assert(opt.plan.wrapped.isInstanceOf[SortExec]) + + // check that the final plan has a CPU sort and no GPU sort + val cpuSort = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[SortExec]) + + val gpuSort = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[GpuSortExec]) + + assert(cpuSort.nonEmpty) + assert(gpuSort.isEmpty) + + df + }, conf) + + } + + test("Force section of plan back onto CPU, AQE off") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.ENABLE_REPLACE_SORTMERGEJOIN.key, "false") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec,SortExec,SortMergeJoinExec," + + "Alias,Cast,LessThan") + + val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df1: DataFrame = createQuery(spark) + .alias("df1") + .orderBy("more_strings_1") + val df2: DataFrame = createQuery(spark) + .alias("df2") + .orderBy("more_strings_2") + val df = df1.join(df2, col("df1.more_strings_1").equalTo(col("df2.more_strings_2"))) + .orderBy("df2.more_strings_2") + + df.collect() + + // check that the expected optimization was applied + assert(7 == optimizations.flatten + .filter(_.isInstanceOf[ReplaceSection[_]]) + .map(_.asInstanceOf[ReplaceSection[_]]) + .count(_.plan.wrapped.isInstanceOf[SortExec])) + + // check that the final plan has a CPU sort and no GPU sort + val cpuSort = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[SortExec]) + + val gpuSort = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[GpuSortExec]) + + assert(cpuSort.nonEmpty) + assert(gpuSort.isEmpty) + + df + }, conf) + + } + + test("Force last section of plan back onto CPU, AQE on") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec,SortExec," + + "Alias,Cast,LessThan") + + val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + .orderBy("more_strings_1") + df.collect() + + // check that the expected optimization was applied + val opt = optimizations.last.last.asInstanceOf[ReplaceSection[_]] + assert(opt.totalGpuCost > opt.totalCpuCost) + assert(opt.plan.wrapped.isInstanceOf[SortExec]) + + //assert that the top-level sort stayed on the CPU + df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + .executedPlan.asInstanceOf[WholeStageCodegenExec] + .child.asInstanceOf[SortExec] + + df + }, conf) + + } + + test("Force last section of plan back onto CPU, AQE off") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec,SortExec," + + "Alias,Cast,LessThan") + + val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + .orderBy("more_strings_1") + df.collect() + + // check that the expected optimization was applied + val opt = optimizations.last.last.asInstanceOf[ReplaceSection[_]] + assert(opt.totalGpuCost > opt.totalCpuCost) + assert(opt.plan.wrapped.isInstanceOf[SortExec]) + + //assert that the top-level sort stayed on the CPU + df.queryExecution.executedPlan.asInstanceOf[WholeStageCodegenExec] + .child.asInstanceOf[SortExec] + + df + }, conf) + + } + + test("Avoid move to GPU for trivial projection, AQE on") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec," + + "Alias,Cast,LessThan") + + val optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + df.collect() + + // assert that the top-level projection stayed on the CPU + df.queryExecution.executedPlan.asInstanceOf[AdaptiveSparkPlanExec] + .executedPlan.asInstanceOf[WholeStageCodegenExec] + .child.asInstanceOf[ProjectExec] + + df + }, conf) + + } + + test("Avoid move to GPU for trivial projection, AQE off") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec," + + "Alias,Cast,LessThan") + + var optimizations: ListBuffer[Seq[Optimization]] = new ListBuffer[Seq[Optimization]]() + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations += costOptimizations + }) + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + df.collect() + + // check that the expected optimization was applied + assert(3 == optimizations + .flatten + .filter(_.isInstanceOf[AvoidTransition[_]]) + .map(_.asInstanceOf[AvoidTransition[_]]) + .count(_.plan.wrapped.isInstanceOf[ProjectExec])) + + // check that the expected optimization was applied + assert(3 == optimizations + .flatten + .filter(_.isInstanceOf[AvoidTransition[_]]) + .map(_.asInstanceOf[AvoidTransition[_]]) + .count(_.plan.wrapped.isInstanceOf[ProjectExec])) + + // assert that the top-level projection stayed on the CPU + assert(df.queryExecution.executedPlan.asInstanceOf[WholeStageCodegenExec] + .child.isInstanceOf[ProjectExec]) + + df + }, conf) + } + + test("Avoid move to GPU for shuffle, AQE on") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec," + + "Alias,Cast,LessThan") + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + df.collect() + + val gpuExchanges = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[GpuShuffleExchangeExecBase]) + assert(gpuExchanges.isEmpty) + + df + }, conf) + } + + test("Avoid move to GPU for shuffle, AQE off") { + + val conf = new SparkConf() + .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false") + .set(RapidsConf.OPTIMIZER_ENABLED.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "false") + .set(RapidsConf.EXPLAIN.key, "ALL") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, + "ProjectExec,BroadcastExchangeExec,BroadcastHashJoinExec," + + "Alias,Cast,LessThan") + + withGpuSparkSession(spark => { + val df: DataFrame = createQuery(spark) + df.collect() + + val gpuExchanges = ShimLoader.getSparkShims + .findOperators(df.queryExecution.executedPlan, + _.isInstanceOf[GpuShuffleExchangeExecBase]) + assert(gpuExchanges.isEmpty) + + df + }, conf) + } + + private def createQuery(spark: SparkSession) = { + val df1 = nullableStringsDf(spark) + .repartition(2) + .withColumnRenamed("more_strings", "more_strings_1") + + val df2 = nullableStringsDf(spark) + .repartition(2) + .withColumnRenamed("more_strings", "more_strings_2") + + val df = df1.join(df2, "strings") + // filter on unsupported CAST to force operation onto CPU + .filter(col("more_strings_2").cast(DataTypes.TimestampType) + .lt(col("more_strings_1").cast(DataTypes.TimestampType))) + // this projection just swaps the order of the attributes and we want CBO to keep + // this on CPU + .select("more_strings_2", "more_strings_1") + df + } + + private def addListener(optimizations: ListBuffer[Optimization]): Unit = { + GpuOverrides.addListener( + (plan: SparkPlanMeta[SparkPlan], + sparkPlan: SparkPlan, + costOptimizations: Seq[Optimization]) => { + optimizations.appendAll(costOptimizations) + }) + } +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index 8878d9ed645..33c455bf4aa 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -1521,6 +1521,19 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { ).toDF("float", "int") } + def nullableStringsDf(session: SparkSession): DataFrame = { + import session.sqlContext.implicits._ + Seq[(String, String)]( + ("100.0", "1.0"), + (null, "2.0"), + ("300.0", "3.0"), + ("400.0", null), + ("500.0", "5.0"), + ("-100.0", null), + ("-500.0", "0.0") + ).toDF("strings", "more_strings") + } + def nullableStringsIntsDf(session: SparkSession): DataFrame = { import session.sqlContext.implicits._ Seq[(String, Integer)](