Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default integration test configs to allow negative decimal scale [databricks] #4812

Merged
merged 1 commit into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 37 additions & 65 deletions integration_tests/src/main/python/arithmetic_ops_test.py

Large diffs are not rendered by default.

42 changes: 13 additions & 29 deletions integration_tests/src/main/python/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ def test_array_item(data_gen):
'a[null]',
'a[3]',
'a[50]',
'a[-1]'),
conf=allow_negative_scale_of_decimal_conf)
'a[-1]'))

# Once we support arrays as literals then we can support a[null] for
# all array gens. See test_array_index for more info
Expand Down Expand Up @@ -68,8 +67,7 @@ def test_orderby_array_unique(data_gen):
assert_gpu_and_cpu_are_equal_sql(
lambda spark : append_unique_int_col_to_df(spark, unary_op_df(spark, data_gen)),
'array_table',
'select array_table.a, array_table.uniq_int from array_table order by uniq_int',
conf=allow_negative_scale_of_decimal_conf)
'select array_table.a, array_table.uniq_int from array_table order by uniq_int')


@pytest.mark.parametrize('data_gen', [ArrayGen(ArrayGen(short_gen, max_length=10), max_length=10),
Expand Down Expand Up @@ -128,52 +126,46 @@ def test_get_array_item_ansi_fail(data_gen):
message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException"
assert_gpu_and_cpu_error(lambda spark: unary_op_df(
spark, data_gen).select(col('a')[100]).collect(),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True},
conf=ansi_enabled_conf,
error_message=message)

@pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_get_array_item_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(col('a')[100]),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)

@pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn)
def test_array_element_at(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 1),
element_at(col('a'), -1)),
conf={'spark.sql.ansi.enabled':False,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf={'spark.sql.ansi.enabled':False})

@pytest.mark.skipif(is_before_spark_311(), reason="Only in Spark 3.1.1 + ANSI mode, array index throws on out of range indexes")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_array_element_at_ansi_fail(data_gen):
message = "org.apache.spark.SparkArrayIndexOutOfBoundsException" if not is_before_spark_330() else "java.lang.ArrayIndexOutOfBoundsException"
assert_gpu_and_cpu_error(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)).collect(),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True},
conf=ansi_enabled_conf,
error_message=message)

@pytest.mark.skipif(not is_before_spark_311(), reason="For Spark before 3.1.1 + ANSI mode, null will be returned instead of an exception if index is out of range")
@pytest.mark.parametrize('data_gen', array_gens_sample, ids=idfn)
def test_array_element_at_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)

# This corner case is for both Spark 3.0.x and 3.1.x
# CPU version will return `null` for null[100], not throwing an exception
@pytest.mark.parametrize('data_gen', [ArrayGen(null_gen,all_null=True)], ids=idfn)
def test_array_element_at_all_null_ansi_not_fail(data_gen):
assert_gpu_and_cpu_are_equal_collect(lambda spark: unary_op_df(
spark, data_gen).select(element_at(col('a'), 100)),
conf={'spark.sql.ansi.enabled':True,
'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
conf=ansi_enabled_conf)


@pytest.mark.parametrize('data_gen', array_gens_sample_with_decimal128, ids=idfn)
Expand Down Expand Up @@ -204,8 +196,7 @@ def do_it(spark):

return two_col_df(spark, data_gen, byte_gen).selectExpr(columns)

assert_gpu_and_cpu_are_equal_collect(do_it,
conf=allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_it)

# TODO add back in string_gen when https://github.com/rapidsai/cudf/issues/9156 is fixed
array_min_max_gens_no_nan = [byte_gen, short_gen, int_gen, long_gen, FloatGen(no_nans=True), DoubleGen(no_nans=True),
Expand All @@ -216,29 +207,23 @@ def test_array_min(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'array_min(a)'),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)


@pytest.mark.parametrize('data_gen', decimal_128_gens + decimal_gens, ids=idfn)
def test_array_concat_decimal(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : debug_df(unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'concat(a, a)')),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)


