Skip to content

Commit

Permalink
Implement AQE unit test for InsertAdaptiveSparkPlan (NVIDIA#655)
Browse files Browse the repository at this point in the history
* Implement AQE unit test for InsertAdaptiveSparkPlan

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Address feedback

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* add assertion

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Address feedback

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* Test for CPU and GPU write, with adaptive GPU read

Signed-off-by: Andy Grove <andygrove@nvidia.com>

* cleanup after tests

Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Sep 4, 2020
1 parent 8422715 commit d500916
Showing 1 changed file with 104 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,37 @@

package com.nvidia.spark.rapids

import java.io.File

import com.nvidia.spark.rapids.AdaptiveQueryExecSuite.TEST_FILES_ROOT
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveSparkPlanHelper}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.{GpuCustomShuffleReaderExec, GpuShuffledHashJoinBase}

object AdaptiveQueryExecSuite {
val TEST_FILES_ROOT: File = TestUtils.getTempDir(this.getClass.getSimpleName)
}

class AdaptiveQueryExecSuite
extends SparkQueryCompareTestSuite
with AdaptiveSparkPlanHelper {
with AdaptiveSparkPlanHelper
with BeforeAndAfterEach {

override def beforeEach(): Unit = {
TEST_FILES_ROOT.mkdirs()
}

override def afterEach(): Unit = {
org.apache.commons.io.FileUtils.deleteDirectory(TEST_FILES_ROOT)
}

private def runAdaptiveAndVerifyResult(
spark: SparkSession, query: String): (SparkPlan, SparkPlan) = {
Expand Down Expand Up @@ -90,6 +109,89 @@ class AdaptiveQueryExecSuite
}
}

test("Plugin should translate child plan of GPU DataWritingCommandExec to GPU") {

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")

withGpuSparkSession(spark => {
import spark.implicits._

// read from a parquet file so we can test reading on GPU
val path = new File(TEST_FILES_ROOT, "DataWritingCommandExecGPU.parquet").getAbsolutePath
(0 until 100).toDF("a")
.write
.mode(SaveMode.Overwrite)
.parquet(path)
spark.read.parquet(path).createOrReplaceTempView("testData")

spark.sql("CREATE TABLE IF NOT EXISTS DataWritingCommandExecGPU (a INT) USING parquet")
.collect()

val df = spark.sql("INSERT INTO TABLE DataWritingCommandExecGPU SELECT * FROM testData")
df.collect()

// write should be on GPU
val writeCommand = TestUtils.findOperator(df.queryExecution.executedPlan,
_.isInstanceOf[GpuDataWritingCommandExec])
assert(writeCommand.isDefined)

// the read should be an adaptive plan
val adaptiveSparkPlanExec = TestUtils.findOperator(writeCommand.get,
_.isInstanceOf[AdaptiveSparkPlanExec])
.get.asInstanceOf[AdaptiveSparkPlanExec]

// assert that at least part of the adaptive plan ran on GPU
assert(TestUtils.findOperator(adaptiveSparkPlanExec, _.isInstanceOf[GpuExec]).isDefined)
}, conf)
}

test("Plugin should translate child plan of CPU DataWritingCommandExec to GPU") {

val conf = new SparkConf()
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.ADAPTIVE_EXECUTION_FORCE_APPLY.key, "true")
// force DataWritingCommandExec onto CPU for this test because we want to verify that
// the read will still happen on GPU with a CPU write
.set(RapidsConf.TEST_ALLOWED_NONGPU.key, "DataWritingCommandExec")
.set("spark.rapids.sql.exec.DataWritingCommandExec", "false")

withGpuSparkSession(spark => {
import spark.implicits._

// read from a parquet file so we can test reading on GPU
val path = new File(TEST_FILES_ROOT, "DataWritingCommandExecCPU.parquet").getAbsolutePath
(0 until 100).toDF("a")
.write
.mode(SaveMode.Overwrite)
.parquet(path)

spark.read.parquet(path).createOrReplaceTempView("testData")

spark.sql("CREATE TABLE IF NOT EXISTS DataWritingCommandExecCPU (a INT) USING parquet")
.collect()

val df = spark.sql("INSERT INTO TABLE DataWritingCommandExecCPU SELECT * FROM testData")
df.collect()

// write should be on CPU
val writeCommand = TestUtils.findOperator(df.queryExecution.executedPlan,
_.isInstanceOf[DataWritingCommandExec])
assert(writeCommand.isDefined)

// the read should be an adaptive plan
val adaptiveSparkPlanExec = TestUtils.findOperator(writeCommand.get,
_.isInstanceOf[AdaptiveSparkPlanExec])
.get.asInstanceOf[AdaptiveSparkPlanExec]

// even though the write couldn't run on GPU, the read should have done
assert(TestUtils.findOperator(adaptiveSparkPlanExec.executedPlan,
_.isInstanceOf[GpuExec]).isDefined)

}, conf)
}

def skewJoinTest(fun: SparkSession => Unit) {

// this test requires Spark 3.0.1 or later
Expand Down

0 comments on commit d500916

Please sign in to comment.