From c312ecb6a10b05a69858e443075a1af3b092bd71 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Mon, 15 Aug 2022 18:17:46 +0800 Subject: [PATCH 01/11] implement casting float/double * Impl: float/double -> double/float, bool, int8/16/32/64, timestamp * There are some precision issue when casting float/double to string * IT Test: need a function float_gen with range [a, b] Signed-off-by: sinkinben --- integration_tests/src/main/python/tmp_test.py | 54 +++++++ .../com/nvidia/spark/rapids/GpuCast.scala | 2 +- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 136 ++++++++++++++++++ 3 files changed, 191 insertions(+), 1 deletion(-) create mode 100644 integration_tests/src/main/python/tmp_test.py diff --git a/integration_tests/src/main/python/tmp_test.py b/integration_tests/src/main/python/tmp_test.py new file mode 100644 index 00000000000..6dafb54ff3f --- /dev/null +++ b/integration_tests/src/main/python/tmp_test.py @@ -0,0 +1,54 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error +from data_gen import * +from pyspark.sql.types import * +from spark_session import with_cpu_session + + +def create_orc(data_gen_list, data_path): + # generate ORC dataframe, and dump it to local file 'data_path' + with_cpu_session( + lambda spark: gen_df(spark, data_gen_list).write.mode('overwrite').orc(data_path) + ) + +# TODO: merge test_casting_from_float and test_casting_from_double into one test +# TODO: Need a float_gen with range [a, b], if float/double >= 1e13, then float/double -> timestamp will overflow + +@pytest.mark.parametrize('to_type', ['double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint', 'timestamp']) +def test_casting_from_float(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_float' + data_gen = [('float_column', float_gen)] + create_orc(data_gen, orc_path) + schema_str = "float_column {}".format(to_type) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema(schema_str).orc(orc_path) + ) + + + + +@pytest.mark.parametrize('to_type', ['float', 'boolean', 'tinyint', 'smallint', 'int', 'bigint', 'timestamp']) +def test_casting_from_double(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_double' + data_gen = [('double_column', float_gen)] + create_orc(data_gen, orc_path) + schema_str = "double_column {}".format(to_type) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema(schema_str).orc(orc_path) + ) + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index efadb5b5f26..0ba8178b730 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -973,7 +973,7 @@ object GpuCast extends Arm { } } - private def castFloatingTypeToString(input: ColumnView): ColumnVector = { + private[rapids] def castFloatingTypeToString(input: ColumnView): ColumnVector = { withResource(input.castTo(DType.STRING)) { cudfCast => // replace "e+" with "E" diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 45fa3c3b320..92033d70e16 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -186,6 +186,78 @@ object GpuOrcScan extends Arm { } } + /** + * Get the overflow flags in booleans. + * true means no overflow, while false means getting overflow. + * + * @param doubleMillis the input double column + * @param millis the long column casted from the doubleMillis + */ + private def getOverflowFlags(doubleMillis: ColumnView, millis: ColumnView): ColumnView = { + // No overflow when + // doubleMillis <= Long.MAX_VALUE && + // doubleMillis >= Long.MIN_VALUE && + // ((millis >= 0) == (doubleMillis >= 0)) + val rangeCheck = withResource(Scalar.fromLong(Long.MaxValue)) { max => + withResource(doubleMillis.lessOrEqualTo(max)) { upperCheck => + withResource(Scalar.fromLong(Long.MinValue)) { min => + withResource(doubleMillis.greaterOrEqualTo(min)) { lowerCheck => + upperCheck.and(lowerCheck) + } + } + } + } + withResource(rangeCheck) { _ => + val signCheck = withResource(Scalar.fromInt(0)) { zero => + withResource(millis.greaterOrEqualTo(zero)) { longSign => + withResource(doubleMillis.greaterOrEqualTo(zero)) { doubleSign => + longSign.equalTo(doubleSign) + } + } + } + withResource(signCheck) { _ => + rangeCheck.and(signCheck) + } + } + } + + /** + * Borrowed from ORC "ConvertTreeReaderFactory" + * Scala does not support such numeric literal, so parse from string. + */ + private val MIN_LONG_AS_DOUBLE = java.lang.Double.valueOf("-0x1p63") + + /** + * We cannot store Long.MAX_VALUE as a double without losing precision. Instead, we store + * Long.MAX_VALUE + 1 == -Long.MIN_VALUE, and then offset all comparisons by 1. + */ + private val MAX_LONG_AS_DOUBLE_PLUS_ONE = java.lang.Double.valueOf("0x1p63") + + /** + * Return a boolean column indicates whether the rows in col can fix in a long. + * It assumes the input type is float or double. + */ + private def doubleCanFitInLong(col: ColumnView): ColumnVector = { + // It is true when + // (MIN_LONG_AS_DOUBLE - doubleValue < 1.0) && + // (doubleValue < MAX_LONG_AS_DOUBLE_PLUS_ONE) + val lowRet = withResource(Scalar.fromDouble(MIN_LONG_AS_DOUBLE)) { sMin => + withResource(Scalar.fromDouble(1.0)) { sOne => + withResource(sMin.sub(col)) { diff => + diff.lessThan(sOne) + } + } + } + withResource(lowRet) { _ => + withResource(Scalar.fromDouble(MAX_LONG_AS_DOUBLE_PLUS_ONE)) { sMax => + withResource(col.lessThan(sMax)) { highRet => + lowRet.and(highRet) + } + } + } + } + + /** * Cast the column to the target type for ORC schema evolution. * It is designed to support all the cases that `canCast` returns true. @@ -211,6 +283,64 @@ object GpuOrcScan extends Arm { } else { downCastAnyInteger(col, toDt) } + + // float/double(float64) to {bool, integer types, double/float, string, timestamp} + // float to bool/integral + case (DType.FLOAT32 | DType.FLOAT64, DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 + | DType.INT64) => + // Follow the CPU ORC conversion: + // First replace rows that cannot fit in long with nulls, + // next convert to long, + // then down cast long to the target integral type. + val longDoubles = withResource(doubleCanFitInLong(col)) { fitLongs => + col.copyWithBooleanColumnAsValidity(fitLongs) + } + withResource(longDoubles) { _ => + withResource(longDoubles.castTo(DType.INT64)) { longs => + toDt match { + case DType.BOOL8 => longs.castTo(toDt) + case DType.INT64 => longs.incRefCount() + case _ => downCastAnyInteger(longs, toDt) + } + } + } + + // float/double to double/float + case (DType.FLOAT32 | DType.FLOAT64, DType.FLOAT32 | DType.FLOAT64) => + col.castTo(toDt) + + // FIXME float/double to string, there are some precision error issues + case (DType.FLOAT32 | DType.FLOAT64, DType.STRING) => + GpuCast.castFloatingTypeToString(col) + + // float/double -> timestamp + case (DType.FLOAT32 | DType.FLOAT64, DType.TIMESTAMP_MICROSECONDS) => + // Follow the CPU ORC conversion. + // val doubleMillis = doubleValue * 1000, + // val millis = Math.round(doubleMillis) + // if (noOverflow) millis else null + val milliSeconds = withResource(Scalar.fromDouble(1000.0)) { thousand => + // ORC assumes value is in seconds, and returns timestamps in milliseconds. + withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis => + withResource(doubleMillis.round()) { millis => + withResource(getOverflowFlags(doubleMillis, millis)) { overflows => + millis.copyWithBooleanColumnAsValidity(overflows) + } + } + } + } + // Cast milli-seconds to micro-seconds + // We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be + // INT64-overflow, but we do not handle this issue here (as CPU code of ORC does). + // If (milliSeconds * 1000) > INT64.MAX, then 'castTo' will throw an exception. + withResource(milliSeconds) { _ => + withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => + withResource(microSeconds.castTo(DType.INT64)) { longVec => + longVec.castTo(DType.TIMESTAMP_MICROSECONDS) + } + } + } + // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 case (f, t) => throw new QueryExecutionException(s"Unsupported type casting: $f -> $t") @@ -239,6 +369,12 @@ object GpuOrcScan extends Arm { } case VARCHAR => to.getCategory == STRING + + case FLOAT | DOUBLE => + to.getCategory match { + case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | STRING | TIMESTAMP => true + case _ => false + } // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 case _ => false From a729c866bf995096e9c9e972071e125b459911a8 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Mon, 15 Aug 2022 19:18:08 +0800 Subject: [PATCH 02/11] Check long-overflow when casting float/double to timestamp Signed-off-by: sinkinben --- .../scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 92033d70e16..abff50593b6 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -320,7 +320,7 @@ object GpuOrcScan extends Arm { // val millis = Math.round(doubleMillis) // if (noOverflow) millis else null val milliSeconds = withResource(Scalar.fromDouble(1000.0)) { thousand => - // ORC assumes value is in seconds, and returns timestamps in milliseconds. + // ORC assumes value is in seconds withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis => withResource(doubleMillis.round()) { millis => withResource(getOverflowFlags(doubleMillis, millis)) { overflows => @@ -331,9 +331,13 @@ object GpuOrcScan extends Arm { } // Cast milli-seconds to micro-seconds // We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be - // INT64-overflow, but we do not handle this issue here (as CPU code of ORC does). - // If (milliSeconds * 1000) > INT64.MAX, then 'castTo' will throw an exception. + // INT64-overflow. withResource(milliSeconds) { _ => + // Test whether if there is long-overflow + // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getLong will return LONG_MAX + // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will throw an + // exception (as CPU code does). + Math.multiplyExact(milliSeconds.max().getLong, 1000.toLong) withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => withResource(microSeconds.castTo(DType.INT64)) { longVec => longVec.castTo(DType.TIMESTAMP_MICROSECONDS) From bb2d3a66b3793aeb9dac3f442d2a92b53d4550bf Mon Sep 17 00:00:00 2001 From: sinkinben Date: Mon, 15 Aug 2022 19:21:19 +0800 Subject: [PATCH 03/11] Add comments in temp IT tests Signed-off-by: sinkinben --- integration_tests/src/main/python/tmp_test.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/tmp_test.py b/integration_tests/src/main/python/tmp_test.py index 6dafb54ff3f..d367db7da77 100644 --- a/integration_tests/src/main/python/tmp_test.py +++ b/integration_tests/src/main/python/tmp_test.py @@ -28,7 +28,12 @@ def create_orc(data_gen_list, data_path): # TODO: merge test_casting_from_float and test_casting_from_double into one test # TODO: Need a float_gen with range [a, b], if float/double >= 1e13, then float/double -> timestamp will overflow - +''' +We need this test cases: +1. val * 1e3 <= LONG_MAX && val * 1e6 <= LONG_MAX (no overflow) +2. val * 1e3 <= LONG_MAX && val * 1e6 > LONG_MAX (caught java.lang.ArithmeticException) +3. val * 1e3 > LONG_MAX (caught java.lang.ArithmeticException) +''' @pytest.mark.parametrize('to_type', ['double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint', 'timestamp']) def test_casting_from_float(spark_tmp_path, to_type): orc_path = spark_tmp_path + '/orc_casting_from_float' From ef1163d2b0df0fd9e5113bacaac59cb7ada8d596 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Tue, 16 Aug 2022 15:25:19 +0800 Subject: [PATCH 04/11] Update float/double -> timestamp, and add more test cases Signed-off-by: sinkinben --- .../src/main/python/orc_cast_float_test.py | 74 +++++++++++++++++++ integration_tests/src/main/python/tmp_test.py | 59 --------------- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 6 +- 3 files changed, 77 insertions(+), 62 deletions(-) create mode 100644 integration_tests/src/main/python/orc_cast_float_test.py delete mode 100644 integration_tests/src/main/python/tmp_test.py diff --git a/integration_tests/src/main/python/orc_cast_float_test.py b/integration_tests/src/main/python/orc_cast_float_test.py new file mode 100644 index 00000000000..90707d38ff8 --- /dev/null +++ b/integration_tests/src/main/python/orc_cast_float_test.py @@ -0,0 +1,74 @@ +# Copyright (c) 2020-2022, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error +from data_gen import * +from pyspark.sql.types import * +from spark_session import with_cpu_session + + +@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint']) +def test_casting_from_float_and_double(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_float_and_double' + data_gen = [('float_column', float_gen), ('double_column', double_gen)] + with_cpu_session( + lambda spark: gen_df(spark, data_gen).write.mode('overwrite').orc(orc_path) + ) + schema_str = "float_column {}, double_column {}".format(to_type, to_type) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema(schema_str).orc(orc_path) + ) + + +@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None), + DoubleGen(max_exp=32, special_cases=[8.88e32, 9.99e33, 3.14159e34, 2.712e35, 2e36])]) +def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): + # ORC will assume the original double value in seconds, we need to convert them to + # timestamp(INT64 in micro-seconds). + # + # Since datetime library in python requires year >= 0, and UTC timestamp is start from 1970/1/1 00:00:00, + # that is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2e32. + # So we set max_exp = 32 in DoubleGen. + # + # The maximum valid positive number is INT64_MAX / 1e6 -> 1e12 -> 2e36, so we add some special cases + # from 2e33 to 2e36. + # + # In DoubleGen, special_case=None will generate some NaN corner case. + + orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, data_gen).write.mode('overwrite').orc(orc_path) + ) + # the name of unique column is 'a', cast it into timestamp type + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema("a timestamp").orc(orc_path) + ) + + +def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): + orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, DoubleGen(min_exp=37)).write.mode('overwrite').orc(orc_path) + ) + assert_gpu_and_cpu_error( + df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(), + conf={}, + error_message="ArithmeticException" + ) + +''' +TODO: In the above cases, we test double -> timestamp. Should we add cases to test float -> timestamp? +''' \ No newline at end of file diff --git a/integration_tests/src/main/python/tmp_test.py b/integration_tests/src/main/python/tmp_test.py deleted file mode 100644 index d367db7da77..00000000000 --- a/integration_tests/src/main/python/tmp_test.py +++ /dev/null @@ -1,59 +0,0 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error -from data_gen import * -from pyspark.sql.types import * -from spark_session import with_cpu_session - - -def create_orc(data_gen_list, data_path): - # generate ORC dataframe, and dump it to local file 'data_path' - with_cpu_session( - lambda spark: gen_df(spark, data_gen_list).write.mode('overwrite').orc(data_path) - ) - -# TODO: merge test_casting_from_float and test_casting_from_double into one test -# TODO: Need a float_gen with range [a, b], if float/double >= 1e13, then float/double -> timestamp will overflow -''' -We need this test cases: -1. val * 1e3 <= LONG_MAX && val * 1e6 <= LONG_MAX (no overflow) -2. val * 1e3 <= LONG_MAX && val * 1e6 > LONG_MAX (caught java.lang.ArithmeticException) -3. val * 1e3 > LONG_MAX (caught java.lang.ArithmeticException) -''' -@pytest.mark.parametrize('to_type', ['double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint', 'timestamp']) -def test_casting_from_float(spark_tmp_path, to_type): - orc_path = spark_tmp_path + '/orc_casting_from_float' - data_gen = [('float_column', float_gen)] - create_orc(data_gen, orc_path) - schema_str = "float_column {}".format(to_type) - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.read.schema(schema_str).orc(orc_path) - ) - - - - -@pytest.mark.parametrize('to_type', ['float', 'boolean', 'tinyint', 'smallint', 'int', 'bigint', 'timestamp']) -def test_casting_from_double(spark_tmp_path, to_type): - orc_path = spark_tmp_path + '/orc_casting_from_double' - data_gen = [('double_column', float_gen)] - create_orc(data_gen, orc_path) - schema_str = "double_column {}".format(to_type) - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.read.schema(schema_str).orc(orc_path) - ) - diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index abff50593b6..e7920954ef4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -323,8 +323,8 @@ object GpuOrcScan extends Arm { // ORC assumes value is in seconds withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis => withResource(doubleMillis.round()) { millis => - withResource(getOverflowFlags(doubleMillis, millis)) { overflows => - millis.copyWithBooleanColumnAsValidity(overflows) + withResource(getOverflowFlags(doubleMillis, millis)) { overflowFlags => + millis.copyWithBooleanColumnAsValidity(overflowFlags) } } } @@ -337,7 +337,7 @@ object GpuOrcScan extends Arm { // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getLong will return LONG_MAX // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will throw an // exception (as CPU code does). - Math.multiplyExact(milliSeconds.max().getLong, 1000.toLong) + Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000.toLong) withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => withResource(microSeconds.castTo(DType.INT64)) { longVec => longVec.castTo(DType.TIMESTAMP_MICROSECONDS) From 2930ab8fea98dc80227ed471cbed80457ff3d7c4 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Tue, 16 Aug 2022 15:31:20 +0800 Subject: [PATCH 05/11] Update comments Signed-off-by: sinkinben --- integration_tests/src/main/python/orc_cast_float_test.py | 4 ++-- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/integration_tests/src/main/python/orc_cast_float_test.py b/integration_tests/src/main/python/orc_cast_float_test.py index 90707d38ff8..f609d592b41 100644 --- a/integration_tests/src/main/python/orc_cast_float_test.py +++ b/integration_tests/src/main/python/orc_cast_float_test.py @@ -46,7 +46,7 @@ def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): # The maximum valid positive number is INT64_MAX / 1e6 -> 1e12 -> 2e36, so we add some special cases # from 2e33 to 2e36. # - # In DoubleGen, special_case=None will generate some NaN corner case. + # In DoubleGen, special_case=None will generate some NaN, INF corner cases. orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' with_cpu_session( @@ -71,4 +71,4 @@ def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): ''' TODO: In the above cases, we test double -> timestamp. Should we add cases to test float -> timestamp? -''' \ No newline at end of file +''' diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index e7920954ef4..f4a5db9b5d1 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -334,9 +334,9 @@ object GpuOrcScan extends Arm { // INT64-overflow. withResource(milliSeconds) { _ => // Test whether if there is long-overflow - // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getLong will return LONG_MAX - // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will throw an - // exception (as CPU code does). + // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getDouble.toLong will return + // LONG_MAX. If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will + // throw an exception (as CPU code does). Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000.toLong) withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => withResource(microSeconds.castTo(DType.INT64)) { longVec => From e785de1dcff150eb0fd278f30c8474a2d3271621 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Wed, 17 Aug 2022 16:12:43 +0800 Subject: [PATCH 06/11] Add a config item to control casting float/double to string in ORC reading. Signed-off-by: sinkinben --- docs/configs.md | 1 + .../src/main/python/orc_cast_float_test.py | 4 ---- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 18 +++++++++++++++--- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 +++++++++ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index c49545f4b12..92581801bd1 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -92,6 +92,7 @@ Name | Description | Default Value spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true +spark.rapids.sql.format.orc.floatTypesToString.enable|The float/double numbers in GPU have different precision with CPU. So when casting them to string, the result of GPU is different from result of CPU spark.|true spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647 spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true diff --git a/integration_tests/src/main/python/orc_cast_float_test.py b/integration_tests/src/main/python/orc_cast_float_test.py index f609d592b41..de52b69a82c 100644 --- a/integration_tests/src/main/python/orc_cast_float_test.py +++ b/integration_tests/src/main/python/orc_cast_float_test.py @@ -68,7 +68,3 @@ def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): conf={}, error_message="ArithmeticException" ) - -''' -TODO: In the above cases, we test double -> timestamp. Should we add cases to test float -> timestamp? -''' diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index f4a5db9b5d1..084494e6165 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -122,6 +122,8 @@ case class GpuOrcScan( } object GpuOrcScan extends Arm { + var rapidsConf: RapidsConf = null + def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = { val scan = scanMeta.wrapped val schema = StructType(scan.readDataSchema ++ scan.readPartitionSchema) @@ -151,6 +153,10 @@ object GpuOrcScan extends Arm { .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) { meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") } + + // We need the value of 'isOrcFloatTypesToStringEnable' in 'canCast' method. + // So we get a reference of 'meta.conf'. + rapidsConf = meta.conf } private lazy val numericLevels = Seq( @@ -284,7 +290,6 @@ object GpuOrcScan extends Arm { downCastAnyInteger(col, toDt) } - // float/double(float64) to {bool, integer types, double/float, string, timestamp} // float to bool/integral case (DType.FLOAT32 | DType.FLOAT64, DType.BOOL8 | DType.INT8 | DType.INT16 | DType.INT32 | DType.INT64) => @@ -309,7 +314,11 @@ object GpuOrcScan extends Arm { case (DType.FLOAT32 | DType.FLOAT64, DType.FLOAT32 | DType.FLOAT64) => col.castTo(toDt) - // FIXME float/double to string, there are some precision error issues + // float/double to string + // cuDF keep 9 decimal numbers after the decimal point, and CPU keeps more than 10. + // So when casting float/double to string, the result of GPU is different from CPU. + // We let a conf 'spark.rapids.sql.format.orc.floatTypesToString.enable' to control it's + // enable or not. case (DType.FLOAT32 | DType.FLOAT64, DType.STRING) => GpuCast.castFloatingTypeToString(col) @@ -376,7 +385,10 @@ object GpuOrcScan extends Arm { case FLOAT | DOUBLE => to.getCategory match { - case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | STRING | TIMESTAMP => true + case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | TIMESTAMP => true + case STRING => { + rapidsConf != null && rapidsConf.isOrcFloatTypesToStringEnable + } case _ => false } // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 54b735a0264..1d7fe164f48 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -864,6 +864,13 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val ENABLE_ORC_FLOAT_TYPES_TO_STRING = + conf("spark.rapids.sql.format.orc.floatTypesToString.enable") + .doc("The float/double numbers in GPU have different precision with CPU. So when casting " + + "them to string, the result of GPU is different from result of CPU spark.") + .booleanConf + .createWithDefault(true) + val ORC_READER_TYPE = conf("spark.rapids.sql.format.orc.reader.type") .doc("Sets the ORC reader type. We support different types that are optimized for " + "different environments. The original Spark style reader can be selected by setting this " + @@ -1854,6 +1861,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isOrcWriteEnabled: Boolean = get(ENABLE_ORC_WRITE) + lazy val isOrcFloatTypesToStringEnable: Boolean = get(ENABLE_ORC_FLOAT_TYPES_TO_STRING) + lazy val isOrcPerFileReadEnabled: Boolean = RapidsReaderType.withName(get(ORC_READER_TYPE)) == RapidsReaderType.PERFILE From c6e05a51a3ef46b0fe9ae41c8d5efdfae1a765ec Mon Sep 17 00:00:00 2001 From: sinkinben Date: Thu, 18 Aug 2022 16:55:04 +0800 Subject: [PATCH 07/11] Refined code according to review comments * Fixed bug when all elements in ColumnVector are null * Updated IT tests Signed-off-by: sinkinben --- docs/configs.md | 2 +- .../src/main/python/orc_cast_float_test.py | 16 +++--- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 51 +++++++++++-------- .../com/nvidia/spark/rapids/RapidsConf.scala | 7 ++- 4 files changed, 43 insertions(+), 33 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 92581801bd1..861feef76be 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -92,7 +92,7 @@ Name | Description | Default Value spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true -spark.rapids.sql.format.orc.floatTypesToString.enable|The float/double numbers in GPU have different precision with CPU. So when casting them to string, the result of GPU is different from result of CPU spark.|true +spark.rapids.sql.format.orc.floatTypesToString.enable|When reading an ORC file, the source data schemas(schemas of ORC file) may differ from the target schemas (schemas of the reader), we need to handle the castings from source type to target type. Since float/double numbers in GPU have different precision with CPU, when casting float/double to string, the result of GPU is different from result of CPU spark.|true spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647 spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true diff --git a/integration_tests/src/main/python/orc_cast_float_test.py b/integration_tests/src/main/python/orc_cast_float_test.py index de52b69a82c..dc8e98e7791 100644 --- a/integration_tests/src/main/python/orc_cast_float_test.py +++ b/integration_tests/src/main/python/orc_cast_float_test.py @@ -25,7 +25,7 @@ def test_casting_from_float_and_double(spark_tmp_path, to_type): orc_path = spark_tmp_path + '/orc_casting_from_float_and_double' data_gen = [('float_column', float_gen), ('double_column', double_gen)] with_cpu_session( - lambda spark: gen_df(spark, data_gen).write.mode('overwrite').orc(orc_path) + lambda spark: gen_df(spark, data_gen).write.orc(orc_path) ) schema_str = "float_column {}, double_column {}".format(to_type, to_type) assert_gpu_and_cpu_are_equal_collect( @@ -34,23 +34,23 @@ def test_casting_from_float_and_double(spark_tmp_path, to_type): @pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None), - DoubleGen(max_exp=32, special_cases=[8.88e32, 9.99e33, 3.14159e34, 2.712e35, 2e36])]) + DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])]) def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): # ORC will assume the original double value in seconds, we need to convert them to # timestamp(INT64 in micro-seconds). # - # Since datetime library in python requires year >= 0, and UTC timestamp is start from 1970/1/1 00:00:00, - # that is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2e32. + # The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1. + # That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32. # So we set max_exp = 32 in DoubleGen. # - # The maximum valid positive number is INT64_MAX / 1e6 -> 1e12 -> 2e36, so we add some special cases - # from 2e33 to 2e36. + # The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37, + # so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11). # # In DoubleGen, special_case=None will generate some NaN, INF corner cases. orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' with_cpu_session( - lambda spark: unary_op_df(spark, data_gen).write.mode('overwrite').orc(orc_path) + lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path) ) # the name of unique column is 'a', cast it into timestamp type assert_gpu_and_cpu_are_equal_collect( @@ -61,7 +61,7 @@ def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp' with_cpu_session( - lambda spark: unary_op_df(spark, DoubleGen(min_exp=37)).write.mode('overwrite').orc(orc_path) + lambda spark: unary_op_df(spark, DoubleGen(min_exp=38)).write.orc(orc_path) ) assert_gpu_and_cpu_error( df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 084494e6165..28fb258c9f3 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -122,7 +122,6 @@ case class GpuOrcScan( } object GpuOrcScan extends Arm { - var rapidsConf: RapidsConf = null def tagSupport(scanMeta: ScanMeta[OrcScan]): Unit = { val scan = scanMeta.wrapped @@ -153,10 +152,6 @@ object GpuOrcScan extends Arm { .getOption("spark.sql.orc.mergeSchema").exists(_.toBoolean)) { meta.willNotWorkOnGpu("mergeSchema and schema evolution is not supported yet") } - - // We need the value of 'isOrcFloatTypesToStringEnable' in 'canCast' method. - // So we get a reference of 'meta.conf'. - rapidsConf = meta.conf } private lazy val numericLevels = Seq( @@ -341,12 +336,16 @@ object GpuOrcScan extends Arm { // Cast milli-seconds to micro-seconds // We need to pay attention that when convert (milliSeconds * 1000) to INT64, there may be // INT64-overflow. + // In this step, ORC casting of CPU throw an exception rather than replace such values with + // null. We followed the CPU code here. withResource(milliSeconds) { _ => // Test whether if there is long-overflow // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getDouble.toLong will return // LONG_MAX. If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will // throw an exception (as CPU code does). - Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000.toLong) + if (milliSeconds.max() != null) { + Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000L) + } withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => withResource(microSeconds.castTo(DType.INT64)) { longVec => longVec.castTo(DType.TIMESTAMP_MICROSECONDS) @@ -367,7 +366,8 @@ object GpuOrcScan extends Arm { * but the ones between GPU supported types. * Each supported casting is implemented in "castColumnTo". */ - def canCast(from: TypeDescription, to: TypeDescription): Boolean = { + def canCast(from: TypeDescription, to: TypeDescription, + isOrcFloatTypesToStringEnable: Boolean): Boolean = { import org.apache.orc.TypeDescription.Category._ if (!to.getCategory.isPrimitive || !from.getCategory.isPrimitive) { // Don't convert from any to complex, or from complex to any. @@ -386,9 +386,7 @@ object GpuOrcScan extends Arm { case FLOAT | DOUBLE => to.getCategory match { case BOOLEAN | BYTE | SHORT | INT | LONG | FLOAT | DOUBLE | TIMESTAMP => true - case STRING => { - rapidsConf != null && rapidsConf.isOrcFloatTypesToStringEnable - } + case STRING => isOrcFloatTypesToStringEnable case _ => false } // TODO more types, tracked in https://github.com/NVIDIA/spark-rapids/issues/5895 @@ -429,7 +427,8 @@ case class GpuOrcMultiFilePartitionReaderFactory( private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix) private val numThreads = rapidsConf.multiThreadReadNumThreads private val maxNumFileProcessed = rapidsConf.maxNumOrcFilesParallel - private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters) + private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, filters, + rapidsConf.isOrcFloatTypesToStringEnable) private val ignoreMissingFiles = sqlConf.ignoreMissingFiles private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles @@ -512,7 +511,8 @@ case class GpuOrcPartitionReaderFactory( private val debugDumpPrefix = Option(rapidsConf.orcDebugDumpPrefix) private val maxReadBatchSizeRows: Integer = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes: Long = rapidsConf.maxReadBatchSizeBytes - private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters) + private val filterHandler = GpuOrcFileFilterHandler(sqlConf, broadcastedConf, pushedFilters, + rapidsConf.isOrcFloatTypesToStringEnable) override def supportColumnarReads(partition: InputPartition): Boolean = true @@ -1043,7 +1043,8 @@ private object OrcTools extends Arm { private case class GpuOrcFileFilterHandler( @transient sqlConf: SQLConf, broadcastedConf: Broadcast[SerializableConfiguration], - pushedFilters: Array[Filter]) extends Arm { + pushedFilters: Array[Filter], + isOrcFloatTypesToStringEnable: Boolean) extends Arm { private[rapids] val isCaseSensitive = sqlConf.caseSensitiveAnalysis @@ -1138,7 +1139,7 @@ private case class GpuOrcFileFilterHandler( val isCaseSensitive = readerOpts.getIsSchemaEvolutionCaseAware val (updatedReadSchema, fileIncluded) = checkSchemaCompatibility(orcReader.getSchema, - readerOpts.getSchema, isCaseSensitive) + readerOpts.getSchema, isCaseSensitive, isOrcFloatTypesToStringEnable) // GPU has its own read schema, so unset the reader include to read all the columns // specified by its read schema. readerOpts.include(null) @@ -1318,11 +1319,13 @@ private case class GpuOrcFileFilterHandler( private def checkSchemaCompatibility( fileSchema: TypeDescription, readSchema: TypeDescription, - isCaseAware: Boolean): (TypeDescription, Array[Boolean]) = { + isCaseAware: Boolean, + isOrcFloatTypesToStringEnable: Boolean): (TypeDescription, Array[Boolean]) = { // all default to false val fileIncluded = new Array[Boolean](fileSchema.getMaximumId + 1) val isForcePos = OrcShims.forcePositionalEvolution(conf) - (checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos), + (checkTypeCompatibility(fileSchema, readSchema, isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable), fileIncluded) } @@ -1336,7 +1339,8 @@ private case class GpuOrcFileFilterHandler( readType: TypeDescription, isCaseAware: Boolean, fileIncluded: Array[Boolean], - isForcePos: Boolean): TypeDescription = { + isForcePos: Boolean, + isOrcFloatTypesToStringEnable: Boolean): TypeDescription = { (fileType.getCategory, readType.getCategory) match { case (TypeDescription.Category.STRUCT, TypeDescription.Category.STRUCT) => // Check for the top or nested struct types. @@ -1364,7 +1368,7 @@ private case class GpuOrcFileFilterHandler( .zipWithIndex.foreach { case ((fileFieldName, fType), idx) => getReadFieldType(fileFieldName, idx).foreach { case (rField, rType) => val newChild = checkTypeCompatibility(fType, rType, - isCaseAware, fileIncluded, isForcePos) + isCaseAware, fileIncluded, isForcePos, isOrcFloatTypesToStringEnable) prunedReadSchema.addField(rField, newChild) } } @@ -1374,19 +1378,22 @@ private case class GpuOrcFileFilterHandler( // for struct children. case (TypeDescription.Category.LIST, TypeDescription.Category.LIST) => val newChild = checkTypeCompatibility(fileType.getChildren.get(0), - readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) fileIncluded(fileType.getId) = true TypeDescription.createList(newChild) case (TypeDescription.Category.MAP, TypeDescription.Category.MAP) => val newKey = checkTypeCompatibility(fileType.getChildren.get(0), - readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(0), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) val newValue = checkTypeCompatibility(fileType.getChildren.get(1), - readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos) + readType.getChildren.get(1), isCaseAware, fileIncluded, isForcePos, + isOrcFloatTypesToStringEnable) fileIncluded(fileType.getId) = true TypeDescription.createMap(newKey, newValue) case (ft, rt) if ft.isPrimitive && rt.isPrimitive => if (OrcShims.typeDescriptionEqual(fileType, readType) || - GpuOrcScan.canCast(fileType, readType)) { + GpuOrcScan.canCast(fileType, readType, isOrcFloatTypesToStringEnable)) { // Since type casting is supported, here should return the file type. fileIncluded(fileType.getId) = true fileType.clone() diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 1d7fe164f48..01eeb2759a4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -866,8 +866,11 @@ object RapidsConf { val ENABLE_ORC_FLOAT_TYPES_TO_STRING = conf("spark.rapids.sql.format.orc.floatTypesToString.enable") - .doc("The float/double numbers in GPU have different precision with CPU. So when casting " + - "them to string, the result of GPU is different from result of CPU spark.") + .doc("When reading an ORC file, the source data schemas(schemas of ORC file) may differ " + + "from the target schemas (schemas of the reader), we need to handle the castings from " + + "source type to target type. Since float/double numbers in GPU have different precision " + + "with CPU, when casting float/double to string, the result of GPU is different from " + + "result of CPU spark.") .booleanConf .createWithDefault(true) From 0cf548a51870ea388b042faa5c6f2a4cfce3d3af Mon Sep 17 00:00:00 2001 From: sinkinben Date: Mon, 29 Aug 2022 17:10:14 +0800 Subject: [PATCH 08/11] Refined some comments, and move orc_cast_float_test to orc_cast_test Signed-off-by: sinkinben --- .../src/main/python/orc_cast_float_test.py | 70 ------------------- .../src/main/python/orc_cast_test.py | 52 ++++++++++++++ .../com/nvidia/spark/rapids/GpuOrcScan.scala | 5 +- 3 files changed, 54 insertions(+), 73 deletions(-) delete mode 100644 integration_tests/src/main/python/orc_cast_float_test.py diff --git a/integration_tests/src/main/python/orc_cast_float_test.py b/integration_tests/src/main/python/orc_cast_float_test.py deleted file mode 100644 index dc8e98e7791..00000000000 --- a/integration_tests/src/main/python/orc_cast_float_test.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright (c) 2020-2022, NVIDIA CORPORATION. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error -from data_gen import * -from pyspark.sql.types import * -from spark_session import with_cpu_session - - -@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint']) -def test_casting_from_float_and_double(spark_tmp_path, to_type): - orc_path = spark_tmp_path + '/orc_casting_from_float_and_double' - data_gen = [('float_column', float_gen), ('double_column', double_gen)] - with_cpu_session( - lambda spark: gen_df(spark, data_gen).write.orc(orc_path) - ) - schema_str = "float_column {}, double_column {}".format(to_type, to_type) - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.read.schema(schema_str).orc(orc_path) - ) - - -@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None), - DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])]) -def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): - # ORC will assume the original double value in seconds, we need to convert them to - # timestamp(INT64 in micro-seconds). - # - # The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1. - # That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32. - # So we set max_exp = 32 in DoubleGen. - # - # The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37, - # so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11). - # - # In DoubleGen, special_case=None will generate some NaN, INF corner cases. - - orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' - with_cpu_session( - lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path) - ) - # the name of unique column is 'a', cast it into timestamp type - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.read.schema("a timestamp").orc(orc_path) - ) - - -def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): - orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp' - with_cpu_session( - lambda spark: unary_op_df(spark, DoubleGen(min_exp=38)).write.orc(orc_path) - ) - assert_gpu_and_cpu_error( - df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(), - conf={}, - error_message="ArithmeticException" - ) diff --git a/integration_tests/src/main/python/orc_cast_test.py b/integration_tests/src/main/python/orc_cast_test.py index 98fcbc8b22f..5cae9b74a25 100644 --- a/integration_tests/src/main/python/orc_cast_test.py +++ b/integration_tests/src/main/python/orc_cast_test.py @@ -67,6 +67,7 @@ def test_casting_from_integer(spark_tmp_path, to_type): schema_str.format(*([to_type] * len(data_gen)))).orc(orc_path) ) + @pytest.mark.parametrize('overflow_long_gen', [LongGen(min_val=int(1e16)), LongGen(max_val=int(-1e16))]) @pytest.mark.parametrize('to_type', ['timestamp']) @@ -82,3 +83,54 @@ def test_casting_from_overflow_long(spark_tmp_path, overflow_long_gen,to_type): conf={}, error_message="ArithmeticException" ) + + +# When casting float/double to double/float, we need to compare values of GPU with CPU +# in an approximate way. +@pytest.mark.approximate_float +@pytest.mark.parametrize('to_type', ['float', 'double', 'boolean', 'tinyint', 'smallint', 'int', 'bigint']) +def test_casting_from_float_and_double(spark_tmp_path, to_type): + orc_path = spark_tmp_path + '/orc_casting_from_float_and_double' + data_gen = [('float_column', float_gen), ('double_column', double_gen)] + create_orc(data_gen, orc_path) + schema_str = "float_column {}, double_column {}".format(to_type, to_type) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema(schema_str).orc(orc_path) + ) + + +@pytest.mark.parametrize('data_gen', [DoubleGen(max_exp=32, special_cases=None), + DoubleGen(max_exp=32, special_cases=[8.88e9, 9.99e10, 1.314e11])]) +def test_casting_from_double_to_timestamp(spark_tmp_path, data_gen): + # ORC will assume the original double value in seconds, we need to convert them to + # timestamp(INT64 in micro-seconds). + # + # The 'datetime' module in python requires 0 <= year < 10000, and UTC timestamp is start from 1970/1/1. + # That is, the minimum valid negative number is -1970 * 365 * 24 * 3600 = -62125920000 -> 6e10 -> 2^32. + # So we set max_exp = 32 in DoubleGen. + # + # The maximum valid positive number is (10000 - 1970) * 365 * 24 * 3600 = 253234080000 -> 2e11 -> 2^37, + # so we add some special cases from 2^33 - 2^37 (8e9 ~ 1e11). + # + # In DoubleGen, special_case=None will generate some NaN, INF corner cases. + + orc_path = spark_tmp_path + '/orc_casting_from_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, data_gen).write.orc(orc_path) + ) + # the name of unique column is 'a', cast it into timestamp type + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.read.schema("a timestamp").orc(orc_path) + ) + + +def test_casting_from_overflow_double_to_timestamp(spark_tmp_path): + orc_path = spark_tmp_path + '/orc_casting_from_overflow_double_to_timestamp' + with_cpu_session( + lambda spark: unary_op_df(spark, DoubleGen(min_exp=38)).write.orc(orc_path) + ) + assert_gpu_and_cpu_error( + df_fun=lambda spark: spark.read.schema("a timestamp").orc(orc_path).collect(), + conf={}, + error_message="ArithmeticException" + ) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 9112f7fae39..05222b14be4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -361,11 +361,10 @@ object GpuOrcScan extends Arm { // null. We followed the CPU code here. withResource(milliSeconds) { _ => // Test whether if there is long-overflow - // If milliSeconds.max() > LONG_MAX, then milliSeconds.max().getDouble.toLong will return - // LONG_MAX. If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will + // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will // throw an exception (as CPU code does). if (milliSeconds.max() != null) { - Math.multiplyExact(milliSeconds.max().getDouble.toLong, 1000L) + testLongMultiplicationOverflow(milliSeconds.max().getDouble.toLong, 1000L) } withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => withResource(microSeconds.castTo(DType.INT64)) { longVec => From e6e1aa9a1af4fe47fb1604449bf697da9f770871 Mon Sep 17 00:00:00 2001 From: sinkinben Date: Wed, 31 Aug 2022 15:28:07 +0800 Subject: [PATCH 09/11] Fixed some conflicts and update docs about ENABLE_ORC_FLOAT_TYPES_TO_STRING Signed-off-by: sinkinben --- docs/configs.md | 2 +- .../src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 136ac8915f5..1e5be3dfc7a 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -92,7 +92,7 @@ Name | Description | Default Value spark.rapids.sql.format.json.enabled|When set to true enables all json input and output acceleration. (only input is currently supported anyways)|false spark.rapids.sql.format.json.read.enabled|When set to true enables json input acceleration|false spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true -spark.rapids.sql.format.orc.floatTypesToString.enable|When reading an ORC file, the source data schemas(schemas of ORC file) may differ from the target schemas (schemas of the reader), we need to handle the castings from source type to target type. Since float/double numbers in GPU have different precision with CPU, when casting float/double to string, the result of GPU is different from result of CPU spark.|true +spark.rapids.sql.format.orc.floatTypesToString.enable|When reading an ORC file, the source data schemas(schemas of ORC file) may differ from the target schemas (schemas of the reader), we need to handle the castings from source type to target type. Since float/double numbers in GPU have different precision with CPU, when casting float/double to string, the result of GPU is different from result of CPU spark. Its default value is `true` (this means the strings result will differ from result of CPU). If it's set `false` explicitly and there exists casting from float/double to string in the job, then such behavior will cause an exception, and the job will fail.|true spark.rapids.sql.format.orc.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type.|2147483647 spark.rapids.sql.format.orc.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small ORC files in parallel. This can not be changed at runtime after the executor has started. Used with MULTITHREADED reader, see spark.rapids.sql.format.orc.reader.type. DEPRECATED: use spark.rapids.sql.multiThreadedRead.numThreads|None spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 846404ad9a0..c228c4ce2b8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -870,7 +870,10 @@ object RapidsConf { "from the target schemas (schemas of the reader), we need to handle the castings from " + "source type to target type. Since float/double numbers in GPU have different precision " + "with CPU, when casting float/double to string, the result of GPU is different from " + - "result of CPU spark.") + "result of CPU spark. Its default value is `true` (this means the strings result will " + + "differ from result of CPU). If it's set `false` explicitly and there exists casting " + + "from float/double to string in the job, then such behavior will cause an exception, " + + "and the job will fail.") .booleanConf .createWithDefault(true) From db0f0d2c63b8150f4007111ab2aa7876b3e8aa6a Mon Sep 17 00:00:00 2001 From: sinkinben Date: Tue, 6 Sep 2022 15:35:42 +0800 Subject: [PATCH 10/11] Refined float/double -> timestamp, and fixed some conflicts Signed-off-by: sinkinben --- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index f60a908dc11..4d0a1127c35 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -342,9 +342,10 @@ object GpuOrcScan extends Arm { case (DType.FLOAT32 | DType.FLOAT64, DType.TIMESTAMP_MICROSECONDS) => // Follow the CPU ORC conversion. // val doubleMillis = doubleValue * 1000, - // val millis = Math.round(doubleMillis) - // if (noOverflow) millis else null - val milliSeconds = withResource(Scalar.fromDouble(1000.0)) { thousand => + // val milliseconds = Math.round(doubleMillis) + // if (noOverflow) { milliseconds } else { null } + val milliseconds = withResource(Scalar.fromDouble(DateTimeConstants.MILLIS_PER_SECOND)) { + thousand => // ORC assumes value is in seconds withResource(col.mul(thousand, DType.FLOAT64)) { doubleMillis => withResource(doubleMillis.round()) { millis => @@ -359,16 +360,21 @@ object GpuOrcScan extends Arm { // INT64-overflow. // In this step, ORC casting of CPU throw an exception rather than replace such values with // null. We followed the CPU code here. - withResource(milliSeconds) { _ => + withResource(milliseconds) { _ => // Test whether if there is long-overflow // If milliSeconds.max() * 1000 > LONG_MAX, then 'Math.multiplyExact' will // throw an exception (as CPU code does). - if (milliSeconds.max() != null) { - testLongMultiplicationOverflow(milliSeconds.max().getDouble.toLong, 1000L) + withResource(milliseconds.max()) { maxValue => + if (maxValue.isValid) { + testLongMultiplicationOverflow(maxValue.getDouble.toLong, + DateTimeConstants.MICROS_PER_MILLIS) + } } - withResource(milliSeconds.mul(Scalar.fromDouble(1000.0))) { microSeconds => - withResource(microSeconds.castTo(DType.INT64)) { longVec => - longVec.castTo(DType.TIMESTAMP_MICROSECONDS) + withResource(Scalar.fromDouble(DateTimeConstants.MICROS_PER_MILLIS)) { thousand => + withResource(milliseconds.mul(thousand)) { microseconds => + withResource(microseconds.castTo(DType.INT64)) { longVec => + longVec.bitCastTo(DType.TIMESTAMP_MICROSECONDS) + } } } } From 46e71f9ead9408f1036a6044631254d75f1558cf Mon Sep 17 00:00:00 2001 From: sinkinben Date: Tue, 6 Sep 2022 15:59:42 +0800 Subject: [PATCH 11/11] Fixed memory SEG fault Signed-off-by: sinkinben --- .../src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 4d0a1127c35..38ca1d484dd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -373,7 +373,7 @@ object GpuOrcScan extends Arm { withResource(Scalar.fromDouble(DateTimeConstants.MICROS_PER_MILLIS)) { thousand => withResource(milliseconds.mul(thousand)) { microseconds => withResource(microseconds.castTo(DType.INT64)) { longVec => - longVec.bitCastTo(DType.TIMESTAMP_MICROSECONDS) + longVec.castTo(DType.TIMESTAMP_MICROSECONDS) } } }