@pytest.mark.parametrize('data_gen', array_min_max_gens_no_nan, ids=idfn)
def test_array_max(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, ArrayGen(data_gen)).selectExpr(
'array_max(a)'),
conf={
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
'spark.rapids.sql.hasNans': 'false'})
conf=no_nans_conf)

# We add in several types of processing for foldable functions because the output
# can be different types.
Expand All @@ -253,5 +238,4 @@ def test_array_max(data_gen):
'array(array(struct(1 as a, 2 as b), struct(3 as a, 4 as b))) as a_a_s'], ids=idfn)
def test_sql_array_scalars(query):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.sql('SELECT {}'.format(query)),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true'})
lambda spark : spark.sql('SELECT {}'.format(query)))
38 changes: 12 additions & 26 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -66,8 +66,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() # populates cache
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
Expand All @@ -84,8 +83,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count() #populates the cache
return cached.filter("a is not null")
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -98,8 +96,7 @@ def do_join(spark):
cached.count()
return cached

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160",
"spark.sql.join.preferSortMergeJoin": "false",
Expand All @@ -115,8 +112,7 @@ def do_join(spark):
cached = left.join(right, left.a == right.r_a, join_type).cache()
cached.count()
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -128,8 +124,7 @@ def do_join(spark):
cached = left.crossJoin(right.hint("broadcast")).cache()
cached.count()
return cached
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf = conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enable_vectorized_conf', enable_vectorized_confs, ids=idfn)
Expand All @@ -142,8 +137,7 @@ def op_df(spark, length=2048, seed=0):
cached.count() # populate the cache
return cached.rollup(f.col("a"), f.col("b")).agg(f.col("b"))

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(op_df, conf = conf)
assert_gpu_and_cpu_are_equal_collect(op_df, conf=enable_vectorized_conf)

@pytest.mark.parametrize('data_gen', [all_basic_struct_gen, StructGen([['child0', StructGen([['child1', byte_gen]])]]),
ArrayGen(
Expand All @@ -158,9 +152,8 @@ def partial_return(col):
def partial_return_cache(spark):
return two_col_df(spark, data_gen, string_gen).select(f.col("a"), f.col("b")).cache().limit(50).select(col)
return partial_return_cache
conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("a")), conf=enable_vectorized_conf)
assert_gpu_and_cpu_are_equal_collect(partial_return(f.col("b")), conf=enable_vectorized_conf)

@allow_non_gpu('CollectLimitExec')
def test_cache_diff_req_order(spark_tmp_path):
Expand Down Expand Up @@ -235,8 +228,7 @@ def func(spark):

return df.selectExpr("a")

conf = copy_and_update(enable_vectorized_conf, allow_negative_scale_of_decimal_conf)
assert_gpu_and_cpu_are_equal_collect(func, conf)
assert_gpu_and_cpu_are_equal_collect(func, conf=enable_vectorized_conf)

@pytest.mark.parametrize('enable_vectorized', ['false', 'true'], ids=idfn)
@pytest.mark.parametrize('with_x_session', [with_gpu_session, with_cpu_session])
Expand Down Expand Up @@ -308,10 +300,7 @@ def helper(spark):
@pytest.mark.parametrize('batch_size', [{"spark.rapids.sql.batchSizeBytes": "100"}, {}], ids=idfn)
@ignore_order
def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = copy_and_update(enable_vectorized_conf,
allow_negative_scale_of_decimal_conf,
batch_size)

test_conf = copy_and_update(enable_vectorized_conf, batch_size)
function_to_test_on_cached_df(with_x_session, lambda df: df.count(), data_gen, test_conf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
Expand All @@ -326,10 +315,7 @@ def test_cache_count(data_gen, with_x_session, enable_vectorized_conf, batch_siz
# condition therefore we must allow it in all cases
@allow_non_gpu('ColumnarToRowExec')
def test_cache_multi_batch(data_gen, with_x_session, enable_vectorized_conf, batch_size):
test_conf = copy_and_update(enable_vectorized_conf,
allow_negative_scale_of_decimal_conf,
batch_size)

test_conf = copy_and_update(enable_vectorized_conf, batch_size)
function_to_test_on_cached_df(with_x_session, lambda df: df.collect(), data_gen, test_conf)

@pytest.mark.parametrize('data_gen', all_basic_map_gens + single_level_array_gens_no_null, ids=idfn)
Expand Down
33 changes: 12 additions & 21 deletions integration_tests/src/main/python/cast_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ def test_cast_string_timestamp_fallback():
def test_cast_decimal_to(data_gen, to_type):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')),
conf = copy_and_update(allow_negative_scale_of_decimal_conf,
{'spark.rapids.sql.castDecimalToFloat.enabled': 'true'}))
conf = {'spark.rapids.sql.castDecimalToFloat.enabled': 'true'})

@pytest.mark.parametrize('data_gen', [
DecimalGen(7, 1),
Expand All @@ -115,8 +114,7 @@ def test_cast_decimal_to(data_gen, to_type):
DecimalType(1, -1)], ids=meta_idfn('to:'))
def test_cast_decimal_to_decimal(data_gen, to_type):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')),
conf = allow_negative_scale_of_decimal_conf)
lambda spark : unary_op_df(spark, data_gen).select(f.col('a').cast(to_type), f.col('a')))

