Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling AQE on [databricks] #6461

Merged
merged 30 commits into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
8d8b121
WIP: first pass at ShimLeafExecNode, need to update indirect inherito…
NVnavkumar Aug 18, 2022
b3b516c
Move this to until 3.4.0 for non-databricks spark versions
NVnavkumar Aug 18, 2022
c45cbe0
Set this flag to true in 3.2.1 DB shim
NVnavkumar Aug 18, 2022
5b3f954
WIP: some test updates with enabling AQE
NVnavkumar Aug 18, 2022
40e7044
Move these shim implementations to right place
NVnavkumar Aug 18, 2022
d0c5ccb
revert this test change for now, need a better solution
NVnavkumar Aug 18, 2022
acaa586
WIP: Re-enable aqe Databricks tests
NVnavkumar Aug 18, 2022
e203e9c
Unblock these 2 tests on Databricks
NVnavkumar Aug 19, 2022
91dc00a
WIP: integration tests for AQE
NVnavkumar Aug 19, 2022
1ce22b2
WIP: AQE integration tests
NVnavkumar Aug 19, 2022
c3ebbbb
Updated AQE tests to ensure that leafexecnodes are tested for Databricks
NVnavkumar Aug 22, 2022
4a1f591
Add shim for DatasourceV2ExecBase to implement the equivalent compute…
NVnavkumar Aug 23, 2022
a64a484
fix unused import on Spark 3.1.x
NVnavkumar Aug 23, 2022
1682f87
Add AQE unit test to handle window aggregate condition
NVnavkumar Aug 24, 2022
da750f0
Fix windowexec issue with missing references to child expressions due…
NVnavkumar Aug 29, 2022
b9e2bad
Merge branch 'branch-22.10' of github.com:NVIDIA/spark-rapids into aq…
NVnavkumar Aug 31, 2022
d65c307
Fix some style issues
NVnavkumar Aug 31, 2022
cdb32c6
Found a potential union based join unit test that will crash when AQE…
NVnavkumar Aug 31, 2022
4922fb4
Merge branch 'aqe_on_db' of github.com:NVnavkumar/spark-rapids into a…
NVnavkumar Aug 31, 2022
12fe02c
Disable GPU shuffle on older Databricks, and switch current Databrick…
NVnavkumar Sep 1, 2022
29da0c1
Refactor unit tests for handling issues with Databricks 9.1
NVnavkumar Sep 1, 2022
541d91a
Address feedback
NVnavkumar Sep 6, 2022
6e93d9c
Update comment
NVnavkumar Sep 6, 2022
afbdf2b
Enable GPU shuffle in AQE on Databricks 9.1, remove unnecessary shim …
NVnavkumar Sep 6, 2022
41adfa9
cleanup and add comments to tests
NVnavkumar Sep 7, 2022
c470b72
Add cache join test for AQE
NVnavkumar Sep 7, 2022
3bf9f21
remove windowing fix, and move to a separate branch since this is not…
NVnavkumar Sep 9, 2022
819d54d
Merge branch 'branch-22.10' into aqe_on_db
NVnavkumar Sep 9, 2022
bf10236
This should be allowed not to run on GPU since AQE can push it off
NVnavkumar Sep 9, 2022
9c4288b
Allow ColumnarToRowExec to not run on GPU because it tends to fallbac…
NVnavkumar Sep 9, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions integration_tests/src/main/python/aqe_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Copyright (c) 2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from pyspark.sql.functions import when, col
from pyspark.sql.types import *
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import ignore_order, allow_non_gpu
from spark_session import with_cpu_session

_adaptive_conf = { "spark.sql.adaptive.enabled": "true" }

def create_skew_df(spark, length):
root = spark.range(0, length)
mid = length / 2
left = root.select(
when(col('id') < mid / 2, mid).
otherwise('id').alias("key1"),
col('id').alias("value1")
)
right = root.select(
when(col('id') < mid, mid).
otherwise('id').alias("key2"),
col('id').alias("value2")
)
return left, right


# This replicates the skew join test from scala tests, and is here to test
# the computeStats(...) implementation in GpuRangeExec
@ignore_order(local=True)
def test_aqe_skew_join():
def do_join(spark):
left, right = create_skew_df(spark, 500)
NVnavkumar marked this conversation as resolved.
Show resolved Hide resolved
left.createOrReplaceTempView("skewData1")
right.createOrReplaceTempView("skewData2")
return spark.sql("SELECT * FROM skewData1 join skewData2 ON key1 = key2")

assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf)

# Test the computeStats(...) implementation in GpuDataSourceScanExec
@ignore_order(local=True)
@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn)
def test_aqe_join_parquet(spark_tmp_path, data_gen):
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark: unary_op_df(spark, data_gen).orderBy('a').write.parquet(data_path)
)

def do_it(spark):
spark.read.parquet(data_path).createOrReplaceTempView('df1')
spark.read.parquet(data_path).createOrReplaceTempView('df2')
return spark.sql("select count(*) from df1,df2 where df1.a = df2.a")

