From 157aa7c6b5200f8a7bc2ce58eaf42a45d238ad9e Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Wed, 1 Jul 2020 09:50:26 -0700 Subject: [PATCH 1/3] Additional tests for broadcast hash join --- .../com/nvidia/spark/rapids/JoinsSuite.scala | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala index d8aab15e0eb..e9e90428110 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala @@ -16,7 +16,11 @@ package com.nvidia.spark.rapids +import com.nvidia.spark.rapids.SparkSessionHolder.spark + import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.broadcast +import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec class JoinsSuite extends SparkQueryCompareTestSuite { @@ -91,4 +95,42 @@ class JoinsSuite extends SparkQueryCompareTestSuite { mixedDfWithNulls, mixedDfWithNulls, sortBeforeRepart = true) { (A, B) => A.join(B, A("longs") === B("longs"), "LeftAnti") } + + test("broadcast hint isn't propagated after a join") { + import spark.sqlContext.implicits._ + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") + spark.conf.set("spark.rapids.sql.enabled", "true") + val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value") + val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") + val df3 = df1.join(broadcast(df2), Seq("key"), "inner").drop(df2("key")) + + val df4 = Seq((1, "5"), (2, "5")).toDF("key", "value") + val df5 = df4.join(df3, Seq("key"), "inner") + + val plan = df5.queryExecution.executedPlan + + assert(plan.collect { case p: GpuBroadcastHashJoinExec => p }.size === 1) + assert(plan.collect { case p: GpuShuffledHashJoinExec => p }.size === 1) + } + + test("broadcast hint in SQL") { + spark.conf.set("spark.rapids.sql.enabled", "true") + longsDf(spark).createOrReplaceTempView("t") + longsDf(spark).createOrReplaceTempView("u") + + for (name <- Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN")) { + val plan1 = spark.sql(s"SELECT /*+ $name(t) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + val plan2 = spark.sql(s"SELECT /*+ $name(u) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + + val res1 = plan1.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + val res2 = plan2.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + + assert(res1.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildLeft")) + assert(res2.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildRight")) + } + } } From c08ae032433e27ecb38b5cfe980a7c768084dc16 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Mon, 13 Jul 2020 17:33:50 -0700 Subject: [PATCH 2/3] addressed review comments --- .../com/nvidia/spark/rapids/JoinsSuite.scala | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala index 9acd40449b2..45c7a2090c4 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala @@ -16,8 +16,6 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.SparkSessionHolder.spark - import org.apache.spark.SparkConf import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec @@ -103,40 +101,43 @@ class JoinsSuite extends SparkQueryCompareTestSuite { } test("broadcast hint isn't propagated after a join") { - import spark.sqlContext.implicits._ - spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") - spark.conf.set("spark.rapids.sql.enabled", "true") - val df1 = Seq((1, "4"), (2, "2")).toDF("key", "value") - val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") - val df3 = df1.join(broadcast(df2), Seq("key"), "inner").drop(df2("key")) + val conf = new SparkConf() + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + + withGpuSparkSession(spark => { + val df1 = longsDf(spark) + val df2 = nonZeroLongsDf(spark) - val df4 = Seq((1, "5"), (2, "5")).toDF("key", "value") - val df5 = df4.join(df3, Seq("key"), "inner") + val df3 = df1.join(broadcast(df2), Seq("longs"), "inner").drop(df2("longs")) + val df4 = longsDf(spark) + val df5 = df4.join(df3, Seq("longs"), "inner") - val plan = df5.queryExecution.executedPlan + val plan = df5.queryExecution.executedPlan - assert(plan.collect { case p: GpuBroadcastHashJoinExec => p }.size === 1) - assert(plan.collect { case p: GpuShuffledHashJoinExec => p }.size === 1) + assert(plan.collect { case p: GpuBroadcastHashJoinExec => p }.size === 1) + assert(plan.collect { case p: GpuShuffledHashJoinExec => p }.size === 1) + }, conf) } test("broadcast hint in SQL") { - spark.conf.set("spark.rapids.sql.enabled", "true") - longsDf(spark).createOrReplaceTempView("t") - longsDf(spark).createOrReplaceTempView("u") - - for (name <- Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN")) { - val plan1 = spark.sql(s"SELECT /*+ $name(t) */ * FROM t JOIN u ON t.longs = u.longs") - .queryExecution.executedPlan - val plan2 = spark.sql(s"SELECT /*+ $name(u) */ * FROM t JOIN u ON t.longs = u.longs") - .queryExecution.executedPlan - - val res1 = plan1.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) - val res2 = plan2.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) - - assert(res1.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString - .equals("BuildLeft")) - assert(res2.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString - .equals("BuildRight")) - } + withGpuSparkSession(spark => { + longsDf(spark).createOrReplaceTempView("t") + longsDf(spark).createOrReplaceTempView("u") + + for (name <- Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN")) { + val plan1 = spark.sql(s"SELECT /*+ $name(t) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + val plan2 = spark.sql(s"SELECT /*+ $name(u) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + + val res1 = plan1.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + val res2 = plan2.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + + assert(res1.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildLeft")) + assert(res2.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildRight")) + } + }) } } From e9c8c2afb2a74a1e468079ea5a9c38edca280c72 Mon Sep 17 00:00:00 2001 From: Niranjan Artal Date: Tue, 14 Jul 2020 15:57:31 -0700 Subject: [PATCH 3/3] Moved these tests under tests/ directory --- .../com/nvidia/spark/rapids/JoinsSuite.scala | 43 ------------ .../spark/rapids/BroadcastHashJoinSuite.scala | 65 +++++++++++++++++++ 2 files changed, 65 insertions(+), 43 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala index 45c7a2090c4..bd4b82ca2a7 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/JoinsSuite.scala @@ -17,8 +17,6 @@ package com.nvidia.spark.rapids import org.apache.spark.SparkConf -import org.apache.spark.sql.functions.broadcast -import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec class JoinsSuite extends SparkQueryCompareTestSuite { @@ -99,45 +97,4 @@ class JoinsSuite extends SparkQueryCompareTestSuite { mixedDfWithNulls, mixedDfWithNulls, sortBeforeRepart = true) { (A, B) => A.join(B, A("longs") === B("longs"), "LeftAnti") } - - test("broadcast hint isn't propagated after a join") { - val conf = new SparkConf() - .set("spark.sql.autoBroadcastJoinThreshold", "-1") - - withGpuSparkSession(spark => { - val df1 = longsDf(spark) - val df2 = nonZeroLongsDf(spark) - - val df3 = df1.join(broadcast(df2), Seq("longs"), "inner").drop(df2("longs")) - val df4 = longsDf(spark) - val df5 = df4.join(df3, Seq("longs"), "inner") - - val plan = df5.queryExecution.executedPlan - - assert(plan.collect { case p: GpuBroadcastHashJoinExec => p }.size === 1) - assert(plan.collect { case p: GpuShuffledHashJoinExec => p }.size === 1) - }, conf) - } - - test("broadcast hint in SQL") { - withGpuSparkSession(spark => { - longsDf(spark).createOrReplaceTempView("t") - longsDf(spark).createOrReplaceTempView("u") - - for (name <- Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN")) { - val plan1 = spark.sql(s"SELECT /*+ $name(t) */ * FROM t JOIN u ON t.longs = u.longs") - .queryExecution.executedPlan - val plan2 = spark.sql(s"SELECT /*+ $name(u) */ * FROM t JOIN u ON t.longs = u.longs") - .queryExecution.executedPlan - - val res1 = plan1.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) - val res2 = plan2.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) - - assert(res1.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString - .equals("BuildLeft")) - assert(res2.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString - .equals("BuildRight")) - } - }) - } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala new file mode 100644 index 00000000000..bcdd032915b --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/BroadcastHashJoinSuite.scala @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.SparkConf +import org.apache.spark.sql.functions.broadcast +import org.apache.spark.sql.rapids.execution.GpuBroadcastHashJoinExec + +class BroadcastHashJoinSuite extends SparkQueryCompareTestSuite { + + test("broadcast hint isn't propagated after a join") { + val conf = new SparkConf() + .set("spark.sql.autoBroadcastJoinThreshold", "-1") + + withGpuSparkSession(spark => { + val df1 = longsDf(spark) + val df2 = nonZeroLongsDf(spark) + + val df3 = df1.join(broadcast(df2), Seq("longs"), "inner").drop(df2("longs")) + val df4 = longsDf(spark) + val df5 = df4.join(df3, Seq("longs"), "inner") + + val plan = df5.queryExecution.executedPlan + + assert(plan.collect { case p: GpuBroadcastHashJoinExec => p }.size === 1) + assert(plan.collect { case p: GpuShuffledHashJoinExec => p }.size === 1) + }, conf) + } + + test("broadcast hint in SQL") { + withGpuSparkSession(spark => { + longsDf(spark).createOrReplaceTempView("t") + longsDf(spark).createOrReplaceTempView("u") + + for (name <- Seq("BROADCAST", "BROADCASTJOIN", "MAPJOIN")) { + val plan1 = spark.sql(s"SELECT /*+ $name(t) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + val plan2 = spark.sql(s"SELECT /*+ $name(u) */ * FROM t JOIN u ON t.longs = u.longs") + .queryExecution.executedPlan + + val res1 = plan1.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + val res2 = plan2.find(_.isInstanceOf[GpuBroadcastHashJoinExec]) + + assert(res1.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildLeft")) + assert(res2.get.asInstanceOf[GpuBroadcastHashJoinExec].buildSide.toString + .equals("BuildRight")) + } + }) + } +}