From e0d6d77723fbf51289e2247be7c5b4375989a0e4 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Tue, 8 Dec 2020 11:36:47 -0600 Subject: [PATCH] Fallback to cpu when reading Delta log files for stats (#1307) * comment and signoff Signed-off-by: Thomas Graves * Review comments - add explain check, fix wording, rename entirePlanExcludedReasons --- .../shims/spark301db/Spark301dbShims.scala | 14 ++++++++++- .../nvidia/spark/rapids/GpuOverrides.scala | 23 +++++++++++------ .../com/nvidia/spark/rapids/RapidsConf.scala | 11 ++++++++ .../com/nvidia/spark/rapids/RapidsMeta.scala | 25 ++++++++++++++++++- 4 files changed, 64 insertions(+), 9 deletions(-) diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala index acdcf2eb287..b906d25d4ab 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec @@ -109,7 +110,18 @@ class Spark301dbShims extends Spark301Shims { // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty - override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) + override def tagPlanForGpu(): Unit = { + // this is very specific check to have any of the Delta log metadata queries + // fallback and run on the CPU since there is some incompatibilities in + // Databricks Spark and Apache Spark. + if (wrapped.relation.fileFormat.isInstanceOf[JsonFileFormat] && + wrapped.relation.location.getClass.getCanonicalName() == + "com.databricks.sql.transaction.tahoe.DeltaLogFileIndex") { + this.entirePlanWillNotWork("Plans that read Delta Index JSON files can not run " + + "any part of the plan on the GPU!") + } + GpuFileSourceScanExec.tagSupport(this) + } override def convertToGpu(): GpuExec = { val sparkSession = wrapped.relation.sparkSession diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7a951a9d603..729e43e510a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2267,16 +2267,25 @@ case class GpuOverrides() extends Rule[SparkPlan] with Logging { if (conf.isSqlEnabled) { val wrap = GpuOverrides.wrapPlan(plan, conf, None) wrap.tagForGpu() - wrap.runAfterTagRules() + val reasonsToNotReplaceEntirePlan = wrap.getReasonsNotToReplaceEntirePlan val exp = conf.explain - if (!exp.equalsIgnoreCase("NONE")) { - val explain = wrap.explain(exp.equalsIgnoreCase("ALL")) - if (!explain.isEmpty) { - logWarning(s"\n$explain") + if (conf.allowDisableEntirePlan && reasonsToNotReplaceEntirePlan.nonEmpty) { + if (!exp.equalsIgnoreCase("NONE")) { + logWarning("Can't replace any part of this plan due to: " + + s"${reasonsToNotReplaceEntirePlan.mkString(",")}") } + plan + } else { + wrap.runAfterTagRules() + if (!exp.equalsIgnoreCase("NONE")) { + val explain = wrap.explain(exp.equalsIgnoreCase("ALL")) + if (!explain.isEmpty) { + logWarning(s"\n$explain") + } + } + val convertedPlan = wrap.convertIfNeeded() + addSortsIfNeeded(convertedPlan, conf) } - val convertedPlan = wrap.convertIfNeeded() - addSortsIfNeeded(convertedPlan, conf) } else { plan } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 626c56112e4..fa2fc2f72e8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -785,6 +785,15 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val ALLOW_DISABLE_ENTIRE_PLAN = conf("spark.rapids.allowDisableEntirePlan") + .internal() + .doc("The plugin has the ability to detect possibe incompatibility with some specific " + + "queries and cluster configurations. In those cases the plugin will disable GPU support " + + "for the entire query. Set this to false if you want to override that behavior, but use " + + "with caution.") + .booleanConf + .createWithDefault(true) + private def printSectionHeader(category: String): Unit = println(s"\n### $category") @@ -1072,6 +1081,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val cudfVersionOverride: Boolean = get(CUDF_VERSION_OVERRIDE) + lazy val allowDisableEntirePlan: Boolean = get(ALLOW_DISABLE_ENTIRE_PLAN) + lazy val getCloudSchemes: Option[Seq[String]] = get(CLOUD_SCHEMES) def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 9cd8e5989fd..9cb05e41998 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -120,7 +120,7 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( def convertToCpu(): BASE = wrapped private var cannotBeReplacedReasons: Option[mutable.Set[String]] = None - + private var cannotReplaceAnyOfPlanReasons: Option[mutable.Set[String]] = None private var shouldBeRemovedReasons: Option[mutable.Set[String]] = None val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported") @@ -141,6 +141,14 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( } } + /** + * Call this if there is a condition found that the entire plan is not allowed + * to run on the GPU. + */ + final def entirePlanWillNotWork(because: String): Unit = { + cannotReplaceAnyOfPlanReasons.get.add(because) + } + final def shouldBeRemoved(because: String): Unit = shouldBeRemovedReasons.get.add(because) @@ -154,6 +162,15 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( */ final def canThisBeReplaced: Boolean = cannotBeReplacedReasons.exists(_.isEmpty) + /** + * Returns the list of reasons the entire plan can't be replaced. An empty + * set means the entire plan is ok to be replaced, do the normal checking + * per exec and children. + */ + final def entirePlanExcludedReasons: Seq[String] = { + cannotReplaceAnyOfPlanReasons.getOrElse(mutable.Set.empty).toSeq + } + /** * Returns true iff all of the expressions and their children could be replaced. */ @@ -184,6 +201,7 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( def initReasons(): Unit = { cannotBeReplacedReasons = Some(mutable.Set[String]()) shouldBeRemovedReasons = Some(mutable.Set[String]()) + cannotReplaceAnyOfPlanReasons = Some(mutable.Set[String]()) } /** @@ -511,6 +529,11 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } } + def getReasonsNotToReplaceEntirePlan: Seq[String] = { + val childReasons = childPlans.flatMap(_.getReasonsNotToReplaceEntirePlan) + entirePlanExcludedReasons ++ childReasons + } + private def fixUpJoinConsistencyIfNeeded(): Unit = { childPlans.foreach(_.fixUpJoinConsistencyIfNeeded()) wrapped match {