assert_gpu_and_cpu_are_equal_collect(do_it, conf=_adaptive_conf)


# Test the computeStats(...) implementation in GpuBatchScanExec
@ignore_order(local=True)
@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn)
def test_aqe_join_parquet_batch(spark_tmp_path, data_gen):
# force v2 source for parquet to use BatchScanExec
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
conf = copy_and_update(_adaptive_conf, {
"spark.sql.sources.useV1SourceList": ""
})

first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
lambda spark : unary_op_df(spark, data_gen).write.parquet(first_data_path))
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1'
with_cpu_session(
lambda spark : unary_op_df(spark, data_gen).write.parquet(second_data_path))
data_path = spark_tmp_path + '/PARQUET_DATA'

def do_it(spark):
spark.read.parquet(data_path).createOrReplaceTempView('df1')
spark.read.parquet(data_path).createOrReplaceTempView('df2')
return spark.sql("select count(*) from df1,df2 where df1.a = df2.a")

assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf)

# Test the map stage submission handling for GpuShuffleExchangeExec
@ignore_order(local=True)
def test_aqe_struct_self_join(spark_tmp_table_factory):
def do_join(spark):
data = [
(("Adam ", "", "Green"), "1", "M", 1000),
(("Bob ", "Middle", "Green"), "2", "M", 2000),
(("Cathy ", "", "Green"), "3", "F", 3000)
]
schema = (StructType()
.add("name", StructType()
.add("firstname", StringType())
.add("middlename", StringType())
.add("lastname", StringType()))
.add("id", StringType())
.add("gender", StringType())
.add("salary", IntegerType()))
df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df_name = spark_tmp_table_factory.get()
df.createOrReplaceTempView(df_name)
resultdf = spark.sql(
"select struct(name, struct(name.firstname, name.lastname) as newname)" +
" as col,name from " + df_name + " union" +
" select struct(name, struct(name.firstname, name.lastname) as newname) as col,name" +
" from " + df_name)
resultdf_name = spark_tmp_table_factory.get()
resultdf.createOrReplaceTempView(resultdf_name)
return spark.sql("select a.* from {} a, {} b where a.name=b.name".format(
resultdf_name, resultdf_name))
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_adaptive_conf)
14 changes: 14 additions & 0 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,3 +333,17 @@ def test_func(spark):
df.cache().count()
return df.selectExpr("b", "a")
assert_gpu_and_cpu_are_equal_collect(test_func, enable_vectorized_conf)

# For AQE, test the computeStats(...) implementation in GpuInMemoryTableScanExec
# NOTE: this test is here because the necessary cache configuration is only
# available when this test file is used
@ignore_order(local=True)
@allow_non_gpu("ShuffleExchangeExec", "ColumnarToRowExec")
@pytest.mark.parametrize("data_gen", integral_gens, ids=idfn)
def test_aqe_cache_join(data_gen):
conf = {'spark.sql.adaptive.enabled': 'true'}
def do_it(spark):
df1 = unary_op_df(spark, data_gen).orderBy('a').cache()
df2 = df1.alias('df2')
return df1.join(df2, df1.a == df2.a, 'Outer')
assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf)
6 changes: 0 additions & 6 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,6 @@ def do_join(spark):
assert_gpu_and_cpu_are_equal_collect(do_join)

@ignore_order(local=True)
@pytest.mark.skipif(is_databricks_runtime(),
reason="Disabled for databricks because of lack of AQE support, and "
"differences in BroadcastMode.transform")
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_right_broadcast_nested_loop_join_without_condition_empty_small_batch(join_type):
def do_join(spark):
Expand All @@ -155,9 +152,6 @@ def do_join(spark):
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': 'true'})

@ignore_order(local=True)
@pytest.mark.skipif(is_databricks_runtime(),
reason="Disabled for databricks because of lack of AQE support, and "
"differences in BroadcastMode.transform")
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
def test_empty_broadcast_hash_join(join_type):
def do_join(spark):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase

trait ShimLeafExecNode extends LeafExecNode {
// For AQE support in Databricks, all Exec nodes implement computeStats(). This is actually
// a recursive call to traverse the entire physical plan to aggregate this number. For the
// end of the computation, this means that all LeafExecNodes must implement this method to
// avoid a stack overflow. For now, based on feedback from Databricks, Long.MaxValue is
// sufficient to satisfy this computation.
override def computeStats(): Statistics = {
NVnavkumar marked this conversation as resolved.
Show resolved Hide resolved
Statistics(
sizeInBytes = Long.MaxValue
)
}
}

