diff --git a/integration_tests/src/main/python/asserts.py b/integration_tests/src/main/python/asserts.py index e86c2a00355..bc564e8fa3f 100644 --- a/integration_tests/src/main/python/asserts.py +++ b/integration_tests/src/main/python/asserts.py @@ -299,3 +299,18 @@ def assert_gpu_and_cpu_are_equal_iterator(func, conf={}): so any amount of data can work, just be careful about how long it might take. """ _assert_gpu_and_cpu_are_equal(func, False, conf=conf) + + +def assert_gpu_and_cpu_are_equal_sql(df, tableName, sql, conf=None): + """ + Assert that the specified SQL query produces equal results on CPU and GPU. + :param df: Input dataframe + :param tableName: Name of table to be created with the dataframe + :param sql: SQL query to be run on the specified table + :param conf: Any user-specified confs. Empty by default. + :return: Assertion failure, if results from CPU and GPU do not match. + """ + if conf is None: + conf = {} + df.createOrReplaceTempView(tableName) + assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql), conf) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 60f7212b335..8f106084f61 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -14,7 +14,7 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql from data_gen import * from pyspark.sql.types import * from marks import * @@ -275,17 +275,15 @@ def test_hash_count_with_filter(data_gen, conf): @pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn) @pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn) def test_hash_multiple_filters(data_gen, conf): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=100)) - df.createOrReplaceTempView("hash_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select count(a) filter (where c > 50),' + - 'count(b) filter (where c > 100),' + - # Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed - # 'avg(b) filter (where b > 20),' + - 'min(a), max(b) filter (where c > 250) from hash_agg_table group by a'), - conf=conf) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark : gen_df(spark, data_gen, length=100)), + "hash_agg_table", + 'select count(a) filter (where c > 50),' + + 'count(b) filter (where c > 100),' + + # Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed + # 'avg(b) filter (where b > 20),' + + 'min(a), max(b) filter (where c > 250) from hash_agg_table group by a', + conf) @pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/155') @@ -297,13 +295,11 @@ def test_hash_multiple_filters(data_gen, conf): 'EqualTo', 'First', 'SortAggregateExec', 'Coalesce') @pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn) def test_hash_multiple_filters_fail(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=100)) - df.createOrReplaceTempView("hash_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select avg(b) filter (where b > 20) from hash_agg_table group by a'), - conf=_no_nans_float_conf_partial) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark : gen_df(spark, data_gen, length=100)), + "hash_agg_table", + 'select avg(b) filter (where b > 20) from hash_agg_table group by a', + _no_nans_float_conf_partial) @ignore_order @@ -321,21 +317,19 @@ def test_hash_query_max_bug(data_gen): @pytest.mark.parametrize('data_gen', [_grpkey_floats_with_nan_zero_grouping_keys, _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) def test_hash_agg_with_nan_keys(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=1024)) - df.createOrReplaceTempView("hash_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select a, ' - 'count(*) as count_stars, ' - 'count(b) as count_bees, ' - 'sum(b) as sum_of_bees, ' - 'max(c) as max_seas, ' - 'min(c) as min_seas, ' - 'count(distinct c) as count_distinct_cees, ' - 'avg(c) as average_seas ' - 'from hash_agg_table group by a'), - conf=_no_nans_float_conf) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark : gen_df(spark, data_gen, length=1024)), + "hash_agg_table", + 'select a, ' + 'count(*) as count_stars, ' + 'count(b) as count_bees, ' + 'sum(b) as sum_of_bees, ' + 'max(c) as max_seas, ' + 'min(c) as min_seas, ' + 'count(distinct c) as count_distinct_cees, ' + 'avg(c) as average_seas ' + 'from hash_agg_table group by a', + _no_nans_float_conf) @pytest.mark.xfail( @@ -347,15 +341,11 @@ def test_hash_agg_with_nan_keys(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [ _grpkey_doubles_with_nan_zero_grouping_keys], ids=idfn) def test_count_distinct_with_nan_floats(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=1024)) - df.createOrReplaceTempView("hash_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select a, ' - 'count(distinct b) as count_distinct_bees ' - 'from hash_agg_table group by a'), - conf=_no_nans_float_conf) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark : gen_df(spark, data_gen, length=1024)), + "hash_agg_table", + 'select a, count(distinct b) as count_distinct_bees from hash_agg_table group by a', + _no_nans_float_conf) # TODO: Literal tests # TODO: First and Last tests diff --git a/integration_tests/src/main/python/window_function_test.py b/integration_tests/src/main/python/window_function_test.py index 909a866cfa1..d3e52021460 100644 --- a/integration_tests/src/main/python/window_function_test.py +++ b/integration_tests/src/main/python/window_function_test.py @@ -14,7 +14,7 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql from data_gen import * from pyspark.sql.types import * from marks import * @@ -47,23 +47,21 @@ _grpkey_longs_with_timestamps, _grpkey_longs_with_nullable_timestamps], ids=idfn) def test_window_aggs_for_rows(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=2048)) - df.createOrReplaceTempView("window_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select ' - ' sum(c) over ' - ' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, ' - ' max(c) over ' - ' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, ' - ' min(c) over ' - ' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, ' - ' count(1) over ' - ' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, ' - ' row_number() over ' - ' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num ' - 'from window_agg_table ')) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark : gen_df(spark, data_gen, length=2048)), + "window_agg_table", + 'select ' + ' sum(c) over ' + ' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, ' + ' max(c) over ' + ' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, ' + ' min(c) over ' + ' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, ' + ' count(1) over ' + ' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, ' + ' row_number() over ' + ' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num ' + 'from window_agg_table ') # Test for RANGE queries, with timestamp order-by expressions. @@ -73,31 +71,29 @@ def test_window_aggs_for_rows(data_gen): @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps, _grpkey_longs_with_nullable_timestamps], ids=idfn) def test_window_aggs_for_ranges(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=2048)) - df.createOrReplaceTempView("window_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select ' - ' sum(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, ' - ' max(c) over ' - ' (partition by a order by cast(b as timestamp) desc ' - ' range between interval 2 days preceding and interval 1 days following) as max_c_desc, ' - ' min(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between interval 2 days preceding and current row) as min_c_asc, ' - ' count(1) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, ' - ' sum(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, ' - ' max(c) over ' - ' (partition by a order by cast(b as timestamp) asc ' - ' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded ' - 'from window_agg_table')) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)), + "window_agg_table", + 'select ' + ' sum(c) over ' + ' (partition by a order by cast(b as timestamp) asc ' + ' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, ' + ' max(c) over ' + ' (partition by a order by cast(b as timestamp) desc ' + ' range between interval 2 days preceding and interval 1 days following) as max_c_desc, ' + ' min(c) over ' + ' (partition by a order by cast(b as timestamp) asc ' + ' range between interval 2 days preceding and current row) as min_c_asc, ' + ' count(1) over ' + ' (partition by a order by cast(b as timestamp) asc ' + ' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, ' + ' sum(c) over ' + ' (partition by a order by cast(b as timestamp) asc ' + ' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, ' + ' max(c) over ' + ' (partition by a order by cast(b as timestamp) asc ' + ' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded ' + 'from window_agg_table') @pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns " @@ -105,16 +101,15 @@ def test_window_aggs_for_ranges(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn) def test_window_aggs_for_ranges_of_dates(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=2048)) - df.createOrReplaceTempView("window_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select ' - ' sum(c) over ' - ' (partition by a order by b asc ' - ' range between 1 preceding and 1 following) as sum_c_asc ' - 'from window_agg_table')) + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)), + "window_agg_table", + 'select ' + ' sum(c) over ' + ' (partition by a order by b asc ' + ' range between 1 preceding and 1 following) as sum_c_asc ' + 'from window_agg_table' + ) @pytest.mark.xfail(reason="[BUG] `COUNT(x)` should not count null values of `x` " @@ -122,13 +117,12 @@ def test_window_aggs_for_ranges_of_dates(data_gen): @ignore_order @pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls], ids=idfn) def test_window_aggs_for_rows_count_non_null(data_gen): - df = with_cpu_session( - lambda spark : gen_df(spark, data_gen, length=2048)) - df.createOrReplaceTempView("window_agg_table") - assert_gpu_and_cpu_are_equal_collect( - lambda spark: spark.sql( - 'select ' - ' count(c) over ' - ' (partition by a order by b,c ' - ' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null ' - 'from window_agg_table ')) \ No newline at end of file + assert_gpu_and_cpu_are_equal_sql( + with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)), + "window_agg_table", + 'select ' + ' count(c) over ' + ' (partition by a order by b,c ' + ' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null ' + 'from window_agg_table ' + )