From 8d8b1216d9071e6ac106b756c88120742c17d6f2 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 17:30:12 +0000 Subject: [PATCH 01/27] WIP: first pass at ShimLeafExecNode, need to update indirect inheritors next Signed-off-by: Navin Kumar --- .../spark/rapids/shims/ShimLeafExecNode.scala | 21 ++++++++++++++ .../spark/rapids/shims/ShimLeafExecNode.scala | 28 +++++++++++++++++++ .../spark/rapids/shims/ShimLeafExecNode.scala | 28 +++++++++++++++++++ .../spark/rapids/basicPhysicalOperators.scala | 6 ++-- .../sql/rapids/GpuDataSourceScanExec.scala | 5 ++-- .../sql/rapids/GpuInMemoryTableScanExec.scala | 5 ++-- 6 files changed, 86 insertions(+), 7 deletions(-) create mode 100644 sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala create mode 100644 sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala create mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala diff --git a/sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala new file mode 100644 index 00000000000..7e15d379f53 --- /dev/null +++ b/sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.execution.LeafExecNode + +trait ShimLeafExecNode extends LeafExecNode \ No newline at end of file diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala new file mode 100644 index 00000000000..e4be4654d2f --- /dev/null +++ b/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.catalyst.plans.logical.Statistics + +trait ShimLeafExecNode extends LeafExecNode { + override def computeStats(): Statistics = { + Statistics( + sizeInBytes = Long.MaxValue + ) + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala new file mode 100644 index 00000000000..e4be4654d2f --- /dev/null +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.catalyst.plans.logical.Statistics + +trait ShimLeafExecNode extends LeafExecNode { + override def computeStats(): Statistics = { + Statistics( + sizeInBytes = Long.MaxValue + ) + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index adf2ab6b668..55b8c6bd134 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -23,7 +23,7 @@ import ai.rapids.cudf import ai.rapids.cudf._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.{ShimSparkPlan, ShimUnaryExecNode} +import com.nvidia.spark.rapids.shims.{ShimSparkPlan, ShimLeafExecNode, ShimUnaryExecNode} import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.internal.Logging @@ -31,7 +31,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Descending, Expression, NamedExpression, NullIntolerant, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning} -import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SampleExec, SparkPlan} +import org.apache.spark.sql.execution.{ProjectExec, SampleExec, SparkPlan} import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler, GpuPredicateHelper} import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.types.{DataType, LongType} @@ -544,7 +544,7 @@ case class GpuRangeExec( numSlices: Int, output: Seq[Attribute], targetSizeBytes: Long) - extends LeafExecNode with GpuExec { + extends ShimLeafExecNode with GpuExec { val numElements: BigInt = { val safeStart = BigInt(start) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceScanExec.scala index 321f5f34439..be93d979d00 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuDataSourceScanExec.scala @@ -17,18 +17,19 @@ package org.apache.spark.sql.rapids import com.nvidia.spark.rapids.GpuExec +import com.nvidia.spark.rapids.shims.ShimLeafExecNode import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode} +import org.apache.spark.sql.execution.ExplainUtils import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.util.Utils /** GPU implementation of Spark's `DataSourceScanExec` */ -trait GpuDataSourceScanExec extends LeafExecNode with GpuExec { +trait GpuDataSourceScanExec extends ShimLeafExecNode with GpuExec { def relation: BaseRelation def tableIdentifier: Option[TableIdentifier] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala index acbb6a3bc5a..8d546d384d9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInMemoryTableScanExec.scala @@ -18,13 +18,14 @@ package org.apache.spark.sql.rapids import com.nvidia.spark.ParquetCachedBatchSerializer import com.nvidia.spark.rapids.{DataFromReplacementRule, ExecChecks, GpuExec, GpuMetric, RapidsConf, RapidsMeta, SparkPlanMeta} +import com.nvidia.spark.rapids.shims.ShimLeafExecNode import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression, SortOrder} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.types.DataType @@ -75,7 +76,7 @@ class InMemoryTableScanMeta( case class GpuInMemoryTableScanExec( attributes: Seq[Attribute], predicates: Seq[Expression], - @transient relation: InMemoryRelation) extends LeafExecNode with GpuExec { + @transient relation: InMemoryRelation) extends ShimLeafExecNode with GpuExec { override val nodeName: String = { relation.cacheBuilder.tableName match { From b3b516c1d9a745458df9d2846d9d82961996d581 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 20:06:47 +0000 Subject: [PATCH 02/27] Move this to until 3.4.0 for non-databricks spark versions Signed-off-by: Navin Kumar --- .../scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename sql-plugin/src/main/{311until330-nondb => 311until340-nondb}/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala (100%) diff --git a/sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311until340-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala similarity index 100% rename from sql-plugin/src/main/311until330-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala rename to sql-plugin/src/main/311until340-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala From c45cbe041634059724b4e30bf36fb6a7fd08fa0e Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 20:10:35 +0000 Subject: [PATCH 03/27] Set this flag to true in 3.2.1 DB shim Signed-off-by: Navin Kumar --- .../321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 20075a69b03..edc17e35ea0 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -29,5 +29,5 @@ object AQEUtils { } // currently we don't support AQE on Databricks - def isAdaptiveExecutionSupportedInSparkVersion: Boolean = false + def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true } From 5b3f954c29555bdfa840674fcc0cf2b54a786e98 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 22:30:38 +0000 Subject: [PATCH 04/27] WIP: some test updates with enabling AQE Signed-off-by: Navin Kumar --- integration_tests/src/main/python/array_test.py | 5 ++++- integration_tests/src/main/python/spark_init_internal.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index c427366638f..33d01b12a23 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -16,7 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect from data_gen import * -from marks import incompat +from marks import incompat, allow_non_gpu_databricks from spark_session import is_before_spark_313, is_before_spark_330, is_spark_330_or_later, is_databricks104_or_later from pyspark.sql.types import * from pyspark.sql.types import IntegralType @@ -167,6 +167,7 @@ def test_make_array(data_gen): 'array(array(b, a, null, {}, {}), array(a), array(null))'.format(s1, s2))) +@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn) def test_orderby_array_unique(data_gen): assert_gpu_and_cpu_are_equal_sql( @@ -175,6 +176,7 @@ def test_orderby_array_unique(data_gen): 'select array_table.a, array_table.uniq_int from array_table order by uniq_int') +@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)], ids=idfn) def test_orderby_array_of_arrays(data_gen): @@ -184,6 +186,7 @@ def test_orderby_array_of_arrays(data_gen): 'select array_table.a, array_table.uniq_int from array_table order by uniq_int') +@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', [ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))], ids=idfn) diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index e36cc3d282b..965e6e6c4d9 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -45,7 +45,7 @@ def _spark__init(): # can be reset in the middle of a test if specific operations are done (some types of cast etc) _sb = pyspark.sql.SparkSession.builder _sb.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ - .config("spark.sql.adaptive.enabled", "false") \ + .config("spark.sql.adaptive.enabled", "true") \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback') for key, value in os.environ.items(): From 40e7044fac6723d83135776da7d7c6a463566b93 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 15:45:35 -0700 Subject: [PATCH 05/27] Move these shim implementations to right place Signed-off-by: Navin Kumar --- .../spark/rapids/shims/ShimLeafExecNode.scala | 0 .../spark/rapids/shims/ShimLeafExecNode.scala | 0 .../spark/rapids/shims/ShimLeafExecNode.scala | 28 ------------------- 3 files changed, 28 deletions(-) rename sql-plugin/src/main/{31xdb => 311+-db}/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala (100%) rename sql-plugin/src/main/{311until340-nondb => 311+-nondb}/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala (100%) delete mode 100644 sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala diff --git a/sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala similarity index 100% rename from sql-plugin/src/main/31xdb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala rename to sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala diff --git a/sql-plugin/src/main/311until340-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala similarity index 100% rename from sql-plugin/src/main/311until340-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala rename to sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala deleted file mode 100644 index e4be4654d2f..00000000000 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.shims - -import org.apache.spark.sql.execution.LeafExecNode -import org.apache.spark.sql.catalyst.plans.logical.Statistics - -trait ShimLeafExecNode extends LeafExecNode { - override def computeStats(): Statistics = { - Statistics( - sizeInBytes = Long.MaxValue - ) - } -} \ No newline at end of file From d0c5ccb66e2ef3ec673dd4ee944f65638401f888 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 22:53:13 +0000 Subject: [PATCH 06/27] revert this test change for now, need a better solution --- integration_tests/src/main/python/array_test.py | 3 --- integration_tests/src/main/python/spark_init_internal.py | 2 +- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 33d01b12a23..6ef17b6d4bb 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -167,7 +167,6 @@ def test_make_array(data_gen): 'array(array(b, a, null, {}, {}), array(a), array(null))'.format(s1, s2))) -@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', single_level_array_gens, ids=idfn) def test_orderby_array_unique(data_gen): assert_gpu_and_cpu_are_equal_sql( @@ -176,7 +175,6 @@ def test_orderby_array_unique(data_gen): 'select array_table.a, array_table.uniq_int from array_table order by uniq_int') -@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), ArrayGen(ArrayGen(string_gen, max_length=10), max_length=10)], ids=idfn) def test_orderby_array_of_arrays(data_gen): @@ -186,7 +184,6 @@ def test_orderby_array_of_arrays(data_gen): 'select array_table.a, array_table.uniq_int from array_table order by uniq_int') -@allow_non_gpu_databricks("ShuffleExchangeExec") @pytest.mark.parametrize('data_gen', [ArrayGen(StructGen([['child0', byte_gen], ['child1', string_gen], ['child2', float_gen]]))], ids=idfn) diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index 965e6e6c4d9..e36cc3d282b 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -45,7 +45,7 @@ def _spark__init(): # can be reset in the middle of a test if specific operations are done (some types of cast etc) _sb = pyspark.sql.SparkSession.builder _sb.config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ - .config("spark.sql.adaptive.enabled", "true") \ + .config("spark.sql.adaptive.enabled", "false") \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback') for key, value in os.environ.items(): From acaa586fac23eb59d46cbf3a9acde63f669dd630 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 18 Aug 2022 23:18:33 +0000 Subject: [PATCH 07/27] WIP: Re-enable aqe Databricks tests Signed-off-by: Navin Kumar --- integration_tests/src/main/python/join_test.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 50c1b1ca70e..da5f22fd7b6 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -144,9 +144,9 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) -@pytest.mark.skipif(is_databricks_runtime(), - reason="Disabled for databricks because of lack of AQE support, and " - "differences in BroadcastMode.transform") +# @pytest.mark.skipif(is_databricks_runtime(), +# reason="Disabled for databricks because of lack of AQE support, and " +# "differences in BroadcastMode.transform") @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_right_broadcast_nested_loop_join_without_condition_empty_small_batch(join_type): def do_join(spark): @@ -155,9 +155,9 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': 'true'}) @ignore_order(local=True) -@pytest.mark.skipif(is_databricks_runtime(), - reason="Disabled for databricks because of lack of AQE support, and " - "differences in BroadcastMode.transform") +# @pytest.mark.skipif(is_databricks_runtime(), +# reason="Disabled for databricks because of lack of AQE support, and " +# "differences in BroadcastMode.transform") @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_empty_broadcast_hash_join(join_type): def do_join(spark): From e203e9c1ddf4b112e8c6f70a7dcf4f0313b64764 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 19 Aug 2022 19:26:11 +0000 Subject: [PATCH 08/27] Unblock these 2 tests on Databricks Signed-off-by: Navin Kumar --- integration_tests/src/main/python/join_test.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index da5f22fd7b6..187b953e32d 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -144,9 +144,6 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) -# @pytest.mark.skipif(is_databricks_runtime(), -# reason="Disabled for databricks because of lack of AQE support, and " -# "differences in BroadcastMode.transform") @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_right_broadcast_nested_loop_join_without_condition_empty_small_batch(join_type): def do_join(spark): @@ -155,9 +152,6 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': 'true'}) @ignore_order(local=True) -# @pytest.mark.skipif(is_databricks_runtime(), -# reason="Disabled for databricks because of lack of AQE support, and " -# "differences in BroadcastMode.transform") @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_empty_broadcast_hash_join(join_type): def do_join(spark): From 91dc00ab61f6c2933e73969d6cd7d8ee041595c6 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 19 Aug 2022 21:39:12 +0000 Subject: [PATCH 09/27] WIP: integration tests for AQE --- integration_tests/src/main/python/aqe_test.py | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 integration_tests/src/main/python/aqe_test.py diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py new file mode 100644 index 00000000000..907e3ead7bf --- /dev/null +++ b/integration_tests/src/main/python/aqe_test.py @@ -0,0 +1,66 @@ +# Copyright (c) 2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +from _pytest.mark.structures import ParameterSet +from pyspark.sql.functions import when, col +from pyspark.sql.types import * +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture +from conftest import is_databricks_runtime, is_emr_runtime +from data_gen import * +from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan +from spark_session import with_cpu_session, with_spark_session + +_adaptive_conf = { "spark.sql.adaptive.enabled": "true" } + +# Dynamic switching of join strategies + +# Dynamic coalescing of shuffle partitions +_adaptive_coalese_conf = copy_and_update(_adaptive_conf, { + "spark.sql.adaptive.coalescePartitions.enabled": "true" +}) + + +# Dynamically Handle Skew Joins + +def create_skew_df(spark, data_gen, length): + rand = random.Random(0) + data_gen.start(rand) + next_val = None + while next_val is None: + next_val = data_gen.gen() + root = two_col_df(spark, data_gen, SetValuesGen(data_gen.data_type, [next_val]), length) + left = root.select( + when(col('a') < col('b') / 2, col('b')). + otherwise('a').alias("key1"), + col('a').alias("value1") + ) + right = root.select( + when(col('a') < col('b'), col('b')). + otherwise('a').alias("key2"), + col('a').alias("value2") + ) + return left, right + + +@pytest.mark.parametrize("data_gen", numeric_gens + decimal_gens, ids=idfn) +def test_skew_join(data_gen): + def do_join(spark): + left, right = create_skew_df(spark, data_gen, length=2048) + left.createOrReplaceTempView("skewData1") + right.createOrReplaceTempView("skewData2") + return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2") + + assert_gpu_and_cpu_are_equal_collect(do_join, _adaptive_conf) + From 1ce22b2ec12d1ad2d6ddf1113f7f97b559b0edef Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 19 Aug 2022 16:05:49 -0700 Subject: [PATCH 10/27] WIP: AQE integration tests Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index 907e3ead7bf..912a3adadec 100644 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -19,10 +19,12 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * -from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan +from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan, approximate_float from spark_session import with_cpu_session, with_spark_session -_adaptive_conf = { "spark.sql.adaptive.enabled": "true" } +_adaptive_conf = { "spark.sql.adaptive.enabled": "true", + "spark.rapids.sql.castFloatToString.enabled": "true", + "spark.rapids.sql.castDecimalToString.enabled": "true" } # Dynamic switching of join strategies @@ -54,10 +56,13 @@ def create_skew_df(spark, data_gen, length): return left, right +@ignore_order(local=True) +@approximate_float(abs=1e-6) +@allow_non_gpu("ShuffleExchangeExec") @pytest.mark.parametrize("data_gen", numeric_gens + decimal_gens, ids=idfn) def test_skew_join(data_gen): def do_join(spark): - left, right = create_skew_df(spark, data_gen, length=2048) + left, right = create_skew_df(spark, data_gen, length=512) left.createOrReplaceTempView("skewData1") right.createOrReplaceTempView("skewData2") return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2") From c3ebbbb1fa2d638fb655ba2b3b1b29caf9b4fc0e Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 22 Aug 2022 23:41:37 +0000 Subject: [PATCH 11/27] Updated AQE tests to ensure that leafexecnodes are tested for Databricks --- integration_tests/src/main/python/aqe_test.py | 80 +++++++++++++------ 1 file changed, 54 insertions(+), 26 deletions(-) mode change 100644 => 100755 integration_tests/src/main/python/aqe_test.py diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py old mode 100644 new mode 100755 index 912a3adadec..1af663b660d --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -19,53 +19,81 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * -from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan, approximate_float +from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan from spark_session import with_cpu_session, with_spark_session -_adaptive_conf = { "spark.sql.adaptive.enabled": "true", - "spark.rapids.sql.castFloatToString.enabled": "true", - "spark.rapids.sql.castDecimalToString.enabled": "true" } +_adaptive_conf = { "spark.sql.adaptive.enabled": "true" } # Dynamic switching of join strategies - # Dynamic coalescing of shuffle partitions +# Dynamically Handle Skew Joins + _adaptive_coalese_conf = copy_and_update(_adaptive_conf, { "spark.sql.adaptive.coalescePartitions.enabled": "true" }) -# Dynamically Handle Skew Joins - -def create_skew_df(spark, data_gen, length): - rand = random.Random(0) - data_gen.start(rand) - next_val = None - while next_val is None: - next_val = data_gen.gen() - root = two_col_df(spark, data_gen, SetValuesGen(data_gen.data_type, [next_val]), length) +def create_skew_df(spark, length): + root = spark.range(0, length) + mid = length / 2 left = root.select( - when(col('a') < col('b') / 2, col('b')). - otherwise('a').alias("key1"), - col('a').alias("value1") + when(col('id') < mid / 2, mid). + otherwise('id').alias("key1"), + col('id').alias("value1") ) right = root.select( - when(col('a') < col('b'), col('b')). - otherwise('a').alias("key2"), - col('a').alias("value2") + when(col('id') < mid, mid). + otherwise('id').alias("key2"), + col('id').alias("value2") ) return left, right @ignore_order(local=True) -@approximate_float(abs=1e-6) -@allow_non_gpu("ShuffleExchangeExec") -@pytest.mark.parametrize("data_gen", numeric_gens + decimal_gens, ids=idfn) -def test_skew_join(data_gen): +def test_aqe_skew_join(): def do_join(spark): - left, right = create_skew_df(spark, data_gen, length=512) + left, right = create_skew_df(spark, 500) left.createOrReplaceTempView("skewData1") right.createOrReplaceTempView("skewData2") return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2") - assert_gpu_and_cpu_are_equal_collect(do_join, _adaptive_conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) + +@ignore_order(local=True) +@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) +def test_aqe_join_parquet(spark_tmp_path, data_gen): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: unary_op_df(spark, data_gen).orderBy('a').write.parquet(data_path) + ) + + def do_it(spark): + spark.read.parquet(data_path).createOrReplaceTempView('df1') + spark.read.parquet(data_path).createOrReplaceTempView('df2') + return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) + + +@ignore_order(local=True) +@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) +def test_aqe_join_parquet_batch(spark_tmp_path, data_gen): + # force v2 source for parquet to use BatchScanExec + conf = copy_and_update(_adaptive_conf, { + "spark.sql.sources.useV1SourceList": "" + }) + + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark : unary_op_df(spark, data_gen).write.parquet(first_data_path)) + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark : unary_op_df(spark, data_gen).write.parquet(second_data_path)) + data_path = spark_tmp_path + '/PARQUET_DATA' + + def do_it(spark): + spark.read.parquet(data_path).createOrReplaceTempView('df1') + spark.read.parquet(data_path).createOrReplaceTempView('df2') + return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) \ No newline at end of file From 4a1f59182206ef61742232c27727eb266af0aac2 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Tue, 23 Aug 2022 17:44:05 +0000 Subject: [PATCH 12/27] Add shim for DatasourceV2ExecBase to implement the equivalent computeStats fix Signed-off-by: Navin Kumar --- .../com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 9 +++++++++ .../com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 5 ++++- .../com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala | 2 +- .../com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala | 2 +- .../com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala | 2 +- 5 files changed, 16 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala index e4be4654d2f..f766a8489ee 100644 --- a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.catalyst.plans.logical.Statistics trait ShimLeafExecNode extends LeafExecNode { @@ -25,4 +26,12 @@ trait ShimLeafExecNode extends LeafExecNode { sizeInBytes = Long.MaxValue ) } +} + +trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase { + override def computeStats(): Statistics = { + Statistics( + sizeInBytes = Long.MaxValue + ) + } } \ No newline at end of file diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala index 7e15d379f53..3b46a1339d7 100644 --- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -17,5 +17,8 @@ package com.nvidia.spark.rapids.shims import org.apache.spark.sql.execution.LeafExecNode +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase -trait ShimLeafExecNode extends LeafExecNode \ No newline at end of file +trait ShimLeafExecNode extends LeafExecNode + +trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase \ No newline at end of file diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala index 41dbb864155..69c26084636 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources.v2._ case class GpuBatchScanExec( output: Seq[AttributeReference], - @transient scan: Scan) extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + @transient scan: Scan) extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics { @transient lazy val batch: Batch = scan.toBatch @transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions() diff --git a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala index 6046b24318f..6fbdaa684e5 100644 --- a/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/320until330-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -34,7 +34,7 @@ case class GpuBatchScanExec( output: Seq[AttributeReference], @transient scan: Scan, runtimeFilters: Seq[Expression] = Seq.empty) - extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics { @transient lazy val batch: Batch = scan.toBatch // All expressions are filter expressions used on the CPU. diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala index a80e4b90392..8f334798fae 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -36,7 +36,7 @@ case class GpuBatchScanExec( @transient scan: Scan, runtimeFilters: Seq[Expression] = Seq.empty, keyGroupedPartitioning: Option[Seq[Expression]] = None) - extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics { + extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics { @transient lazy val batch: Batch = scan.toBatch // All expressions are filter expressions used on the CPU. From a64a484c8786adb7781c03d707eadf85a56c1715 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Tue, 23 Aug 2022 14:17:50 -0700 Subject: [PATCH 13/27] fix unused import on Spark 3.1.x Signed-off-by: Navin Kumar --- .../scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala index 69c26084636..11b6b6383ac 100644 --- a/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/311until320-all/scala/com/nvidia/spark/rapids/shims/GpuBatchScanExec.scala @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.connector.read._ -import org.apache.spark.sql.execution.datasources.v2._ case class GpuBatchScanExec( output: Seq[AttributeReference], From 1682f870a048178c1152ae7c858b4e0cbbe3ace8 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 24 Aug 2022 22:29:07 +0000 Subject: [PATCH 14/27] Add AQE unit test to handle window aggregate condition Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index 1af663b660d..d7c31e7e81c 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -96,4 +96,22 @@ def do_it(spark): spark.read.parquet(data_path).createOrReplaceTempView('df2') return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") - assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) \ No newline at end of file + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + + +@ignore_order +@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) +@allow_non_gpu('ShuffleExchangeExec') +def test_aqe_join_sum_window(data_gen): + conf = copy_and_update(_adaptive_conf, + {'spark.rapids.sql.batchSizeBytes': '100'} + ) + + def do_it(spark): + agg_table = gen_df(spark, StructGen([('a_1', LongRangeGen()), ('c', data_gen)], nullable=False)) + part_table = gen_df(spark, StructGen([('a_2', LongRangeGen()), ('b', byte_gen)], nullable=False)) + agg_table.createOrReplaceTempView("agg") + part_table.createOrReplaceTempView("part") + return spark.sql("select b, sum(c) as sum_c, sum(c)*100/sum(sum(c)) over (partition by b) as r_c from agg, part where a_1 = a_2 group by b order by b, r_c") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) From da750f0b32b7080f6904f06a22649536cb9e547e Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Mon, 29 Aug 2022 19:21:06 +0000 Subject: [PATCH 15/27] Fix windowexec issue with missing references to child expressions due to AQE optimizations Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 11 ++++++----- .../scala/com/nvidia/spark/rapids/GpuWindowExec.scala | 7 +++++-- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index d7c31e7e81c..52949422b91 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -101,17 +101,18 @@ def do_it(spark): @ignore_order @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) +@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn) @allow_non_gpu('ShuffleExchangeExec') -def test_aqe_join_sum_window(data_gen): - conf = copy_and_update(_adaptive_conf, - {'spark.rapids.sql.batchSizeBytes': '100'} - ) +def test_aqe_join_sum_window(data_gen, aqe_enabled): + conf = {'spark.sql.adaptive.sql.enabled': aqe_enabled, + 'spark.rapids.sql.batchSizeBytes': '100', + 'spark.rapids.sql.explain': 'NONE'} def do_it(spark): agg_table = gen_df(spark, StructGen([('a_1', LongRangeGen()), ('c', data_gen)], nullable=False)) part_table = gen_df(spark, StructGen([('a_2', LongRangeGen()), ('b', byte_gen)], nullable=False)) agg_table.createOrReplaceTempView("agg") part_table.createOrReplaceTempView("part") - return spark.sql("select b, sum(c) as sum_c, sum(c)*100/sum(sum(c)) over (partition by b) as r_c from agg, part where a_1 = a_2 group by b order by b, r_c") + return spark.sql("select b, sum(c) as sum_c, sum(c)/sum(sum(c)) over (partition by b) as r_c from agg, part where a_1 = a_2 group by b order by b, r_c") assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 9bf33829881..87eb7f9be43 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -340,8 +340,11 @@ object GpuWindowExec extends Arm { case wf: GpuWindowFunction => // All window functions, including those that are also aggregation functions, are // wrapped in a GpuWindowExpression, so dedup and save their children into the pre - // stage, replacing them with aliases. - val newChildren = wf.children.map(extractAndSave(_, preProject, preDedupe)) + // stage, replacing them with aliases. Also, sometimes children are not provided in the + // initial list of expressions after optimizations, so we add them here, and they will + // be deduped anyways in the other passes + val newChildren = wf.children.map(ce => + extractAndSave(extractAndSave(ce, preProject, preDedupe), windowOps, windowDedupe)) wf.withNewChildren(newChildren) case wsc @ GpuWindowSpecDefinition(partitionSpec, orderSpec, _) => // Extracts expressions from the partition spec and order spec to be sure that they From d65c30712cc00a7b898b33bd113a7209d8558c10 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 31 Aug 2022 10:46:33 -0700 Subject: [PATCH 16/27] Fix some style issues Signed-off-by: Navin Kumar --- .../scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 2 +- .../scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala index f766a8489ee..dcf85656cf1 100644 --- a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -16,9 +16,9 @@ package com.nvidia.spark.rapids.shims +import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase -import org.apache.spark.sql.catalyst.plans.logical.Statistics trait ShimLeafExecNode extends LeafExecNode { override def computeStats(): Statistics = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala index 55b8c6bd134..50ca6e94c94 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/basicPhysicalOperators.scala @@ -23,7 +23,7 @@ import ai.rapids.cudf import ai.rapids.cudf._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.{ShimSparkPlan, ShimLeafExecNode, ShimUnaryExecNode} +import com.nvidia.spark.rapids.shims.{ShimLeafExecNode, ShimSparkPlan, ShimUnaryExecNode} import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext} import org.apache.spark.internal.Logging From cdb32c6eda7de577e3f2f7a5f3b63af7369e873e Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 31 Aug 2022 22:15:17 +0000 Subject: [PATCH 17/27] Found a potential union based join unit test that will crash when AQE is enabled on Databricks Signed-off-by: Navin Kumar --- integration_tests/src/main/python/join_test.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 187b953e32d..45d93ca8328 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -735,7 +735,8 @@ def do_join(spark): # Regression test for https://github.com/NVIDIA/spark-rapids/issues/3775 @ignore_order(local=True) -def test_struct_self_join(spark_tmp_table_factory): +@pytest.mark.parametrize("aqe_enabled", ["true", "false"]) +def test_struct_self_join(spark_tmp_table_factory, aqe_enabled): def do_join(spark): data = [ (("Adam ", "", "Green"), "1", "M", 1000), @@ -762,7 +763,7 @@ def do_join(spark): resultdf.createOrReplaceTempView(resultdf_name) return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( resultdf_name, resultdf_name)) - assert_gpu_and_cpu_are_equal_collect(do_join) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': aqe_enabled}) # ExistenceJoin occurs in the context of existential subqueries (which is rewritten to SemiJoin) if # there is an additional condition that may qualify left records even though they don't have From 12fe02ca19578bb7036d9dce68bcabedd9000443 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 1 Sep 2022 20:03:54 +0000 Subject: [PATCH 18/27] Disable GPU shuffle on older Databricks, and switch current Databricks to using original Spark implementation to fix concurrency bug in shim Signed-off-by: Navin Kumar --- integration_tests/src/main/python/join_test.py | 4 +++- .../com/nvidia/spark/rapids/shims/AQEUtils.scala | 2 ++ .../com/nvidia/spark/rapids/shims/AQEUtils.scala | 7 +++++-- .../com/nvidia/spark/rapids/shims/AQEUtils.scala | 7 +++++-- .../rapids/shims/GpuShuffleExchangeExec.scala | 16 +++------------- .../execution/GpuShuffleExchangeExecBase.scala | 9 ++++++++- 6 files changed, 26 insertions(+), 19 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 8b4a56cd0ba..ba822304a8c 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -736,7 +736,9 @@ def do_join(spark): # Regression test for https://github.com/NVIDIA/spark-rapids/issues/3775 @ignore_order(local=True) @pytest.mark.parametrize("aqe_enabled", ["true", "false"]) +@allow_non_gpu("ShuffleExchangeExec") def test_struct_self_join(spark_tmp_table_factory, aqe_enabled): + conf = {'spark.sql.adaptive.enabled': aqe_enabled} def do_join(spark): data = [ (("Adam ", "", "Green"), "1", "M", 1000), @@ -763,7 +765,7 @@ def do_join(spark): resultdf.createOrReplaceTempView(resultdf_name) return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( resultdf_name, resultdf_name)) - assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': aqe_enabled}) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) # ExistenceJoin occurs in the context of existential subqueries (which is rewritten to SemiJoin) if # there is an additional condition that may qualify left records even though they don't have diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index c53b6a3b3e6..06fd3aab721 100644 --- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -27,4 +27,6 @@ object AQEUtils { } def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + + def isGPUShuffleSupportedInAdaptiveExecution: Boolean = true } diff --git a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 2a291852e8a..a64439cd4e0 100644 --- a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -28,6 +28,9 @@ object AQEUtils { ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan) } - // currently we don't support AQE on Databricks - def isAdaptiveExecutionSupportedInSparkVersion: Boolean = false + def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + + // GPU Shuffle in some Databricks creates some incorrect references when AQE is enabled + // so disable it for now. + def isGPUShuffleSupportedInAdaptiveExecution: Boolean = false } diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index edc17e35ea0..402b0fe408c 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -16,18 +16,21 @@ package com.nvidia.spark.rapids.shims +import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec /** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */ -object AQEUtils { +object AQEUtils extends Logging { /** Return a new QueryStageExec reuse instance with updated output attributes */ def newReuseInstance(sqse: ShuffleQueryStageExec, newOutput: Seq[Attribute]): QueryStageExec = { val reusedExchange = ReusedExchangeExec(newOutput, sqse.shuffle) + logWarning(s"reusedExchange: $reusedExchange") ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange) } - // currently we don't support AQE on Databricks def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true + + def isGPUShuffleSupportedInAdaptiveExecution: Boolean = true } diff --git a/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala index 44d61961b40..26dda745e8b 100644 --- a/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/321db/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala @@ -15,37 +15,27 @@ */ package org.apache.spark.rapids.shims -import scala.concurrent.Future - import com.nvidia.spark.rapids.GpuPartitioning -import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.plans.logical.Statistics import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan} import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin} -import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD} +import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBaseWithMetrics, ShuffledBatchRDD} case class GpuShuffleExchangeExec( gpuOutputPartitioning: GpuPartitioning, child: SparkPlan, shuffleOrigin: ShuffleOrigin)( cpuOutputPartitioning: Partitioning) - extends GpuShuffleExchangeExecBase(gpuOutputPartitioning, child) with ShuffleExchangeLike { + extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child) + with ShuffleExchangeLike { override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil override val outputPartitioning: Partitioning = cpuOutputPartitioning - // 'mapOutputStatisticsFuture' is only needed when enable AQE. - override def mapOutputStatisticsFuture: Future[MapOutputStatistics] = - if (inputBatchRDD.getNumPartitions == 0) { - Future.successful(null) - } else { - sparkContext.submitMapStage(shuffleDependencyColumnar) - } - override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index c0f99ca109c..624142d8006 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{AQEUtils, GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, SparkShimImpl} import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rapids.shims.GpuShuffleExchangeExec @@ -105,6 +105,13 @@ class GpuShuffleMeta( wrapped.setTagValue(GpuShuffleMeta.availableRuntimeDataTransition, availableRuntimeDataTransition) } + + // Check if AQE is enabled and if we can use GPU shuffle or not based on our environment + if (SQLConf.get.adaptiveExecutionEnabled && + !AQEUtils.isGPUShuffleSupportedInAdaptiveExecution) { + willNotWorkOnGpu("current Spark version does not support GPU shuffle when adaptive " + + "execution is enabled") + } } override def convertToGpu(): GpuExec = From 29da0c148d3cf2671f33d1e442c49798a027a54a Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Thu, 1 Sep 2022 23:01:57 +0000 Subject: [PATCH 19/27] Refactor unit tests for handling issues with Databricks 9.1 Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 123 +++++++++++++++++- .../src/main/python/join_test.py | 7 +- 2 files changed, 124 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index 52949422b91..ab8f021699c 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -20,7 +20,7 @@ from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan -from spark_session import with_cpu_session, with_spark_session +from spark_session import with_cpu_session, with_spark_session, is_databricks104_or_later, is_databricks91_or_later _adaptive_conf = { "spark.sql.adaptive.enabled": "true" } @@ -50,6 +50,7 @@ def create_skew_df(spark, length): @ignore_order(local=True) +@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") def test_aqe_skew_join(): def do_join(spark): left, right = create_skew_df(spark, 500) @@ -60,6 +61,19 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) @ignore_order(local=True) +@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") +@allow_non_gpu("ShuffleExchangeExec") +def test_aqe_skew_join_db91(): + def do_join(spark): + left, right = create_skew_df(spark, 500) + left.createOrReplaceTempView("skewData1") + right.createOrReplaceTempView("skewData2") + return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2") + + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) + +@ignore_order(local=True) +@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet(spark_tmp_path, data_gen): data_path = spark_tmp_path + '/PARQUET_DATA' @@ -74,8 +88,26 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) +@ignore_order(local=True) +@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") +@allow_non_gpu("ShuffleExchangeExec") +@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) +def test_aqe_join_parquet_db91(spark_tmp_path, data_gen): + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark: unary_op_df(spark, data_gen).orderBy('a').write.parquet(data_path) + ) + + def do_it(spark): + spark.read.parquet(data_path).createOrReplaceTempView('df1') + spark.read.parquet(data_path).createOrReplaceTempView('df2') + return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) + @ignore_order(local=True) +@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet_batch(spark_tmp_path, data_gen): # force v2 source for parquet to use BatchScanExec @@ -98,6 +130,31 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) +@ignore_order(local=True) +@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") +@allow_non_gpu("ShuffleExchangeExec") +@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) +def test_aqe_join_parquet_batch_db91(spark_tmp_path, data_gen): + # force v2 source for parquet to use BatchScanExec + conf = copy_and_update(_adaptive_conf, { + "spark.sql.sources.useV1SourceList": "" + }) + + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark : unary_op_df(spark, data_gen).write.parquet(first_data_path)) + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark : unary_op_df(spark, data_gen).write.parquet(second_data_path)) + data_path = spark_tmp_path + '/PARQUET_DATA' + + def do_it(spark): + spark.read.parquet(data_path).createOrReplaceTempView('df1') + spark.read.parquet(data_path).createOrReplaceTempView('df2') + return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") + + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + @ignore_order @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) @@ -116,3 +173,67 @@ def do_it(spark): return spark.sql("select b, sum(c) as sum_c, sum(c)/sum(sum(c)) over (partition by b) as r_c from agg, part where a_1 = a_2 group by b order by b, r_c") assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) + +@ignore_order(local=True) +@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") +def test_aqe_struct_self_join(spark_tmp_table_factory): + def do_join(spark): + data = [ + (("Adam ", "", "Green"), "1", "M", 1000), + (("Bob ", "Middle", "Green"), "2", "M", 2000), + (("Cathy ", "", "Green"), "3", "F", 3000) + ] + schema = (StructType() + .add("name", StructType() + .add("firstname", StringType()) + .add("middlename", StringType()) + .add("lastname", StringType())) + .add("id", StringType()) + .add("gender", StringType()) + .add("salary", IntegerType())) + df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df_name = spark_tmp_table_factory.get() + df.createOrReplaceTempView(df_name) + resultdf = spark.sql( + "select struct(name, struct(name.firstname, name.lastname) as newname)" + + " as col,name from " + df_name + " union" + + " select struct(name, struct(name.firstname, name.lastname) as newname) as col,name" + + " from " + df_name) + resultdf_name = spark_tmp_table_factory.get() + resultdf.createOrReplaceTempView(resultdf_name) + return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( + resultdf_name, resultdf_name)) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) + + +@ignore_order(local=True) +@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") +@allow_non_gpu("ShuffleExchangeExec") +def test_aqe_struct_self_join_db91(spark_tmp_table_factory): + def do_join(spark): + data = [ + (("Adam ", "", "Green"), "1", "M", 1000), + (("Bob ", "Middle", "Green"), "2", "M", 2000), + (("Cathy ", "", "Green"), "3", "F", 3000) + ] + schema = (StructType() + .add("name", StructType() + .add("firstname", StringType()) + .add("middlename", StringType()) + .add("lastname", StringType())) + .add("id", StringType()) + .add("gender", StringType()) + .add("salary", IntegerType())) + df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + df_name = spark_tmp_table_factory.get() + df.createOrReplaceTempView(df_name) + resultdf = spark.sql( + "select struct(name, struct(name.firstname, name.lastname) as newname)" + + " as col,name from " + df_name + " union" + + " select struct(name, struct(name.firstname, name.lastname) as newname) as col,name" + + " from " + df_name) + resultdf_name = spark_tmp_table_factory.get() + resultdf.createOrReplaceTempView(resultdf_name) + return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( + resultdf_name, resultdf_name)) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index ba822304a8c..bf6cb0c082a 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -735,10 +735,7 @@ def do_join(spark): # Regression test for https://github.com/NVIDIA/spark-rapids/issues/3775 @ignore_order(local=True) -@pytest.mark.parametrize("aqe_enabled", ["true", "false"]) -@allow_non_gpu("ShuffleExchangeExec") -def test_struct_self_join(spark_tmp_table_factory, aqe_enabled): - conf = {'spark.sql.adaptive.enabled': aqe_enabled} +def test_struct_self_join(spark_tmp_table_factory): def do_join(spark): data = [ (("Adam ", "", "Green"), "1", "M", 1000), @@ -765,7 +762,7 @@ def do_join(spark): resultdf.createOrReplaceTempView(resultdf_name) return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( resultdf_name, resultdf_name)) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # ExistenceJoin occurs in the context of existential subqueries (which is rewritten to SemiJoin) if # there is an additional condition that may qualify left records even though they don't have From 541d91abf820d49f51063c2886a72b42774e6782 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Tue, 6 Sep 2022 17:55:12 +0000 Subject: [PATCH 20/27] Address feedback Signed-off-by: Navin Kumar --- integration_tests/src/main/python/array_test.py | 2 +- .../com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 5 +++++ .../321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala | 1 - 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 4a27e40ad25..58c05f7bf4d 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -16,7 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_gpu_fallback_collect from data_gen import * -from marks import incompat, allow_non_gpu_databricks +from marks import incompat from spark_session import is_before_spark_313, is_before_spark_330, is_spark_330_or_later, is_databricks104_or_later from pyspark.sql.types import * from pyspark.sql.types import IntegralType diff --git a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala index dcf85656cf1..664f1100532 100644 --- a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -21,6 +21,11 @@ import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase trait ShimLeafExecNode extends LeafExecNode { + // For AQE support in Databricks, all Exec nodes implement computeStats(). This is actually + // a recursive call to traverse the entire physical plan to aggregate this number. For the + // end of the computation, this means that all LeafExecNodes must implement this method to + // avoid a stack overflow. For now, based on feedback from Databricks, Long.MaxValue is + // sufficient to satisfy this computation. override def computeStats(): Statistics = { Statistics( sizeInBytes = Long.MaxValue diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 402b0fe408c..8fa1fffa2a8 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -26,7 +26,6 @@ object AQEUtils extends Logging { /** Return a new QueryStageExec reuse instance with updated output attributes */ def newReuseInstance(sqse: ShuffleQueryStageExec, newOutput: Seq[Attribute]): QueryStageExec = { val reusedExchange = ReusedExchangeExec(newOutput, sqse.shuffle) - logWarning(s"reusedExchange: $reusedExchange") ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange) } From 6e93d9c9d6f6a4dca9a95df17b8c3dede4742cc7 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Tue, 6 Sep 2022 17:57:33 +0000 Subject: [PATCH 21/27] Update comment Signed-off-by: Navin Kumar --- .../scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala index 664f1100532..877442f6d90 100644 --- a/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala +++ b/sql-plugin/src/main/311+-db/scala/com/nvidia/spark/rapids/shims/ShimLeafExecNode.scala @@ -33,6 +33,7 @@ trait ShimLeafExecNode extends LeafExecNode { } } +// DataSourceV2ScanExecBase actually extends LeafExecNode, so we extend that shim as well here. trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase { override def computeStats(): Statistics = { Statistics( From afbdf2b314914ce90ddc40f28ebb4e93d8030559 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Tue, 6 Sep 2022 19:51:33 +0000 Subject: [PATCH 22/27] Enable GPU shuffle in AQE on Databricks 9.1, remove unnecessary shim logic Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 100 +----------------- .../nvidia/spark/rapids/shims/AQEUtils.scala | 2 - .../nvidia/spark/rapids/shims/AQEUtils.scala | 4 - .../rapids/shims/GpuShuffleExchangeExec.scala | 3 +- .../nvidia/spark/rapids/shims/AQEUtils.scala | 2 - .../GpuShuffleExchangeExecBase.scala | 9 +- 6 files changed, 6 insertions(+), 114 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index ab8f021699c..1ce6c76635a 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -13,14 +13,12 @@ # limitations under the License. import pytest -from _pytest.mark.structures import ParameterSet from pyspark.sql.functions import when, col from pyspark.sql.types import * -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture -from conftest import is_databricks_runtime, is_emr_runtime +from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import ignore_order, allow_non_gpu, incompat, validate_execs_in_gpu_plan -from spark_session import with_cpu_session, with_spark_session, is_databricks104_or_later, is_databricks91_or_later +from marks import ignore_order, allow_non_gpu +from spark_session import with_cpu_session _adaptive_conf = { "spark.sql.adaptive.enabled": "true" } @@ -50,7 +48,6 @@ def create_skew_df(spark, length): @ignore_order(local=True) -@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") def test_aqe_skew_join(): def do_join(spark): left, right = create_skew_df(spark, 500) @@ -61,19 +58,6 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) @ignore_order(local=True) -@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") -@allow_non_gpu("ShuffleExchangeExec") -def test_aqe_skew_join_db91(): - def do_join(spark): - left, right = create_skew_df(spark, 500) - left.createOrReplaceTempView("skewData1") - right.createOrReplaceTempView("skewData2") - return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2") - - assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) - -@ignore_order(local=True) -@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet(spark_tmp_path, data_gen): data_path = spark_tmp_path + '/PARQUET_DATA' @@ -88,26 +72,8 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) -@ignore_order(local=True) -@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") -@allow_non_gpu("ShuffleExchangeExec") -@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) -def test_aqe_join_parquet_db91(spark_tmp_path, data_gen): - data_path = spark_tmp_path + '/PARQUET_DATA' - with_cpu_session( - lambda spark: unary_op_df(spark, data_gen).orderBy('a').write.parquet(data_path) - ) - - def do_it(spark): - spark.read.parquet(data_path).createOrReplaceTempView('df1') - spark.read.parquet(data_path).createOrReplaceTempView('df2') - return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") - - assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) - @ignore_order(local=True) -@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet_batch(spark_tmp_path, data_gen): # force v2 source for parquet to use BatchScanExec @@ -130,32 +96,6 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) -@ignore_order(local=True) -@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") -@allow_non_gpu("ShuffleExchangeExec") -@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) -def test_aqe_join_parquet_batch_db91(spark_tmp_path, data_gen): - # force v2 source for parquet to use BatchScanExec - conf = copy_and_update(_adaptive_conf, { - "spark.sql.sources.useV1SourceList": "" - }) - - first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' - with_cpu_session( - lambda spark : unary_op_df(spark, data_gen).write.parquet(first_data_path)) - second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' - with_cpu_session( - lambda spark : unary_op_df(spark, data_gen).write.parquet(second_data_path)) - data_path = spark_tmp_path + '/PARQUET_DATA' - - def do_it(spark): - spark.read.parquet(data_path).createOrReplaceTempView('df1') - spark.read.parquet(data_path).createOrReplaceTempView('df2') - return spark.sql("select count(*) from df1,df2 where df1.a = df2.a") - - assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) - - @ignore_order @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) @pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn) @@ -175,7 +115,6 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) @ignore_order(local=True) -@pytest.mark.skipif(is_databricks91_or_later() and not is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") def test_aqe_struct_self_join(spark_tmp_table_factory): def do_join(spark): data = [ @@ -204,36 +143,3 @@ def do_join(spark): return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( resultdf_name, resultdf_name)) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) - - -@ignore_order(local=True) -@pytest.mark.skipif(not is_databricks_runtime() or is_databricks104_or_later(), reason="GPU shuffle only works on Apache Spark and Databricks 10.4 or later") -@allow_non_gpu("ShuffleExchangeExec") -def test_aqe_struct_self_join_db91(spark_tmp_table_factory): - def do_join(spark): - data = [ - (("Adam ", "", "Green"), "1", "M", 1000), - (("Bob ", "Middle", "Green"), "2", "M", 2000), - (("Cathy ", "", "Green"), "3", "F", 3000) - ] - schema = (StructType() - .add("name", StructType() - .add("firstname", StringType()) - .add("middlename", StringType()) - .add("lastname", StringType())) - .add("id", StringType()) - .add("gender", StringType()) - .add("salary", IntegerType())) - df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) - df_name = spark_tmp_table_factory.get() - df.createOrReplaceTempView(df_name) - resultdf = spark.sql( - "select struct(name, struct(name.firstname, name.lastname) as newname)" + - " as col,name from " + df_name + " union" + - " select struct(name, struct(name.firstname, name.lastname) as newname) as col,name" + - " from " + df_name) - resultdf_name = spark_tmp_table_factory.get() - resultdf.createOrReplaceTempView(resultdf_name) - return spark.sql("select a.* from {} a, {} b where a.name=b.name".format( - resultdf_name, resultdf_name)) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) diff --git a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 06fd3aab721..c53b6a3b3e6 100644 --- a/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/311+-nondb/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -27,6 +27,4 @@ object AQEUtils { } def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true - - def isGPUShuffleSupportedInAdaptiveExecution: Boolean = true } diff --git a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index a64439cd4e0..33302c1d8d0 100644 --- a/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/312db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -29,8 +29,4 @@ object AQEUtils { } def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true - - // GPU Shuffle in some Databricks creates some incorrect references when AQE is enabled - // so disable it for now. - def isGPUShuffleSupportedInAdaptiveExecution: Boolean = false } diff --git a/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala index b261e15823e..57aff608c82 100644 --- a/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/31xdb/scala/org/apache/spark/rapids/shims/GpuShuffleExchangeExec.scala @@ -39,7 +39,8 @@ case class GpuShuffleExchangeExec( override val outputPartitioning: Partitioning = cpuOutputPartitioning // 'mapOutputStatisticsFuture' is only needed when enable AQE. - override def doMapOutputStatisticsFuture: Future[MapOutputStatistics] = { + @transient + override lazy val doMapOutputStatisticsFuture: Future[MapOutputStatistics] = { if (inputBatchRDD.getNumPartitions == 0) { Future.successful(null) } else { diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 8fa1fffa2a8..3a0e1047e3a 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -30,6 +30,4 @@ object AQEUtils extends Logging { } def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true - - def isGPUShuffleSupportedInAdaptiveExecution: Boolean = true } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 624142d8006..c0f99ca109c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.shims.{AQEUtils, GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, SparkShimImpl} +import com.nvidia.spark.rapids.shims.{GpuHashPartitioning, GpuRangePartitioning, ShimUnaryExecNode, SparkShimImpl} import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rapids.shims.GpuShuffleExchangeExec @@ -105,13 +105,6 @@ class GpuShuffleMeta( wrapped.setTagValue(GpuShuffleMeta.availableRuntimeDataTransition, availableRuntimeDataTransition) } - - // Check if AQE is enabled and if we can use GPU shuffle or not based on our environment - if (SQLConf.get.adaptiveExecutionEnabled && - !AQEUtils.isGPUShuffleSupportedInAdaptiveExecution) { - willNotWorkOnGpu("current Spark version does not support GPU shuffle when adaptive " + - "execution is enabled") - } } override def convertToGpu(): GpuExec = From 41adfa9c6cc6954ea707fdededde169a1fff97ca Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 7 Sep 2022 15:30:17 +0000 Subject: [PATCH 23/27] cleanup and add comments to tests Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 23 ++++++++----------- .../nvidia/spark/rapids/shims/AQEUtils.scala | 3 +-- 2 files changed, 11 insertions(+), 15 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index 1ce6c76635a..1e127cb43b2 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -22,15 +22,6 @@ _adaptive_conf = { "spark.sql.adaptive.enabled": "true" } -# Dynamic switching of join strategies -# Dynamic coalescing of shuffle partitions -# Dynamically Handle Skew Joins - -_adaptive_coalese_conf = copy_and_update(_adaptive_conf, { - "spark.sql.adaptive.coalescePartitions.enabled": "true" -}) - - def create_skew_df(spark, length): root = spark.range(0, length) mid = length / 2 @@ -47,6 +38,8 @@ def create_skew_df(spark, length): return left, right +# This replicates the skew join test from scala tests, and is here to test +# the computeStats(...) implementation in GpuRangeExec @ignore_order(local=True) def test_aqe_skew_join(): def do_join(spark): @@ -57,6 +50,7 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf) +# Test the computeStats(...) implementation in GpuDataSourceScanExec @ignore_order(local=True) @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet(spark_tmp_path, data_gen): @@ -73,6 +67,7 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf) +# Test the computeStats(...) implementation in GpuBatchScanExec @ignore_order(local=True) @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_join_parquet_batch(spark_tmp_path, data_gen): @@ -96,14 +91,15 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) +# Test the AQE optimization that sometimes drops children from the initial list of expressions that +# are still children of window expressions in GpuWindowExec @ignore_order @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) -@pytest.mark.parametrize('aqe_enabled', ['true', 'false'], ids=idfn) @allow_non_gpu('ShuffleExchangeExec') -def test_aqe_join_sum_window(data_gen, aqe_enabled): - conf = {'spark.sql.adaptive.sql.enabled': aqe_enabled, +def test_aqe_join_sum_window(data_gen): + conf = copy_and_update(_adaptive_conf, { 'spark.rapids.sql.batchSizeBytes': '100', - 'spark.rapids.sql.explain': 'NONE'} + 'spark.rapids.sql.explain': 'NONE'}) def do_it(spark): agg_table = gen_df(spark, StructGen([('a_1', LongRangeGen()), ('c', data_gen)], nullable=False)) @@ -114,6 +110,7 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) +# Test the map stage submission handling for GpuShuffleExchangeExec @ignore_order(local=True) def test_aqe_struct_self_join(spark_tmp_table_factory): def do_join(spark): diff --git a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala index 3a0e1047e3a..1830a01e996 100644 --- a/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala +++ b/sql-plugin/src/main/321db/scala/com/nvidia/spark/rapids/shims/AQEUtils.scala @@ -16,13 +16,12 @@ package com.nvidia.spark.rapids.shims -import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.adaptive.{QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.exchange.ReusedExchangeExec /** Utility methods for manipulating Catalyst classes involved in Adaptive Query Execution */ -object AQEUtils extends Logging { +object AQEUtils { /** Return a new QueryStageExec reuse instance with updated output attributes */ def newReuseInstance(sqse: ShuffleQueryStageExec, newOutput: Seq[Attribute]): QueryStageExec = { val reusedExchange = ReusedExchangeExec(newOutput, sqse.shuffle) From c470b7252d6fb98cf8f5b6ae3875c2ca11fa0489 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Wed, 7 Sep 2022 21:21:58 +0000 Subject: [PATCH 24/27] Add cache join test for AQE Signed-off-by: Navin Kumar --- integration_tests/src/main/python/cache_test.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 8e37fb61d1c..c4022135d98 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -333,3 +333,16 @@ def test_func(spark): df.cache().count() return df.selectExpr("b", "a") assert_gpu_and_cpu_are_equal_collect(test_func, enable_vectorized_conf) + +# For AQE, test the computeStats(...) implementation in GpuInMemoryTableScanExec +# NOTE: this test is here because the necessary cache configuration is only +# available when this test file is used +@ignore_order(local=True) +@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) +def test_aqe_cache_join(data_gen): + conf = {'spark.sql.adaptive.enabled': 'true'} + def do_it(spark): + df1 = unary_op_df(spark, data_gen).orderBy('a').cache() + df2 = df1.alias('df2') + return df1.join(df2, df1.a == df2.a, 'Outer') + assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) \ No newline at end of file From 3bf9f21229fe39a9e4c154543304c6dfa891f5f6 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 9 Sep 2022 00:46:31 +0000 Subject: [PATCH 25/27] remove windowing fix, and move to a separate branch since this is not an AQE-specific bug Signed-off-by: Navin Kumar --- integration_tests/src/main/python/aqe_test.py | 19 ------------------- .../nvidia/spark/rapids/GpuWindowExec.scala | 7 ++----- 2 files changed, 2 insertions(+), 24 deletions(-) diff --git a/integration_tests/src/main/python/aqe_test.py b/integration_tests/src/main/python/aqe_test.py index 1e127cb43b2..8865bf3477a 100755 --- a/integration_tests/src/main/python/aqe_test.py +++ b/integration_tests/src/main/python/aqe_test.py @@ -91,25 +91,6 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) -# Test the AQE optimization that sometimes drops children from the initial list of expressions that -# are still children of window expressions in GpuWindowExec -@ignore_order -@pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) -@allow_non_gpu('ShuffleExchangeExec') -def test_aqe_join_sum_window(data_gen): - conf = copy_and_update(_adaptive_conf, { - 'spark.rapids.sql.batchSizeBytes': '100', - 'spark.rapids.sql.explain': 'NONE'}) - - def do_it(spark): - agg_table = gen_df(spark, StructGen([('a_1', LongRangeGen()), ('c', data_gen)], nullable=False)) - part_table = gen_df(spark, StructGen([('a_2', LongRangeGen()), ('b', byte_gen)], nullable=False)) - agg_table.createOrReplaceTempView("agg") - part_table.createOrReplaceTempView("part") - return spark.sql("select b, sum(c) as sum_c, sum(c)/sum(sum(c)) over (partition by b) as r_c from agg, part where a_1 = a_2 group by b order by b, r_c") - - assert_gpu_and_cpu_are_equal_collect(do_it, conf = conf) - # Test the map stage submission handling for GpuShuffleExchangeExec @ignore_order(local=True) def test_aqe_struct_self_join(spark_tmp_table_factory): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 87eb7f9be43..9bf33829881 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -340,11 +340,8 @@ object GpuWindowExec extends Arm { case wf: GpuWindowFunction => // All window functions, including those that are also aggregation functions, are // wrapped in a GpuWindowExpression, so dedup and save their children into the pre - // stage, replacing them with aliases. Also, sometimes children are not provided in the - // initial list of expressions after optimizations, so we add them here, and they will - // be deduped anyways in the other passes - val newChildren = wf.children.map(ce => - extractAndSave(extractAndSave(ce, preProject, preDedupe), windowOps, windowDedupe)) + // stage, replacing them with aliases. + val newChildren = wf.children.map(extractAndSave(_, preProject, preDedupe)) wf.withNewChildren(newChildren) case wsc @ GpuWindowSpecDefinition(partitionSpec, orderSpec, _) => // Extracts expressions from the partition spec and order spec to be sure that they From bf102363f4b3a9845a0d1b0444cf886c9519e70f Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 9 Sep 2022 01:02:42 +0000 Subject: [PATCH 26/27] This should be allowed not to run on GPU since AQE can push it off Signed-off-by: Navin Kumar --- integration_tests/src/main/python/cache_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index c4022135d98..cee8e24e025 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -338,6 +338,7 @@ def test_func(spark): # NOTE: this test is here because the necessary cache configuration is only # available when this test file is used @ignore_order(local=True) +@allow_non_gpu("ShuffleExchangeExec") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_cache_join(data_gen): conf = {'spark.sql.adaptive.enabled': 'true'} From 9c4288b1581f99870e468161218d5eb5bd8297e5 Mon Sep 17 00:00:00 2001 From: Navin Kumar Date: Fri, 9 Sep 2022 11:43:22 -0700 Subject: [PATCH 27/27] Allow ColumnarToRowExec to not run on GPU because it tends to fallback in CI Signed-off-by: Navin Kumar --- integration_tests/src/main/python/cache_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index cee8e24e025..47e25c8b2f2 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -338,7 +338,7 @@ def test_func(spark): # NOTE: this test is here because the necessary cache configuration is only # available when this test file is used @ignore_order(local=True) -@allow_non_gpu("ShuffleExchangeExec") +@allow_non_gpu("ShuffleExchangeExec", "ColumnarToRowExec") @pytest.mark.parametrize("data_gen", integral_gens, ids=idfn) def test_aqe_cache_join(data_gen): conf = {'spark.sql.adaptive.enabled': 'true'}