Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add partial and final only hash aggregate tests and fix nulls corner case for Average #157

Merged
merged 7 commits into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 128 additions & 40 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
'spark.rapids.sql.castStringToFloat.enabled': 'true'
}

_no_nans_float_conf_partial = _no_nans_float_conf.copy()
_no_nans_float_conf_partial.update(
{'spark.rapids.sql.hashAgg.replaceMode': 'partial'})

_no_nans_float_conf_final = _no_nans_float_conf.copy()
_no_nans_float_conf_final.update({'spark.rapids.sql.hashAgg.replaceMode': 'final'})

# The input lists or schemas that are used by StructGen.

Expand Down Expand Up @@ -87,53 +93,90 @@
_grpkey_floats_with_nulls_and_nans]


def get_struct_gens(init_list=_init_list_no_nans, marked_params=[]):
def get_params(init_list, marked_params=[]):
revans2 marked this conversation as resolved.
Show resolved Hide resolved
"""
A method to build the structGen inputs along with their passed in markers to allow testing
specific params with their relevant markers.
:arg init_list list of schemas to be tested, defaults to _init_list_no_nans from above.
:arg marked_params A list of tuples of (schema, list of pytest markers)
Look at params_markers_for_avg_sum as an example.
A method to build the test inputs along with their passed in markers to allow testing
specific params with their relevant markers. Right now it is used to parametrize _confs with
allow_non_gpu which allows some operators to be whitelisted.
However, this can be used with any list of params to the test.
:arg init_list list of param values to be tested
:arg marked_params A list of tuples of (params, list of pytest markers)
Look at params_markers_for_confs as an example.
"""
list = init_list.copy()
for index in range(0, len(list)):
for test_case,marks in marked_params:
for test_case, marks in marked_params:
if list[index] == test_case:
list[index] = pytest.param(list[index], marks=marks)
return list


params_markers_for_avg_sum=[
(_grpkey_strings_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_dbls_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_floats_with_nulls, [pytest.mark.incompat, pytest.mark.approximate_float]),
(_grpkey_floats_with_nulls_and_nans, [pytest.mark.incompat, pytest.mark.approximate_float])]
# Run these tests with in 3 modes, all on the GPU, only partial aggregates on GPU and
# only final aggregates on the GPU
_confs = [_no_nans_float_conf, _no_nans_float_conf_final, _no_nans_float_conf_partial]

# Pytest marker for list of operators allowed to run on the CPU,
# esp. useful in partial and final only modes.
_excluded_operators_marker = pytest.mark.allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')

params_markers_for_confs = [
(_no_nans_float_conf_partial, [_excluded_operators_marker]),
(_no_nans_float_conf_final, [_excluded_operators_marker])
]


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_grpby_sum(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_sum(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.sum('b')),
conf=_no_nans_float_conf
conf=conf
)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(init_list=_init_list_with_nans_and_no_nans,
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_grpby_avg(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_with_nans_and_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_grpby_avg(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.avg('b')),
conf=_no_nans_float_conf
conf=conf
)

# tracks https://github.com/NVIDIA/spark-rapids/issues/154
@approximate_float
@ignore_order
@incompat
@pytest.mark.allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec')
@pytest.mark.parametrize('data_gen', [
StructGen(children=[('a', int_gen), ('b', int_gen)],nullable=False,
special_cases=[((None, None), 400.0), ((None, -1542301795), 100.0)])], ids=idfn)
def test_hash_avg_nulls_partial_only(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=2).agg(f.avg('b')),
conf=_no_nans_float_conf_partial
)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_multiple_mode_query(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_multiple_mode_query(data_gen, conf):
print_params(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
Expand All @@ -144,44 +187,92 @@ def test_hash_multiple_mode_query(data_gen):
f.countDistinct('b'),
f.sum('a'),
f.min('a'),
f.max('a'),
f.countDistinct('c')), conf=_no_nans_float_conf)
f.max('a')
# Add the following line back in
# after https://github.com/NVIDIA/spark-rapids/issues/153 is fixed.
# f.countDistinct('c')
), conf=conf)


