Skip to content

Commit

Permalink
Merge branch 'branch-0.4' into fix-merge
Browse files Browse the repository at this point in the history
  • Loading branch information
jlowe committed Mar 16, 2021
2 parents 4101898 + 822448e commit e0df573
Show file tree
Hide file tree
Showing 16 changed files with 279 additions and 339 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2021-03-02
Generated on 2021-03-16

## Release 0.4

Expand Down Expand Up @@ -104,6 +104,12 @@ Generated on 2021-03-02
### PRs
|||
|:---|:---|
|[#1910](https://github.com/NVIDIA/spark-rapids/pull/1910)|Make hash partitioning match CPU|
|[#1927](https://github.com/NVIDIA/spark-rapids/pull/1927)|Change cuDF dependency to 0.18.1|
|[#1934](https://github.com/NVIDIA/spark-rapids/pull/1934)|Update documentation to use cudf version 0.18.1|
|[#1871](https://github.com/NVIDIA/spark-rapids/pull/1871)|Disable coalesce batch spilling to avoid cudf contiguous_split bug|
|[#1849](https://github.com/NVIDIA/spark-rapids/pull/1849)|Update changelog for 0.4|
|[#1744](https://github.com/NVIDIA/spark-rapids/pull/1744)|Fix NullPointerException on null partition insert|
|[#1842](https://github.com/NVIDIA/spark-rapids/pull/1842)|Update to note support for 3.0.2|
|[#1832](https://github.com/NVIDIA/spark-rapids/pull/1832)|Spark 3.1.1 shim no longer a snapshot shim|
|[#1831](https://github.com/NVIDIA/spark-rapids/pull/1831)|Spark 3.0.2 shim no longer a snapshot shim|
Expand Down
40 changes: 20 additions & 20 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -9672,19 +9672,19 @@ Accelerator support is described below.
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
Expand Down Expand Up @@ -9717,17 +9717,17 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ def start(self, rand):
self._start(rand, self._loop_values)
self._vals = [self._child.gen() for _ in range(0, self._length)]

class SetValuesGen(DataGen):
"""A set of values that are randomly selected"""
def __init__(self, data_type, data):
super().__init__(data_type, nullable=False)
self.nullable = any(x is None for x in data)
self._vals = data

def __repr__(self):
return super().__repr__() + '(' + str(self._child) + ')'

def start(self, rand):
data = self._vals
length = len(data)
self._start(rand, lambda : data[rand.randrange(0, length)])

FLOAT_MIN = -3.4028235E38
FLOAT_MAX = 3.4028235E38
NEG_FLOAT_NAN_MIN_VALUE = struct.unpack('f', struct.pack('I', 0xffffffff))[0]
Expand Down
37 changes: 37 additions & 0 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,40 @@ def do_join(spark):
return testurls.join(resolved, "Url", "inner")
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.autoBroadcastJoinThreshold': '-1'})

@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('cache_side', ['cache_left', 'cache_right'], ids=idfn)
@pytest.mark.parametrize('cpu_side', ['cache', 'not_cache'], ids=idfn)
@ignore_order
def test_half_cache_join(join_type, cache_side, cpu_side):
left_gen = [('a', SetValuesGen(LongType(), range(500))), ('b', IntegerGen())]
right_gen = [('r_a', SetValuesGen(LongType(), range(500))), ('c', LongGen())]
def do_join(spark):
# Try to force the shuffle to be split between CPU and GPU for the join
# so don't let the shuffle be on the GPU/CPU depending on how the test is configured
# when we repartition and cache the data
spark.conf.set('spark.rapids.sql.exec.ShuffleExchangeExec', cpu_side != 'cache')
left = gen_df(spark, left_gen, length=500)
right = gen_df(spark, right_gen, length=500)

if (cache_side == 'cache_left'):
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
left = left.repartition('a').selectExpr('b + 1 as b', 'a').cache()
left.count() # populate the cache
else:
#cache_right
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
right = right.repartition('r_a').selectExpr('c + 1 as c', 'r_a').cache()
right.count() # populate the cache
# Now turn it back so the other half of the shuffle will be on the oposite side
spark.conf.set('spark.rapids.sql.exec.ShuffleExchangeExec', cpu_side == 'cache')
return left.join(right, left.a == right.r_a, join_type)

# Even though Spark does not know the size of an RDD input so it will not do a broadcast join unless
# we tell it to, this is just to be safe
assert_gpu_and_cpu_are_equal_collect(do_join, {'spark.sql.autoBroadcastJoinThreshold': '1'})
39 changes: 38 additions & 1 deletion integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import ignore_order
import pyspark.sql.functions as f

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_union(data_gen):
Expand Down Expand Up @@ -45,3 +46,39 @@ def test_repartion_df(num_parts, length):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen_list, length=length).repartition(num_parts),
conf = allow_negative_scale_of_decimal_conf)

@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [1, 2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('gen', [
([('a', boolean_gen)], ['a']),
([('a', byte_gen)], ['a']),
([('a', short_gen)], ['a']),
([('a', int_gen)], ['a']),
([('a', long_gen)], ['a']),
pytest.param(([('a', float_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')),
pytest.param(([('a', double_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')),
([('a', decimal_gen_default)], ['a']),
([('a', decimal_gen_neg_scale)], ['a']),
([('a', decimal_gen_scale_precision)], ['a']),
([('a', decimal_gen_same_scale_precision)], ['a']),
([('a', decimal_gen_64bit)], ['a']),
([('a', string_gen)], ['a']),
([('a', null_gen)], ['a']),
([('a', byte_gen)], [f.col('a') - 5]),
([('a', long_gen)], [f.col('a') + 15]),
([('a', byte_gen), ('b', boolean_gen)], ['a', 'b']),
([('a', short_gen), ('b', string_gen)], ['a', 'b']),
([('a', int_gen), ('b', byte_gen)], ['a', 'b']),
([('a', long_gen), ('b', null_gen)], ['a', 'b']),
([('a', byte_gen), ('b', boolean_gen), ('c', short_gen)], ['a', 'b', 'c']),
([('a', short_gen), ('b', string_gen), ('c', int_gen)], ['a', 'b', 'c']),
([('a', decimal_gen_default), ('b', decimal_gen_64bit), ('c', decimal_gen_scale_precision)], ['a', 'b', 'c']),
], ids=idfn)
def test_hash_repartition_exact(gen, num_parts):
data_gen = gen[0]
part_on = gen[1]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen)\
.repartition(num_parts, *part_on)\
.selectExpr('spark_partition_id() as id', '*', 'hash(*)', 'pmod(hash(*),{})'.format(num_parts)),
conf = allow_negative_scale_of_decimal_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,65 +101,4 @@ class JoinsSuite extends SparkQueryCompareTestSuite {
mixedDfWithNulls, mixedDfWithNulls, sortBeforeRepart = true) {
(A, B) => A.join(B, A("longs") === B("longs"), "LeftAnti")
}

test("fixUpJoinConsistencyIfNeeded AQE on") {
// this test is only valid in Spark 3.0.1 and later due to AQE supporting the plugin
val isValidTestForSparkVersion = ShimLoader.getSparkShims.getSparkShimVersion match {
case SparkShimVersion(3, 0, 0) => false
case DatabricksShimVersion(3, 0, 0) => false
case _ => true
}
assume(isValidTestForSparkVersion)
testFixUpJoinConsistencyIfNeeded(true)
}

test("fixUpJoinConsistencyIfNeeded AQE off") {
testFixUpJoinConsistencyIfNeeded(false)
}

private def testFixUpJoinConsistencyIfNeeded(aqe: Boolean) {

val conf = shuffledJoinConf.clone()
.set("spark.sql.adaptive.enabled", String.valueOf(aqe))
.set("spark.rapids.sql.test.allowedNonGpu",
"BroadcastHashJoinExec,SortMergeJoinExec,SortExec,Upper")
.set("spark.rapids.sql.incompatibleOps.enabled", "false") // force UPPER onto CPU

withGpuSparkSession(spark => {
import spark.implicits._

def createStringDF(name: String, upper: Boolean = false): DataFrame = {
val countryNames = (0 until 1000).map(i => s"country_$i")
if (upper) {
countryNames.map(_.toUpperCase).toDF(name)
} else {
countryNames.toDF(name)
}
}

val left = createStringDF("c1")
.join(createStringDF("c2"), col("c1") === col("c2"))

val right = createStringDF("c3")
.join(createStringDF("c4"), col("c3") === col("c4"))

val join = left.join(right, upper(col("c1")) === col("c4"))

// call collect so that we get the final executed plan when AQE is on
join.collect()

val shuffleExec = TestUtils
.findOperator(join.queryExecution.executedPlan, _.isInstanceOf[ShuffleExchangeExec])
.get

val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
val reasons = shuffleExec.getTagValue(gpuSupportedTag).getOrElse(Set.empty)
assert(reasons.contains(
"other exchanges that feed the same join are on the CPU, and GPU " +
"hashing is not consistent with the CPU version"))

}, conf)

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,6 @@ class UnaryOperatorsSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("Test murmur3", mixedDfWithNulls) {
frame => frame.selectExpr("hash(longs, doubles, 1, null, 'stock string', ints, strings)")
frame => frame.selectExpr("hash(longs, 1, null, 'stock string', ints, strings)")
}
}
Loading

0 comments on commit e0df573

Please sign in to comment.