Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve warnings about AQE nodes not supported on GPU #647

Merged
merged 2 commits into from
Sep 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.CustomShuffleReaderExec
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.command.{DataWritingCommand, DataWritingCommandExec}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand}
Expand Down Expand Up @@ -1735,7 +1735,13 @@ object GpuOverrides {
GpuCustomShuffleReaderExec(childPlans.head.convertIfNeeded(),
exec.partitionSpecs)
}
})
}),
exec[AdaptiveSparkPlanExec]("Wrapper for adaptive query plan", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[AdaptiveSparkPlanExec](exec, conf, p)),
exec[BroadcastQueryStageExec]("Broadcast query stage", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[BroadcastQueryStageExec](exec, conf, p)),
exec[ShuffleQueryStageExec]("Shuffle query stage", (exec, conf, p, _) =>
new DoNotReplaceSparkPlanMeta[ShuffleQueryStageExec](exec, conf, p))
).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap
val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] =
commonExecs ++ ShimLoader.getSparkShims.getExecs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
private def indent(append: StringBuilder, depth: Int): Unit =
append.append(" " * depth)

def suppressWillWorkOnGpuInfo: Boolean = false

private def willWorkOnGpuInfo: String = cannotBeReplacedReasons match {
case None => "NOT EVALUATED FOR GPU YET"
case Some(v) if v.isEmpty => "could run on GPU"
Expand Down Expand Up @@ -253,7 +255,7 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
* @param all should all the data be printed or just what does not work on the GPU?
*/
protected def print(strBuilder: StringBuilder, depth: Int, all: Boolean): Unit = {
if (all || !canThisBeReplaced) {
if ((all || !canThisBeReplaced) && !suppressWillWorkOnGpuInfo) {
indent(strBuilder, depth)
strBuilder.append(if (canThisBeReplaced) "*" else "!")

Expand Down Expand Up @@ -570,6 +572,25 @@ final class RuleNotFoundSparkPlanMeta[INPUT <: SparkPlan](
throw new IllegalStateException("Cannot be converted to GPU")
}

/**
* Metadata for `SparkPlan` that should not be replaced.
*/
final class DoNotReplaceSparkPlanMeta[INPUT <: SparkPlan](
plan: INPUT,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]])
extends SparkPlanMeta[INPUT](plan, conf, parent, new NoRuleConfKeysAndIncompat) {

/** We don't want to spam the user with messages about these operators */
override def suppressWillWorkOnGpuInfo: Boolean = true

override def tagPlanForGpu(): Unit =
willNotWorkOnGpu(s"there is no need to replace ${plan.getClass}")

override def convertToGpu(): GpuExec =
throw new IllegalStateException("Cannot be converted to GPU")
}

/**
* Base class for metadata around `Expression`.
*/
Expand Down