// DataSourceV2ScanExecBase actually extends LeafExecNode, so we extend that shim as well here.
trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase {
override def computeStats(): Statistics = {
Statistics(
sizeInBytes = Long.MaxValue
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase

trait ShimLeafExecNode extends LeafExecNode

trait ShimDataSourceV2ScanExecBase extends DataSourceV2ScanExecBase
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.execution.datasources.v2._

case class GpuBatchScanExec(
output: Seq[AttributeReference],
@transient scan: Scan) extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient scan: Scan) extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient lazy val batch: Batch = scan.toBatch

@transient override lazy val partitions: Seq[InputPartition] = batch.planInputPartitions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@ object AQEUtils {
ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan)
}

// currently we don't support AQE on Databricks
def isAdaptiveExecutionSupportedInSparkVersion: Boolean = false
def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ case class GpuShuffleExchangeExec(
override val outputPartitioning: Partitioning = cpuOutputPartitioning

// 'mapOutputStatisticsFuture' is only needed when enable AQE.
override def doMapOutputStatisticsFuture: Future[MapOutputStatistics] = {
@transient
override lazy val doMapOutputStatisticsFuture: Future[MapOutputStatistics] = {
if (inputBatchRDD.getNumPartitions == 0) {
Future.successful(null)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class GpuBatchScanExec(
output: Seq[AttributeReference],
@transient scan: Scan,
runtimeFilters: Seq[Expression] = Seq.empty)
extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient lazy val batch: Batch = scan.toBatch

// All expressions are filter expressions used on the CPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,5 @@ object AQEUtils {
ShuffleQueryStageExec(sqse.id, reusedExchange, sqse.originalPlan, sqse.isSparkExchange)
}

// currently we don't support AQE on Databricks
def isAdaptiveExecutionSupportedInSparkVersion: Boolean = false
def isAdaptiveExecutionSupportedInSparkVersion: Boolean = true
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,37 +15,27 @@
*/
package org.apache.spark.rapids.shims

import scala.concurrent.Future

import com.nvidia.spark.rapids.GpuPartitioning

import org.apache.spark.MapOutputStatistics
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{ShufflePartitionSpec, SparkPlan}
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBase, ShuffledBatchRDD}
import org.apache.spark.sql.rapids.execution.{GpuShuffleExchangeExecBaseWithMetrics, ShuffledBatchRDD}

case class GpuShuffleExchangeExec(
gpuOutputPartitioning: GpuPartitioning,
child: SparkPlan,
shuffleOrigin: ShuffleOrigin)(
cpuOutputPartitioning: Partitioning)
extends GpuShuffleExchangeExecBase(gpuOutputPartitioning, child) with ShuffleExchangeLike {
extends GpuShuffleExchangeExecBaseWithMetrics(gpuOutputPartitioning, child)
with ShuffleExchangeLike {

override def otherCopyArgs: Seq[AnyRef] = cpuOutputPartitioning :: Nil

override val outputPartitioning: Partitioning = cpuOutputPartitioning

// 'mapOutputStatisticsFuture' is only needed when enable AQE.
override def mapOutputStatisticsFuture: Future[MapOutputStatistics] =
if (inputBatchRDD.getNumPartitions == 0) {
Future.successful(null)
} else {
sparkContext.submitMapStage(shuffleDependencyColumnar)
}

override def numMappers: Int = shuffleDependencyColumnar.rdd.getNumPartitions

override def numPartitions: Int = shuffleDependencyColumnar.partitioner.numPartitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ case class GpuBatchScanExec(
@transient scan: Scan,
runtimeFilters: Seq[Expression] = Seq.empty,
keyGroupedPartitioning: Option[Seq[Expression]] = None)
extends DataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
extends ShimDataSourceV2ScanExecBase with GpuBatchScanExecMetrics {
@transient lazy val batch: Batch = scan.toBatch

// All expressions are filter expressions used on the CPU.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ import ai.rapids.cudf
import ai.rapids.cudf._
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.{ShimSparkPlan, ShimUnaryExecNode}
import com.nvidia.spark.rapids.shims.{ShimLeafExecNode, ShimSparkPlan, ShimUnaryExecNode}

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Descending, Expression, NamedExpression, NullIntolerant, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, RangePartitioning, SinglePartition, UnknownPartitioning}
import org.apache.spark.sql.execution.{LeafExecNode, ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.execution.{ProjectExec, SampleExec, SparkPlan}
import org.apache.spark.sql.rapids.{GpuPartitionwiseSampledRDD, GpuPoissonSampler, GpuPredicateHelper}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.types.{DataType, LongType}
Expand Down Expand Up @@ -591,7 +591,7 @@ case class GpuRangeExec(
numSlices: Int,
output: Seq[Attribute],
targetSizeBytes: Long)
extends LeafExecNode with GpuExec {
extends ShimLeafExecNode with GpuExec {

val numElements: BigInt = {
val safeStart = BigInt(start)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@
package org.apache.spark.sql.rapids

import com.nvidia.spark.rapids.GpuExec
import com.nvidia.spark.rapids.shims.ShimLeafExecNode
import org.apache.commons.lang3.StringUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.execution.{ExplainUtils, LeafExecNode}
import org.apache.spark.sql.execution.ExplainUtils
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.util.Utils

/** GPU implementation of Spark's `DataSourceScanExec` */
trait GpuDataSourceScanExec extends LeafExecNode with GpuExec {
trait GpuDataSourceScanExec extends ShimLeafExecNode with GpuExec {
def relation: BaseRelation
def tableIdentifier: Option[TableIdentifier]

Expand Down
Loading