@pytest.mark.parametrize('data_gen', [byte_gen, short_gen, int_gen, long_gen], ids=idfn)
@pytest.mark.parametrize('to_type', [
Expand All @@ -136,26 +134,22 @@ def test_cast_integral_to_decimal(data_gen, to_type):
def test_cast_byte_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, byte_gen).select(
f.col('a').cast(DecimalType(2, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(2, -1))))

def test_cast_short_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, short_gen).select(
f.col('a').cast(DecimalType(4, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(4, -1))))

def test_cast_int_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, int_gen).select(
f.col('a').cast(DecimalType(9, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(9, -1))))

def test_cast_long_to_decimal_overflow():
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, long_gen).select(
f.col('a').cast(DecimalType(18, -1))),
conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(18, -1))))

# casting these types to string should be passed
basic_gens_for_cast_to_string = [ByteGen, ShortGen, IntegerGen, LongGen, StringGen, BooleanGen, DateGen, TimestampGen]
Expand Down Expand Up @@ -204,8 +198,7 @@ def test_cast_array_to_string(data_gen, legacy):
def test_cast_array_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)
Expand All @@ -216,7 +209,7 @@ def test_cast_array_with_unmatched_element_to_string(data_gen, legacy):
def test_cast_map_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.sql.legacy.castComplexTypesToString.enabled": legacy})


Expand All @@ -226,9 +219,8 @@ def test_cast_map_to_string(data_gen, legacy):
def test_cast_map_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)

Expand Down Expand Up @@ -282,8 +274,7 @@ def broken_df(spark):
def test_cast_struct_with_unmatched_element_to_string(data_gen, legacy):
_assert_cast_to_string_equal(
data_gen,
{"spark.sql.legacy.allowNegativeScaleOfDecimal" : "true",
"spark.rapids.sql.castDecimalToString.enabled" : 'true',
{"spark.rapids.sql.castDecimalToString.enabled" : 'true',
"spark.rapids.sql.castFloatToString.enabled" : "true",
"spark.sql.legacy.castComplexTypesToString.enabled": legacy}
)
Expand All @@ -297,4 +288,4 @@ def is_neg_dec_scale_bug_version():
def test_cast_string_to_negative_scale_decimal():
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, StringGen("[0-9]{9}")).select(
f.col('a').cast(DecimalType(8, -3))), conf={'spark.sql.legacy.allowNegativeScaleOfDecimal': True})
f.col('a').cast(DecimalType(8, -3))))
Loading