From c18b4e59d646e01edcc4156f12a41892fd6891c2 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 17 Feb 2022 15:05:21 -0600 Subject: [PATCH] Default integration test configs to allow negative decimal scale (#4812) Signed-off-by: Jason Lowe --- .../src/main/python/arithmetic_ops_test.py | 102 +++++++----------- .../src/main/python/array_test.py | 42 +++----- .../src/main/python/cache_test.py | 38 +++---- .../src/main/python/cast_test.py | 33 +++--- integration_tests/src/main/python/cmp_test.py | 45 +++----- .../src/main/python/conditionals_test.py | 18 ++-- integration_tests/src/main/python/data_gen.py | 4 +- .../src/main/python/explain_mode_test.py | 1 - .../src/main/python/generate_expr_test.py | 5 +- .../src/main/python/hash_aggregate_test.py | 39 +++---- .../src/main/python/join_test.py | 86 ++++++--------- .../src/main/python/limit_test.py | 10 +- integration_tests/src/main/python/map_test.py | 31 ++---- .../src/main/python/project_lit_alias_test.py | 6 +- .../src/main/python/repart_test.py | 24 ++--- .../src/main/python/sample_test.py | 8 +- .../src/main/python/sort_test.py | 32 ++---- .../src/main/python/spark_session.py | 8 +- .../src/main/python/struct_test.py | 8 +- .../src/main/python/time_window_test.py | 3 +- .../src/main/python/window_function_test.py | 21 ++-- 21 files changed, 203 insertions(+), 361 deletions(-) diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 94f30037a35..8c3b0fe9622 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -21,7 +21,6 @@ from pyspark.sql.types import IntegralType from spark_session import with_cpu_session, with_gpu_session, with_spark_session, is_before_spark_311, is_before_spark_320, is_databricks91_or_later import pyspark.sql.functions as f -from pyspark.sql.utils import IllegalArgumentException # No overflow gens here because we just focus on verifying the fallback to CPU when # enabling ANSI mode. But overflows will fail the tests because CPU runs raise @@ -47,8 +46,7 @@ def test_addition(data_gen): f.lit(-12).cast(data_type) + f.col('b'), f.lit(None).cast(data_type) + f.col('a'), f.col('b') + f.lit(None).cast(data_type), - f.col('a') + f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + f.col('a') + f.col('b'))) # If it will not overflow for multiply it is good for add too @pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn) @@ -61,7 +59,7 @@ def test_addition_ansi_no_overflow(data_gen): f.lit(None).cast(data_type) + f.col('a'), f.col('b') + f.lit(None).cast(data_type), f.col('a') + f.col('b')), - conf={'spark.sql.ansi.enabled': 'true'}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + decimal_128_gens, ids=idfn) def test_subtraction(data_gen): @@ -72,8 +70,7 @@ def test_subtraction(data_gen): f.lit(-12).cast(data_type) - f.col('b'), f.lit(None).cast(data_type) - f.col('a'), f.col('b') - f.lit(None).cast(data_type), - f.col('a') - f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + f.col('a') - f.col('b'))) # If it will not overflow for multiply it is good for subtract too @pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn) @@ -86,7 +83,7 @@ def test_subtraction_ansi_no_overflow(data_gen): f.lit(None).cast(data_type) - f.col('a'), f.col('b') - f.lit(None).cast(data_type), f.col('a') - f.col('b')), - conf={'spark.sql.ansi.enabled': 'true'}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', numeric_gens + [decimal_gen_neg_scale, decimal_gen_scale_precision, decimal_gen_same_scale_precision, @@ -99,8 +96,7 @@ def test_multiplication(data_gen): f.lit(-12).cast(data_type) * f.col('b'), f.lit(None).cast(data_type) * f.col('a'), f.col('b') * f.lit(None).cast(data_type), - f.col('a') * f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + f.col('a') * f.col('b'))) @allow_non_gpu('ProjectExec', 'Alias', 'Multiply', 'Cast') @pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens, ids=idfn) @@ -109,7 +105,7 @@ def test_multiplication_fallback_when_ansi_enabled(data_gen): lambda spark : binary_op_df(spark, data_gen).select( f.col('a') * f.col('b')), 'Multiply', - conf={'spark.sql.ansi.enabled': 'true'}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', [float_gen, double_gen, decimal_gen_scale_precision], ids=idfn) @@ -119,15 +115,14 @@ def test_multiplication_ansi_enabled(data_gen): lambda spark : binary_op_df(spark, data_gen).select( f.col('a') * f.lit(100).cast(data_type), f.col('a') * f.col('b')), - conf={'spark.sql.ansi.enabled': 'true'}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('lhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 5), DecimalGen(6, 4), DecimalGen(5, 4), DecimalGen(5, 3), DecimalGen(4, 2), DecimalGen(3, -2), DecimalGen(16, 7)], ids=idfn) @pytest.mark.parametrize('rhs', [byte_gen, short_gen, int_gen, long_gen, DecimalGen(6, 3), DecimalGen(10, -2), DecimalGen(15, 3)], ids=idfn) def test_multiplication_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( - f.col('a') * f.col('b')), - conf = allow_negative_scale_of_decimal_conf) + f.col('a') * f.col('b'))) @approximate_float # we should get the perfectly correct answer for floats except when casting a decimal to a float in some corner cases. @pytest.mark.parametrize('lhs', [float_gen, double_gen], ids=idfn) @@ -136,8 +131,7 @@ def test_float_multiplication_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( f.col('a') * f.col('b')), - conf=copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})) + conf={'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}) @pytest.mark.parametrize('data_gen', [double_gen, decimal_gen_neg_scale, DecimalGen(6, 3), DecimalGen(5, 5), DecimalGen(6, 0), DecimalGen(7, 4), DecimalGen(15, 0), DecimalGen(18, 0), @@ -150,8 +144,7 @@ def test_division(data_gen): f.lit(-12).cast(data_type) / f.col('b'), f.lit(None).cast(data_type) / f.col('a'), f.col('b') / f.lit(None).cast(data_type), - f.col('a') / f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + f.col('a') / f.col('b'))) @allow_non_gpu('ProjectExec', 'Alias', 'Divide', 'Cast', 'PromotePrecision', 'CheckOverflow') @pytest.mark.parametrize('data_gen', [DecimalGen(38, 21), DecimalGen(21, 17)], ids=idfn) @@ -166,8 +159,7 @@ def test_division_fallback_on_decimal(data_gen): def test_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( - f.col('a') / f.col('b')), - conf = allow_negative_scale_of_decimal_conf) + f.col('a') / f.col('b'))) @approximate_float # we should get the perfectly correct answer for floats except when casting a decimal to a float in some corner cases. @pytest.mark.parametrize('rhs', [float_gen, double_gen], ids=idfn) @@ -176,8 +168,7 @@ def test_float_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).select( f.col('a') / f.col('b')), - conf=copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})) + conf={'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}) @ignore_order @pytest.mark.parametrize('rhs,rhs_type', [ @@ -187,15 +178,13 @@ def test_float_division_mixed(lhs, rhs): (DecimalGen(15, 3), DecimalType(27, 7)), (DecimalGen(3, -3), DecimalType(20, -3))], ids=idfn) def test_decimal_division_mixed_no_overflow_guarantees(lhs, lhs_type, rhs, rhs_type): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.decimalOverflowGuarantees': 'false'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs)\ .withColumn('lhs', f.col('a').cast(lhs_type))\ .withColumn('rhs', f.col('b').cast(rhs_type))\ .repartition(1)\ .select(f.col('lhs'), f.col('rhs'), f.col('lhs') / f.col('rhs')), - conf = conf) + conf={'spark.rapids.sql.decimalOverflowGuarantees': 'false'}) @ignore_order @pytest.mark.parametrize('rhs,rhs_type', [ @@ -205,15 +194,13 @@ def test_decimal_division_mixed_no_overflow_guarantees(lhs, lhs_type, rhs, rhs_t (DecimalGen(10, 3), DecimalType(27, 7)), (DecimalGen(3, -3), DecimalType(20, -3))], ids=idfn) def test_decimal_multiplication_mixed_no_overflow_guarantees(lhs, lhs_type, rhs, rhs_type): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.decimalOverflowGuarantees': 'false'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs)\ .withColumn('lhs', f.col('a').cast(lhs_type))\ .withColumn('rhs', f.col('b').cast(rhs_type))\ .repartition(1)\ .select(f.col('lhs'), f.col('rhs'), f.col('lhs') * f.col('rhs')), - conf = conf) + conf={'spark.rapids.sql.decimalOverflowGuarantees': 'false'}) @pytest.mark.parametrize('data_gen', integral_gens + [decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit, decimal_gen_18_3, decimal_gen_30_2, @@ -233,8 +220,7 @@ def test_int_division(data_gen): def test_int_division_mixed(lhs, rhs): assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, lhs, rhs).selectExpr( - 'a DIV b'), - conf=allow_negative_scale_of_decimal_conf) + 'a DIV b')) @pytest.mark.parametrize('data_gen', numeric_gens + decimal_128_gens + [decimal_gen_scale_precision, decimal_gen_64bit], ids=idfn) @@ -246,8 +232,7 @@ def test_mod(data_gen): f.lit(-12).cast(data_type) % f.col('b'), f.lit(None).cast(data_type) % f.col('a'), f.col('b') % f.lit(None).cast(data_type), - f.col('a') % f.col('b')), - conf=allow_negative_scale_of_decimal_conf) + f.col('a') % f.col('b'))) @pytest.mark.parametrize('data_gen', numeric_gens + decimal_128_gens_no_neg + [decimal_gen_scale_precision, decimal_gen_64bit], ids=idfn) @@ -269,15 +254,13 @@ def test_signum(data_gen): @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + decimal_128_gens, ids=idfn) def test_unary_minus(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr('-a'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr('-a')) @pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [float_gen, double_gen] + decimal_gens + decimal_128_gens, ids=idfn) def test_unary_minus_ansi_no_overflow(data_gen): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr('-a'), - conf=conf) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_type,value', [ (LongType(), LONG_MIN), @@ -285,10 +268,9 @@ def test_unary_minus_ansi_no_overflow(data_gen): (ShortType(), SHORT_MIN), (ByteType(), BYTE_MIN)], ids=idfn) def test_unary_minus_ansi_overflow(data_type, value): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'}) assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, '-a').collect(), - conf=conf, + conf=ansi_enabled_conf, error_message='ArithmeticException') # This just ends up being a pass through. There is no good way to force @@ -297,22 +279,19 @@ def test_unary_minus_ansi_overflow(data_type, value): @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + decimal_128_gens, ids=idfn) def test_unary_positive(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr('+a'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr('+a')) @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + decimal_128_gens, ids=idfn) def test_abs(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)')) # ANSI is ignored for abs prior to 3.2.0, but still okay to test it a little more. @pytest.mark.parametrize('data_gen', _no_overflow_multiply_gens + [float_gen, double_gen] + decimal_gens + decimal_128_gens, ids=idfn) def test_abs_ansi_no_overflow(data_gen): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr('abs(a)'), - conf=conf) + conf=ansi_enabled_conf) # Only run this test for Spark v3.2.0 and later to verify abs will # throw exceptions for overflow when ANSI mode is enabled. @@ -323,10 +302,9 @@ def test_abs_ansi_no_overflow(data_gen): (ShortType(), SHORT_MIN), (ByteType(), BYTE_MIN)], ids=idfn) def test_abs_ansi_overflow(data_type, value): - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.sql.ansi.enabled': 'true'}) assert_gpu_and_cpu_error( df_fun=lambda spark: _get_overflow_df(spark, [value], data_type, 'abs(a)').collect(), - conf=conf, + conf=ansi_enabled_conf, error_message='ArithmeticException') @approximate_float @@ -351,24 +329,22 @@ def test_hypot(data_gen): @pytest.mark.parametrize('data_gen', double_n_long_gens + decimal_gens + decimal_128_gens_no_neg, ids=idfn) def test_floor(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a)'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr('floor(a)')) @pytest.mark.parametrize('data_gen', double_n_long_gens + decimal_gens + decimal_128_gens_no_neg, ids=idfn) def test_ceil(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).selectExpr('ceil(a)'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).selectExpr('ceil(a)')) @pytest.mark.parametrize('data_gen', [decimal_gen_36_neg5, decimal_gen_38_neg10], ids=idfn) def test_floor_ceil_overflow(data_gen): assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr('floor(a)').collect(), - conf=allow_negative_scale_of_decimal_conf, + conf={}, error_message="ArithmeticException") assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr('ceil(a)').collect(), - conf=allow_negative_scale_of_decimal_conf, + conf={}, error_message="ArithmeticException") @pytest.mark.parametrize('data_gen', double_gens, ids=idfn) @@ -425,8 +401,7 @@ def test_decimal_bround(data_gen): 'bround(a, -1)', 'bround(a, 1)', 'bround(a, 2)', - 'bround(a, 10)'), - conf=allow_negative_scale_of_decimal_conf) + 'bround(a, 10)')) @incompat @approximate_float @@ -438,8 +413,7 @@ def test_decimal_round(data_gen): 'round(a, -1)', 'round(a, 1)', 'round(a, 2)', - 'round(a, 10)'), - conf=allow_negative_scale_of_decimal_conf) + 'round(a, 10)')) @incompat @approximate_float @@ -712,7 +686,7 @@ def test_least(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gen).select( - f.least(*command_args)), conf=allow_negative_scale_of_decimal_conf) + f.least(*command_args))) @pytest.mark.parametrize('data_gen', all_basic_gens + decimal_gens + decimal_128_gens, ids=idfn) def test_greatest(data_gen): @@ -726,7 +700,7 @@ def test_greatest(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gen).select( - f.greatest(*command_args)), conf=allow_negative_scale_of_decimal_conf) + f.greatest(*command_args))) def _test_div_by_zero(ansi_mode, expr): @@ -813,21 +787,20 @@ def test_div_overflow_no_exception_when_ansi(expr, ansi_enabled): @pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_add_overflow, ids=idfn) def test_add_overflow_with_ansi_enabled(data, tp, expr): - ansi_conf = {'spark.sql.ansi.enabled': 'true'} if isinstance(tp, IntegralType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), - conf=ansi_conf, + conf=ansi_enabled_conf, error_message='overflow') elif isinstance(tp, DecimalType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), - conf=ansi_conf, + conf=ansi_enabled_conf, error_message='') else: assert_gpu_and_cpu_are_equal_collect( func=lambda spark: _get_overflow_df(spark, data, tp, expr), - conf=ansi_conf) + conf=ansi_enabled_conf) _data_type_expr_for_sub_overflow = [ @@ -843,18 +816,17 @@ def test_add_overflow_with_ansi_enabled(data, tp, expr): @pytest.mark.parametrize('data,tp,expr', _data_type_expr_for_sub_overflow, ids=idfn) def test_subtraction_overflow_with_ansi_enabled(data, tp, expr): - ansi_conf = {'spark.sql.ansi.enabled': 'true'} if isinstance(tp, IntegralType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), - conf=ansi_conf, + conf=ansi_enabled_conf, error_message='overflow') elif isinstance(tp, DecimalType): assert_gpu_and_cpu_error( lambda spark: _get_overflow_df(spark, data, tp, expr).collect(), - conf=ansi_conf, + conf=ansi_enabled_conf, error_message='') else: assert_gpu_and_cpu_are_equal_collect( func=lambda spark: _get_overflow_df(spark, data, tp, expr), - conf=ansi_conf) + conf=ansi_enabled_conf) diff --git a/integration_tests/src/main/python/array_test.py b/integration_tests/src/main/python/array_test.py index 86d1ed1c99c..2522e24d99b 100644 --- a/integration_tests/src/main/python/array_test.py +++ b/integration_tests/src/main/python/array_test.py @@ -34,8 +34,7 @@ def test_array_item(data_gen): 'a[null]', 'a[3]', 'a[50]', - 'a[-1]'), - conf=allow_negative_scale_of_decimal_conf) + 'a[-1]')) # Once we support arrays as literals then we can support a[null] for # all array gens. See test_array_index for more info @@ -68,8 +67,7 @@ def test_orderby_array_unique(data_gen): assert_gpu_and_cpu_are_equal_sql( lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)), 'array_table', - 'select array_table.a, array_table.uniq_int from array_table order by uniq_int', - conf=allow_negative_scale_of_decimal_conf) + 'select array_table.a, array_table.uniq_int from array_table order by uniq_int') @pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10), @@ -128,8 +126,7 @@ def test_get_array_item_ansi_fail(data_gen): message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException" assert_gpu_and_cpu_error(lambda spark: unary_op_df( spark, data_gen).select(col('a')[100]).collect(), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}, + conf=ansi_enabled_conf, error_message=message) @pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range") @@ -137,16 +134,14 @@ def test_get_array_item_ansi_fail(data_gen): def test_get_array_item_ansi_not_fail(data_gen): assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df( spark, data_gen).select(col('a')[100]), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn) def test_array_element_at(data_gen): assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df( spark, data_gen).select(element_at(col('a'), 1), element_at(col('a'), -1)), - conf={'spark.sql.ansi.enabled':False, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf={'spark.sql.ansi.enabled':False}) @pytest.mark.skipif(is_before_spark_311(), reason="Only in Spark 3.1.1 + ANSI mode, array index throws on out of range indexes") @pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn) @@ -154,8 +149,7 @@ def test_array_element_at_ansi_fail(data_gen): message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException" assert_gpu_and_cpu_error(lambda spark: unary_op_df( spark, data_gen).select(element_at(col('a'), 100)).collect(), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}, + conf=ansi_enabled_conf, error_message=message) @pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range") @@ -163,8 +157,7 @@ def test_array_element_at_ansi_fail(data_gen): def test_array_element_at_ansi_not_fail(data_gen): assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df( spark, data_gen).select(element_at(col('a'), 100)), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf=ansi_enabled_conf) # This corner case is for both Spark 3.0.x and 3.1.x # CPU version will return `null` for null[100], not throwing an exception @@ -172,8 +165,7 @@ def test_array_element_at_ansi_not_fail(data_gen): def test_array_element_at_all_null_ansi_not_fail(data_gen): assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df( spark, data_gen).select(element_at(col('a'), 100)), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn) @@ -204,8 +196,7 @@ def do_it(spark): return two_col_df(spark, data_gen, byte_gen).selectExpr(columns) - assert_gpu_and_cpu_are_equal_collect(do_it, - conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_it) # TODO add back in string_gen when https://github.com/rapidsai/cudf/issues/9156 is fixed array_min_max_gens_no_nan = [byte_gen, short_gen, int_gen, long_gen, FloatGen(no_nans=True), DoubleGen(no_nans=True), @@ -216,9 +207,7 @@ def test_array_min(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr( 'array_min(a)'), - conf={ - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true', - 'spark.rapids.sql.hasNans': 'false'}) + conf=no_nans_conf) @pytest.mark.parametrize('data_gen', decimal_128_gens + decimal_gens, ids=idfn) @@ -226,9 +215,7 @@ def test_array_concat_decimal(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : debug_df(unary_op_df(spark, ArrayGen(data_gen)).selectExpr( 'concat(a, a)')), - conf={ - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true', - 'spark.rapids.sql.hasNans': 'false'}) + conf=no_nans_conf) @pytest.mark.parametrize('data_gen', array_min_max_gens_no_nan, ids=idfn) @@ -236,9 +223,7 @@ def test_array_max(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr( 'array_max(a)'), - conf={ - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true', - 'spark.rapids.sql.hasNans': 'false'}) + conf=no_nans_conf) # We add in several types of processing for foldable functions because the output # can be different types. @@ -253,5 +238,4 @@ def test_array_max(data_gen): 'array(array(struct(1 as a, 2 as b), struct(3 as a, 4 as b))) as a_a_s'], ids=idfn) def test_sql_array_scalars(query): assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.sql('SELECT {}'.format(query)), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + lambda spark : spark.sql('SELECT {}'.format(query))) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 023368f4e58..e79add2ed1e 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -66,8 +66,7 @@ def do_join(spark): cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() # populates cache return cached - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -84,8 +83,7 @@ def do_join(spark): cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() #populates the cache return cached.filter("a is not null") - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn) @@ -98,8 +96,7 @@ def do_join(spark): cached.count() return cached - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf) shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160", "spark.sql.join.preferSortMergeJoin": "false", @@ -115,8 +112,7 @@ def do_join(spark): cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() return cached - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn) @@ -128,8 +124,7 @@ def do_join(spark): cached = left.crossJoin(right.hint("broadcast")).cache() cached.count() return cached - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn) @@ -142,8 +137,7 @@ def op_df(spark, length=2048, seed=0): cached.count() # populate the cache return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b")) - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(op_df, conf = conf) + assert_gpu_and_cpu_are_equal_collect(op_df, conf=enable_vectorized_conf) @pytest.mark.parametrize('data_gen', [all_basic_struct_gen, StructGen([['child0', StructGen([['child1', byte_gen]])]]), ArrayGen( @@ -158,9 +152,8 @@ def partial_return(col): def partial_return_cache(spark): return two_col_df(spark, data_gen, string_gen).select(f.col("a"), f.col("b")).cache().limit(50).select(col) return partial_return_cache - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf) - assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf) + assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf=enable_vectorized_conf) + assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf=enable_vectorized_conf) @allow_non_gpu('CollectLimitExec') def test_cache_diff_req_order(spark_tmp_path): @@ -235,8 +228,7 @@ def func(spark): return df.selectExpr("a") - conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf) - assert_gpu_and_cpu_are_equal_collect(func, conf) + assert_gpu_and_cpu_are_equal_collect(func, conf=enable_vectorized_conf) @pytest.mark.parametrize('enable_vectorized', ['false', 'true'], ids=idfn) @pytest.mark.parametrize('with_x_session', [with_gpu_session, with_cpu_session]) @@ -308,10 +300,7 @@ def helper(spark): @pytest.mark.parametrize('batch_size', [{"spark.rapids.sql.batchSizeBytes": "100"}, {}], ids=idfn) @ignore_order def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_size): - test_conf = copy_and_update(enable_vectorized_conf, - allow_negative_scale_of_decimal_conf, - batch_size) - + test_conf = copy_and_update(enable_vectorized_conf, batch_size) function_to_test_on_cached_df(with_x_session, lambda df: df.count(), data_gen, test_conf) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @@ -326,10 +315,7 @@ def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_siz # condition therefore we must allow it in all cases @allow_non_gpu('ColumnarToRowExec') def test_cache_multi_batch(data_gen, with_x_session, enable_vectorized_conf, batch_size): - test_conf = copy_and_update(enable_vectorized_conf, - allow_negative_scale_of_decimal_conf, - batch_size) - + test_conf = copy_and_update(enable_vectorized_conf, batch_size) function_to_test_on_cached_df(with_x_session, lambda df: df.collect(), data_gen, test_conf) @pytest.mark.parametrize('data_gen', all_basic_map_gens + single_level_array_gens_no_null, ids=idfn) diff --git a/integration_tests/src/main/python/cast_test.py b/integration_tests/src/main/python/cast_test.py index 07e514f8816..e51678d913b 100644 --- a/integration_tests/src/main/python/cast_test.py +++ b/integration_tests/src/main/python/cast_test.py @@ -95,8 +95,7 @@ def test_cast_string_timestamp_fallback(): def test_cast_decimal_to(data_gen, to_type): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})) + conf = {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}) @pytest.mark.parametrize('data_gen', [ DecimalGen(7, 1), @@ -115,8 +114,7 @@ def test_cast_decimal_to(data_gen, to_type): DecimalType(1, -1)], ids=meta_idfn('to:')) def test_cast_decimal_to_decimal(data_gen, to_type): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')), - conf = allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a'))) @pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen], ids=idfn) @pytest.mark.parametrize('to_type', [ @@ -136,26 +134,22 @@ def test_cast_integral_to_decimal(data_gen, to_type): def test_cast_byte_to_decimal_overflow(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, byte_gen).select( - f.col('a').cast(DecimalType(2, -1))), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + f.col('a').cast(DecimalType(2, -1)))) def test_cast_short_to_decimal_overflow(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, short_gen).select( - f.col('a').cast(DecimalType(4, -1))), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + f.col('a').cast(DecimalType(4, -1)))) def test_cast_int_to_decimal_overflow(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, int_gen).select( - f.col('a').cast(DecimalType(9, -1))), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + f.col('a').cast(DecimalType(9, -1)))) def test_cast_long_to_decimal_overflow(): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, long_gen).select( - f.col('a').cast(DecimalType(18, -1))), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + f.col('a').cast(DecimalType(18, -1)))) # casting these types to string should be passed basic_gens_for_cast_to_string = [ByteGen, ShortGen, IntegerGen, LongGen, StringGen, BooleanGen, DateGen, TimestampGen] @@ -204,8 +198,7 @@ def test_cast_array_to_string(data_gen, legacy): def test_cast_array_with_unmatched_element_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, - {"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true", - "spark.rapids.sql.castDecimalToString.enabled" : 'true', + {"spark.rapids.sql.castDecimalToString.enabled" : 'true', "spark.rapids.sql.castFloatToString.enabled" : "true", "spark.sql.legacy.castComplexTypesToString.enabled": legacy} ) @@ -216,7 +209,7 @@ def test_cast_array_with_unmatched_element_to_string(data_gen, legacy): def test_cast_map_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, - {"spark.rapids.sql.castDecimalToString.enabled" : 'true', + {"spark.rapids.sql.castDecimalToString.enabled" : 'true', "spark.sql.legacy.castComplexTypesToString.enabled": legacy}) @@ -226,9 +219,8 @@ def test_cast_map_to_string(data_gen, legacy): def test_cast_map_with_unmatched_element_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, - {"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true", - "spark.rapids.sql.castDecimalToString.enabled" : 'true', - "spark.rapids.sql.castFloatToString.enabled" : "true", + {"spark.rapids.sql.castDecimalToString.enabled" : 'true', + "spark.rapids.sql.castFloatToString.enabled" : "true", "spark.sql.legacy.castComplexTypesToString.enabled": legacy} ) @@ -282,8 +274,7 @@ def broken_df(spark): def test_cast_struct_with_unmatched_element_to_string(data_gen, legacy): _assert_cast_to_string_equal( data_gen, - {"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true", - "spark.rapids.sql.castDecimalToString.enabled" : 'true', + {"spark.rapids.sql.castDecimalToString.enabled" : 'true', "spark.rapids.sql.castFloatToString.enabled" : "true", "spark.sql.legacy.castComplexTypesToString.enabled": legacy} ) @@ -297,4 +288,4 @@ def is_neg_dec_scale_bug_version(): def test_cast_string_to_negative_scale_decimal(): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, StringGen("[0-9]{9}")).select( - f.col('a').cast(DecimalType(8, -3))), conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + f.col('a').cast(DecimalType(8, -3)))) diff --git a/integration_tests/src/main/python/cmp_test.py b/integration_tests/src/main/python/cmp_test.py index db54417886a..19f4ac71826 100644 --- a/integration_tests/src/main/python/cmp_test.py +++ b/integration_tests/src/main/python/cmp_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import incompat, approximate_float from spark_session import with_cpu_session from pyspark.sql.types import * import pyspark.sql.functions as f @@ -31,7 +30,7 @@ def test_eq(data_gen): s2 == f.col('b'), f.lit(None).cast(data_type) == f.col('a'), f.col('b') == f.lit(None).cast(data_type), - f.col('a') == f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') == f.col('b'))) @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens, ids=idfn) def test_eq_ns(data_gen): @@ -43,7 +42,7 @@ def test_eq_ns(data_gen): s2.eqNullSafe(f.col('b')), f.lit(None).cast(data_type).eqNullSafe(f.col('a')), f.col('b').eqNullSafe(f.lit(None).cast(data_type)), - f.col('a').eqNullSafe(f.col('b'))), conf=allow_negative_scale_of_decimal_conf) + f.col('a').eqNullSafe(f.col('b')))) @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens, ids=idfn) def test_ne(data_gen): @@ -55,7 +54,7 @@ def test_ne(data_gen): s2 != f.col('b'), f.lit(None).cast(data_type) != f.col('a'), f.col('b') != f.lit(None).cast(data_type), - f.col('a') != f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') != f.col('b'))) @pytest.mark.parametrize('data_gen', orderable_gens + decimal_128_gens, ids=idfn) def test_lt(data_gen): @@ -67,7 +66,7 @@ def test_lt(data_gen): s2 < f.col('b'), f.lit(None).cast(data_type) < f.col('a'), f.col('b') < f.lit(None).cast(data_type), - f.col('a') < f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') < f.col('b'))) @pytest.mark.parametrize('data_gen', orderable_gens + decimal_128_gens, ids=idfn) def test_lte(data_gen): @@ -79,7 +78,7 @@ def test_lte(data_gen): s2 <= f.col('b'), f.lit(None).cast(data_type) <= f.col('a'), f.col('b') <= f.lit(None).cast(data_type), - f.col('a') <= f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') <= f.col('b'))) @pytest.mark.parametrize('data_gen', orderable_gens + decimal_128_gens, ids=idfn) def test_gt(data_gen): @@ -91,7 +90,7 @@ def test_gt(data_gen): s2 > f.col('b'), f.lit(None).cast(data_type) > f.col('a'), f.col('b') > f.lit(None).cast(data_type), - f.col('a') > f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') > f.col('b'))) @pytest.mark.parametrize('data_gen', orderable_gens + decimal_128_gens, ids=idfn) def test_gte(data_gen): @@ -103,14 +102,13 @@ def test_gte(data_gen): s2 >= f.col('b'), f.lit(None).cast(data_type) >= f.col('a'), f.col('b') >= f.lit(None).cast(data_type), - f.col('a') >= f.col('b')), conf=allow_negative_scale_of_decimal_conf) + f.col('a') >= f.col('b'))) @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_isnull(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select( - f.isnull(f.col('a'))), - conf=allow_negative_scale_of_decimal_conf) + f.isnull(f.col('a')))) @pytest.mark.parametrize('data_gen', [FloatGen(), DoubleGen()], ids=idfn) def test_isnan(data_gen): @@ -121,28 +119,24 @@ def test_isnan(data_gen): @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_dropna_any(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).dropna(), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).dropna()) @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_dropna_all(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).dropna(how='all'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).dropna(how='all')) #dropna is really a filter along with a test for null, but lets do an explicit filter test too @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_filter(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a')), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : three_col_df(spark, BooleanGen(), data_gen, data_gen).filter(f.col('a'))) # coalesce batch happens after a filter, but only if something else happens on the GPU after that @pytest.mark.parametrize('data_gen', eq_gens_with_decimal_gen + decimal_128_gens + array_gens_sample + struct_gens_sample + map_gens_sample, ids=idfn) def test_filter_with_project(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2'), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : two_col_df(spark, BooleanGen(), data_gen).filter(f.col('a')).selectExpr('*', 'a as a2')) @pytest.mark.parametrize('expr', [f.lit(True), f.lit(False), f.lit(None).cast('boolean')], ids=idfn) def test_filter_with_lit(expr): @@ -156,11 +150,9 @@ def test_in(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) - 1 # we have to make the scalars in a session so negative scales in decimals are supported - scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))), - conf=allow_negative_scale_of_decimal_conf) + scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))) assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) # Spark supports two different versions of 'IN', and it depends on the spark.sql.optimizer.inSetConversionThreshold conf # This is to test entries over that value. @@ -169,9 +161,6 @@ def test_in_set(data_gen): # nulls are not supported for in on the GPU yet num_entries = int(with_cpu_session(lambda spark: spark.conf.get('spark.sql.optimizer.inSetConversionThreshold'))) + 1 # we have to make the scalars in a session so negative scales in decimals are supported - scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen))), - conf=allow_negative_scale_of_decimal_conf) + scalars = with_cpu_session(lambda spark: list(gen_scalars(data_gen, num_entries, force_no_nulls=not isinstance(data_gen, NullGen)))) assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars)), - conf=allow_negative_scale_of_decimal_conf) - + lambda spark : unary_op_df(spark, data_gen).select(f.col('a').isin(scalars))) diff --git a/integration_tests/src/main/python/conditionals_test.py b/integration_tests/src/main/python/conditionals_test.py index d181bef8668..c596ef5f178 100644 --- a/integration_tests/src/main/python/conditionals_test.py +++ b/integration_tests/src/main/python/conditionals_test.py @@ -16,7 +16,6 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import incompat, approximate_float from pyspark.sql.types import * import pyspark.sql.functions as f @@ -65,8 +64,7 @@ def test_if_else_map(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : three_col_df(spark, boolean_gen, data_gen, data_gen).selectExpr( 'IF(TRUE, b, c)', - 'IF(a, b, c)'), - conf = allow_negative_scale_of_decimal_conf) + 'IF(a, b, c)')) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.parametrize('data_gen', all_gens + all_nested_gens + single_array_gens_sample_with_decimal128 + decimal_128_gens, ids=idfn) @@ -97,8 +95,7 @@ def test_case_when(data_gen): f.when(f.col('_b0'), s1).when(f.lit(False), f.col('_c0')), f.when(f.col('_b0'), s1).when(f.lit(True), f.col('_c0')), f.when(f.col('_b0'), f.lit(None).cast(data_type)).otherwise(f.col('_c0')), - f.when(f.lit(False), f.col('_c0'))), - conf = allow_negative_scale_of_decimal_conf) + f.when(f.lit(False), f.col('_c0')))) @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_nanvl(data_gen): @@ -140,8 +137,7 @@ def test_coalesce(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, gen).select( - f.coalesce(*command_args)), - conf = allow_negative_scale_of_decimal_conf) + f.coalesce(*command_args))) def test_coalesce_constant_output(): # Coalesce can allow a constant value as output. Technically Spark should mark this @@ -191,7 +187,7 @@ def test_conditional_with_side_effects_col_col(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr( 'IF(a < 2147483647, a + 1, a)'), - conf = {'spark.sql.ansi.enabled':True}) + conf = ansi_enabled_conf) @pytest.mark.parametrize('data_gen', [IntegerGen().with_special_case(2147483647)], ids=idfn) def test_conditional_with_side_effects_col_scalar(data_gen): @@ -199,14 +195,14 @@ def test_conditional_with_side_effects_col_scalar(data_gen): lambda spark : unary_op_df(spark, data_gen).selectExpr( 'IF(a < 2147483647, a + 1, 2147483647)', 'IF(a >= 2147483646, 2147483647, a + 1)'), - conf = {'spark.sql.ansi.enabled':True}) + conf = ansi_enabled_conf) @pytest.mark.parametrize('data_gen', [mk_str_gen('[0-9]{1,20}')], ids=idfn) def test_conditional_with_side_effects_cast(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr( 'IF(a RLIKE "^[0-9]{1,5}\\z", CAST(a AS INT), 0)'), - conf = {'spark.sql.ansi.enabled':True}) + conf = ansi_enabled_conf) @pytest.mark.parametrize('data_gen', [mk_str_gen('[0-9]{1,9}')], ids=idfn) def test_conditional_with_side_effects_case_when(data_gen): @@ -216,4 +212,4 @@ def test_conditional_with_side_effects_case_when(data_gen): WHEN a RLIKE "^[0-9]{1,3}\\z" THEN CAST(a AS INT) \ WHEN a RLIKE "^[0-9]{4,6}\\z" THEN CAST(a AS INT) + 123 \ ELSE -1 END'), - conf = {'spark.sql.ansi.enabled':True}) \ No newline at end of file + conf = ansi_enabled_conf) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 6d637b2f834..069e1cec9a3 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -20,7 +20,6 @@ from pyspark.sql import Row from pyspark.sql.types import * import pyspark.sql.functions as f -import pytest import random from spark_session import is_tz_utc import sre_yield @@ -956,8 +955,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): MapGen(RepeatSeqGen(IntegerGen(nullable=False), 10), long_gen, max_length=10), MapGen(StringGen(pattern='key_[0-9]', nullable=False), simple_string_to_string_map_gen)] -allow_negative_scale_of_decimal_conf = {'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'} - +ansi_enabled_conf = {'spark.sql.ansi.enabled': 'true'} no_nans_conf = {'spark.rapids.sql.hasNans': 'false'} def copy_and_update(conf, *more_confs): diff --git a/integration_tests/src/main/python/explain_mode_test.py b/integration_tests/src/main/python/explain_mode_test.py index b05d07f3887..00860de99d8 100644 --- a/integration_tests/src/main/python/explain_mode_test.py +++ b/integration_tests/src/main/python/explain_mode_test.py @@ -22,7 +22,6 @@ _explain_mode_conf = {'spark.rapids.sql.mode': 'explainOnly', 'spark.sql.join.preferSortMergeJoin': 'True', 'spark.sql.shuffle.partitions': '2', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true' } def create_df(spark, data_gen, left_length, right_length): diff --git a/integration_tests/src/main/python/generate_expr_test.py b/integration_tests/src/main/python/generate_expr_test.py index 6fe94c284ad..6a19ce66df5 100644 --- a/integration_tests/src/main/python/generate_expr_test.py +++ b/integration_tests/src/main/python/generate_expr_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -48,8 +48,7 @@ def test_explode_litarray(data_gen): f.explode(array_lit))) # use a small `spark.rapids.sql.batchSizeBytes` to enforce input batches splitting up during explode -conf_to_enforce_split_input = {'spark.rapids.sql.batchSizeBytes': '8192', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'} +conf_to_enforce_split_input = {'spark.rapids.sql.batchSizeBytes': '8192'} @ignore_order(local=True) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index c19611eb5c3..e290c15c617 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -356,7 +356,7 @@ def test_computation_in_grpby_columns(): def test_hash_grpby_sum(data_gen, conf): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b')), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) + conf = conf) @pytest.mark.skipif(is_before_spark_311(), reason="SUM overflows for CPU were fixed in Spark 3.1.1") @shuffle_test @@ -368,7 +368,7 @@ def test_hash_grpby_sum(data_gen, conf): def test_hash_grpby_sum_full_decimal(data_gen, conf): assert_gpu_and_cpu_are_equal_collect( lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b')), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) + conf = conf) @approximate_float @ignore_order @@ -378,7 +378,7 @@ def test_hash_grpby_sum_full_decimal(data_gen, conf): def test_hash_reduction_sum(data_gen, conf): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen, length=100).selectExpr("SUM(a)"), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) + conf = conf) @pytest.mark.skipif(is_before_spark_311(), reason="SUM overflows for CPU were fixed in Spark 3.1.1") @approximate_float @@ -389,7 +389,7 @@ def test_hash_reduction_sum(data_gen, conf): def test_hash_reduction_sum_full_decimal(data_gen, conf): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen, length=100).selectExpr("SUM(a)"), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) + conf = conf) @approximate_float @ignore_order @@ -426,8 +426,7 @@ def test_hash_avg_nulls_partial_only(data_gen): @pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimalbig, ids=idfn) def test_intersectAll(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100))) @approximate_float @ignore_order @@ -435,8 +434,7 @@ def test_intersectAll(data_gen): @pytest.mark.parametrize('data_gen', _init_list_no_nans_with_decimalbig, ids=idfn) def test_exceptAll(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, data_gen, length=100).exceptAll(gen_df(spark, data_gen, length=100).filter('a != b')), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : gen_df(spark, data_gen, length=100).exceptAll(gen_df(spark, data_gen, length=100).filter('a != b'))) @approximate_float @ignore_order(local=True) @@ -449,7 +447,7 @@ def test_hash_grpby_pivot(data_gen, conf): .groupby('a') .pivot('b') .agg(f.sum('c')), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, conf)) + conf = conf) @approximate_float @ignore_order(local=True) @@ -599,8 +597,7 @@ def test_hash_reduction_pivot_without_nans(data_gen, conf): @pytest.mark.parametrize('data_gen', decimal_128_gens, ids=idfn) def test_decimal128_count_reduction(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).selectExpr('count(a)'), - conf = allow_negative_scale_of_decimal_conf) + lambda spark: unary_op_df(spark, data_gen).selectExpr('count(a)')) # very simple test for just a count on decimals 128 values until we can support more with them @ignore_order(local=True) @@ -609,16 +606,14 @@ def test_decimal128_count_group_by(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: two_col_df(spark, byte_gen, data_gen) .groupby('a') - .agg(f.count('b')), - conf = allow_negative_scale_of_decimal_conf) + .agg(f.count('b'))) # very simple test for just a min/max on decimals 128 values until we can support more with them @ignore_order(local=True) @pytest.mark.parametrize('data_gen', decimal_128_gens, ids=idfn) def test_decimal128_min_max_reduction(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark: unary_op_df(spark, data_gen).selectExpr('min(a)', 'max(a)'), - conf = allow_negative_scale_of_decimal_conf) + lambda spark: unary_op_df(spark, data_gen).selectExpr('min(a)', 'max(a)')) # very simple test for just a min/max on decimals 128 values until we can support more with them @ignore_order(local=True) @@ -627,8 +622,7 @@ def test_decimal128_min_max_group_by(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: two_col_df(spark, byte_gen, data_gen) .groupby('a') - .agg(f.min('b'), f.max('b')), - conf = allow_negative_scale_of_decimal_conf) + .agg(f.min('b'), f.max('b'))) # to avoid ordering issues with collect_list we do it all in a single task @ignore_order(local=True) @@ -640,7 +634,7 @@ def test_hash_groupby_collect_list(data_gen, use_obj_hash_agg): .groupby('a') .agg(f.collect_list('b')), conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower(), - 'spark.sql.shuffle.partitons': '1'}) + 'spark.sql.shuffle.partitions': '1'}) @approximate_float @ignore_order(local=True) @@ -1048,8 +1042,7 @@ def test_first_last_reductions_decimal_types(data_gen): # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic lambda spark: unary_op_df(spark, data_gen).coalesce(1).selectExpr( - 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'), - conf=allow_negative_scale_of_decimal_conf) + 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)')) @pytest.mark.parametrize('data_gen', _nested_gens, ids=idfn) def test_first_last_reductions_nested_types(data_gen): @@ -1057,8 +1050,7 @@ def test_first_last_reductions_nested_types(data_gen): # Coalesce and sort are to make sure that first and last, which are non-deterministic # become deterministic lambda spark: unary_op_df(spark, data_gen).coalesce(1).selectExpr( - 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)'), - conf=allow_negative_scale_of_decimal_conf) + 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)')) @pytest.mark.parametrize('data_gen', non_nan_all_basic_gens, ids=idfn) @pytest.mark.parametrize('parameterless', ['true', pytest.param('false', marks=pytest.mark.xfail( @@ -1124,8 +1116,7 @@ def test_groupby_first_last(data_gen): assert_gpu_and_cpu_are_equal_collect( # First and last are not deterministic when they are run in a real distributed setup. # We set parallelism 1 to prevent nondeterministic results because of distributed setup. - lambda spark: agg_fn(gen_df(spark, gen_fn, num_slices=1)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark: agg_fn(gen_df(spark, gen_fn, num_slices=1))) @ignore_order @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 5b08e1bad68..4ae9c777bed 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -80,7 +80,6 @@ _sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1', 'spark.sql.join.preferSortMergeJoin': 'True', 'spark.sql.shuffle.partitions': '2', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true' } # For spark to insert a shuffled hash join it has to be enabled with @@ -93,7 +92,6 @@ _hash_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '160', 'spark.sql.join.preferSortMergeJoin': 'false', 'spark.sql.shuffle.partitions': '2', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true' } def create_df(spark, data_gen, left_length, right_length): @@ -128,7 +126,7 @@ def test_right_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 50, 0) return left.join(broadcast(right), how=join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -136,7 +134,7 @@ def test_left_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 0, 50) return left.join(broadcast(right), how=join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -144,7 +142,7 @@ def test_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 0, 0) return left.join(broadcast(right), how=join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) @pytest.mark.skipif(is_databricks_runtime(), @@ -155,9 +153,7 @@ def test_right_broadcast_nested_loop_join_without_condition_empty_small_batch(jo def do_join(spark): left, right = create_df(spark, long_gen, 50, 0) return left.join(broadcast(right), how=join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.sql.adaptive.enabled': 'true'}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': 'true'}) @ignore_order(local=True) @pytest.mark.skipif(is_databricks_runtime(), @@ -168,9 +164,7 @@ def test_empty_broadcast_hash_join(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 50, 0) return left.join(right.hint("broadcast"), left.a == right.r_a, join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.sql.adaptive.enabled': 'true'}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.sql.adaptive.enabled': 'true'}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 @@ -238,7 +232,7 @@ def test_broadcast_join_right_table(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) @pytest.mark.parametrize('data_gen', basic_nested_gens + decimal_128_gens + single_array_gens_sample_with_decimal128, ids=idfn) @@ -249,7 +243,7 @@ def test_broadcast_join_right_table_ridealong(data_gen, join_type): def do_join(spark): left, right = create_ridealong_df(spark, short_gen, data_gen, 500, 500) return left.join(broadcast(right), left.key == right.r_key, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -263,7 +257,7 @@ def test_broadcast_join_right_table_with_job_group(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -276,8 +270,7 @@ def test_cartesian_join(data_gen, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return left.crossJoin(right) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -290,8 +283,7 @@ def test_cartesian_join_special_case_count(batch_size): def do_join(spark): left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(right).selectExpr('COUNT(*)') - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -304,8 +296,7 @@ def test_cartesian_join_special_case_group_by_count(batch_size): def do_join(spark): left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(right).groupBy('a').count() - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -335,8 +326,7 @@ def test_broadcast_nested_loop_join(data_gen, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return left.crossJoin(broadcast(right)) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -346,8 +336,7 @@ def test_broadcast_nested_loop_join_special_case_count(batch_size): def do_join(spark): left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(broadcast(right)).selectExpr('COUNT(*)') - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -359,8 +348,7 @@ def test_broadcast_nested_loop_join_special_case_group_by_count(batch_size): def do_join(spark): left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(broadcast(right)).groupBy('a').count() - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -377,8 +365,7 @@ def do_join(spark): # but these take a long time to verify so we run with smaller numbers by default # that do not expose the error return left.join(broadcast(right), (left.b >= right.r_b), join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.batchSizeBytes': batch_size}) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -392,7 +379,7 @@ def do_join(spark): # but these take a long time to verify so we run with smaller numbers by default # that do not expose the error return broadcast(left).join(right, (left.b >= right.r_b), 'Right') - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -408,7 +395,7 @@ def do_join(spark): # that do not expose the error # AST does not support cast or logarithm yet, so this must be implemented as a post-filter return left.join(broadcast(right), left.a > f.log(right.r_a), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'Cast', 'GreaterThan', 'Log') @ignore_order(local=True) @@ -419,8 +406,7 @@ def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) # AST does not support cast or logarithm yet return broadcast(left).join(right, left.a > f.log(right.r_a), join_type) - conf = allow_negative_scale_of_decimal_conf - assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec', conf=conf) + assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec') @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @@ -435,8 +421,7 @@ def do_join(spark): # Compute the distinct of the join result to verify the join produces a proper dataframe # for downstream processing. return left.join(broadcast(right), how=join_type).distinct() - conf = allow_negative_scale_of_decimal_conf - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @@ -451,8 +436,7 @@ def do_join(spark): # Compute the distinct of the join result to verify the join produces a proper dataframe # for downstream processing. return broadcast(left).join(right, how=join_type).distinct() - conf = allow_negative_scale_of_decimal_conf - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -460,8 +444,7 @@ def test_right_broadcast_nested_loop_join_condition_missing_count(data_gen, join def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return left.join(broadcast(right), how=join_type).selectExpr('COUNT(*)') - conf = allow_negative_scale_of_decimal_conf - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn) @pytest.mark.parametrize('join_type', ['Right'], ids=idfn) @@ -469,8 +452,7 @@ def test_left_broadcast_nested_loop_join_condition_missing_count(data_gen, join_ def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return broadcast(left).join(right, how=join_type).selectExpr('COUNT(*)') - conf = allow_negative_scale_of_decimal_conf - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'GreaterThanOrEqual') @ignore_order(local=True) @@ -480,8 +462,7 @@ def test_broadcast_nested_loop_join_with_conditionals_build_left_fallback(data_g def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return broadcast(left).join(right, (left.b >= right.r_b), join_type) - conf = allow_negative_scale_of_decimal_conf - assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec', conf=conf) + assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec') @allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'GreaterThanOrEqual') @ignore_order(local=True) @@ -491,8 +472,7 @@ def test_broadcast_nested_loop_with_conditionals_build_right_fallback(data_gen, def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) return left.join(broadcast(right), (left.b >= right.r_b), join_type) - conf = allow_negative_scale_of_decimal_conf - assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec', conf=conf) + assert_gpu_fallback_collect(do_join, 'BroadcastNestedLoopJoinExec') # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -505,7 +485,7 @@ def test_broadcast_join_left_table(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 250, 500) return broadcast(left).join(right, left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -517,7 +497,7 @@ def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), (left.a == right.r_a) & (left.b >= right.r_b), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -531,9 +511,8 @@ def do_join(spark): # AST does not support cast or logarithm yet return left.join(broadcast(right), (left.a == right.r_a) & (left.b > f.log(right.r_b)), join_type) - conf = allow_negative_scale_of_decimal_conf exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec' - assert_gpu_fallback_collect(do_join, exec, conf=conf) + assert_gpu_fallback_collect(do_join, exec) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -547,9 +526,8 @@ def do_join(spark): # AST does not support cast or logarithm yet return left.join(broadcast(right), (left.a == right.r_a) & (left.b > right.r_b), join_type) - conf = allow_negative_scale_of_decimal_conf exec = 'SortMergeJoinExec' if join_type in ['Right', 'FullOuter'] else 'BroadcastHashJoinExec' - assert_gpu_fallback_collect(do_join, exec, conf=conf) + assert_gpu_fallback_collect(do_join, exec) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -561,7 +539,7 @@ def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), (left.a == right.r_a) & (left.b > right.r_b), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -614,7 +592,7 @@ def do_join(spark): right = gen_df(spark, _mixed_df2_with_nulls, length=500).withColumnRenamed("a", "r_a")\ .withColumnRenamed("b", "r_b").withColumnRenamed("c", "r_c") return left.join(broadcast(right), left.a.eqNullSafe(right.r_a), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) @ignore_order @allow_non_gpu('DataWritingCommandExec') @@ -725,7 +703,7 @@ def test_broadcast_join_right_struct_as_key(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @@ -737,7 +715,7 @@ def do_join(spark): left = two_col_df(spark, data_gen, int_gen, length=500) right = two_col_df(spark, data_gen, int_gen, length=250) return left.join(broadcast(right), (left.a == right.a) & (left.b == right.b), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_join) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this diff --git a/integration_tests/src/main/python/limit_test.py b/integration_tests/src/main/python/limit_test.py index c0dc44b23af..dc4a0f8bd35 100644 --- a/integration_tests/src/main/python/limit_test.py +++ b/integration_tests/src/main/python/limit_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,11 +14,8 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect -from spark_session import is_before_spark_311 +from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import ignore_order, allow_non_gpu -import pyspark.sql.functions as f @pytest.mark.parametrize('data_gen', all_basic_gens + decimal_128_gens + array_gens_sample + map_gens_sample + struct_gens_sample, ids=idfn) @@ -26,5 +23,4 @@ def test_simple_limit(data_gen): assert_gpu_and_cpu_are_equal_collect( # We need some processing after the limit to avoid a CollectLimitExec lambda spark : unary_op_df(spark, data_gen, num_slices=1).limit(10).repartition(1), - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.sql.execution.sortBeforeRepartition': 'false'})) + conf = {'spark.sql.execution.sortBeforeRepartition': 'false'}) diff --git a/integration_tests/src/main/python/map_test.py b/integration_tests/src/main/python/map_test.py index dae5c408cdd..515e90463d7 100644 --- a/integration_tests/src/main/python/map_test.py +++ b/integration_tests/src/main/python/map_test.py @@ -20,7 +20,6 @@ from spark_session import is_before_spark_311, is_before_spark_330 from pyspark.sql.types import * from pyspark.sql.types import IntegralType -import pyspark.sql.functions as f # Mark all tests in current file as premerge_ci_1 in order to be run in first k8s pod for parallel build premerge job pytestmark = pytest.mark.premerge_ci_1 @@ -39,8 +38,7 @@ def test_map_keys(data_gen): # but it works this way for now so lets see if we can maintain it. # Good thing too, because we cannot support sorting all of the types that could be # in here yet, and would need some special case code for checking equality - 'map_keys(a)'), - conf=allow_negative_scale_of_decimal_conf) + 'map_keys(a)')) @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_64_map_gens + decimal_128_map_gens, ids=idfn) def test_map_values(data_gen): @@ -50,8 +48,7 @@ def test_map_values(data_gen): # but it works this way for now so lets see if we can maintain it. # Good thing too, because we cannot support sorting all of the types that could be # in here yet, and would need some special case code for checking equality - 'map_values(a)'), - conf=allow_negative_scale_of_decimal_conf) + 'map_values(a)')) @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_64_map_gens + decimal_128_map_gens, ids=idfn) def test_map_entries(data_gen): @@ -61,8 +58,7 @@ def test_map_entries(data_gen): # but it works this way for now so lets see if we can maintain it. # Good thing too, because we cannot support sorting all of the types that could be # in here yet, and would need some special case code for checking equality - 'map_entries(a)'), - conf=allow_negative_scale_of_decimal_conf) + 'map_entries(a)')) @pytest.mark.parametrize('data_gen', [simple_string_to_string_map_gen], ids=idfn) def test_simple_get_map_value(data_gen): @@ -149,8 +145,7 @@ def test_simple_get_map_value_ansi_fail(data_gen): assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr( 'a["NOT_FOUND"]').collect(), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}, + conf=ansi_enabled_conf, error_message=message) @pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if key is not found") @@ -159,8 +154,7 @@ def test_map_get_map_value_ansi_not_fail(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( 'a["NOT_FOUND"]'), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', [simple_string_to_string_map_gen], ids=idfn) def test_simple_element_at_map(data_gen): @@ -181,8 +175,7 @@ def test_map_element_at_ansi_fail(data_gen): assert_gpu_and_cpu_error( lambda spark: unary_op_df(spark, data_gen).selectExpr( 'element_at(a, "NOT_FOUND")').collect(), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}, + conf=ansi_enabled_conf, error_message=message) @pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if key is not found") @@ -191,8 +184,7 @@ def test_map_element_at_ansi_not_fail(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen).selectExpr( 'element_at(a, "NOT_FOUND")'), - conf={'spark.sql.ansi.enabled':True, - 'spark.sql.legacy.allowNegativeScaleOfDecimal': True}) + conf=ansi_enabled_conf) @pytest.mark.parametrize('data_gen', map_gens_sample, ids=idfn) def test_transform_values(data_gen): @@ -226,8 +218,7 @@ def do_it(spark): return two_col_df(spark, data_gen, byte_gen).selectExpr(columns) - assert_gpu_and_cpu_are_equal_collect(do_it, - conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_it) @pytest.mark.parametrize('data_gen', map_gens_sample + decimal_128_map_gens + decimal_64_map_gens, ids=idfn) @@ -244,8 +235,7 @@ def do_it(spark): return unary_op_df(spark, data_gen).selectExpr(columns) - assert_gpu_and_cpu_are_equal_collect(do_it, - conf=allow_negative_scale_of_decimal_conf) + assert_gpu_and_cpu_are_equal_collect(do_it) @pytest.mark.parametrize('data_gen', [simple_string_to_string_map_gen], ids=idfn) def test_transform_keys_null_fail(data_gen): @@ -280,5 +270,4 @@ def test_transform_keys_last_win_fallback(data_gen): 'map(1, sequence(1, 5)) as m'], ids=idfn) def test_sql_map_scalars(query): assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.sql('SELECT {}'.format(query)), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + lambda spark : spark.sql('SELECT {}'.format(query))) diff --git a/integration_tests/src/main/python/project_lit_alias_test.py b/integration_tests/src/main/python/project_lit_alias_test.py index 90fda642223..d191349dff8 100644 --- a/integration_tests/src/main/python/project_lit_alias_test.py +++ b/integration_tests/src/main/python/project_lit_alias_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,7 +16,6 @@ from asserts import assert_gpu_and_cpu_are_equal_collect from data_gen import * -from marks import incompat, approximate_float from pyspark.sql.types import * import pyspark.sql.functions as f @@ -27,7 +26,6 @@ def test_project_alias(data_gen): lambda spark : binary_op_df(spark, data_gen).select( f.col('a').alias('col1'), f.col('b').alias('col2'), - f.lit(dec)), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + f.lit(dec))) diff --git a/integration_tests/src/main/python/repart_test.py b/integration_tests/src/main/python/repart_test.py index 8b88ad19825..80b749fb477 100644 --- a/integration_tests/src/main/python/repart_test.py +++ b/integration_tests/src/main/python/repart_test.py @@ -92,8 +92,7 @@ def test_union_struct_missing_children(data_gen): # This tests union of two DFs of two cols each. The types of the left col and right col is the same def test_union(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).union(binary_op_df(spark, data_gen)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).union(binary_op_df(spark, data_gen))) @pytest.mark.parametrize('data_gen', all_gen + decimal_128_gens + map_gens + array_gens_sample_with_decimal128 + [all_basic_struct_gen, @@ -103,8 +102,7 @@ def test_union(data_gen): # This tests union of two DFs of two cols each. The types of the left col and right col is the same def test_unionAll(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).unionAll(binary_op_df(spark, data_gen)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).unionAll(binary_op_df(spark, data_gen))) @pytest.mark.parametrize('data_gen', all_gen + decimal_128_gens + map_gens + array_gens_sample_with_decimal128 + [all_basic_struct_gen, @@ -120,8 +118,7 @@ def test_unionAll(data_gen): def test_union_by_missing_col_name(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).withColumnRenamed("a", "x") - .unionByName(binary_op_df(spark, data_gen).withColumnRenamed("a", "y"), True), - conf=allow_negative_scale_of_decimal_conf) + .unionByName(binary_op_df(spark, data_gen).withColumnRenamed("a", "y"), True)) # the first number ('1' and '2') is the nest level @@ -161,8 +158,7 @@ def assert_union_equal(gen1, gen2): struct_of_maps], ids=idfn) def test_union_by_name(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).unionByName(binary_op_df(spark, data_gen)), - conf=allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).unionByName(binary_op_df(spark, data_gen))) @pytest.mark.parametrize('data_gen', [ @@ -173,8 +169,7 @@ def test_union_by_name(data_gen): ], ids=idfn) def test_coalesce_types(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark: gen_df(spark, data_gen).coalesce(2), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + lambda spark: gen_df(spark, data_gen).coalesce(2)) @pytest.mark.parametrize('num_parts', [1, 10, 100, 1000, 2000], ids=idfn) @pytest.mark.parametrize('length', [0, 2048, 4096], ids=idfn) @@ -182,8 +177,7 @@ def test_coalesce_df(num_parts, length): #This should change eventually to be more than just the basic gens gen_list = [('_c' + str(i), gen) for i, gen in enumerate(all_basic_gens + decimal_gens + decimal_128_gens)] assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, gen_list, length=length).coalesce(num_parts), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + lambda spark : gen_df(spark, gen_list, length=length).coalesce(num_parts)) @pytest.mark.parametrize('data_gen', [ pytest.param([('_c' + str(i), gen) for i, gen in enumerate(all_basic_gens + decimal_gens + decimal_128_gens)]), @@ -200,8 +194,7 @@ def test_repartition_df(data_gen, num_parts, length): # Add a computed column to avoid shuffle being optimized back to a CPU shuffle lambda spark : gen_df(spark, data_gen, length=length).withColumn('x', lit(1)).repartition(num_parts), # disable sort before shuffle so round robin works for arrays - conf = {'spark.sql.execution.sortBeforeRepartition': 'false', - 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'}) + conf = {'spark.sql.execution.sortBeforeRepartition': 'false'}) @allow_non_gpu('ShuffleExchangeExec', 'RoundRobinPartitioning') @pytest.mark.parametrize('data_gen', [[('ag', ArrayGen(string_gen))], @@ -274,8 +267,7 @@ def test_hash_repartition_exact(gen, num_parts): .repartition(num_parts, *part_on)\ .withColumn('id', f.spark_partition_id())\ .withColumn('hashed', f.hash(*part_on))\ - .selectExpr('*', 'pmod(hashed, {})'.format(num_parts)), - conf = allow_negative_scale_of_decimal_conf) + .selectExpr('*', 'pmod(hashed, {})'.format(num_parts))) # Test a query that should cause Spark to leverage getShuffleRDD @ignore_order(local=True) diff --git a/integration_tests/src/main/python/sample_test.py b/integration_tests/src/main/python/sample_test.py index 4aacf938aef..c678a103bbf 100644 --- a/integration_tests/src/main/python/sample_test.py +++ b/integration_tests/src/main/python/sample_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021, NVIDIA CORPORATION. +# Copyright (c) 2021-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -40,14 +40,12 @@ def test_sample_produce_empty_batch(data_gen): def test_sample(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen, num_slices = 10) - .sample(fraction = 0.9, seed = 1), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True} + .sample(fraction = 0.9, seed = 1) ) @pytest.mark.parametrize('data_gen', basic_gens + nested_gens, ids=idfn) def test_sample_with_replacement(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark: unary_op_df(spark, data_gen, num_slices = 10).sample( - withReplacement =True, fraction = 0.5, seed = 1), - conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True} + withReplacement =True, fraction = 0.5, seed = 1) ) diff --git a/integration_tests/src/main/python/sort_test.py b/integration_tests/src/main/python/sort_test.py index cf504c072ed..50ec9aac3a8 100644 --- a/integration_tests/src/main/python/sort_test.py +++ b/integration_tests/src/main/python/sort_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -47,8 +47,7 @@ def test_sort_nonbinary_carry_binary(data_gen): @pytest.mark.parametrize('order', [f.col('a').asc(), f.col('a').asc_nulls_last(), f.col('a').desc(), f.col('a').desc_nulls_first()], ids=idfn) def test_single_orderby(data_gen, order): assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen).orderBy(order), - conf = allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen).orderBy(order)) @pytest.mark.parametrize('shuffle_parts', [ pytest.param(1), @@ -78,11 +77,8 @@ def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), conf = { - **allow_negative_scale_of_decimal_conf, - **{ - 'spark.sql.shuffle.partitions': shuffle_parts, - 'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE' - } + 'spark.sql.shuffle.partitions': shuffle_parts, + 'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE' }) # only default null ordering for direction is supported for nested types @@ -98,10 +94,7 @@ def test_single_nested_orderby_plain(data_gen, order, shuffle_parts, stable_sort def test_single_nested_orderby_fallback_for_nullorder(data_gen, order): assert_gpu_fallback_collect( lambda spark : unary_op_df(spark, data_gen).orderBy(order), - "SortExec", - conf = { - **allow_negative_scale_of_decimal_conf, - }) + "SortExec") # SPARK CPU itself has issue with negative scale for take ordered and project orderable_without_neg_decimal = [n for n in (orderable_gens + orderable_not_null_gen + decimal_128_gens) if not (isinstance(n, DecimalGen) and n.scale < 0)] @@ -146,8 +139,7 @@ def test_single_nested_orderby_with_limit_fallback(data_gen, order): def test_single_sort_in_part(data_gen, order): # We set `num_slices` to handle https://github.com/NVIDIA/spark-rapids/issues/2477 assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, num_slices=12).sortWithinPartitions(order), - conf = allow_negative_scale_of_decimal_conf) + lambda spark : unary_op_df(spark, data_gen, num_slices=12).sortWithinPartitions(order)) @pytest.mark.parametrize('data_gen', [ pytest.param(all_basic_struct_gen), @@ -170,7 +162,7 @@ def test_single_nested_sort_in_part(data_gen, order, stable_sort): sort_conf = {'spark.rapids.sql.stableSort.enabled': stable_sort == 'STABLE'} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen, num_slices=12).sortWithinPartitions(order), - conf={**allow_negative_scale_of_decimal_conf, **sort_conf}) + conf=sort_conf) orderable_gens_sort = [byte_gen, short_gen, int_gen, long_gen, pytest.param(float_gen, marks=pytest.mark.xfail(condition=is_before_spark_311(), @@ -181,8 +173,7 @@ def test_single_nested_sort_in_part(data_gen, order, stable_sort): @pytest.mark.parametrize('data_gen', orderable_gens_sort, ids=idfn) def test_multi_orderby(data_gen): assert_gpu_and_cpu_are_equal_collect( - lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc()), - conf = allow_negative_scale_of_decimal_conf) + lambda spark : binary_op_df(spark, data_gen).orderBy(f.col('a'), f.col('b').desc())) # SPARK CPU itself has issue with negative scale for take ordered and project orderable_gens_sort_without_neg_decimal = [n for n in orderable_gens_sort + decimal_128_gens if not (isinstance(n, DecimalGen) and n.scale < 0)] @@ -233,8 +224,7 @@ def test_single_orderby_with_skew(data_gen): .selectExpr('a', 'random(1) > 0.5 as b')\ .repartition(f.col('b'))\ .orderBy(f.col('a'))\ - .selectExpr('a'), - conf = allow_negative_scale_of_decimal_conf) + .selectExpr('a')) # We are not trying all possibilities, just doing a few with numbers so the query works. @@ -251,7 +241,7 @@ def test_single_nested_orderby_with_skew(data_gen, stable_sort): .repartition(f.col('b')) \ .orderBy(f.col('a')) \ .selectExpr('a'), - conf={**allow_negative_scale_of_decimal_conf, **sort_conf}) + conf=sort_conf) # This is primarily to test the out of core sort with multiple batches. For this we set the data size to @@ -306,4 +296,4 @@ def test_orderby_nested_ridealong_limit(data_gen): # results, especially on distributed clusters. assert_gpu_and_cpu_are_equal_collect( lambda spark : two_col_df(spark, LongRangeGen(), data_gen)\ - .orderBy(f.col('a').desc()).limit(100), conf=allow_negative_scale_of_decimal_conf) + .orderBy(f.col('a').desc()).limit(100)) diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 2d9a9f2bcd3..099b7b9b9b5 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -31,6 +31,10 @@ def _from_scala_map(scala_map): _orig_conf = _from_scala_map(_spark.conf._jconf.getAll()) _orig_conf_keys = _orig_conf.keys() +# Default settings that should apply to CPU and GPU sessions. +# These settings can be overridden by specific tests if necessary. +_default_conf = { 'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true' } + def is_tz_utc(spark=_spark): """ true if the tz is UTC else false @@ -42,7 +46,9 @@ def is_tz_utc(spark=_spark): return utc == sys_tz def _set_all_confs(conf): - for key, value in conf.items(): + newconf = _default_conf.copy() + newconf.update(conf) + for key, value in newconf.items(): if _spark.conf.get(key, None) != value: _spark.conf.set(key, value) diff --git a/integration_tests/src/main/python/struct_test.py b/integration_tests/src/main/python/struct_test.py index fa3798d57d5..e3fb6f50da6 100644 --- a/integration_tests/src/main/python/struct_test.py +++ b/integration_tests/src/main/python/struct_test.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021, NVIDIA CORPORATION. +# Copyright (c) 2020-2022, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,7 +15,6 @@ import pytest from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql -from conftest import is_dataproc_runtime from data_gen import * from pyspark.sql.types import * @@ -39,7 +38,7 @@ def test_struct_get_item(data_gen): lambda spark : unary_op_df(spark, data_gen).selectExpr( 'a.first', 'a.second', - 'a.third'), conf=allow_negative_scale_of_decimal_conf) + 'a.third')) @pytest.mark.parametrize('data_gen', all_basic_gens + [null_gen, decimal_gen_default, @@ -50,8 +49,7 @@ def test_make_struct(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr( 'struct(a, b)', - 'named_struct("foo", b, "m", map("a", "b"), "n", null, "bar", 5, "other", named_struct("z", "z"),"end", a)'), - conf = allow_negative_scale_of_decimal_conf) + 'named_struct("foo", b, "m", map("a", "b"), "n", null, "bar", 5, "other", named_struct("z", "z"),"end", a)')) @pytest.mark.parametrize('data_gen', [StructGen([["first", boolean_gen], ["second", byte_gen], ["third", float_gen]]), diff --git a/integration_tests/src/main/python/time_window_test.py b/integration_tests/src/main/python/time_window_test.py index 2cfe461471f..ff367b506fb 100644 --- a/integration_tests/src/main/python/time_window_test.py +++ b/integration_tests/src/main/python/time_window_test.py @@ -75,7 +75,6 @@ def test_sliding_window(data_gen): def test_just_window(data_gen): row_gen = StructGen([['ts', timestamp_gen],['data', data_gen]], nullable=False) assert_gpu_and_cpu_are_equal_collect( - lambda spark : gen_df(spark, row_gen).withColumn('time_bucket', f.window('ts', '5 hour', '1 hour')), - conf = allow_negative_scale_of_decimal_conf) + lambda spark : gen_df(spark, row_gen).withColumn('time_bucket', f.window('ts', '5 hour', '1 hour'))) diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index a6973c05387..67ebe129dfb 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -147,8 +147,7 @@ def test_decimal128_count_window(data_gen): ' count(c) over ' ' (partition by a order by b asc ' ' rows between 2 preceding and 10 following) as count_c_asc ' - 'from window_agg_table', - conf = allow_negative_scale_of_decimal_conf) + 'from window_agg_table') @ignore_order @pytest.mark.parametrize('data_gen', decimal_128_gens, ids=idfn) @@ -160,8 +159,7 @@ def test_decimal128_count_window_no_part(data_gen): ' count(b) over ' ' (order by a asc ' ' rows between 2 preceding and 10 following) as count_b_asc ' - 'from window_agg_table', - conf = allow_negative_scale_of_decimal_conf) + 'from window_agg_table') @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens + decimal_128_gens, ids=idfn) @@ -173,8 +171,7 @@ def test_decimal_sum_window(data_gen): ' sum(c) over ' ' (partition by a order by b asc ' ' rows between 2 preceding and 10 following) as sum_c_asc ' - 'from window_agg_table', - conf = allow_negative_scale_of_decimal_conf) + 'from window_agg_table') @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens + decimal_128_gens, ids=idfn) @@ -186,8 +183,7 @@ def test_decimal_sum_window_no_part(data_gen): ' sum(b) over ' ' (order by a asc ' ' rows between 2 preceding and 10 following) as sum_b_asc ' - 'from window_agg_table', - conf = allow_negative_scale_of_decimal_conf) + 'from window_agg_table') @ignore_order @@ -201,8 +197,7 @@ def test_decimal_running_sum_window(data_gen): ' (partition by a order by b asc ' ' rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_c_asc ' 'from window_agg_table', - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.batchSizeBytes': '100'})) + conf = {'spark.rapids.sql.batchSizeBytes': '100'}) @ignore_order @pytest.mark.parametrize('data_gen', decimal_gens + decimal_128_gens, ids=idfn) @@ -215,8 +210,7 @@ def test_decimal_running_sum_window_no_part(data_gen): ' (order by a asc ' ' rows between UNBOUNDED PRECEDING AND CURRENT ROW) as sum_b_asc ' 'from window_agg_table', - conf = copy_and_update(allow_negative_scale_of_decimal_conf, - {'spark.rapids.sql.batchSizeBytes': '100'})) + conf = {'spark.rapids.sql.batchSizeBytes': '100'}) @pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over order by byte column overflow " "(https://github.com/NVIDIA/spark-rapids/pull/2020#issuecomment-838127070)") @@ -1024,8 +1018,7 @@ def test_window_ride_along(ride_along): "window_agg_table", 'select *,' ' row_number() over (order by a) as row_num ' - 'from window_agg_table ', - conf = allow_negative_scale_of_decimal_conf) + 'from window_agg_table ') @approximate_float @ignore_order