From 9705fab9a20487518b264f30cd9e2b5199e0a66a Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 9 Feb 2022 15:49:39 -0600 Subject: [PATCH] Trim join tests to improve runtime of tests (#4731) Signed-off-by: Jason Lowe --- .../src/main/python/join_test.py | 103 ++++++++++-------- 1 file changed, 55 insertions(+), 48 deletions(-) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index e8414230b1b..5b08e1bad68 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +from _pytest.mark.structures import ParameterSet from pyspark.sql.functions import broadcast from pyspark.sql.types import * from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_cpu_and_gpu_are_equal_collect_with_capture @@ -30,8 +31,7 @@ BooleanGen(), DateGen(), TimestampGen(), null_gen, pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat]), - decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, - decimal_gen_neg_scale, decimal_gen_64bit] + decimal_128_gens + decimal_gen_scale_precision, decimal_gen_neg_scale, decimal_gen_64bit, decimal_gen_38_0] all_gen_no_nulls = [StringGen(nullable=False), ByteGen(nullable=False), ShortGen(nullable=False), IntegerGen(nullable=False), LongGen(nullable=False), @@ -71,6 +71,12 @@ string_gen, null_gen, decimal_gen_default, decimal_gen_64bit ] +# Types to use when running joins on small batches. Small batch joins can take a long time +# to run and are mostly redundant with the normal batch size test, so we only run these on a +# set of representative types rather than all types. +join_small_batch_gens = [ StringGen(), IntegerGen(), decimal_gen_30_2 ] +cartesian_join_small_batch_gens = join_small_batch_gens + [basic_struct_gen, ArrayGen(string_gen)] + _sortmerge_join_conf = {'spark.sql.autoBroadcastJoinThreshold': '-1', 'spark.sql.join.preferSortMergeJoin': 'True', 'spark.sql.shuffle.partitions': '2', @@ -104,35 +110,41 @@ def create_ridealong_df(spark, key_data_gen, data_gen, left_length, right_length .withColumnRenamed("b", "r_b") return left, right +# Takes a sequence of list-of-generator and batch size string pairs and returns the +# test parameters, using the batch size setting for each corresponding data generator. +def join_batch_size_test_params(*args): + params = [] + for (data_gens, batch_size) in args: + for obj in data_gens: + if isinstance(obj, ParameterSet): + params += [ pytest.param(v, batch_size, marks=obj.marks) for v in obj.values ] + else: + params += [ pytest.param(obj, batch_size) ] + return params + @ignore_order(local=True) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) -def test_right_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size): +def test_right_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 50, 0) return left.join(broadcast(right), how=join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) @ignore_order(local=True) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) -def test_left_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size): +def test_left_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 0, 50) return left.join(broadcast(right), how=join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) @ignore_order(local=True) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) -def test_broadcast_nested_loop_join_without_condition_empty(join_type, batch_size): +def test_broadcast_nested_loop_join_without_condition_empty(join_type): def do_join(spark): left, right = create_df(spark, long_gen, 0, 0) return left.join(broadcast(right), how=join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) @ignore_order(local=True) @pytest.mark.skipif(is_databricks_runtime(), @@ -164,9 +176,10 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params( + (all_gen, '1g'), + (join_small_batch_gens, '1000')), ids=idfn) @pytest.mark.parametrize('join_type', all_join_types, ids=idfn) -@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_sortmerge_join(data_gen, join_type, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) @@ -177,13 +190,11 @@ def do_join(spark): @ignore_order(local=True) @pytest.mark.parametrize('data_gen', basic_nested_gens + decimal_128_gens, ids=idfn) @pytest.mark.parametrize('join_type', all_join_types, ids=idfn) -@pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_sortmerge_join_ridealong(data_gen, join_type, batch_size): +def test_sortmerge_join_ridealong(data_gen, join_type): def do_join(spark): left, right = create_ridealong_df(spark, short_gen, data_gen, 500, 500) return left.join(right, left.key == right.r_key, join_type) - conf = copy_and_update(_sortmerge_join_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) # For floating point values the normalization is done using a higher order function. We could probably work around this # for now it falls back to the CPU @@ -258,8 +269,9 @@ def do_join(spark): # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed -@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens + single_array_gens_sample_with_decimal128, ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params( + (all_gen + basic_nested_gens + single_array_gens_sample_with_decimal128, '1g'), + (join_small_batch_gens + [basic_struct_gen, ArrayGen(string_gen)], '100')), ids=idfn) def test_cartesian_join(data_gen, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) @@ -273,11 +285,10 @@ def do_join(spark): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.xfail(condition=is_databricks_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/334') -@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_cartesian_join_special_case_count(data_gen, batch_size): +def test_cartesian_join_special_case_count(batch_size): def do_join(spark): - left, right = create_df(spark, data_gen, 500, 250) + left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(right).selectExpr('COUNT(*)') conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) @@ -288,11 +299,10 @@ def do_join(spark): @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed @pytest.mark.xfail(condition=is_databricks_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/334') -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_cartesian_join_special_case_group_by(data_gen, batch_size): +def test_cartesian_join_special_case_group_by_count(batch_size): def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) + left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(right).groupBy('a').count() conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) @@ -301,8 +311,9 @@ def do_join(spark): # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.order(1) # at the head of xdist worker queue if pytest-order is installed -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params( + (all_gen, '1g'), + (join_small_batch_gens, '100')), ids=idfn) def test_cartesian_join_with_condition(data_gen, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) @@ -317,8 +328,9 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen + basic_nested_gens, ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches +@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params( + (all_gen + basic_nested_gens, '1g'), + (join_small_batch_gens, '100')), ids=idfn) def test_broadcast_nested_loop_join(data_gen, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) @@ -329,11 +341,10 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen + single_level_array_gens + single_array_gens_sample_with_decimal128, ids=idfn) @pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_broadcast_nested_loop_join_special_case_count(data_gen, batch_size): +def test_broadcast_nested_loop_join_special_case_count(batch_size): def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) + left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(broadcast(right)).selectExpr('COUNT(*)') conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) @@ -343,11 +354,10 @@ def do_join(spark): @ignore_order(local=True) @pytest.mark.xfail(condition=is_databricks_runtime(), reason='https://github.com/NVIDIA/spark-rapids/issues/334') -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_broadcast_nested_loop_join_special_case_group_by(data_gen, batch_size): +def test_broadcast_nested_loop_join_special_case_group_by_count(batch_size): def do_join(spark): - left, right = create_df(spark, data_gen, 50, 25) + left, right = create_df(spark, int_gen, 50, 25) return left.crossJoin(broadcast(right)).groupBy('a').count() conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) @@ -355,9 +365,10 @@ def do_join(spark): # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) +@pytest.mark.parametrize('data_gen,batch_size', join_batch_size_test_params( + (join_ast_gen, '1g'), + ([int_gen], 100)), ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'Cross'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches def test_right_broadcast_nested_loop_join_with_ast_condition(data_gen, join_type, batch_size): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) @@ -373,8 +384,7 @@ def do_join(spark): # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.parametrize('data_gen', join_ast_gen, ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen, batch_size): +def test_left_broadcast_nested_loop_join_with_ast_condition(data_gen): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) # This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294 @@ -382,16 +392,14 @@ def do_join(spark): # but these take a long time to verify so we run with smaller numbers by default # that do not expose the error return broadcast(left).join(right, (left.b >= right.r_b), 'Right') - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) # local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 # After 3.1.0 is the min spark version we can drop this @ignore_order(local=True) @pytest.mark.parametrize('data_gen', [IntegerGen(), LongGen(), pytest.param(FloatGen(), marks=[incompat]), pytest.param(DoubleGen(), marks=[incompat])], ids=idfn) @pytest.mark.parametrize('join_type', ['Inner', 'Cross'], ids=idfn) -@pytest.mark.parametrize('batch_size', ['100', '1g'], ids=idfn) # set the batch size so we can test multiple stream batches -def test_broadcast_nested_loop_join_with_condition_post_filter(data_gen, join_type, batch_size): +def test_broadcast_nested_loop_join_with_condition_post_filter(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 50, 25) # This test is impacted by https://github.com/NVIDIA/spark-rapids/issues/294 @@ -400,8 +408,7 @@ def do_join(spark): # that do not expose the error # AST does not support cast or logarithm yet, so this must be implemented as a post-filter return left.join(broadcast(right), left.a > f.log(right.r_a), join_type) - conf = copy_and_update(allow_negative_scale_of_decimal_conf, {'spark.rapids.sql.batchSizeBytes': batch_size}) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=conf) + assert_gpu_and_cpu_are_equal_collect(do_join, conf=allow_negative_scale_of_decimal_conf) @allow_non_gpu('BroadcastExchangeExec', 'BroadcastNestedLoopJoinExec', 'Cast', 'GreaterThan', 'Log') @ignore_order(local=True)