Skip to content

Commit

Permalink
Default integration test configs to allow negative decimal scale (#4812)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Feb 17, 2022
1 parent e00a3f7 commit c18b4e5
Show file tree
Hide file tree
Showing 21 changed files with 203 additions and 361 deletions.
102 changes: 37 additions & 65 deletions integration_tests/src/main/python/arithmetic_ops_test.py

Large diffs are not rendered by default.

42 changes: 13 additions & 29 deletions integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ def test_array_item(data_gen):
'a[null]',
'a[3]',
'a[50]',
'a[-1]'),
conf=allow_negative_scale_of_decimal_conf)
'a[-1]'))

# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
Expand Down Expand Up @@ -68,8 +67,7 @@ def test_orderby_array_unique(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'array_table',
'select array_table.a, array_table.uniq_int from array_table order by uniq_int',
conf=allow_negative_scale_of_decimal_conf)
'select array_table.a, array_table.uniq_int from array_table order by uniq_int')


@pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
Expand Down Expand Up @@ -128,52 +126,46 @@ def test_get_array_item_ansi_fail(data_gen):
message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException"
assert_gpu_and_cpu_error(lambda spark: unary_op_df(
spark, data_gen).select(col('a')[100]).collect(),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True},
conf=ansi_enabled_conf,
error_message=message)

@pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_get_array_item_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(col('a')[100]),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)

@pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn)
def test_array_element_at(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 1),
element_at(col('a'), -1)),
conf={'spark.sql.ansi.enabled':False,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf={'spark.sql.ansi.enabled':False})

@pytest.mark.skipif(is_before_spark_311(), reason="Only in Spark 3.1.1 + ANSI mode, array index throws on out of range indexes")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_array_element_at_ansi_fail(data_gen):
message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException"
assert_gpu_and_cpu_error(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)).collect(),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True},
conf=ansi_enabled_conf,
error_message=message)

@pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_array_element_at_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)

# This corner case is for both Spark 3.0.x and 3.1.x
# CPU version will return `null` for null[100], not throwing an exception
@pytest.mark.parametrize('data_gen', [ArrayGen(null_gen,all_null=True)], ids=idfn)
def test_array_element_at_all_null_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)


@pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn)
Expand Down Expand Up @@ -204,8 +196,7 @@ def do_it(spark):

return two_col_df(spark, data_gen, byte_gen).selectExpr(columns)

assert_gpu_and_cpu_are_equal_collect(do_it,
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_it)

# TODO add back in string_gen when https://github.com/rapidsai/cudf/issues/9156 is fixed
array_min_max_gens_no_nan = [byte_gen, short_gen, int_gen, long_gen, FloatGen(no_nans=True), DoubleGen(no_nans=True),
Expand All @@ -216,29 +207,23 @@ def test_array_min(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'array_min(a)'),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)


@pytest.mark.parametrize('data_gen', decimal_128_gens + decimal_gens, ids=idfn)
def test_array_concat_decimal(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : debug_df(unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'concat(a, a)')),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)


@pytest.mark.parametrize('data_gen', array_min_max_gens_no_nan, ids=idfn)
def test_array_max(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'array_max(a)'),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)

