Skip to content

Commit

Permalink
Improve warnings about AQE nodes not supported on GPU (#647)
Browse files Browse the repository at this point in the history
* Improve warnings about AQE nodes not supported on GPU

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Introduce new DoNotReplaceSparkPlanMeta rule

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Sep 3, 2020
1 parent 262ec31 commit f659eaf
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand}
Expand Down Expand Up @@ -1735,7 +1735,13 @@ object GpuOverrides {
GpuCustomShuffleReaderExec(childPlans.head.convertIfNeeded(),
exec.partitionSpecs)
}
})
}),
exec[AdaptiveSparkPlanExec]("Wrapper for adaptive query plan", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[AdaptiveSparkPlanExec](exec, conf, p)),
exec[BroadcastQueryStageExec]("Broadcast query stage", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[BroadcastQueryStageExec](exec, conf, p)),
exec[ShuffleQueryStageExec]("Shuffle query stage", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[ShuffleQueryStageExec](exec, conf, p))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
commonExecs ++ ShimLoader.getSparkShims.getExecs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
private def indent(append: StringBuilder, depth: Int): Unit =
append.append(" " * depth)

def suppressWillWorkOnGpuInfo: Boolean = false

private def willWorkOnGpuInfo: String = cannotBeReplacedReasons match {
case None => "NOT EVALUATED FOR GPU YET"
case Some(v) if v.isEmpty => "could run on GPU"
Expand Down Expand Up @@ -253,7 +255,7 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
* @param all should all the data be printed or just what does not work on the GPU?
*/
protected def print(strBuilder: StringBuilder, depth: Int, all: Boolean): Unit = {
if (all || !canThisBeReplaced) {
if ((all || !canThisBeReplaced) && !suppressWillWorkOnGpuInfo) {
indent(strBuilder, depth)
strBuilder.append(if (canThisBeReplaced) "*" else "!")

Expand Down Expand Up @@ -570,6 +572,25 @@ final class RuleNotFoundSparkPlanMeta[INPUT <: SparkPlan](
throw new IllegalStateException("Cannot be converted to GPU")
}

/**
* Metadata for `SparkPlan` that should not be replaced.
*/
final class DoNotReplaceSparkPlanMeta[INPUT <: SparkPlan](
plan: INPUT,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]])
extends SparkPlanMeta[INPUT](plan, conf, parent, new NoRuleConfKeysAndIncompat) {

/** We don't want to spam the user with messages about these operators */
override def suppressWillWorkOnGpuInfo: Boolean = true

override def tagPlanForGpu(): Unit =
willNotWorkOnGpu(s"there is no need to replace ${plan.getClass}")

override def convertToGpu(): GpuExec =
throw new IllegalStateException("Cannot be converted to GPU")
}

/**
* Base class for metadata around `Expression`.
*/
Expand Down

0 comments on commit f659eaf

Please sign in to comment.