Skip to content

Commit

Permalink
Trim join tests to improve runtime of tests (#4731)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Feb 9, 2022
1 parent ffc18a0 commit 9705fab
Showing 1 changed file with 55 additions and 48 deletions.
103 changes: 55 additions & 48 deletions integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import pytest
from _pytest.mark.structures import ParameterSet
from pyspark.sql.functions import broadcast
from pyspark.sql.types import *
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture
Expand All @@ -30,8 +31,7 @@
BooleanGen(), DateGen(), TimestampGen(), null_gen,
pytest.param(FloatGen(), marks=[incompat]),
pytest.param(DoubleGen(), marks=[incompat]),
decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision,
decimal_gen_neg_scale, decimal_gen_64bit] + decimal_128_gens
decimal_gen_scale_precision, decimal_gen_neg_scale, decimal_gen_64bit, decimal_gen_38_0]

all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False),
ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False),
Expand Down Expand Up @@ -71,6 +71,12 @@
string_gen, null_gen, decimal_gen_default, decimal_gen_64bit
]

# Types to use when running joins on small batches. Small batch joins can take a long time
# to run and are mostly redundant with the normal batch size test, so we only run these on a
# set of representative types rather than all types.
join_small_batch_gens = [ StringGen(), IntegerGen(), decimal_gen_30_2 ]
cartesian_join_small_batch_gens = join_small_batch_gens + [basic_struct_gen, ArrayGen(string_gen)]

_sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1',
'spark.sql.join.preferSortMergeJoin': 'True',
'spark.sql.shuffle.partitions': '2',
Expand Down Expand Up @@ -104,35 +110,41 @@ def create_ridealong_df(spark, key_data_gen, data_gen, left_length, right_length
.withColumnRenamed("b", "r_b")
return left, right

# Takes a sequence of list-of-generator and batch size string pairs and returns the
# test parameters, using the batch size setting for each corresponding data generator.
def join_batch_size_test_params(*args):
params = []
for (data_gens, batch_size) in args:
for obj in data_gens:
if isinstance(obj, ParameterSet):
params += [ pytest.param(v, batch_size, marks=obj.marks) for v in obj.values ]
else:
params += [ pytest.param(obj, batch_size) ]
return params

@ignore_order(local=True)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn)
def test_right_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size):
def test_right_broadcast_nested_loop_join_without_condition_empty(join_type):
def do_join(spark):
left, right = create_df(spark, long_gen, 50, 0)
return left.join(broadcast(right), how=join_type)
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn)
def test_left_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size):
def test_left_broadcast_nested_loop_join_without_condition_empty(join_type):
def do_join(spark):
left, right = create_df(spark, long_gen, 0, 50)
return left.join(broadcast(right), how=join_type)
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn)
def test_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size):
def test_broadcast_nested_loop_join_without_condition_empty(join_type):
def do_join(spark):
left, right = create_df(spark, long_gen, 0, 0)
return left.join(broadcast(right), how=join_type)
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@ignore_order(local=True)
@pytest.mark.skipif(is_databricks_runtime(),
Expand Down Expand Up @@ -164,9 +176,10 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params(
(all_gen, '1g'),
(join_small_batch_gens, '1000')), ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_sortmerge_join(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 500)
Expand All @@ -177,13 +190,11 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', basic_nested_gens + decimal_128_gens, ids=idfn)
@pytest.mark.parametrize('join_type', all_join_types, ids=idfn)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_sortmerge_join_ridealong(data_gen, join_type, batch_size):
def test_sortmerge_join_ridealong(data_gen, join_type):
def do_join(spark):
left, right = create_ridealong_df(spark, short_gen, data_gen, 500, 500)
return left.join(right, left.key == right.r_key, join_type)
conf = copy_and_update(_sortmerge_join_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf)

# For floating point values the normalization is done using a higher order function. We could probably work around this
# for now it falls back to the CPU
Expand Down Expand Up @@ -258,8 +269,9 @@ def do_join(spark):
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens + single_array_gens_sample_with_decimal128, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params(
(all_gen + basic_nested_gens + single_array_gens_sample_with_decimal128, '1g'),
(join_small_batch_gens + [basic_struct_gen, ArrayGen(string_gen)], '100')), ids=idfn)
def test_cartesian_join(data_gen, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand All @@ -273,11 +285,10 @@ def do_join(spark):
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/334')
@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_cartesian_join_special_case_count(data_gen, batch_size):
def test_cartesian_join_special_case_count(batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 500, 250)
left, right = create_df(spark, int_gen, 50, 25)
return left.crossJoin(right).selectExpr('COUNT(*)')
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
Expand All @@ -288,11 +299,10 @@ def do_join(spark):
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/334')
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_cartesian_join_special_case_group_by(data_gen, batch_size):
def test_cartesian_join_special_case_group_by_count(batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
left, right = create_df(spark, int_gen, 50, 25)
return left.crossJoin(right).groupBy('a').count()
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
Expand All @@ -301,8 +311,9 @@ def do_join(spark):
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params(
(all_gen, '1g'),
(join_small_batch_gens, '100')), ids=idfn)
def test_cartesian_join_with_condition(data_gen, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand All @@ -317,8 +328,9 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params(
(all_gen + basic_nested_gens, '1g'),
(join_small_batch_gens, '100')), ids=idfn)
def test_broadcast_nested_loop_join(data_gen, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand All @@ -329,11 +341,10 @@ def do_join(spark):
# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_broadcast_nested_loop_join_special_case_count(data_gen, batch_size):
def test_broadcast_nested_loop_join_special_case_count(batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
left, right = create_df(spark, int_gen, 50, 25)
return left.crossJoin(broadcast(right)).selectExpr('COUNT(*)')
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
Expand All @@ -343,21 +354,21 @@ def do_join(spark):
@ignore_order(local=True)
@pytest.mark.xfail(condition=is_databricks_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/334')
@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_broadcast_nested_loop_join_special_case_group_by(data_gen, batch_size):
def test_broadcast_nested_loop_join_special_case_group_by_count(batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
left, right = create_df(spark, int_gen, 50, 25)
return left.crossJoin(broadcast(right)).groupBy('a').count()
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params(
(join_ast_gen, '1g'),
([int_gen], 100)), ids=idfn)
@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
Expand All @@ -373,25 +384,22 @@ def do_join(spark):
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen, batch_size):
def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294
# if the sizes are large enough to have both 0.0 and -0.0 show up 500 and 250
# but these take a long time to verify so we run with smaller numbers by default
# that do not expose the error
return broadcast(left).join(right, (left.b >= right.r_b), 'Right')
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84
# After 3.1.0 is the min spark version we can drop this
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen(), pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat])], ids=idfn)
@pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn)
@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches
def test_broadcast_nested_loop_join_with_condition_post_filter(data_gen, join_type, batch_size):
def test_broadcast_nested_loop_join_with_condition_post_filter(data_gen, join_type):
def do_join(spark):
left, right = create_df(spark, data_gen, 50, 25)
# This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294
Expand All @@ -400,8 +408,7 @@ def do_join(spark):
# that do not expose the error
# AST does not support cast or logarithm yet, so this must be implemented as a post-filter
return left.join(broadcast(right), left.a > f.log(right.r_a), join_type)
conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size})
assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf)
assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf)

@allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'Cast', 'GreaterThan', 'Log')
@ignore_order(local=True)
Expand Down

0 comments on commit 9705fab

Please sign in to comment.