Skip to content

Commit

Permalink
Test utility to compare SQL query results between CPU and GPU
Browse files Browse the repository at this point in the history
  • Loading branch information
mythrocks committed Jul 17, 2020
1 parent 10668cd commit ec67f3e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 63 deletions.
15 changes: 15 additions & 0 deletions integration_tests/src/main/python/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,3 +299,18 @@ def assert_gpu_and_cpu_are_equal_iterator(func, conf={}):
so any amount of data can work, just be careful about how long it might take.
"""
_assert_gpu_and_cpu_are_equal(func, False, conf=conf)


def assert_gpu_and_cpu_are_equal_sql(df, tableName, sql, conf=None):
"""
Assert that the specified SQL query produces equal results on CPU and GPU.
:param df: Input dataframe
:param tableName: Name of table to be created with the dataframe
:param sql: SQL query to be run on the specified table
:param conf: Any user-specified confs. Empty by default.
:return: Assertion failure, if results from CPU and GPU do not match.
"""
if conf is None:
conf = {}
df.createOrReplaceTempView(tableName)
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql), conf)
120 changes: 57 additions & 63 deletions integration_tests/src/main/python/window_function_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from pyspark.sql.types import *
from marks import *
Expand Down Expand Up @@ -47,23 +47,21 @@
_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_rows(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark : gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b,c asc rows between 1 preceding and 1 following) as sum_c_asc, '
' max(c) over '
' (partition by a order by b desc, c desc rows between 2 preceding and 1 following) as max_c_desc, '
' min(c) over '
' (partition by a order by b,c rows between 2 preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by b,c rows between UNBOUNDED preceding and UNBOUNDED following) as count_1, '
' row_number() over '
' (partition by a order by b,c rows between UNBOUNDED preceding and CURRENT ROW) as row_num '
'from window_agg_table ')


# Test for RANGE queries, with timestamp order-by expressions.
Expand All @@ -73,62 +71,58 @@ def test_window_aggs_for_rows(data_gen):
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps,
_grpkey_longs_with_nullable_timestamps], ids=idfn)
def test_window_aggs_for_ranges(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 1 day preceding and interval 1 day following) as sum_c_asc, '
' max(c) over '
' (partition by a order by cast(b as timestamp) desc '
' range between interval 2 days preceding and interval 1 days following) as max_c_desc, '
' min(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between interval 2 days preceding and current row) as min_c_asc, '
' count(1) over '
' (partition by a order by cast(b as timestamp) asc '
' range between CURRENT ROW and UNBOUNDED following) as count_1_asc, '
' sum(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and CURRENT ROW) as sum_c_unbounded, '
' max(c) over '
' (partition by a order by cast(b as timestamp) asc '
' range between UNBOUNDED preceding and UNBOUNDED following) as max_c_unbounded '
'from window_agg_table')


@pytest.mark.xfail(reason="[UNSUPPORTED] Ranges over non-timestamp columns "
"(https://github.com/NVIDIA/spark-rapids/issues/216)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_timestamps], ids=idfn)
def test_window_aggs_for_ranges_of_dates(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' sum(c) over '
' (partition by a order by b asc '
' range between 1 preceding and 1 following) as sum_c_asc '
'from window_agg_table'
)


@pytest.mark.xfail(reason="[BUG] `COUNT(x)` should not count null values of `x` "
"(https://github.com/NVIDIA/spark-rapids/issues/218)")
@ignore_order
@pytest.mark.parametrize('data_gen', [_grpkey_longs_with_no_nulls], ids=idfn)
def test_window_aggs_for_rows_count_non_null(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=2048))
df.createOrReplaceTempView("window_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '))
assert_gpu_and_cpu_are_equal_sql(
with_cpu_session(lambda spark: gen_df(spark, data_gen, length=2048)),
"window_agg_table",
'select '
' count(c) over '
' (partition by a order by b,c '
' rows between UNBOUNDED preceding and UNBOUNDED following) as count_non_null '
'from window_agg_table '
)

0 comments on commit ec67f3e

Please sign in to comment.