Skip to content

Commit

Permalink
Merge pull request NVIDIA#1937 from jlowe/fix-merge
Browse files Browse the repository at this point in the history
Fix merge conflict with branch-0.4
  • Loading branch information
jlowe authored Mar 16, 2021
2 parents d55d487 + e0df573 commit a6c5ef6
Show file tree
Hide file tree
Showing 20 changed files with 286 additions and 346 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Change log
Generated on 2021-03-02
Generated on 2021-03-16

## Release 0.4

Expand Down Expand Up @@ -104,6 +104,12 @@ Generated on 2021-03-02
### PRs
|||
|:---|:---|
|[#1910](https://github.com/NVIDIA/spark-rapids/pull/1910)|Make hash partitioning match CPU|
|[#1927](https://github.com/NVIDIA/spark-rapids/pull/1927)|Change cuDF dependency to 0.18.1|
|[#1934](https://github.com/NVIDIA/spark-rapids/pull/1934)|Update documentation to use cudf version 0.18.1|
|[#1871](https://github.com/NVIDIA/spark-rapids/pull/1871)|Disable coalesce batch spilling to avoid cudf contiguous_split bug|
|[#1849](https://github.com/NVIDIA/spark-rapids/pull/1849)|Update changelog for 0.4|
|[#1744](https://github.com/NVIDIA/spark-rapids/pull/1744)|Fix NullPointerException on null partition insert|
|[#1842](https://github.com/NVIDIA/spark-rapids/pull/1842)|Update to note support for 3.0.2|
|[#1832](https://github.com/NVIDIA/spark-rapids/pull/1832)|Spark 3.1.1 shim no longer a snapshot shim|
|[#1831](https://github.com/NVIDIA/spark-rapids/pull/1831)|Spark 3.0.2 shim no longer a snapshot shim|
Expand Down
2 changes: 1 addition & 1 deletion api_validation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ It requires cudf, rapids-4-spark and spark jars.

```
cd api_validation
// To run validation script on all version of Spark(3.0.0, 3.0.1 and 3.1.0-SNAPSHOT)
// To run validation script on all version of Spark(3.0.0, 3.0.1 and 3.1.1)
sh auditAllVersions.sh
// To run script on particular version we can use profile(spark300, spark301 and spark311)
Expand Down
2 changes: 1 addition & 1 deletion docs/additional-functionality/cache-serializer.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ nav_order: 2
utilize disk space to spill over. To read more about what storage levels are available look
at `StorageLevel.scala` in Spark.

Starting in Spark 3.1.0 users can add their own cache serializer, if they desire, by
Starting in Spark 3.1.1 users can add their own cache serializer, if they desire, by
setting the `spark.sql.cache.serializer` configuration. This is a static configuration
that is set once for the duration of a Spark application which means that you can only set the conf
before starting a Spark application and cannot be changed for that application's Spark
Expand Down
2 changes: 1 addition & 1 deletion docs/demo/Databricks/generate-init-script.ipynb
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"cells":[{"cell_type":"code","source":["dbutils.fs.mkdirs(\"dbfs:/databricks/init_scripts/\")\n \ndbutils.fs.put(\"/databricks/init_scripts/init.sh\",\"\"\"\n#!/bin/bash\nsudo wget -O /databricks/jars/rapids-4-spark_2.12-0.4.0.jar https://oss.sonatype.org/content/repositories/staging/com/nvidia/rapids-4-spark_2.12/0.4.0/rapids-4-spark_2.12-0.4.0.jar\nsudo wget -O /databricks/jars/cudf-0.18-cuda10-1.jar https://oss.sonatype.org/content/repositories/staging/ai/rapids/cudf/0.18/cudf-0.18-cuda10-1.jar\"\"\", True)"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["%sh\ncd ../../dbfs/databricks/init_scripts\npwd\nls -ltr\ncat init.sh"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":3}],"metadata":{"name":"generate-init-script","notebookId":2645746662301564},"nbformat":4,"nbformat_minor":0}
{"cells":[{"cell_type":"code","source":["dbutils.fs.mkdirs(\"dbfs:/databricks/init_scripts/\")\n \ndbutils.fs.put(\"/databricks/init_scripts/init.sh\",\"\"\"\n#!/bin/bash\nsudo wget -O /databricks/jars/rapids-4-spark_2.12-0.4.0.jar https://oss.sonatype.org/content/repositories/staging/com/nvidia/rapids-4-spark_2.12/0.4.0/rapids-4-spark_2.12-0.4.0.jar\nsudo wget -O /databricks/jars/cudf-0.18.1-cuda10-1.jar https://oss.sonatype.org/content/repositories/staging/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-1.jar\"\"\", True)"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["%sh\ncd ../../dbfs/databricks/init_scripts\npwd\nls -ltr\ncat init.sh"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":3}],"metadata":{"name":"generate-init-script","notebookId":2645746662301564},"nbformat":4,"nbformat_minor":0}
8 changes: 4 additions & 4 deletions docs/download.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ RAPIDS Accelerator For Apache Spark. See the [getting-started guide](https://nvi
## Release v0.4.0
### Download v0.4.0
* Download [RAPIDS Accelerator For Apache Spark v0.4.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.4.0/rapids-4-spark_2.12-0.4.0.jar)
* Download RAPIDS cuDF 0.18 for your system:
* [For CUDA 11.0 & NVIDIA driver 450.36+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18/cudf-0.18-cuda11.jar)
* [For CUDA 10.2 & NVIDIA driver 440.33+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18/cudf-0.18-cuda10-2.jar)
* [For CUDA 10.1 & NVIDIA driver 418.87+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18/cudf-0.18-cuda10-1.jar)
* Download RAPIDS cuDF 0.18.1 for your system:
* [For CUDA 11.0 & NVIDIA driver 450.36+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda11.jar)
* [For CUDA 10.2 & NVIDIA driver 440.33+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-2.jar)
* [For CUDA 10.1 & NVIDIA driver 418.87+](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-1.jar)

### Requirements
Hardware Requirements:
Expand Down
40 changes: 20 additions & 20 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -9719,19 +9719,19 @@ Accelerator support is described below.
<td>S</td>
<td>S</td>
<td>S</td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td>S</td>
<td>S*</td>
<td>S</td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
Expand Down Expand Up @@ -9764,17 +9764,17 @@ Accelerator support is described below.
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
<td><b>NS</b></td>
</tr>
<tr>
<td>result</td>
Expand Down
15 changes: 15 additions & 0 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,21 @@ def start(self, rand):
self._start(rand, self._loop_values)
self._vals = [self._child.gen() for _ in range(0, self._length)]

class SetValuesGen(DataGen):
"""A set of values that are randomly selected"""
def __init__(self, data_type, data):
super().__init__(data_type, nullable=False)
self.nullable = any(x is None for x in data)
self._vals = data

def __repr__(self):
return super().__repr__() + '(' + str(self._child) + ')'

def start(self, rand):
data = self._vals
length = len(data)
self._start(rand, lambda : data[rand.randrange(0, length)])

FLOAT_MIN = -3.4028235E38
FLOAT_MAX = 3.4028235E38
NEG_FLOAT_NAN_MIN_VALUE = struct.unpack('f', struct.pack('I', 0xffffffff))[0]
Expand Down
37 changes: 37 additions & 0 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,40 @@ def do_join(spark):
return testurls.join(resolved, "Url", "inner")
assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.autoBroadcastJoinThreshold': '-1'})

@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('cache_side', ['cache_left', 'cache_right'], ids=idfn)
@pytest.mark.parametrize('cpu_side', ['cache', 'not_cache'], ids=idfn)
@ignore_order
def test_half_cache_join(join_type, cache_side, cpu_side):
left_gen = [('a', SetValuesGen(LongType(), range(500))), ('b', IntegerGen())]
right_gen = [('r_a', SetValuesGen(LongType(), range(500))), ('c', LongGen())]
def do_join(spark):
# Try to force the shuffle to be split between CPU and GPU for the join
# so don't let the shuffle be on the GPU/CPU depending on how the test is configured
# when we repartition and cache the data
spark.conf.set('spark.rapids.sql.exec.ShuffleExchangeExec', cpu_side != 'cache')
left = gen_df(spark, left_gen, length=500)
right = gen_df(spark, right_gen, length=500)

if (cache_side == 'cache_left'):
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
left = left.repartition('a').selectExpr('b + 1 as b', 'a').cache()
left.count() # populate the cache
else:
#cache_right
# Try to force the shuffle to be split between CPU and GPU for the join
# by default if the operation after the shuffle is not on the GPU then
# don't do a GPU shuffle, so do something simple after the repartition
# to make sure that the GPU shuffle is used.
right = right.repartition('r_a').selectExpr('c + 1 as c', 'r_a').cache()
right.count() # populate the cache
# Now turn it back so the other half of the shuffle will be on the oposite side
spark.conf.set('spark.rapids.sql.exec.ShuffleExchangeExec', cpu_side == 'cache')
return left.join(right, left.a == right.r_a, join_type)

# Even though Spark does not know the size of an RDD input so it will not do a broadcast join unless
# we tell it to, this is just to be safe
assert_gpu_and_cpu_are_equal_collect(do_join, {'spark.sql.autoBroadcastJoinThreshold': '1'})
39 changes: 38 additions & 1 deletion integration_tests/src/main/python/repart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
from asserts import assert_gpu_and_cpu_are_equal_collect
from data_gen import *
from marks import ignore_order
import pyspark.sql.functions as f

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
def test_union(data_gen):
Expand Down Expand Up @@ -45,3 +46,39 @@ def test_repartion_df(num_parts, length):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen_list, length=length).repartition(num_parts),
conf = allow_negative_scale_of_decimal_conf)

@ignore_order(local=True) # To avoid extra data shuffle by 'sort on Spark' for this repartition test.
@pytest.mark.parametrize('num_parts', [1, 2, 10, 17, 19, 32], ids=idfn)
@pytest.mark.parametrize('gen', [
([('a', boolean_gen)], ['a']),
([('a', byte_gen)], ['a']),
([('a', short_gen)], ['a']),
([('a', int_gen)], ['a']),
([('a', long_gen)], ['a']),
pytest.param(([('a', float_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')),
pytest.param(([('a', double_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')),
([('a', decimal_gen_default)], ['a']),
([('a', decimal_gen_neg_scale)], ['a']),
([('a', decimal_gen_scale_precision)], ['a']),
([('a', decimal_gen_same_scale_precision)], ['a']),
([('a', decimal_gen_64bit)], ['a']),
([('a', string_gen)], ['a']),
([('a', null_gen)], ['a']),
([('a', byte_gen)], [f.col('a') - 5]),
([('a', long_gen)], [f.col('a') + 15]),
([('a', byte_gen), ('b', boolean_gen)], ['a', 'b']),
([('a', short_gen), ('b', string_gen)], ['a', 'b']),
([('a', int_gen), ('b', byte_gen)], ['a', 'b']),
([('a', long_gen), ('b', null_gen)], ['a', 'b']),
([('a', byte_gen), ('b', boolean_gen), ('c', short_gen)], ['a', 'b', 'c']),
([('a', short_gen), ('b', string_gen), ('c', int_gen)], ['a', 'b', 'c']),
([('a', decimal_gen_default), ('b', decimal_gen_64bit), ('c', decimal_gen_scale_precision)], ['a', 'b', 'c']),
], ids=idfn)
def test_hash_repartition_exact(gen, num_parts):
data_gen = gen[0]
part_on = gen[1]
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, data_gen)\
.repartition(num_parts, *part_on)\
.selectExpr('spark_partition_id() as id', '*', 'hash(*)', 'pmod(hash(*),{})'.format(num_parts)),
conf = allow_negative_scale_of_decimal_conf)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -101,65 +101,4 @@ class JoinsSuite extends SparkQueryCompareTestSuite {
mixedDfWithNulls, mixedDfWithNulls, sortBeforeRepart = true) {
(A, B) => A.join(B, A("longs") === B("longs"), "LeftAnti")
}

test("fixUpJoinConsistencyIfNeeded AQE on") {
// this test is only valid in Spark 3.0.1 and later due to AQE supporting the plugin
val isValidTestForSparkVersion = ShimLoader.getSparkShims.getSparkShimVersion match {
case SparkShimVersion(3, 0, 0) => false
case DatabricksShimVersion(3, 0, 0) => false
case _ => true
}
assume(isValidTestForSparkVersion)
testFixUpJoinConsistencyIfNeeded(true)
}

test("fixUpJoinConsistencyIfNeeded AQE off") {
testFixUpJoinConsistencyIfNeeded(false)
}

private def testFixUpJoinConsistencyIfNeeded(aqe: Boolean) {

val conf = shuffledJoinConf.clone()
.set("spark.sql.adaptive.enabled", String.valueOf(aqe))
.set("spark.rapids.sql.test.allowedNonGpu",
"BroadcastHashJoinExec,SortMergeJoinExec,SortExec,Upper")
.set("spark.rapids.sql.incompatibleOps.enabled", "false") // force UPPER onto CPU

withGpuSparkSession(spark => {
import spark.implicits._

def createStringDF(name: String, upper: Boolean = false): DataFrame = {
val countryNames = (0 until 1000).map(i => s"country_$i")
if (upper) {
countryNames.map(_.toUpperCase).toDF(name)
} else {
countryNames.toDF(name)
}
}

val left = createStringDF("c1")
.join(createStringDF("c2"), col("c1") === col("c2"))

val right = createStringDF("c3")
.join(createStringDF("c4"), col("c3") === col("c4"))

val join = left.join(right, upper(col("c1")) === col("c4"))

// call collect so that we get the final executed plan when AQE is on
join.collect()

val shuffleExec = TestUtils
.findOperator(join.queryExecution.executedPlan, _.isInstanceOf[ShuffleExchangeExec])
.get

val gpuSupportedTag = TreeNodeTag[Set[String]]("rapids.gpu.supported")
val reasons = shuffleExec.getTagValue(gpuSupportedTag).getOrElse(Set.empty)
assert(reasons.contains(
"other exchanges that feed the same join are on the CPU, and GPU " +
"hashing is not consistent with the CPU version"))

}, conf)

}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,6 +37,6 @@ class UnaryOperatorsSuite extends SparkQueryCompareTestSuite {
}

testSparkResultsAreEqual("Test murmur3", mixedDfWithNulls) {
frame => frame.selectExpr("hash(longs, doubles, 1, null, 'stock string', ints, strings)")
frame => frame.selectExpr("hash(longs, 1, null, 'stock string', ints, strings)")
}
}
Loading

0 comments on commit a6c5ef6

Please sign in to comment.