Skip to content

Commit

Permalink
Optimize unnecessary columnar->row->columnar transitions with AQE (NV…
Browse files Browse the repository at this point in the history
…IDIA#2064)

* Avoid transitions when writing columnar output from adaptive plan

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Implement unit test and code cleanup

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Remove debug println

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Move logic to GpuOverrides

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Add comments

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update docs

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Update comment

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Move AvoidAdaptiveTransitionToRow logic to generic rule in GpuTransitionOverrides

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Add unit test for collect case

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Code simplification

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Use ExecutionPlanCaptureCallback instead of custom listener

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Apr 6, 2021
1 parent 93ec13c commit 7421e8c
Show file tree
Hide file tree
Showing 2 changed files with 162 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package com.nvidia.spark.rapids

import org.apache.spark.RangePartitioner
import java.lang.reflect.Method

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.RangePartitioning
import org.apache.spark.sql.catalyst.rules.Rule
Expand All @@ -29,6 +32,7 @@ import org.apache.spark.sql.execution.exchange.{Exchange, ReusedExchangeExec, Sh
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec}
import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName, GpuShuffleEnv}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuHashJoin, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Rules that run after the row to columnar and columnar to row transitions have been inserted.
Expand Down Expand Up @@ -62,8 +66,20 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
def optimizeAdaptiveTransitions(
plan: SparkPlan,
parent: Option[SparkPlan]): SparkPlan = plan match {
// HostColumnarToGpu(RowToColumnarExec(..)) => GpuRowToColumnarExec(..)
case HostColumnarToGpu(r2c: RowToColumnarExec, goal) =>
GpuRowToColumnarExec(optimizeAdaptiveTransitions(r2c.child, Some(r2c)), goal)
val transition = GpuRowToColumnarExec(
optimizeAdaptiveTransitions(r2c.child, Some(r2c)), goal)
r2c.child match {
case _: AdaptiveSparkPlanExec =>
// When the input is an adaptive plan we do not get to see the GPU version until
// the plan is executed and sometimes the plan will have a GpuColumnarToRowExec as the
// final operator and we can bypass this to keep the data columnar by inserting
// the [[AvoidAdaptiveTransitionToRow]] operator here
AvoidAdaptiveTransitionToRow(transition)
case _ =>
transition
}

case ColumnarToRowExec(GpuBringBackToHost(
GpuShuffleCoalesceExec(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty =>
Expand Down Expand Up @@ -505,3 +521,60 @@ object GpuTransitionOverrides {
}
}
}

/**
* This operator will attempt to optimize the case when we are writing the results of
* an adaptive query to disk so that we remove the redundant transitions from columnar
* to row within AdaptiveSparkPlanExec followed by a row to columnar transition.
*
* Specifically, this is the plan we see in this case:
*
* {{{
* GpuRowToColumnar(AdaptiveSparkPlanExec(GpuColumnarToRow(child))
* }}}
*
* We perform this optimization at runtime rather than during planning, because when the adaptive
* plan is being planned and executed, we don't know whether it is being called from an operation
* that wants rows (such as CollectTailExec) or from an operation that wants columns (such as
* GpuDataWritingCommandExec).
*
* Spark does not provide a mechanism for executing an adaptive plan and retrieving columnar
* results and the internal methods that we need to call are private, so we use reflection to
* call them.
*
* @param child The plan to execute
*/
case class AvoidAdaptiveTransitionToRow(child: SparkPlan) extends UnaryExecNode with GpuExec {

override def doExecute(): RDD[InternalRow] =
throw new IllegalStateException(s"Row-based execution should not occur for $this")

override def output: Seq[Attribute] = child.output

override protected def doExecuteColumnar(): RDD[ColumnarBatch] = child match {
case GpuRowToColumnarExec(a: AdaptiveSparkPlanExec, _) =>
val getFinalPhysicalPlan = getPrivateMethod("getFinalPhysicalPlan")
val plan = getFinalPhysicalPlan.invoke(a)
val rdd = plan match {
case t: GpuColumnarToRowExec =>
t.child.executeColumnar()
case _ =>
child.executeColumnar()
}

// final UI update
val finalPlanUpdate = getPrivateMethod("finalPlanUpdate")
finalPlanUpdate.invoke(a)

rdd

case _ =>
child.executeColumnar()
}

private def getPrivateMethod(name: String): Method = {
val m = classOf[AdaptiveSparkPlanExec].getDeclaredMethod(name)
m.setAccessible(true)
m
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,92 @@ class AdaptiveQueryExecSuite
}, conf)
}

test("Avoid transitions to row when writing to Parquet") {

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
.set(RapidsConf.METRICS_LEVEL.key, "DEBUG")

withGpuSparkSession(spark => {
import spark.implicits._

// read from a parquet file so we can test reading on GPU
val path = new File(TEST_FILES_ROOT, "AvoidTransitionInput.parquet").getAbsolutePath
(0 until 100).toDF("a")
.repartition(2 )
.write
.mode(SaveMode.Overwrite)
.parquet(path)

val df = spark.read.parquet(path)

ExecutionPlanCaptureCallback.startCapture()

val outputPath = new File(TEST_FILES_ROOT, "AvoidTransitionOutput.parquet").getAbsolutePath
df.write.mode(SaveMode.Overwrite).parquet(outputPath)

val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan(
ExecutionPlanCaptureCallback.getResultWithTimeout())

// write should be on GPU
val writeCommand = TestUtils.findOperator(executedPlan,
_.isInstanceOf[GpuDataWritingCommandExec])
assert(writeCommand.isDefined)

// the read should be an adaptive plan
val adaptiveSparkPlanExec = TestUtils.findOperator(writeCommand.get,
_.isInstanceOf[AdaptiveSparkPlanExec])
.get.asInstanceOf[AdaptiveSparkPlanExec]

val transition = adaptiveSparkPlanExec
.executedPlan
.asInstanceOf[GpuColumnarToRowExec]

// although the plan contains a GpuColumnarToRowExec, we bypass it in
// AvoidAdaptiveTransitionToRow so the metrics should reflect that
assert(transition.metrics("numOutputRows").value === 0)

}, conf)
}

test("Keep transition to row when collecting results") {

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
.set(RapidsConf.METRICS_LEVEL.key, "DEBUG")

withGpuSparkSession(spark => {
import spark.implicits._

// read from a parquet file so we can test reading on GPU
val path = new File(TEST_FILES_ROOT, "AvoidTransitionInput.parquet").getAbsolutePath
(0 until 100).toDF("a")
.repartition(2 )
.write
.mode(SaveMode.Overwrite)
.parquet(path)

val df = spark.read.parquet(path)

ExecutionPlanCaptureCallback.startCapture()

df.collect()

val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan(
ExecutionPlanCaptureCallback.getResultWithTimeout())

val transition = executedPlan
.asInstanceOf[GpuColumnarToRowExec]

// because we are calling collect, AvoidAdaptiveTransitionToRow will not bypass
// GpuColumnarToRowExec so we should see accurate metrics
assert(transition.metrics("numOutputRows").value === 100)

}, conf)
}

test("Exchange reuse") {

assumeSpark301orLater
Expand Down Expand Up @@ -574,4 +660,4 @@ class AdaptiveQueryExecSuite
spark.read.parquet(path).createOrReplaceTempView(name)
}

}
}

0 comments on commit 7421e8c

Please sign in to comment.