# We add in several types of processing for foldable functions because the output
# can be different types.
Expand All @@ -253,5 +238,4 @@ def test_array_max(data_gen):
'array(array(struct(1 as a, 2 as b), struct(3 as a, 4 as b))) as a_a_s'], ids=idfn)
def test_sql_array_scalars(query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql('SELECT {}'.format(query)),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'})
lambda spark : spark.sql('SELECT {}'.format(query)))
38 changes: 12 additions & 26 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,8 +66,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() # populates cache
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
Expand All @@ -84,8 +83,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter("a is not null")
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -98,8 +96,7 @@ def do_join(spark):
cached.count()
return cached

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
Expand All @@ -115,8 +112,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -128,8 +124,7 @@ def do_join(spark):
cached = left.crossJoin(right.hint("broadcast")).cache()
cached.count()
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -142,8 +137,7 @@ def op_df(spark, length=2048, seed=0):
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(op_df, conf = conf)
assert_gpu_and_cpu_are_equal_collect(op_df, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', [all_basic_struct_gen, StructGen([['child0', StructGen([['child1', byte_gen]])]]),
ArrayGen(
Expand All @@ -158,9 +152,8 @@ def partial_return(col):
def partial_return_cache(spark):
return two_col_df(spark, data_gen, string_gen).select(f.col("a"), f.col("b")).cache().limit(50).select(col)
return partial_return_cache
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf=enable_vectorized_conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf=enable_vectorized_conf)

@allow_non_gpu('CollectLimitExec')
def test_cache_diff_req_order(spark_tmp_path):
Expand Down Expand Up @@ -235,8 +228,7 @@ def func(spark):

return df.selectExpr("a")

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(func, conf)
assert_gpu_and_cpu_are_equal_collect(func, conf=enable_vectorized_conf)

@pytest.mark.parametrize('enable_vectorized', ['false', 'true'], ids=idfn)
@pytest.mark.parametrize('with_x_session', [with_gpu_session, with_cpu_session])
Expand Down Expand Up @@ -308,10 +300,7 @@ def helper(spark):
@pytest.mark.parametrize('batch_size', [{"spark.rapids.sql.batchSizeBytes": "100"}, {}], ids=idfn)
@ignore_order
def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = copy_and_update(enable_vectorized_conf,
allow_negative_scale_of_decimal_conf,
batch_size)

test_conf = copy_and_update(enable_vectorized_conf, batch_size)
function_to_test_on_cached_df(with_x_session, lambda df: df.count(), data_gen, test_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
Expand All @@ -326,10 +315,7 @@ def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_siz
# condition therefore we must allow it in all cases
@allow_non_gpu('ColumnarToRowExec')
def test_cache_multi_batch(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = copy_and_update(enable_vectorized_conf,
allow_negative_scale_of_decimal_conf,
batch_size)

test_conf = copy_and_update(enable_vectorized_conf, batch_size)
function_to_test_on_cached_df(with_x_session, lambda df: df.collect(), data_gen, test_conf)

@pytest.mark.parametrize('data_gen', all_basic_map_gens + single_level_array_gens_no_null, ids=idfn)
Expand Down
33 changes: 12 additions & 21 deletions integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def test_cast_string_timestamp_fallback():
def test_cast_decimal_to(data_gen, to_type):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')),
conf = copy_and_update(allow_negative_scale_of_decimal_conf,
{'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}))
conf = {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})

@pytest.mark.parametrize('data_gen', [
DecimalGen(7, 1),
Expand All @@ -115,8 +114,7 @@ def test_cast_decimal_to(data_gen, to_type):
DecimalType(1, -1)], ids=meta_idfn('to:'))
def test_cast_decimal_to_decimal(data_gen, to_type):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')),
conf = allow_negative_scale_of_decimal_conf)
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')))

@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen], ids=idfn)
@pytest.mark.parametrize('to_type', [
Expand All @@ -136,26 +134,22 @@ def test_cast_integral_to_decimal(data_gen, to_type):
def test_cast_byte_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, byte_gen).select(
f.col('a').cast(DecimalType(2, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(2, -1))))

def test_cast_short_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, short_gen).select(
f.col('a').cast(DecimalType(4, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(4, -1))))

def test_cast_int_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, int_gen).select(
f.col('a').cast(DecimalType(9, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(9, -1))))

def test_cast_long_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, long_gen).select(
f.col('a').cast(DecimalType(18, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(18, -1))))

# casting these types to string should be passed
basic_gens_for_cast_to_string = [ByteGen, ShortGen, IntegerGen, LongGen, StringGen, BooleanGen, DateGen, TimestampGen]
Expand Down Expand Up @@ -204,8 +198,7 @@ def test_cast_array_to_string(data_gen, legacy):
def test_cast_array_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)
Expand All @@ -216,7 +209,7 @@ def test_cast_array_with_unmatched_element_to_string(data_gen, legacy):
def test_cast_map_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.sql.legacy.castComplexTypesToString.enabled": legacy})


Expand All @@ -226,9 +219,8 @@ def test_cast_map_to_string(data_gen, legacy):
def test_cast_map_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)

Expand Down Expand Up @@ -282,8 +274,7 @@ def broken_df(spark):
def test_cast_struct_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)
Expand All @@ -297,4 +288,4 @@ def is_neg_dec_scale_bug_version():
def test_cast_string_to_negative_scale_decimal():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, StringGen("[0-9]{9}")).select(
f.col('a').cast(DecimalType(8, -3))), conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(8, -3))))
Loading

0 comments on commit c18b4e5

Please sign in to comment.