diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index d1cd70aa43c..ea429d4533c 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -29,8 +29,7 @@ pytestmark = pytest.mark.nightly_resource_consuming_test _float_conf = {'spark.rapids.sql.variableFloatAgg.enabled': 'true', - 'spark.rapids.sql.castStringToFloat.enabled': 'true' - } + 'spark.rapids.sql.castStringToFloat.enabled': 'true'} _float_smallbatch_conf = copy_and_update(_float_conf, {'spark.rapids.sql.batchSizeBytes' : '250'}) @@ -306,6 +305,7 @@ def get_params(init_list, marked_params=[]): #Any smaller precision takes way too long to process on the CPU # or results in using too much memory on the GPU @nightly_gpu_mem_consuming_case +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('precision', [38, 37, 36, 35, 34, 33, 32, 31], ids=idfn) def test_hash_reduction_decimal_overflow_sum(precision): constant = '9' * precision @@ -320,6 +320,8 @@ def test_hash_reduction_decimal_overflow_sum(precision): # some optimizations are conspiring against us. conf = {'spark.rapids.sql.batchSizeBytes': '128m'}) + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn) @pytest.mark.parametrize('override_split_until_size', [None, 1], ids=idfn) @pytest.mark.parametrize('override_batch_size_bytes', [None, 1], ids=idfn) @@ -343,6 +345,7 @@ def test_hash_grpby_list_min_max(data_gen): lambda spark: gen_df(spark, data_gen, length=100).coalesce(1).groupby('a').agg(f.min('b'), f.max('b')) ) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn) def test_hash_reduction_sum_count_action(data_gen): assert_gpu_and_cpu_row_counts_equal( @@ -351,6 +354,7 @@ def test_hash_reduction_sum_count_action(data_gen): # Make sure that we can do computation in the group by columns @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_computation_in_grpby_columns(): conf = {'spark.rapids.sql.batchSizeBytes' : '250'} data_gen = [ @@ -364,6 +368,7 @@ def test_computation_in_grpby_columns(): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list_with_decimalbig, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_grpby_sum(data_gen, conf): @@ -375,6 +380,7 @@ def test_hash_grpby_sum(data_gen, conf): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_grpkey_short_sum_full_decimals, _grpkey_short_sum_full_neg_scale_decimals], ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_grpby_sum_full_decimal(data_gen, conf): @@ -396,6 +402,7 @@ def test_hash_reduction_sum(data_gen, conf): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens + [ DecimalGen(precision=38, scale=0), DecimalGen(precision=38, scale=-10)], ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) @@ -407,6 +414,7 @@ def test_hash_reduction_sum_full_decimal(data_gen, conf): @approximate_float @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @incompat @pytest.mark.parametrize('data_gen', _init_list + [_grpkey_short_mid_decimals, _grpkey_short_big_decimals, _grpkey_short_very_big_decimals, _grpkey_short_sum_full_decimals], ids=idfn) @@ -421,6 +429,7 @@ def test_hash_grpby_avg(data_gen, conf): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.allow_non_gpu( 'HashAggregateExec', 'AggregateExpression', 'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast', @@ -439,14 +448,16 @@ def test_hash_avg_nulls_partial_only(data_gen): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list_with_decimalbig, ids=idfn) -def test_intersectAll(data_gen): +def test_intersect_all(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : gen_df(spark, data_gen, length=100).intersectAll(gen_df(spark, data_gen, length=100))) @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list_with_decimalbig, ids=idfn) def test_exceptAll(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -469,9 +480,12 @@ def test_exceptAll(data_gen): _grpkey_small_decimals, _pivot_big_decimals, _grpkey_short_mid_decimals, _pivot_short_big_decimals, _grpkey_short_very_big_decimals, _grpkey_short_very_big_neg_scale_decimals] + + @approximate_float @ignore_order(local=True) @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _pivot_gens_with_decimals, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_grpby_pivot(data_gen, conf): @@ -485,6 +499,7 @@ def test_hash_grpby_pivot(data_gen, conf): @approximate_float @ignore_order(local=True) @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) @datagen_overrides(seed=0, reason='https://github.com/NVIDIA/spark-rapids/issues/10062') @@ -499,6 +514,7 @@ def test_hash_multiple_grpby_pivot(data_gen, conf): @approximate_float @ignore_order(local=True) @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_reduction_pivot(data_gen, conf): @@ -509,11 +525,13 @@ def test_hash_reduction_pivot(data_gen, conf): .agg(f.sum('c')), conf=conf) + @approximate_float @ignore_order(local=True) @allow_non_gpu('HashAggregateExec', 'PivotFirst', 'AggregateExpression', 'Alias', 'GetArrayItem', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'NormalizeNaNAndZero') @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nulls_and_nans], ids=idfn) def test_hash_pivot_groupby_duplicates_fallback(data_gen): # PivotFirst will not work on the GPU when pivot_values has duplicates @@ -590,6 +608,7 @@ def test_hash_pivot_groupby_duplicates_fallback(data_gen): # very simple test for just a count on decimals 128 values until we can support more with them @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) def test_decimal128_count_reduction(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -597,6 +616,7 @@ def test_decimal128_count_reduction(data_gen): # very simple test for just a count on decimals 128 values until we can support more with them @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [decimal_gen_128bit], ids=idfn) def test_decimal128_count_group_by(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -662,7 +682,9 @@ def doit(spark): doit, conf={'spark.sql.execution.useObjectHashAggregateExec': str(use_obj_hash_agg).lower()}) + @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) @allow_non_gpu(*non_utc_allow) def test_hash_groupby_collect_set(data_gen): @@ -702,6 +724,7 @@ def do_it(spark): @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) @allow_non_gpu(*non_utc_allow) def test_hash_reduction_collect_set(data_gen): @@ -736,7 +759,9 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it, conf=conf) + @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _full_gen_data_for_collect_op, ids=idfn) @allow_non_gpu(*non_utc_allow) def test_hash_groupby_collect_with_single_distinct(data_gen): @@ -749,18 +774,15 @@ def test_hash_groupby_collect_with_single_distinct(data_gen): f.countDistinct('c'), f.count('c'))) -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) -@allow_non_gpu(*non_utc_allow) -def test_hash_groupby_single_distinct_collect(data_gen): - # test distinct collect + +def hash_groupby_single_distinct_collect_impl(data_gen, conf): sql = """select a, sort_array(collect_list(distinct b)), sort_array(collect_set(distinct b)) from tbl group by a""" assert_gpu_and_cpu_are_equal_sql( df_fun=lambda spark: gen_df(spark, data_gen, length=100), - table_name="tbl", sql=sql) + table_name="tbl", sql=sql, conf=conf) # test distinct collect with nonDistinct aggregations sql = """select a, @@ -771,11 +793,38 @@ def test_hash_groupby_single_distinct_collect(data_gen): from tbl group by a""" assert_gpu_and_cpu_are_equal_sql( df_fun=lambda spark: gen_df(spark, data_gen, length=100), - table_name="tbl", sql=sql) + table_name="tbl", sql=sql, conf=conf) + @ignore_order(local=True) @pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) @allow_non_gpu(*non_utc_allow) +def test_hash_groupby_single_distinct_collect(data_gen): + """ + Tests distinct collect, with ANSI disabled. + The corresponding ANSI-enabled condition is tested in + test_hash_groupby_single_distinct_collect_ansi_enabled + """ + ansi_disabled_conf = {'spark.sql.ansi.enabled': False} + hash_groupby_single_distinct_collect_impl(data_gen=data_gen, conf=ansi_disabled_conf) + + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', [_gen_data_for_collect_op[0]], ids=idfn) +@allow_non_gpu(*non_utc_allow) +@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec') +def test_hash_groupby_single_distinct_collect_ansi_enabled(data_gen): + """ + Tests distinct collect, with ANSI enabled. + Enabling ANSI mode causes the plan to include ObjectHashAggregateExec, which runs on CPU. + """ + hash_groupby_single_distinct_collect_impl(data_gen=data_gen, conf=ansi_enabled_conf) + + +@ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 +@pytest.mark.parametrize('data_gen', _gen_data_for_collect_op, ids=idfn) +@allow_non_gpu(*non_utc_allow) def test_hash_groupby_collect_with_multi_distinct(data_gen): def spark_fn(spark_session): return gen_df(spark_session, data_gen, length=100).groupby('a').agg( @@ -831,6 +880,7 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen, non_exist_classes=','.join(non_exist_clz), conf=conf) + _replace_modes_single_distinct = [ # Spark: CPU -> CPU -> GPU(PartialMerge) -> GPU(Partial) # Databricks runtime: CPU(Final and Complete) -> GPU(PartialMerge) @@ -839,7 +889,10 @@ def test_hash_groupby_collect_partial_replace_fallback(data_gen, # Databricks runtime: GPU(Final&Complete) -> CPU(PartialMerge) 'final|partialMerge&partial|final&complete', ] + + @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @allow_non_gpu('ObjectHashAggregateExec', 'SortAggregateExec', 'ShuffleExchangeExec', 'HashPartitioning', 'SortExec', 'SortArray', 'Alias', 'Literal', 'Count', 'CollectList', 'CollectSet', @@ -1062,6 +1115,7 @@ def test_hash_groupby_typed_imperative_agg_without_gpu_implementation_fallback() @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_multiple_mode_query(data_gen, conf): @@ -1085,6 +1139,7 @@ def test_hash_multiple_mode_query(data_gen, conf): @ignore_order @incompat @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/10234") +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) @@ -1099,6 +1154,7 @@ def test_hash_multiple_mode_query_avg_distincts(data_gen, conf): @ignore_order @incompat @datagen_overrides(seed=0, reason="https://github.com/NVIDIA/spark-rapids/issues/10388") +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): @@ -1122,6 +1178,7 @@ def test_hash_query_multiple_distincts_with_non_distinct(data_gen, conf): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_query_max_with_multiple_distincts(data_gen, conf): @@ -1136,6 +1193,7 @@ def test_hash_query_max_with_multiple_distincts(data_gen, conf): conf=local_conf) @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_count_with_filter(data_gen, conf): @@ -1148,6 +1206,7 @@ def test_hash_count_with_filter(data_gen, conf): @approximate_float @ignore_order @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', _init_list + [_grpkey_short_mid_decimals, _grpkey_short_big_decimals], ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_multiple_filters(data_gen, conf): @@ -1162,6 +1221,7 @@ def test_hash_multiple_filters(data_gen, conf): @approximate_float @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys, _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) def test_hash_agg_with_nan_keys(data_gen): @@ -1182,6 +1242,7 @@ def test_hash_agg_with_nan_keys(data_gen): local_conf) @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_grpkey_structs_with_non_nested_children, _grpkey_nested_structs], ids=idfn) def test_hash_agg_with_struct_keys(data_gen): @@ -1227,6 +1288,7 @@ def test_hash_agg_with_struct_of_array_fallback(data_gen): @approximate_float @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [ _grpkey_floats_with_nulls_and_nans ], ids=idfn) def test_count_distinct_with_nan_floats(data_gen): assert_gpu_and_cpu_are_equal_sql( @@ -1259,6 +1321,7 @@ def test_first_last_reductions_nested_types(data_gen): 'first(a)', 'last(a)', 'first(a, true)', 'last(a, true)')) @pytest.mark.parametrize('data_gen', _all_basic_gens_with_all_nans_cases, ids=idfn) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @allow_non_gpu(*non_utc_allow) def test_generic_reductions(data_gen): local_conf = copy_and_update(_float_conf, {'spark.sql.legacy.allowParameterlessCount': 'true'}) @@ -1277,6 +1340,7 @@ def test_generic_reductions(data_gen): conf=local_conf) @pytest.mark.parametrize('data_gen', all_gen + _nested_gens, ids=idfn) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @allow_non_gpu(*non_utc_allow) def test_count(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -1288,6 +1352,7 @@ def test_count(data_gen): 'count(1)'), conf = {'spark.sql.legacy.allowParameterlessCount': 'true'}) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) @allow_non_gpu(*non_utc_allow) def test_distinct_count_reductions(data_gen): @@ -1295,6 +1360,7 @@ def test_distinct_count_reductions(data_gen): lambda spark : binary_op_df(spark, data_gen).selectExpr( 'count(DISTINCT a)')) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [float_gen, double_gen], ids=idfn) def test_distinct_float_count_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -1302,6 +1368,7 @@ def test_distinct_float_count_reductions(data_gen): 'count(DISTINCT a)')) @approximate_float +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', numeric_gens + [decimal_gen_64bit, decimal_gen_128bit], ids=idfn) def test_arithmetic_reductions(data_gen): assert_gpu_and_cpu_are_equal_collect( @@ -1376,6 +1443,7 @@ def test_sorted_groupby_first_last(data_gen): # Spark has a sorting bug with decimals, see https://issues.apache.org/jira/browse/SPARK-40129. # Have pytest do the sorting rather than Spark as a workaround. @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('count_func', [f.count, f.countDistinct]) @allow_non_gpu(*non_utc_allow) @@ -1406,10 +1474,12 @@ def subquery_create_temp_views(spark, expr): spark.sql(t2).createOrReplaceTempView("t2") return spark.sql(expr) + # Adding these tests as they were added in SPARK-31620, and were shown to break in # SPARK-32031, but our GPU hash aggregate does not seem to exhibit the same failure. # The tests are being added more as a sanity check. # Adaptive is being turned on and off so we invoke re-optimization at the logical plan level. +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('adaptive', ["true", "false"]) @pytest.mark.parametrize('expr', [ "select sum(if(c > (select a from t1), d, 0)) as csum from t2", @@ -1460,6 +1530,7 @@ def group_by_count(spark): assert_gpu_and_cpu_are_equal_collect(group_by_count) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('cast_struct_tostring', ['LEGACY', 'SPARK311+']) @pytest.mark.parametrize('key_data_gen', [ StructGen([ @@ -1502,6 +1573,7 @@ def _count_distinct_by_struct(spark): assert_gpu_and_cpu_are_equal_collect(_count_distinct_by_struct) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('cast_struct_tostring', ['LEGACY', 'SPARK311+']) @pytest.mark.parametrize('key_data_gen', [ StructGen([ @@ -1524,6 +1596,8 @@ def _count_distinct_by_struct(spark): 'spark.sql.legacy.castComplexTypesToString.enabled': cast_struct_tostring == 'LEGACY' }) + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) def test_reduction_nested_struct(): def do_it(spark): @@ -1531,6 +1605,8 @@ def do_it(spark): return df.agg(f.sum(df.a.aa.aaa)) assert_gpu_and_cpu_are_equal_collect(do_it) + +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) def test_reduction_nested_array(): def do_it(spark): @@ -1538,7 +1614,9 @@ def do_it(spark): return df.agg(f.sum(df.a[1].aa)) assert_gpu_and_cpu_are_equal_collect(do_it) + # The map here is a child not a top level, because we only support GetMapValue on String to String maps. +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @ignore_order(local=True) def test_reduction_nested_map(): def do_it(spark): @@ -1547,6 +1625,7 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it) @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_agg_nested_struct(): def do_it(spark): df = two_col_df(spark, StringGen('k{1,5}'), StructGen([('aa', StructGen([('aaa', IntegerGen(min_val=0, max_val=4))]))])) @@ -1554,6 +1633,7 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it) @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_agg_nested_array(): def do_it(spark): df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(StructGen([('aa', IntegerGen(min_val=0, max_val=4))]))) @@ -1562,6 +1642,7 @@ def do_it(spark): # The map here is a child not a top level, because we only support GetMapValue on String to String maps. @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_agg_nested_map(): def do_it(spark): df = two_col_df(spark, StringGen('k{1,5}'), ArrayGen(MapGen(StringGen('a{1,5}', nullable=False), StringGen('[ab]{1,5}')))) @@ -1602,6 +1683,7 @@ def test_hash_groupby_approx_percentile_byte(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @incompat +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/11198 @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled): conf = {'spark.sql.adaptive.enabled': aqe_enabled} @@ -1629,6 +1711,7 @@ def test_hash_groupby_approx_percentile_long(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @incompat +@disable_ansi_mode # ANSI mode is tested in test_hash_groupby_approx_percentile_long_single_ansi @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) def test_hash_groupby_approx_percentile_long_single(aqe_enabled): conf = {'spark.sql.adaptive.enabled': aqe_enabled} @@ -1637,6 +1720,24 @@ def test_hash_groupby_approx_percentile_long_single(aqe_enabled): ('v', UniqueLongGen())], length=100), 0.5, conf) + +@incompat +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec') +def test_hash_groupby_approx_percentile_long_single_ansi(aqe_enabled): + """ + Tests approx_percentile with ANSI mode enabled. + Note: In ANSI mode, the test query exercises ObjectHashAggregateExec and ShuffleExchangeExec, + which fall back to CPU. + """ + conf = {'spark.sql.adaptive.enabled': aqe_enabled} + conf.update(ansi_enabled_conf) + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', UniqueLongGen())], length=100), + 0.5, conf) + + @incompat @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) def test_hash_groupby_approx_percentile_double(aqe_enabled): @@ -1682,14 +1783,27 @@ def test_hash_groupby_approx_percentile_decimal32(): ('v', DecimalGen(6, 2))]), [0.05, 0.25, 0.5, 0.75, 0.95]) + @incompat @ignore_order(local=True) +@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi. def test_hash_groupby_approx_percentile_decimal32_single(): compare_percentile_approx( lambda spark: gen_df(spark, [('k', RepeatSeqGen(ByteGen(nullable=False), length=2)), ('v', DecimalGen(6, 2))]), 0.05) + +@incompat +@ignore_order(local=True) +@allow_non_gpu('ObjectHashAggregateExec', 'ShuffleExchangeExec') +def test_hash_groupby_approx_percentile_decimal_single_ansi(): + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', RepeatSeqGen(ByteGen(nullable=False), length=2)), + ('v', DecimalGen(6, 2))]), + 0.05, conf=ansi_enabled_conf) + + @incompat @ignore_order(local=True) def test_hash_groupby_approx_percentile_decimal64(): @@ -1699,6 +1813,7 @@ def test_hash_groupby_approx_percentile_decimal64(): [0.05, 0.25, 0.5, 0.75, 0.95]) @incompat +@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi. @ignore_order(local=True) def test_hash_groupby_approx_percentile_decimal64_single(): compare_percentile_approx( @@ -1715,6 +1830,7 @@ def test_hash_groupby_approx_percentile_decimal128(): [0.05, 0.25, 0.5, 0.75, 0.95]) @incompat +@disable_ansi_mode # ANSI mode is tested with test_hash_groupby_approx_percentile_decimal_single_ansi. @ignore_order(local=True) def test_hash_groupby_approx_percentile_decimal128_single(): compare_percentile_approx( @@ -1812,6 +1928,7 @@ def create_percentile_sql(func_name, percentiles, reduction): @ignore_order +@disable_ansi_mode # ANSI mode is tested in test_hash_grpby_avg_nulls_ansi @pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_grpby_avg_nulls(data_gen, conf): @@ -1836,6 +1953,7 @@ def test_hash_grpby_avg_nulls_ansi(data_gen, conf): ) @ignore_order +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('data_gen', [_grpkey_strings_with_extra_nulls], ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_reduction_avg_nulls(data_gen, conf): @@ -2066,6 +2184,7 @@ def test_min_max_in_groupby_and_reduction(data_gen): # Some Spark implementations will optimize this aggregation as a # complete aggregation (i.e.: only one aggregation node in the plan) @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 def test_hash_aggregate_complete_with_grouping_expressions(): assert_gpu_and_cpu_are_equal_sql( lambda spark : spark.range(10).withColumn("id2", f.col("id")), @@ -2073,6 +2192,7 @@ def test_hash_aggregate_complete_with_grouping_expressions(): "select id, avg(id) from hash_agg_complete_table group by id, id2 + 1") @ignore_order(local=True) +@disable_ansi_mode # https://github.com/NVIDIA/spark-rapids/issues/5114 @pytest.mark.parametrize('cast_key_to', ["byte", "short", "int", "long", "string", "DECIMAL(38,5)"], ids=idfn) def test_hash_agg_force_pre_sort(cast_key_to):