Skip to content

Commit

Permalink
Add partial and final only hash aggregate tests and fix nulls corner …
Browse files Browse the repository at this point in the history
…case for Average (#157)

* make config passing configurable in hash aggregate tests

Co-authored-by: Kuhu Shukla <kuhus@nvidia.com>
  • Loading branch information
Kuhu Shukla and kuhushukla authored Jun 12, 2020
1 parent cbd8371 commit 8c27f70
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 41 deletions.
168 changes: 128 additions & 40 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

_no_nans_float_conf_partial = _no_nans_float_conf.copy()
_no_nans_float_conf_partial.update(
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

_no_nans_float_conf_final = _no_nans_float_conf.copy()
_no_nans_float_conf_final.update({'spark.rapids.sql.hashAgg.replaceMode': 'final'})

# The input lists or schemas that are used by StructGen.

Expand Down Expand Up @@ -87,53 +93,90 @@
_grpkey_floats_with_nulls_and_nans]


def get_struct_gens(init_list=_init_list_no_nans, marked_params=[]):
def get_params(init_list, marked_params=[]):
"""
A method to build the structGen inputs along with their passed in markers to allow testing
specific params with their relevant markers.
:arg init_list list of schemas to be tested, defaults to _init_list_no_nans from above.
:arg marked_params A list of tuples of (schema, list of pytest markers)
Look at params_markers_for_avg_sum as an example.
A method to build the test inputs along with their passed in markers to allow testing
specific params with their relevant markers. Right now it is used to parametrize _confs with
allow_non_gpu which allows some operators to be whitelisted.
However, this can be used with any list of params to the test.
:arg init_list list of param values to be tested
:arg marked_params A list of tuples of (params, list of pytest markers)
Look at params_markers_for_confs as an example.
"""
list = init_list.copy()
for index in range(0, len(list)):
for test_case,marks in marked_params:
for test_case, marks in marked_params:
if list[index] == test_case:
list[index] = pytest.param(list[index], marks=marks)
return list


params_markers_for_avg_sum=[
(_grpkey_strings_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_dbls_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_floats_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_floats_with_nulls_and_nans, [pytest.mark.incompat, pytest.mark.approximate_float])]
# Run these tests with in 3 modes, all on the GPU, only partial aggregates on GPU and
# only final aggregates on the GPU
_confs = [_no_nans_float_conf, _no_nans_float_conf_final, _no_nans_float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
_excluded_operators_marker = pytest.mark.allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')

params_markers_for_confs = [
(_no_nans_float_conf_partial, [_excluded_operators_marker]),
(_no_nans_float_conf_final, [_excluded_operators_marker])
]


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_grpby_sum(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_sum(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b')),
conf=_no_nans_float_conf
conf=conf
)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(init_list=_init_list_with_nans_and_no_nans,
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_grpby_avg(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_nans_and_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.avg('b')),
conf=_no_nans_float_conf
conf=conf
)

# tracks https://github.com/NVIDIA/spark-rapids/issues/154
@approximate_float
@ignore_order
@incompat
@pytest.mark.allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec')
@pytest.mark.parametrize('data_gen', [
StructGen(children=[('a', int_gen), ('b', int_gen)],nullable=False,
special_cases=[((None, None), 400.0), ((None, -1542301795), 100.0)])], ids=idfn)
def test_hash_avg_nulls_partial_only(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=2).agg(f.avg('b')),
conf=_no_nans_float_conf_partial
)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_multiple_mode_query(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_multiple_mode_query(data_gen, conf):
print_params(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
Expand All @@ -144,44 +187,92 @@ def test_hash_multiple_mode_query(data_gen):
f.countDistinct('b'),
f.sum('a'),
f.min('a'),
f.max('a'),
f.countDistinct('c')), conf=_no_nans_float_conf)
f.max('a')
# Add the following line back in
# after https://github.com/NVIDIA/spark-rapids/issues/153 is fixed.
# f.countDistinct('c')
), conf=conf)


# Remove this test and add back the countDistinct mentioned above
# once https://github.com/NVIDIA/spark-rapids/issues/153 is fixed.
@allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')
@ignore_order
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/153')
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
def test_hash_multiple_distincts_fail(data_gen):
print_params(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=3)
.groupby('a')
.agg(f.count('a'),
f.countDistinct('b'),
f.countDistinct('c')
), conf=_no_nans_float_conf_partial)


@approximate_float
@ignore_order
@incompat
@pytest.mark.parametrize('data_gen', get_struct_gens(), ids=idfn)
def test_hash_multiple_mode_query_avg_distincts(data_gen):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs),
ids=idfn)
def test_hash_multiple_mode_query_avg_distincts(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.selectExpr('avg(distinct a)', 'avg(distinct b)','avg(distinct c)'),
conf=_no_nans_float_conf)
conf=conf)


@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(), ids=idfn)
def test_hash_count_with_filter(data_gen):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_count_with_filter(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.selectExpr('count(a) filter (where c > 50)'),
conf=_no_nans_float_conf)
conf=conf)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_multiple_filters(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_multiple_filters(data_gen, conf):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select count(a) filter (where c > 50),' +
'count(b) filter (where c > 100),' +
'avg(b) filter (where b > 20),' +
# Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed
# 'avg(b) filter (where b > 20),' +
'min(a), max(b) filter (where c > 250) from hash_agg_table group by a'),
conf=_no_nans_float_conf)
conf=conf)


@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/155')
@ignore_order
@allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
def test_hash_multiple_filters_fail(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select avg(b) filter (where b > 20) from hash_agg_table group by a'),
conf=_no_nans_float_conf_partial)


@ignore_order
Expand All @@ -193,9 +284,6 @@ def test_hash_query_max_bug(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.max('b')))

# TODO: Make config a param for partial and final only testing
# TODO: Why limit to 100, go bigger - keeping 100 for now to make it easier to debug
# TODO: String datagen combos in struct_gens which make more sense.

# TODO: Literal tests
# TODO: Port over sort aggregate tests
# TODO: First and Last tests
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ case class GpuAverage(child: GpuExpression) extends GpuDeclarativeAggregate {
override lazy val inputProjection: Seq[GpuExpression] = Seq(
child match {
case literal: GpuLiteral => toDoubleLit(literal.value)
case _ => GpuCast(child, DoubleType)
case _ => GpuCoalesce(Seq(GpuCast(child, DoubleType), GpuLiteral(0D, DoubleType)))
},
child match {
case literal : GpuLiteral => GpuLiteral(if (literal.value != null) 1L else 0L, LongType)
Expand Down

0 comments on commit 8c27f70

Please sign in to comment.