diff --git a/CHANGELOG.md b/CHANGELOG.md index 979ba888d7a..adfa27ff6d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,18 @@ # Change log -Generated on 2021-03-18 +Generated on 2021-03-23 + +## Release 0.4.1 + +### Bugs Fixed +||| +|:---|:---| +|[#1985](https://github.com/NVIDIA/spark-rapids/issues/1985)|[BUG] broadcast exchange can fail on 0.4| + +### PRs +||| +|:---|:---| +|[#1990](https://github.com/NVIDIA/spark-rapids/pull/1990)|Prepare for v0.4.1 release| +|[#1988](https://github.com/NVIDIA/spark-rapids/pull/1988)|broadcast exchange can fail when job group set| ## Release 0.4 @@ -53,6 +66,7 @@ Generated on 2021-03-18 ### Bugs Fixed ||| |:---|:---| +|[#1885](https://github.com/NVIDIA/spark-rapids/issues/1885)|[BUG] natural join on string key results in a data frame with spurious NULLs| |[#1785](https://github.com/NVIDIA/spark-rapids/issues/1785)|[BUG] Rapids pytest integration tests FAILED on Yarn cluster with unrecognized arguments: `--std_input_path=src/test/resources/`| |[#999](https://github.com/NVIDIA/spark-rapids/issues/999)|[BUG] test_multi_types_window_aggs_for_rows_lead_lag fails against Spark 3.1.0| |[#1818](https://github.com/NVIDIA/spark-rapids/issues/1818)|[BUG] unmoored doc comment warnings in GpuCast| @@ -104,6 +118,7 @@ Generated on 2021-03-18 ### PRs ||| |:---|:---| +|[#1963](https://github.com/NVIDIA/spark-rapids/pull/1963)|Update changelog 0.4 [skip ci]| |[#1960](https://github.com/NVIDIA/spark-rapids/pull/1960)|Replace sonatype staging link with maven central link| |[#1945](https://github.com/NVIDIA/spark-rapids/pull/1945)|Update changelog 0.4 [skip ci]| |[#1910](https://github.com/NVIDIA/spark-rapids/pull/1910)|Make hash partitioning match CPU| diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f656390193f..3678d0c0fec 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -40,6 +40,13 @@ mvn verify After a successful build the RAPIDS Accelerator jar will be in the `dist/target/` directory. +### Building against different CUDA Toolkit versions + +You can build against different versions of the CUDA Toolkit by using one of the following profiles: +* `-Pcuda10-1` (CUDA 10.1, default) +* `-Pcuda10-2` (CUDA 10.2) +* `-Pcuda11` (CUDA 11.0) + ## Code contributions ### Your first issue diff --git a/docs/configs.md b/docs/configs.md index 56e6d5411eb..8abcf8f00a2 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -54,6 +54,7 @@ Name | Description | Default Value spark.rapids.sql.castFloatToDecimal.enabled|Casting from floating point types to decimal on the GPU returns results that have tiny difference compared to results returned from CPU.|false spark.rapids.sql.castFloatToIntegralTypes.enabled|Casting from floating point types to integral types on the GPU supports a slightly different range of values when using Spark 3.1.0 or later. Refer to the CAST documentation for more details.|false spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|false +spark.rapids.sql.castStringToDecimal.enabled|When set to true, enables casting from strings to decimal type on the GPU. Currently string to decimal type on the GPU might produce results which slightly differed from the correct results when the string represents any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given input string "99999999999999999". The cause of divergence is that we can not cast strings containing scientific notation to decimal directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. The first step may lead to precision loss.|false spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false spark.rapids.sql.castStringToInteger.enabled|When set to true, enables casting from strings to integer types (byte, short, int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect results when the string represents a number larger than Long.MaxValue or smaller than Long.MinValue.|false spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false @@ -258,7 +259,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Year|`year`|Returns the year from a date or timestamp|true|None| spark.rapids.sql.expression.AggregateExpression| |Aggregate expression|true|None| spark.rapids.sql.expression.Average|`avg`, `mean`|Average aggregate operator|true|None| -spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of elements, now only supported by windowing.|false|This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases.| +spark.rapids.sql.expression.CollectList|`collect_list`|Collect a list of elements, now only supported by windowing.|true|None| spark.rapids.sql.expression.Count|`count`|Count aggregate operator|true|None| spark.rapids.sql.expression.First|`first_value`, `first`|first aggregate operator|true|None| spark.rapids.sql.expression.Last|`last`, `last_value`|last aggregate operator|true|None| diff --git a/docs/demo/Databricks/generate-init-script.ipynb b/docs/demo/Databricks/generate-init-script.ipynb index 657421da97b..1c3170471b2 100644 --- a/docs/demo/Databricks/generate-init-script.ipynb +++ b/docs/demo/Databricks/generate-init-script.ipynb @@ -1 +1 @@ -{"cells":[{"cell_type":"code","source":["dbutils.fs.mkdirs(\"dbfs:/databricks/init_scripts/\")\n \ndbutils.fs.put(\"/databricks/init_scripts/init.sh\",\"\"\"\n#!/bin/bash\nsudo wget -O /databricks/jars/rapids-4-spark_2.12-0.4.0.jar https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.4.0/rapids-4-spark_2.12-0.4.0.jar\nsudo wget -O /databricks/jars/cudf-0.18.1-cuda10-1.jar https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-1.jar\"\"\", True)"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["%sh\ncd ../../dbfs/databricks/init_scripts\npwd\nls -ltr\ncat init.sh"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":3}],"metadata":{"name":"generate-init-script","notebookId":2645746662301564},"nbformat":4,"nbformat_minor":0} +{"cells":[{"cell_type":"code","source":["dbutils.fs.mkdirs(\"dbfs:/databricks/init_scripts/\")\n \ndbutils.fs.put(\"/databricks/init_scripts/init.sh\",\"\"\"\n#!/bin/bash\nsudo wget -O /databricks/jars/rapids-4-spark_2.12-0.4.1.jar https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.4.1/rapids-4-spark_2.12-0.4.1.jar\nsudo wget -O /databricks/jars/cudf-0.18.1-cuda10-1.jar https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-1.jar\"\"\", True)"],"metadata":{},"outputs":[],"execution_count":1},{"cell_type":"code","source":["%sh\ncd ../../dbfs/databricks/init_scripts\npwd\nls -ltr\ncat init.sh"],"metadata":{},"outputs":[],"execution_count":2},{"cell_type":"code","source":[""],"metadata":{},"outputs":[],"execution_count":3}],"metadata":{"name":"generate-init-script","notebookId":2645746662301564},"nbformat":4,"nbformat_minor":0} diff --git a/docs/download.md b/docs/download.md index 9170a8ba0e5..2cb309a4cf8 100644 --- a/docs/download.md +++ b/docs/download.md @@ -14,6 +14,41 @@ The RAPIDS Accelerator For Apache Spark consists of two jars: a plugin jar along that is either preinstalled in the Spark classpath on all nodes or submitted with each job that uses the RAPIDS Accelerator For Apache Spark. See the [getting-started guide](https://nvidia.github.io/spark-rapids/Getting-Started/) for more details. +## Release v0.4.1 + +This is a patch release based on version 0.4.0 with the following additional fixes: +* Broadcast exchange can fail when job group is set + +The release is supported on Apache Spark 3.0.0, 3.0.1, 3.0.2, 3.1.1, Databricks 7.3 ML LTS and +Google Cloud Platform Dataproc 2.0. + +The list of all supported operations is provided [here](supported_ops.md). + +For a detailed list of changes, please refer to the +[CHANGELOG](https://github.com/NVIDIA/spark-rapids/blob/main/CHANGELOG.md). + +Hardware Requirements: + + GPU Architecture: NVIDIA Pascalâ„¢ or better (Tested on V100, T4 and A100 GPU) + +Software Requirements: + + OS: Ubuntu 16.04, Ubuntu 18.04 or CentOS 7 + + CUDA & Nvidia Drivers: 10.1.2 & v418.87+, 10.2 & v440.33+ or 11.0 & v450.36+ + + Apache Spark 3.0, 3.0.1, 3.0.2, 3.1.1, Databricks 7.3 ML LTS Runtime, or GCP Dataproc 2.0 + + Apache Hadoop 2.10+ or 3.1.1+ (3.1.1 for nvidia-docker version 2) + + Python 3.6+, Scala 2.12, Java 8 + +### Download v0.4.1 +* [RAPIDS Spark Package](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.4.1/rapids-4-spark_2.12-0.4.1.jar) +* [cuDF 11.0 Package](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda11.jar) +* [cuDF 10.2 Package](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-2.jar) +* [cuDF 10.1 Package](https://repo1.maven.org/maven2/ai/rapids/cudf/0.18.1/cudf-0.18.1-cuda10-1.jar) + ## Release v0.4.0 ### Download v0.4.0 * Download [RAPIDS Accelerator For Apache Spark v0.4.0](https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/0.4.0/rapids-4-spark_2.12-0.4.0.jar) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index b115249e8f6..1ab7049890e 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -402,7 +402,7 @@ Accelerator supports are described below. NS NS NS -NS +PS* (unionByName will not optionally impute nulls for missing struct fields when the column is a struct and there are non-overlapping fields; missing nested BINARY, CALENDAR, ARRAY, MAP, STRUCT, UDT) NS @@ -699,9 +699,9 @@ Accelerator supports are described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -9719,10 +9719,10 @@ Accelerator support is described below. S S S -NS -NS -NS -NS +S +S +S +S* S S* S @@ -10573,9 +10573,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10616,9 +10616,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10659,9 +10659,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -10702,9 +10702,9 @@ Accelerator support is described below. NS NS NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS -NS -NS +PS* (missing nested DECIMAL, NULL, BINARY, CALENDAR, MAP, UDT) NS @@ -12329,8 +12329,8 @@ Accelerator support is described below. -S* -S* +PS* (missing nested BINARY, CALENDAR, UDT) +PS* (missing nested BINARY, CALENDAR, UDT) @@ -16435,7 +16435,7 @@ Accelerator support is described below. CollectList `collect_list` Collect a list of elements, now only supported by windowing. -This is disabled by default because for now the GPU collects null values to a list, but Spark does not. This will be fixed in future releases. +None aggregation input NS @@ -18029,7 +18029,7 @@ and the accelerator produces the same result. S S* S -NS +S* S NS @@ -18433,7 +18433,7 @@ and the accelerator produces the same result. S S* S -NS +S* S NS diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 90261c5e7af..a44274af01b 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -18,8 +18,9 @@ from data_gen import * from marks import incompat, approximate_float from pyspark.sql.types import * -from spark_session import with_cpu_session, with_gpu_session, with_spark_session, is_before_spark_310 +from spark_session import with_cpu_session, with_gpu_session, with_spark_session, is_before_spark_311 import pyspark.sql.functions as f +from pyspark.sql.utils import IllegalArgumentException decimal_gens_not_max_prec = [decimal_gen_neg_scale, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit] @@ -69,7 +70,10 @@ def test_multiplication_mixed(lhs, rhs): f.col('a') * f.col('b')), conf=allow_negative_scale_of_decimal_conf) -@pytest.mark.parametrize('data_gen', [double_gen, decimal_gen_neg_scale, DecimalGen(6, 3), DecimalGen(5, 5), DecimalGen(6, 0)], ids=idfn) +@pytest.mark.parametrize('data_gen', [double_gen, decimal_gen_neg_scale, DecimalGen(6, 3), + DecimalGen(5, 5), DecimalGen(6, 0), +pytest.param(DecimalGen(38, 21), marks=pytest.mark.xfail(reason="The precision is too large to be supported on the GPU", raises=IllegalArgumentException)), +pytest.param(DecimalGen(21, 17), marks=pytest.mark.xfail(reason="The precision is too large to be supported on the GPU", raises=IllegalArgumentException))], ids=idfn) def test_division(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -524,7 +528,7 @@ def _test_div_by_zero(ansi_mode, expr): @pytest.mark.parametrize('expr', ['1/0', 'a/0', 'a/b']) -@pytest.mark.xfail(condition=is_before_spark_310(), reason='https://github.com/apache/spark/pull/29882') +@pytest.mark.xfail(condition=is_before_spark_311(), reason='https://github.com/apache/spark/pull/29882') def test_div_by_zero_ansi(expr): _test_div_by_zero(ansi_mode='ansi', expr=expr) diff --git a/integration_tests/src/main/python/collection_ops_test.py b/integration_tests/src/main/python/collection_ops_test.py index 367a297ed3f..6c194e69379 100644 --- a/integration_tests/src/main/python/collection_ops_test.py +++ b/integration_tests/src/main/python/collection_ops_test.py @@ -18,7 +18,11 @@ from data_gen import * from pyspark.sql.types import * -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +nested_gens = [ArrayGen(LongGen()), + StructGen([("a", LongGen())]), + MapGen(StringGen(pattern='key_[0-9]', nullable=False), StringGen())] + +@pytest.mark.parametrize('data_gen', all_gen + nested_gens, ids=idfn) @pytest.mark.parametrize('size_of_null', ['true', 'false'], ids=idfn) def test_size_of_array(data_gen, size_of_null): gen = ArrayGen(data_gen) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index fe40ed866cd..1ebcfb5c923 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -844,6 +844,26 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit] +# Pyarrow will complain the error as below if the timestamp is out of range for both CPU and GPU, +# so narrow down the time range to avoid exceptions causing test failures. +# +# "pyarrow.lib.ArrowInvalid: Casting from timestamp[us, tz=UTC] to timestamp[ns] +# would result in out of bounds timestamp: 51496791452587000" +# +# This issue has been fixed in pyarrow by the PR https://github.com/apache/arrow/pull/7169 +# However it still requires PySpark to specify the new argument "timestamp_as_object". +arrow_common_gen = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, date_gen, + TimestampGen(start=datetime(1970, 1, 1, tzinfo=timezone.utc), + end=datetime(2262, 1, 1, tzinfo=timezone.utc))] + +arrow_array_gens = [ArrayGen(subGen) for subGen in arrow_common_gen] + nested_array_gens_sample + +arrow_one_level_struct_gen = StructGen([ + ['child'+str(i), sub_gen] for i, sub_gen in enumerate(arrow_common_gen)]) + +arrow_struct_gens = [arrow_one_level_struct_gen, + StructGen([['child0', ArrayGen(short_gen)], ['child1', arrow_one_level_struct_gen]])] # This function adds a new column named uniq_int where each row # has a new unique integer value. It just starts at 0 and diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index fd8deda76e9..d554f769adb 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -20,7 +20,7 @@ from pyspark.sql.types import * from marks import * import pyspark.sql.functions as f -from spark_session import with_spark_session, is_spark_300, is_before_spark_310 +from spark_session import with_spark_session, is_spark_300, is_before_spark_311 _no_nans_float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', 'spark.rapids.sql.hasNans': 'false', @@ -262,9 +262,11 @@ def test_hash_multiple_mode_query_avg_distincts(data_gen, conf): @ignore_order @incompat @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) -@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), - ids=idfn) -def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): +@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", @@ -274,6 +276,7 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): 'sum(distinct a),' + 'count(distinct b),' + 'count(a),' + + 'count(),' + 'sum(a),' + 'min(a),'+ 'max(a) from hash_agg_table group by a', @@ -286,12 +289,16 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) -def test_hash_query_max_with_multiple_distincts(data_gen, conf): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_query_max_with_multiple_distincts(data_gen, conf, parameterless): + conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=100), "hash_agg_table", 'select max(c),' + 'sum(distinct a),' + + 'count(),' + 'count(distinct b) from hash_agg_table group by a', conf) @@ -336,12 +343,16 @@ def test_hash_query_max_bug(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys, _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) -def test_hash_agg_with_nan_keys(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_hash_agg_with_nan_keys(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_sql( lambda spark : gen_df(spark, data_gen, length=1024), "hash_agg_table", 'select a, ' - 'count(*) as count_stars, ' + 'count(*) as count_stars, ' + 'count() as count_parameterless, ' 'count(b) as count_bees, ' 'sum(b) as sum_of_bees, ' 'max(c) as max_seas, ' @@ -380,7 +391,10 @@ def test_count_distinct_with_nan_floats(data_gen): @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) -def test_generic_reductions(data_gen): +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_generic_reductions(data_gen, parameterless): + _no_nans_float_conf.update({'spark.sql.legacy.allowParameterlessCount': parameterless}) assert_gpu_and_cpu_are_equal_collect( # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic @@ -391,16 +405,30 @@ def test_generic_reductions(data_gen): 'first(a)', 'last(a)', 'count(a)', + 'count()', 'count(1)'), conf = _no_nans_float_conf) +@pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) +@pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( + condition=not is_before_spark_311(), reason="parameterless count not supported by default in Spark 3.1+"))]) +def test_count(data_gen, parameterless): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, data_gen) \ + .selectExpr( + 'count(a)', + 'count()', + 'count()', + 'count(1)'), + conf = {'spark.sql.legacy.allowParameterlessCount': parameterless}) + @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) def test_distinct_count_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'count(DISTINCT a)')) -@pytest.mark.xfail(condition=is_before_spark_310(), +@pytest.mark.xfail(condition=is_before_spark_311(), reason='Spark fixed distinct count of NaNs in 3.1') @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_distinct_float_count_reductions(data_gen): diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 957dae7053b..59daabda280 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -18,7 +18,7 @@ from conftest import is_databricks_runtime, is_emr_runtime from data_gen import * from marks import ignore_order, allow_non_gpu, incompat -from spark_session import with_cpu_session, with_spark_session, is_before_spark_310 +from spark_session import with_cpu_session, with_spark_session all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), BooleanGen(), DateGen(), TimestampGen(), null_gen, diff --git a/integration_tests/src/main/python/repart_test.py b/integration_tests/src/main/python/repart_test.py index 902ec7e9a75..7225ab451a1 100644 --- a/integration_tests/src/main/python/repart_test.py +++ b/integration_tests/src/main/python/repart_test.py @@ -15,16 +15,42 @@ import pytest from asserts import assert_gpu_and_cpu_are_equal_collect +from spark_session import is_before_spark_311 from data_gen import * from marks import ignore_order import pyspark.sql.functions as f -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +nested_scalar_mark=pytest.mark.xfail(reason="https://github.com/NVIDIA/spark-rapids/issues/1459") +@pytest.mark.parametrize('data_gen', [pytest.param((StructGen([['child0', DecimalGen(7, 2)]]), + StructGen([['child1', IntegerGen()]])), marks=nested_scalar_mark), + (StructGen([['child0', DecimalGen(7, 2)]], nullable=False), + StructGen([['child1', IntegerGen()]], nullable=False))], ids=idfn) +@pytest.mark.skipif(is_before_spark_311(), reason="This is supported only in Spark 3.1.1+") +# This tests the union of DF of structs with different types of cols as long as the struct itself +# isn't null. This is a limitation in cudf because we don't support nested types as literals +def test_union_struct_missing_children(data_gen): + left_gen, right_gen = data_gen + assert_gpu_and_cpu_are_equal_collect( + lambda spark : binary_op_df(spark, left_gen).unionByName(binary_op_df( + spark, right_gen), True)) + +@pytest.mark.parametrize('data_gen', all_gen + [all_basic_struct_gen, StructGen([['child0', DecimalGen(7, 2)]])], ids=idfn) +# This tests union of two DFs of two cols each. The types of the left col and right col is the same def test_union(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).union(binary_op_df(spark, data_gen))) -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen + [pytest.param(all_basic_struct_gen, marks=nested_scalar_mark), + pytest.param(StructGen([[ 'child0', DecimalGen(7, 2)]], nullable=False), marks=nested_scalar_mark)]) +@pytest.mark.skipif(is_before_spark_311(), reason="This is supported only in Spark 3.1.1+") +# This tests the union of two DFs of structs with missing child column names. The missing child +# column will be replaced by nulls in the output DF. This is a feature added in 3.1+ +def test_union_by_missing_col_name(data_gen): + assert_gpu_and_cpu_are_equal_collect( + lambda spark : binary_op_df(spark, data_gen).withColumnRenamed("a", "x") + .unionByName(binary_op_df(spark, data_gen).withColumnRenamed("a", "y"), True)) + +@pytest.mark.parametrize('data_gen', all_gen + [all_basic_struct_gen, StructGen([['child0', DecimalGen(7, 2)]])], ids=idfn) def test_union_by_name(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).unionByName(binary_op_df(spark, data_gen))) @@ -55,8 +81,10 @@ def test_repartion_df(num_parts, length): ([('a', short_gen)], ['a']), ([('a', int_gen)], ['a']), ([('a', long_gen)], ['a']), - pytest.param(([('a', float_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')), - pytest.param(([('a', double_gen)], ['a']), marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1914')), + ([('a', float_gen)], ['a']), + ([('a', double_gen)], ['a']), + ([('a', timestamp_gen)], ['a']), + ([('a', date_gen)], ['a']), ([('a', decimal_gen_default)], ['a']), ([('a', decimal_gen_neg_scale)], ['a']), ([('a', decimal_gen_scale_precision)], ['a']), @@ -71,6 +99,8 @@ def test_repartion_df(num_parts, length): ([('a', int_gen), ('b', byte_gen)], ['a', 'b']), ([('a', long_gen), ('b', null_gen)], ['a', 'b']), ([('a', byte_gen), ('b', boolean_gen), ('c', short_gen)], ['a', 'b', 'c']), + ([('a', float_gen), ('b', double_gen), ('c', short_gen)], ['a', 'b', 'c']), + ([('a', timestamp_gen), ('b', date_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', short_gen), ('b', string_gen), ('c', int_gen)], ['a', 'b', 'c']), ([('a', decimal_gen_default), ('b', decimal_gen_64bit), ('c', decimal_gen_scale_precision)], ['a', 'b', 'c']), ], ids=idfn) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index 9f398f0c286..9d90a0dd66d 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -19,7 +19,7 @@ from marks import * from pyspark.sql.types import * import pyspark.sql.functions as f -from spark_session import is_before_spark_310 +from spark_session import is_before_spark_311 orderable_not_null_gen = [ByteGen(nullable=False), ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False), FloatGen(nullable=False), DoubleGen(nullable=False), BooleanGen(nullable=False), @@ -116,9 +116,9 @@ def test_single_sort_in_part(data_gen, order): conf = allow_negative_scale_of_decimal_conf) orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen, - pytest.param(float_gen, marks=pytest.mark.xfail(condition=is_before_spark_310(), + pytest.param(float_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(), reason='Spark has -0.0 < 0.0 before Spark 3.1')), - pytest.param(double_gen, marks=pytest.mark.xfail(condition=is_before_spark_310(), + pytest.param(double_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(), reason='Spark has -0.0 < 0.0 before Spark 3.1')), boolean_gen, timestamp_gen, date_gen, string_gen, null_gen] + decimal_gens @pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 22ab75370ab..b4e05888b5c 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -97,5 +97,5 @@ def with_gpu_session(func, conf={}): def is_spark_300(): return (spark_version() == "3.0.0" or spark_version().startswith('3.0.0-amzn')) -def is_before_spark_310(): - return spark_version() < "3.1.0" +def is_before_spark_311(): + return spark_version() < "3.1.1" diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 95cbd8cbeb8..36ba96ac499 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -45,6 +45,8 @@ 'spark.rapids.sql.exec.WindowInPandasExec': 'true' } +data_gens_nested_for_udf = arrow_array_gens + arrow_struct_gens + #################################################################### # NOTE: pytest does not play well with pyspark udfs, because pyspark # tries to import the dependencies for top level functions and @@ -78,6 +80,17 @@ def iterator_add(to_process: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[ conf=arrow_udf_conf) +@pytest.mark.parametrize('data_gen', data_gens_nested_for_udf, ids=idfn) +def test_pandas_scalar_udf_nested_type(data_gen): + def nested_size(nested): + return pd.Series([nested.size]).repeat(len(nested)) + + my_udf = f.pandas_udf(nested_size, returnType=LongType()) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: unary_op_df(spark, data_gen).select(my_udf(f.col('a'))), + conf=arrow_udf_conf) + + @approximate_float @allow_non_gpu('AggregateInPandasExec', 'PythonUDF', 'Alias') @pytest.mark.parametrize('data_gen', integral_gens, ids=idfn) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index c9889791278..74eb6d00e3c 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -14,7 +14,6 @@ import pytest -from spark_session import is_before_spark_310 from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql from data_gen import * from marks import * @@ -262,5 +261,4 @@ def test_window_aggs_for_rows_collect_list(): collect_list(c_struct) over (partition by a order by b,c_int rows between CURRENT ROW and UNBOUNDED FOLLOWING) as collect_struct from window_collect_table - ''', - {'spark.rapids.sql.expression.CollectList': 'true'}) + ''') diff --git a/jenkins/databricks/create.py b/jenkins/databricks/create.py index ef632bf1722..a6e14f3e157 100644 --- a/jenkins/databricks/create.py +++ b/jenkins/databricks/create.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020, NVIDIA CORPORATION. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. diff --git a/scripts/generate-changelog b/scripts/generate-changelog index 5b888ab6f35..d6138847d39 100755 --- a/scripts/generate-changelog +++ b/scripts/generate-changelog @@ -44,11 +44,11 @@ Github personal access token: https://github.com/settings/tokens, and make you h Usage: cd spark-rapids/ - # generate changelog for release 0.1,0.2,0.3,0.4 - scripts/generate-changelog --token= --releases=0.1,0.2,0.3,0.4 + # generate changelog for release 0.1,0.2,0.3,0.4,0.4.1 + scripts/generate-changelog --token= --releases=0.1,0.2,0.3,0.4,0.4.1 - # generate changelog for release 0.1,0.2,0.3,0.4 to /tmp/CHANGELOG.md - GITHUB_TOKEN= scripts/generate-changelog --releases=0.1,0.2,0.3,0.4 --path=/tmp/CHANGELOG.md + # generate changelog for release 0.1,0.2,0.3,0.4,0.4.1 to /tmp/CHANGELOG.md + GITHUB_TOKEN= scripts/generate-changelog --releases=0.1,0.2,0.3,0.4,0.4.1 --path=/tmp/CHANGELOG.md """ import os import sys diff --git a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala index c1e450f65f5..5e4f709b899 100644 --- a/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala +++ b/shuffle-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/ucx/UCX.scala @@ -478,7 +478,7 @@ class UCX(executorId: Int, usingWakeupFeature: Boolean = true) extends AutoClose /** * Return rkeys (if we have registered memory) */ - private lazy val localRkeys: Seq[ByteBuffer] = registeredMemory.synchronized { + private def localRkeys: Seq[ByteBuffer] = registeredMemory.synchronized { while (pendingRegistration) { registeredMemory.wait(100) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 380f4a9a40c..f996df38307 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -78,6 +78,18 @@ class CastExprMeta[INPUT <: CastBase]( "for more details. To enable this operation on the GPU, set" + s" ${RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP} to true.") } + // FIXME: https://github.com/NVIDIA/spark-rapids/issues/2019 + if (!conf.isCastStringToDecimalEnabled && cast.child.dataType == DataTypes.StringType && + cast.dataType.isInstanceOf[DecimalType]) { + willNotWorkOnGpu("Currently string to decimal type on the GPU might produce results which " + + "slightly differed from the correct results when the string represents any number " + + "exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU " + + "returns 99999999999999987 given input string \"99999999999999999\". The cause of " + + "divergence is that we can not cast strings containing scientific notation to decimal " + + "directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. " + + "The first step may lead to precision loss. To enable this operation on the GPU, set " + + s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.") + } } def buildTagMessage(entry: ConfEntry[_]): String = { @@ -387,6 +399,14 @@ case class GpuCast( } } } + case (StringType, dt: DecimalType) => + // To apply HALF_UP rounding strategy during casting to decimal, we firstly cast + // string to fp64. Then, cast fp64 to target decimal type to enforce HALF_UP rounding. + withResource(input.getBase.strip()) { trimmed => + withResource(castStringToFloats(trimmed, ansiMode, DType.FLOAT64)) { fp => + castFloatsToDecimal(fp, dt) + } + } case (ShortType | IntegerType | LongType | ByteType | StringType, BinaryType) => input.getBase.asByteList(true) @@ -1050,16 +1070,25 @@ case class GpuCast( } withResource(checkedInput) { checked => + val targetType = DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale) // If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not // be created because of precision overflow. In this case, we perform casting op directly. - if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { - checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + val casted = if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { + checked.castTo(targetType) } else { val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1)) withResource(checked.castTo(containerType)) { container => container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) } } + // Cast NaN values to nulls + withResource(casted) { casted => + withResource(input.isNan) { inputIsNan => + withResource(Scalar.fromNull(targetType)) { nullScalar => + inputIsNan.ifElse(nullScalar, casted) + } + } + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala index ebb764270bb..82ddd40a672 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuHashPartitioning.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, Table} +import ai.rapids.cudf.{ColumnVector, DType, NvtxColor, NvtxRange, OrderByArg, Table} import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, Distribution, HashClusteredDistribution} @@ -65,7 +65,7 @@ case class GpuHashPartitioning(expressions: Seq[Expression], numPartitions: Int) allColumns += parts allColumns ++= GpuColumnVector.extractBases(batch) withResource(new Table(allColumns: _*)) { fullTable => - fullTable.orderBy(Table.asc(0)) + fullTable.orderBy(OrderByArg.asc(0)) } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index ef24236b860..7f8b5853c15 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -840,7 +840,8 @@ class GpuOrcPartitionReader( if (debugDumpPrefix != null) { dumpOrcData(dataBuffer, dataSize) } - val includedColumns = ctx.updatedReadSchema.getFieldNames.asScala + val fieldNames = ctx.updatedReadSchema.getFieldNames.asScala.toArray + val includedColumns = requestedMapping.map(_.map(fieldNames(_))).getOrElse(fieldNames) val parseOpts = ORCOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) .withNumPyTypes(false) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 88c4c8164a5..e9c1f42607a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.python.rapids.GpuAggregateInPandasExecMeta import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.hive.rapids.GpuHiveOverrides import org.apache.spark.sql.internal.SQLConf @@ -1724,29 +1725,39 @@ object GpuOverrides { (childExprs.head.dataType, childExprs(1).dataType) match { case (l: DecimalType, r: DecimalType) => val outputType = GpuDivideUtil.decimalDataType(l, r) - // We will never hit a case where outputType.precision < outputType.scale + r.scale. - // So there is no need to protect against that. - // The only two cases in which there is a possibility of the intermediary scale - // exceeding the intermediary precision is when l.precision < l.scale or l - // .precision < 0, both of which aren't possible. - // Proof: - // case 1: - // outputType.precision = p1 - s1 + s2 + s1 + p2 + 1 + 1 - // outputType.scale = p1 + s2 + p2 + 1 + 1 - // To find out if outputType.precision < outputType.scale simplifies to p1 < s1, - // which is never possible - // - // case 2: - // outputType.precision = p1 - s1 + s2 + 6 + 1 - // outputType.scale = 6 + 1 - // To find out if outputType.precision < outputType.scale simplifies to p1 < 0 - // which is never possible + // Case 1: OutputType.precision doesn't get truncated + // We will never hit a case where outputType.precision < outputType.scale + r.scale. + // So there is no need to protect against that. + // The only two cases in which there is a possibility of the intermediary scale + // exceeding the intermediary precision is when l.precision < l.scale or l + // .precision < 0, both of which aren't possible. + // Proof: + // case 1: + // outputType.precision = p1 - s1 + s2 + s1 + p2 + 1 + 1 + // outputType.scale = p1 + s2 + p2 + 1 + 1 + // To find out if outputType.precision < outputType.scale simplifies to p1 < s1, + // which is never possible // + // case 2: + // outputType.precision = p1 - s1 + s2 + 6 + 1 + // outputType.scale = 6 + 1 + // To find out if outputType.precision < outputType.scale simplifies to p1 < 0 + // which is never possible + // Case 2: OutputType.precision gets truncated to 38 + // In this case we have to make sure the r.precision + l.scale + r.scale + 1 <= 38 + // Otherwise the intermediate result will overflow // TODO We should revisit the proof one more time after we support 128-bit decimals - val intermediateResult = DecimalType(outputType.precision, outputType.scale + r.scale) - if (intermediateResult.precision > DType.DECIMAL64_MAX_PRECISION) { - willNotWorkOnGpu("The actual output precision of the divide is too large" + + if (l.precision + l.scale + r.scale + 1 > 38) { + willNotWorkOnGpu("The intermediate output precision of the divide is too " + + s"large to be supported on the GPU i.e. Decimal(${outputType.precision}, " + + s"${outputType.scale + r.scale})") + } else { + val intermediateResult = + DecimalType(outputType.precision, outputType.scale + r.scale) + if (intermediateResult.precision > DType.DECIMAL64_MAX_PRECISION) { + willNotWorkOnGpu("The actual output precision of the divide is too large" + s" to fit on the GPU $intermediateResult") + } } case _ => // NOOP } @@ -2001,7 +2012,7 @@ object GpuOverrides { TypeSig.all, repeatingParamCheck = Some(RepeatingParamCheck( "param", - TypeSig.commonCudfTypes, + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), TypeSig.all))), (a, conf, p, r) => new ExprMeta[PythonUDF](a, conf, p, r) { override def replaceMessage: String = "not block GPU acceleration" @@ -2323,9 +2334,7 @@ object GpuOverrides { "Murmur3 hash operator", ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT, repeatingParamCheck = Some(RepeatingParamCheck("input", - // Floating point values don't work because of -0.0 is not hashed properly - TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + - TypeSig.STRING + TypeSig.NULL + TypeSig.DECIMAL, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all))), (a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) { override val childExprs: Seq[BaseExprMeta[_]] = a.children @@ -2361,7 +2370,8 @@ object GpuOverrides { expr[Size]( "The size of an array or a map", ExprChecks.unaryProjectNotLambda(TypeSig.INT, TypeSig.INT, - (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all), + (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.commonCudfTypes + TypeSig.NULL + + TypeSig.DECIMAL + TypeSig.ARRAY + TypeSig.STRUCT + TypeSig.MAP), (TypeSig.ARRAY + TypeSig.MAP).nested(TypeSig.all)), (a, conf, p, r) => new UnaryExprMeta[Size](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = @@ -2427,8 +2437,7 @@ object GpuOverrides { (c, conf, p, r) => new ExprMeta[CollectList](c, conf, p, r) { override def convertToGpu(): GpuExpression = GpuCollectList( childExprs.head.convertToGpu(), c.mutableAggBufferOffset, c.inputAggBufferOffset) - }).disabledByDefault("for now the GPU collects null values to a list, but Spark does not." + - " This will be fixed in future releases."), + }), expr[ScalarSubquery]( "Subquery that will return only one row and one column", ExprChecks.projectOnly( @@ -2497,8 +2506,7 @@ object GpuOverrides { // TODO In 0.5 we should make the checks self documenting, and look more like what // SparkPlan and Expression support // https://github.com/NVIDIA/spark-rapids/issues/1915 - val sig = TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + - TypeSig.STRING + TypeSig.NULL + TypeSig.DECIMAL + val sig = TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL hp.children.foreach { child => sig.tagExprParam(this, child, "hash_key") } @@ -2669,7 +2677,9 @@ object GpuOverrides { "The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" + " Java process and the Python process. It also supports scheduling GPU resources" + " for the Python process when enabled", - ExecChecks(TypeSig.commonCudfTypes, TypeSig.all), + ExecChecks( + (TypeSig.commonCudfTypes + TypeSig.ARRAY + TypeSig.STRUCT).nested(), + TypeSig.all), (e, conf, p, r) => new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) { val udfs: Seq[BaseExprMeta[PythonUDF]] = @@ -2718,7 +2728,11 @@ object GpuOverrides { (shuffle, conf, p, r) => new GpuShuffleMeta(shuffle, conf, p, r)), exec[UnionExec]( "The backend for the union operator", - ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL, TypeSig.all), + ExecChecks(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL + + TypeSig.STRUCT.nested(TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL) + .withPsNote(TypeEnum.STRUCT, + "unionByName will not optionally impute nulls for missing struct fields " + + "when the column is a struct and there are non-overlapping fields"), TypeSig.all), (union, conf, p, r) => new SparkPlanMeta[UnionExec](union, conf, p, r) { override def convertToGpu(): GpuExec = GpuUnionExec(childPlans.map(_.convertIfNeeded())) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index d2b46ecc154..fb2611edeea 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -563,6 +563,18 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val ENABLE_CAST_STRING_TO_DECIMAL = conf("spark.rapids.sql.castStringToDecimal.enabled") + .doc("When set to true, enables casting from strings to decimal type on the GPU. Currently " + + "string to decimal type on the GPU might produce results which slightly differed from the " + + "correct results when the string represents any number exceeding the max precision that " + + "CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given " + + "input string \"99999999999999999\". The cause of divergence is that we can not cast " + + "strings containing scientific notation to decimal directly. So, we have to cast strings " + + "to floats firstly. Then, cast floats to decimals. The first step may lead to precision " + + "loss.") + .booleanConf + .createWithDefault(false) + val ENABLE_CAST_STRING_TO_TIMESTAMP = conf("spark.rapids.sql.castStringToTimestamp.enabled") .doc("When set to true, casting from string to timestamp is supported on the GPU. The GPU " + "only supports a subset of formats when casting strings to timestamps. Refer to the CAST " + @@ -1183,6 +1195,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isCastStringToFloatEnabled: Boolean = get(ENABLE_CAST_STRING_TO_FLOAT) + lazy val isCastStringToDecimalEnabled: Boolean = get(ENABLE_CAST_STRING_TO_DECIMAL) + lazy val isCastFloatToIntegralTypesEnabled: Boolean = get(ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES) lazy val isCsvTimestampEnabled: Boolean = get(ENABLE_CSV_TIMESTAMPS) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala index b4361d7631d..cb57d905ac7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SortUtils.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{ColumnVector, NvtxColor, Table} +import ai.rapids.cudf.{ColumnVector, NvtxColor, OrderByArg, Table} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, BoundReference, Expression, NullsFirst, NullsLast, SortOrder} import org.apache.spark.sql.types.DataType @@ -33,11 +33,11 @@ object SortUtils extends Arm { case _ => None } - def getOrder(order: SortOrder, index: Int): Table.OrderByArg = + def getOrder(order: SortOrder, index: Int): OrderByArg = if (order.isAscending) { - Table.asc(index, order.nullOrdering == NullsFirst) + OrderByArg.asc(index, order.nullOrdering == NullsFirst) } else { - Table.desc(index, order.nullOrdering == NullsLast) + OrderByArg.desc(index, order.nullOrdering == NullsLast) } } @@ -88,7 +88,7 @@ class GpuSorter( private[this] lazy val (sortOrdersThatNeedComputation, cudfOrdering, cpuOrderingInternal) = { val sortOrdersThatNeedsComputation = mutable.ArrayBuffer[SortOrder]() val cpuOrdering = mutable.ArrayBuffer[SortOrder]() - val cudfOrdering = mutable.ArrayBuffer[Table.OrderByArg]() + val cudfOrdering = mutable.ArrayBuffer[OrderByArg]() var newColumnIndex = numInputColumns // Remove duplicates in the ordering itself because // there is no need to do it twice. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 1f76280dabf..da8600eaa1f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -766,7 +766,7 @@ class CastChecks extends ExprChecks { val timestampChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING val sparkTimestampSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING - val stringChecks: TypeSig = integral + fp + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY + val stringChecks: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + STRING + BINARY val sparkStringSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + DATE + CALENDAR + STRING + BINARY val binaryChecks: TypeSig = none diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index b3b0fda3500..7553604c58d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf -import ai.rapids.cudf.NvtxColor +import ai.rapids.cudf.{NvtxColor, Scalar} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate} -import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, MapType, StructType} +import org.apache.spark.sql.types.{ArrayType, DoubleType, FloatType, LongType, MapType, StructType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -857,7 +857,7 @@ case class GpuHashAggregateExec( // reduction merge or update aggregates functions are val cvs = ArrayBuffer[GpuColumnVector]() aggModeCudfAggregates.foreach { case (mode, aggs) => - aggs.foreach {agg => + aggs.foreach { agg => val aggFn = if ((mode == Partial || mode == Complete) && !merge) { agg.updateReductionAggregate } else { @@ -871,6 +871,17 @@ case class GpuHashAggregateExec( } } } + // If cvs is empty, we add a single row with zero value. The value in the row is + // meaningless as it doesn't matter what we put in it. The projection will add a zero + // column to the result set in case of a parameter-less count. + // This is to fix a bug in the plugin where a paramater-less count wasn't returning the + // desired result compared to Spark-CPU. + // For more details go to https://github.com/NVIDIA/spark-rapids/issues/1737 + if (cvs.isEmpty) { + withResource(Scalar.fromLong(0L)) { ZERO => + cvs += GpuColumnVector.from(cudf.ColumnVector.fromScalar(ZERO, 1), LongType) + } + } new ColumnarBatch(cvs.toArray, cvs.head.getBase.getRowCount.toInt) } } finally { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala index 9129dc9a32d..19c641d1cd9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/collectionOperations.scala @@ -31,40 +31,18 @@ case class GpuSize(child: Expression, legacySizeOfNull: Boolean) override def nullable: Boolean = if (legacySizeOfNull) false else super.nullable override protected def doColumnar(input: GpuColumnVector): ColumnVector = { - val inputBase = input.getBase - if (inputBase.getRowCount == 0) { - return GpuColumnVector.from(GpuScalar.from(0), 0, IntegerType).getBase - } // Compute sizes of cuDF.ListType to get sizes of each ArrayData or MapData, considering // MapData is represented as List of Struct in terms of cuDF. - // We compute list size via subtracting the offset of next element(row) to the current offset. - val collectionSize = { - // Here is a hack: using index -1 to fetch the offset column of list. - // In terms of cuDF native, the offset is the first (index 0) child of list_column_view. - // In JNI layer, we add 1 to the child index when fetching child column of ListType to keep - // alignment. - // So, in JVM layer, we have to use -1 as index to fetch the real first child of list_column. - withResource(inputBase.getChildColumnView(-1)) { offset => - withResource(offset.subVector(1)) { upBound => - withResource(offset.subVector(0, offset.getRowCount.toInt - 1)) { lowBound => - upBound.sub(lowBound) + withResource(input.getBase.countElements()) { collectionSize => + if (legacySizeOfNull) { + withResource(GpuScalar.from(-1)) { nullScalar => + withResource(input.getBase.isNull) { inputIsNull => + inputIsNull.ifElse(nullScalar, collectionSize) } } - } - } - - val nullScalar = if (legacySizeOfNull) { - GpuScalar.from(-1) - } else { - GpuScalar.from(null, IntegerType) - } - - withResource(collectionSize) { collectionSize => - withResource(nullScalar) { nullScalar => - withResource(inputBase.isNull) { inputIsNull => - inputIsNull.ifElse(nullScalar, collectionSize) - } + } else { + collectionSize.incRefCount() } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuAggregateInPandasExec.scala similarity index 97% rename from sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala rename to sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuAggregateInPandasExec.scala index 156e84ff860..2a636b62117 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuAggregateInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/execution/python/rapids/GpuAggregateInPandasExec.scala @@ -14,23 +14,24 @@ * limitations under the License. */ -package org.apache.spark.sql.rapids.execution.python +package org.apache.spark.sql.execution.python.rapids import java.io.File +import scala.collection.mutable.ArrayBuffer + import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore -import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, - Distribution, Partitioning} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode} -import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowPythonRunner} +import org.apache.spark.sql.execution.python.{AggregateInPandasExec, ArrowPythonRunner, HybridRowQueue} +import org.apache.spark.sql.rapids.execution.python.GpuPythonHelper import org.apache.spark.sql.types.{DataType, StructField, StructType} import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala index 34a40f3d6d3..4ef35319792 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatDataWriter.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2020, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids import scala.collection.mutable -import ai.rapids.cudf.{ContiguousTable, Table} +import ai.rapids.cudf.{ContiguousTable, OrderByArg, Table} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.hadoop.fs.Path @@ -276,7 +276,7 @@ class GpuDynamicPartitionDataWriter( val columnIds = 0 until t.getNumberOfColumns val distinct = t.groupBy(columnIds: _*).aggregate() try { - distinct.orderBy(columnIds.map(Table.asc(_, nullsSmallest)): _*) + distinct.orderBy(columnIds.map(OrderByArg.asc(_, nullsSmallest)): _*) } finally { distinct.close() } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala index 16b7026b441..d383da9c288 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala @@ -38,28 +38,7 @@ case class GpuMd5(child: Expression) object GpuMurmur3Hash extends Arm { def compute(batch: ColumnarBatch, boundExpr: Seq[Expression], seed: Int = 42): ColumnVector = { - val newExprs = boundExpr.map { expr => - expr.dataType match { - case ByteType | ShortType => - GpuCast(expr, IntegerType) - case DoubleType => - // We have to normalize the NaNs, but not zeros - // however the current cudf code does the wrong thing for -0.0 - // https://github.com/NVIDIA/spark-rapids/issues/1914 - GpuIf(GpuIsNan(expr), GpuLiteral(Double.NaN, DoubleType), expr) - case FloatType => - // We have to normalize the NaNs, but not zeros - // however the current cudf code does the wrong thing for -0.0 - // https://github.com/NVIDIA/spark-rapids/issues/1914 - GpuIf(GpuIsNan(expr), GpuLiteral(Float.NaN, FloatType), expr) - case dt: DecimalType if dt.precision <= DType.DECIMAL64_MAX_PRECISION => - // For these values it is just hashing it as a long - GpuUnscaledValue(expr) - case _ => - expr - } - } - withResource(GpuProjectExec.project(batch, newExprs)) { args => + withResource(GpuProjectExec.project(batch, boundExpr)) { args => val bases = GpuColumnVector.extractBases(args) ColumnVector.spark32BitMurmurHash3(seed, bases.toArray[ColumnView]) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala index 66ef5633f79..a0e29f43062 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManagerBase.scala @@ -56,7 +56,7 @@ class GpuShuffleBlockResolver( if (hasActiveShuffle) { throw new IllegalStateException(s"The block $blockId is being managed by the catalog") } - wrapped.getBlockData(blockId) + wrapped.getBlockData(blockId, dirs) } override def stop(): Unit = wrapped.stop() diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 318e7dfbd74..cd0770757b0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -26,14 +26,14 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.collection.mutable.ArrayBuffer -import ai.rapids.cudf.{AggregationOnColumn, ArrowIPCOptions, ArrowIPCWriterOptions, ColumnVector, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table} -import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, RapidsBuffer, SpillableColumnarBatch, SpillPriorities} +import ai.rapids.cudf._ +import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonRDD, SpecialLengths} +import org.apache.spark.api.python._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.util.toPrettySQL import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.python.PythonUDFRunner import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils @@ -436,12 +436,13 @@ class GpuArrowPythonRunner( table.close() GpuSemaphore.releaseIfNecessary(TaskContext.get()) }) - pythonInSchema.foreach { field => - if (field.nullable) { - builder.withColumnNames(field.name) - } else { - builder.withNotNullableColumnNames(field.name) - } + // Flatten the names of nested struct columns, required by cudf arrow IPC writer. + flattenNames(pythonInSchema).foreach { case (name, nullable) => + if (nullable) { + builder.withColumnNames(name) + } else { + builder.withNotNullableColumnNames(name) + } } Table.writeArrowIPCChunked(builder.build(), new BufferToStreamWriter(dataOut)) } @@ -463,6 +464,16 @@ class GpuArrowPythonRunner( if (onDataWriteFinished != null) onDataWriteFinished() } } + + private def flattenNames(d: DataType, nullable: Boolean=true): Seq[(String, Boolean)] = + d match { + case s: StructType => + s.flatMap(sf => Seq((sf.name, sf.nullable)) ++ flattenNames(sf.dataType, sf.nullable)) + case m: MapType => + flattenNames(m.keyType, nullable) ++ flattenNames(m.valueType, nullable) + case a: ArrayType => flattenNames(a.elementType, nullable) + case _ => Nil + } } } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index f79a7d9d9f6..53812b31095 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -16,14 +16,15 @@ package org.apache.spark.sql.rapids.execution.python +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + import ai.rapids.cudf -import ai.rapids.cudf.{Aggregation, Table} +import ai.rapids.cudf.{Aggregation, OrderByArg} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore -import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer import org.apache.spark.TaskContext import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType} @@ -125,7 +126,7 @@ class GroupingIterator( } } val orderedTable = withResource(cntTable) { table => - table.orderBy(partitionIndices.map(id => Table.asc(id, true)): _*) + table.orderBy(partitionIndices.map(id => OrderByArg.asc(id, true)): _*) } val (countHostCol, numRows) = withResource(orderedTable) { table => // Yes copying the data to host, it would be OK since just copying the aggregated diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/RowUtils.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/RowUtils.scala deleted file mode 100644 index 42844cd216d..00000000000 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/RowUtils.scala +++ /dev/null @@ -1,295 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.rapids.execution.python - -import java.io._ - -import com.google.common.io.Closeables - -import org.apache.spark.{SparkEnv, SparkException} -import org.apache.spark.io.NioBufferedFileInputStream -import org.apache.spark.memory.{MemoryConsumer, SparkOutOfMemoryError, TaskMemoryManager} -import org.apache.spark.serializer.SerializerManager -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.unsafe.Platform -import org.apache.spark.unsafe.memory.MemoryBlock - -// The whole file is copied from Spark `RowQueue` to expose row queue utils to rapids execs - -/** - * A RowQueue is an FIFO queue for UnsafeRow. - * - * This RowQueue is ONLY designed and used for Python UDF, which has only one writer and only one - * reader, the reader ALWAYS ran behind the writer. See the doc of class BatchEvalPythonExec - * on how it works. - */ -private[sql] trait RowQueue { - - /** - * Add a row to the end of it, returns true iff the row has been added to the queue. - */ - def add(row: UnsafeRow): Boolean - - /** - * Retrieve and remove the first row, returns null if it's empty. - * - * It can only be called after add is called, otherwise it will fail (NPE). - */ - def remove(): UnsafeRow - - /** - * Cleanup all the resources. - */ - def close(): Unit -} - -/** - * A RowQueue that is based on in-memory page. UnsafeRows are appended into it until it's full. - * Another thread could read from it at the same time (behind the writer). - * - * The format of UnsafeRow in page: - * [4 bytes to hold length of record (N)] [N bytes to hold record] [...] - * - * -1 length means end of page. - */ -private[sql] abstract class InMemoryRowQueue(val page: MemoryBlock, numFields: Int) - extends RowQueue { - private val base: AnyRef = page.getBaseObject - private val endOfPage: Long = page.getBaseOffset + page.size - // the first location where a new row would be written - private var writeOffset = page.getBaseOffset - // points to the start of the next row to read - private var readOffset = page.getBaseOffset - private val resultRow = new UnsafeRow(numFields) - - def add(row: UnsafeRow): Boolean = synchronized { - val size = row.getSizeInBytes - if (writeOffset + 4 + size > endOfPage) { - // if there is not enough space in this page to hold the new record - if (writeOffset + 4 <= endOfPage) { - // if there's extra space at the end of the page, store a special "end-of-page" length (-1) - Platform.putInt(base, writeOffset, -1) - } - false - } else { - Platform.putInt(base, writeOffset, size) - Platform.copyMemory(row.getBaseObject, row.getBaseOffset, base, writeOffset + 4, size) - writeOffset += 4 + size - true - } - } - - def remove(): UnsafeRow = synchronized { - assert(readOffset <= writeOffset, "reader should not go beyond writer") - if (readOffset + 4 > endOfPage || Platform.getInt(base, readOffset) < 0) { - null - } else { - val size = Platform.getInt(base, readOffset) - resultRow.pointTo(base, readOffset + 4, size) - readOffset += 4 + size - resultRow - } - } -} - -/** - * A RowQueue that is backed by a file on disk. This queue will stop accepting new rows once any - * reader has begun reading from the queue. - */ -private[sql] case class DiskRowQueue( - file: File, - fields: Int, - serMgr: SerializerManager) extends RowQueue { - - private var out = new DataOutputStream(serMgr.wrapForEncryption( - new BufferedOutputStream(new FileOutputStream(file.toString)))) - private var unreadBytes = 0L - - private var in: DataInputStream = _ - private val resultRow = new UnsafeRow(fields) - - def add(row: UnsafeRow): Boolean = synchronized { - if (out == null) { - // Another thread is reading, stop writing this one - return false - } - out.writeInt(row.getSizeInBytes) - out.write(row.getBytes) - unreadBytes += 4 + row.getSizeInBytes - true - } - - def remove(): UnsafeRow = synchronized { - if (out != null) { - out.close() - out = null - in = new DataInputStream(serMgr.wrapForEncryption( - new NioBufferedFileInputStream(file))) - } - - if (unreadBytes > 0) { - val size = in.readInt() - val bytes = new Array[Byte](size) - in.readFully(bytes) - unreadBytes -= 4 + size - resultRow.pointTo(bytes, size) - resultRow - } else { - null - } - } - - def close(): Unit = synchronized { - Closeables.close(out, true) - out = null - Closeables.close(in, true) - in = null - if (file.exists()) { - file.delete() - } - } -} - -/** - * A RowQueue that has a list of RowQueues, which could be in memory or disk. - * - * HybridRowQueue could be safely appended in one thread, and pulled in another thread in the same - * time. - */ -private[sql] case class HybridRowQueue( - memManager: TaskMemoryManager, - tempDir: File, - numFields: Int, - serMgr: SerializerManager) - extends MemoryConsumer(memManager) with RowQueue { - - // Each buffer should have at least one row - private var queues = new java.util.LinkedList[RowQueue]() - - private var writing: RowQueue = _ - private var reading: RowQueue = _ - - // exposed for testing - private[python] def numQueues(): Int = queues.size() - - def spill(size: Long, trigger: MemoryConsumer): Long = { - if (trigger == this) { - // When it's triggered by itself, it should write upcoming rows into disk instead of copying - // the rows already in the queue. - return 0L - } - var released = 0L - synchronized { - // poll out all the buffers and add them back in the same order to make sure that the rows - // are in correct order. - val newQueues = new java.util.LinkedList[RowQueue]() - while (!queues.isEmpty) { - val queue = queues.remove() - val newQueue = if (!queues.isEmpty && queue.isInstanceOf[InMemoryRowQueue]) { - val diskQueue = createDiskQueue() - var row = queue.remove() - while (row != null) { - diskQueue.add(row) - row = queue.remove() - } - released += queue.asInstanceOf[InMemoryRowQueue].page.size() - queue.close() - diskQueue - } else { - queue - } - newQueues.add(newQueue) - } - queues = newQueues - } - released - } - - private def createDiskQueue(): RowQueue = { - DiskRowQueue(File.createTempFile("buffer", "", tempDir), numFields, serMgr) - } - - private def createNewQueue(required: Long): RowQueue = { - val page = try { - allocatePage(required) - } catch { - case _: SparkOutOfMemoryError => - null - } - val buffer = if (page != null) { - new InMemoryRowQueue(page, numFields) { - override def close(): Unit = { - freePage(page) - } - } - } else { - createDiskQueue() - } - - synchronized { - queues.add(buffer) - } - buffer - } - - def add(row: UnsafeRow): Boolean = { - if (writing == null || !writing.add(row)) { - writing = createNewQueue(4 + row.getSizeInBytes) - if (!writing.add(row)) { - throw new SparkException(s"failed to push a row into $writing") - } - } - true - } - - def remove(): UnsafeRow = { - var row: UnsafeRow = null - if (reading != null) { - row = reading.remove() - } - if (row == null) { - if (reading != null) { - reading.close() - } - synchronized { - reading = queues.remove() - } - assert(reading != null, s"queue should not be empty") - row = reading.remove() - assert(row != null, s"$reading should have at least one row") - } - row - } - - def close(): Unit = { - if (reading != null) { - reading.close() - reading = null - } - synchronized { - while (!queues.isEmpty) { - queues.remove().close() - } - } - } -} - -private[sql] object HybridRowQueue { - def apply(taskMemoryMgr: TaskMemoryManager, file: File, fields: Int): HybridRowQueue = { - HybridRowQueue(taskMemoryMgr, file, fields, SparkEnv.get.serializerManager) - } -} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 667f4f1be47..b86c16da5f7 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -439,6 +439,16 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast float to decimal (include NaN/INF/-INF)") { + def floatsIncludeNaNs(ss: SparkSession): DataFrame = { + mixedFloatDf(ss).select(col("floats").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.FloatType, scale, + customDataGenerator = Some(floatsIncludeNaNs)) + } + } + test("cast double to decimal") { List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => testCastToDecimal(DataTypes.DoubleType, scale, @@ -446,6 +456,16 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast double to decimal (include NaN/INF/-INF)") { + def doublesIncludeNaNs(ss: SparkSession): DataFrame = { + mixedDoubleDf(ss).select(col("doubles").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.DoubleType, scale, + customDataGenerator = Some(doublesIncludeNaNs)) + } + } + test("cast decimal to decimal") { // fromScale == toScale testCastToDecimal(DataTypes.createDecimalType(18, 0), @@ -574,6 +594,53 @@ class CastOpSuite extends GpuExpressionTestSuite { generator = decimalGenerator(Seq(Decimal(100000000L)), decType)) } + test("cast string to decimal") { + List(-18, -10, -3, 0, 1, 5, 15).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale, + customRandGenerator = Some(new scala.util.Random(1234L))) + } + } + + test("cast string to decimal (include NaN/INF/-INF)") { + def doubleStrings(ss: SparkSession): DataFrame = { + val df1 = floatsAsStrings(ss).selectExpr("cast(c0 as Double) as col") + val df2 = doublesAsStrings(ss).select(col("c0").as("col")) + df1.unionAll(df2) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale = scale, + customDataGenerator = Some(doubleStrings)) + } + } + + test("cast string to decimal (truncated cases)") { + def specialGenerator(column: Seq[String])(ss: SparkSession): DataFrame = { + import ss.sqlContext.implicits._ + column.toDF("col") + } + testCastToDecimal(DataTypes.StringType, scale = 7, + customDataGenerator = Some(specialGenerator(Seq("9999999999")))) + testCastToDecimal(DataTypes.StringType, scale = 2, + customDataGenerator = Some(specialGenerator(Seq("999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = 0, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = -1, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + testCastToDecimal(DataTypes.StringType, scale = -10, + customDataGenerator = Some(specialGenerator(Seq("99999999999999999")))) + } + + test("ansi_cast string to decimal exp") { + def exponentsAsStrings(ss: SparkSession): DataFrame = { + exponentsAsStringsDf(ss).select(col("c0").as("col")) + } + List(-10, -1, 0, 1, 10).foreach { scale => + testCastToDecimal(DataTypes.StringType, scale = scale, + customDataGenerator = Some(exponentsAsStrings), + ansiEnabled = true) + } + } + protected def testCastToDecimal( dataType: DataType, scale: Int, @@ -592,13 +659,14 @@ class CastOpSuite extends GpuExpressionTestSuite { val conf = new SparkConf() .set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true") .set(RapidsConf.ENABLE_CAST_FLOAT_TO_DECIMAL.key, "true") + .set(RapidsConf.ENABLE_CAST_STRING_TO_DECIMAL.key, "true") .set("spark.rapids.sql.exec.FileSourceScanExec", "false") .set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true") .set("spark.sql.ansi.enabled", ansiEnabled.toString) val defaultRandomGenerator: SparkSession => DataFrame = { val rnd = customRandGenerator.getOrElse(new scala.util.Random(1234L)) - generateCastNumericToDecimalDataFrame(dataType, precision - scale, rnd, 500) + generateCastToDecimalDataFrame(dataType, precision - scale, rnd, 500) } val generator = customDataGenerator.getOrElse(defaultRandomGenerator) withCpuSparkSession(spark => generator(spark).write.parquet(path), conf) @@ -613,7 +681,7 @@ class CastOpSuite extends GpuExpressionTestSuite { val (cpuResult, gpuResult) = dataType match { case ShortType | IntegerType | LongType | _: DecimalType => fromCpu.map(r => Row(r.getDecimal(1))) -> fromGpu.map(r => Row(r.getDecimal(1))) - case FloatType | DoubleType => + case FloatType | DoubleType | StringType => // There may be tiny difference between CPU and GPU result when casting from double val fetchFromRow = (r: Row) => { if (r.isNullAt(1)) Double.NaN @@ -630,7 +698,7 @@ class CastOpSuite extends GpuExpressionTestSuite { } } - private def generateCastNumericToDecimalDataFrame( + private def generateCastToDecimalDataFrame( dataType: DataType, integralSize: Int, rndGenerator: scala.util.Random, @@ -656,7 +724,7 @@ class CastOpSuite extends GpuExpressionTestSuite { enhancedRnd.nextLong() / math.pow(10, scale max 9).toLong case LongType => enhancedRnd.nextLong() / math.pow(10, scale max 0).toLong - case FloatType | DoubleType => + case FloatType | DoubleType | StringType => enhancedRnd.nextLong() / math.pow(10, scale + 2) case dt: DecimalType => val unscaledValue = (enhancedRnd.nextLong() * math.pow(10, dt.precision - 18)).toLong @@ -676,6 +744,8 @@ class CastOpSuite extends GpuExpressionTestSuite { rawColumn.map(_.asInstanceOf[Double].toFloat).toDF("col") case DoubleType => rawColumn.map(_.asInstanceOf[Double]).toDF("col") + case StringType => + rawColumn.map(_.asInstanceOf[Double].toString).toDF("col") case dt: DecimalType => val row = rawColumn.map(e => Row(e.asInstanceOf[Decimal])).asJava ss.createDataFrame(row, StructType(Seq(StructField("col", dt))))