# Remove this test and add back the countDistinct mentioned above
# once https://github.com/NVIDIA/spark-rapids/issues/153 is fixed.
@allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')
@ignore_order
@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/153')
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
def test_hash_multiple_distincts_fail(data_gen):
print_params(data_gen)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=3)
.groupby('a')
.agg(f.count('a'),
f.countDistinct('b'),
f.countDistinct('c')
), conf=_no_nans_float_conf_partial)


@approximate_float
@ignore_order
@incompat
@pytest.mark.parametrize('data_gen', get_struct_gens(), ids=idfn)
def test_hash_multiple_mode_query_avg_distincts(data_gen):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs),
ids=idfn)
def test_hash_multiple_mode_query_avg_distincts(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.selectExpr('avg(distinct a)', 'avg(distinct b)','avg(distinct c)'),
conf=_no_nans_float_conf)
conf=conf)


@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(), ids=idfn)
def test_hash_count_with_filter(data_gen):
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_count_with_filter(data_gen, conf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100)
.selectExpr('count(a) filter (where c > 50)'),
conf=_no_nans_float_conf)
conf=conf)


@approximate_float
@ignore_order
@pytest.mark.parametrize('data_gen', get_struct_gens(
marked_params=params_markers_for_avg_sum), ids=idfn)
def test_hash_multiple_filters(data_gen):
@incompat
@pytest.mark.parametrize('data_gen', _init_list_no_nans, ids=idfn)
@pytest.mark.parametrize('conf', get_params(_confs, params_markers_for_confs), ids=idfn)
def test_hash_multiple_filters(data_gen, conf):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select count(a) filter (where c > 50),' +
'count(b) filter (where c > 100),' +
'avg(b) filter (where b > 20),' +
# Uncomment after https://github.com/NVIDIA/spark-rapids/issues/155 is fixed
# 'avg(b) filter (where b > 20),' +
'min(a), max(b) filter (where c > 250) from hash_agg_table group by a'),
conf=_no_nans_float_conf)
conf=conf)


@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/155')
@ignore_order
@allow_non_gpu(
'HashAggregateExec', 'AggregateExpression',
'AttributeReference', 'Alias', 'Sum', 'Count', 'Max', 'Min', 'Average', 'Cast',
'KnownFloatingPointNormalized', 'NormalizeNaNAndZero', 'GreaterThan', 'Literal', 'If',
'EqualTo', 'First', 'SortAggregateExec', 'Coalesce')
@pytest.mark.parametrize('data_gen', [_longs_with_nulls], ids=idfn)
def test_hash_multiple_filters_fail(data_gen):
df = with_cpu_session(
lambda spark : gen_df(spark, data_gen, length=100))
df.createOrReplaceTempView("hash_agg_table")
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.sql(
'select avg(b) filter (where b > 20) from hash_agg_table group by a'),
conf=_no_nans_float_conf_partial)


@ignore_order
Expand All @@ -193,9 +284,6 @@ def test_hash_query_max_bug(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: gen_df(spark, data_gen, length=100).groupby('a').agg(f.max('b')))

# TODO: Make config a param for partial and final only testing
# TODO: Why limit to 100, go bigger - keeping 100 for now to make it easier to debug
# TODO: String datagen combos in struct_gens which make more sense.

# TODO: Literal tests
# TODO: Port over sort aggregate tests
# TODO: First and Last tests
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ case class GpuAverage(child: GpuExpression) extends GpuDeclarativeAggregate {
override lazy val inputProjection: Seq[GpuExpression] = Seq(
child match {
case literal: GpuLiteral => toDoubleLit(literal.value)
case _ => GpuCast(child, DoubleType)
case _ => GpuCoalesce(Seq(GpuCast(child, DoubleType), GpuLiteral(0D, DoubleType)))
},
child match {
case literal : GpuLiteral => GpuLiteral(if (literal.value != null) 1L else 0L, LongType)
Expand Down