From ddbb6ee280440bbd434a394845edd3aa718cd7d5 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 24 Jun 2020 21:07:14 -0700 Subject: [PATCH 01/13] Updated join tests for cache --- .../src/main/python/cache_test.py | 5 + .../src/main/python/join_test.py | 106 +++++++++++++----- 2 files changed, 82 insertions(+), 29 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 5ecd9b3b792..6362e48134d 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -28,3 +28,8 @@ def test_passing_gpuExpr_as_Expr(): .cache() .limit(50) ) + +def test_cache_table(): + spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.sql("select * from range5").limit(5)) diff --git a/integration_tests/src/main/python/join_test.py b/integration_tests/src/main/python/join_test.py index 1252dcecc7f..99a603af4c5 100644 --- a/integration_tests/src/main/python/join_test.py +++ b/integration_tests/src/main/python/join_test.py @@ -52,64 +52,112 @@ def do_join(spark): return left.join(right, left.a == right.r_a, join_type) assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) -# local sort becasue of https://github.com/NVIDIA/spark-rapids/issues/84 -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', - pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn) -def test_sortmerge_join(data_gen, join_type): +def get_sortmerge_join(join_type, data_gen, cached): def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) return left.join(right, left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join, conf=_sortmerge_join_conf) + def do_join_cached(spark): + return do_join(spark) + if cached == "true": + return do_join_cached + else: + return do_join -# Once https://github.com/NVIDIA/spark-rapids/issues/280 is fixed this test should be deleted +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 @ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen_no_nulls, ids=idfn) -@pytest.mark.parametrize('join_type', ['FullOuter'], ids=idfn) -def test_broadcast_join_no_nulls(data_gen, join_type): +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn) +@pytest.mark.parametrize('isCached', ["false", "true"]) +def test_sortmerge_join(data_gen, join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_sortmerge_join(join_type, data_gen, isCached), conf=_sortmerge_join_conf) + +def get_broadcast_join(join_type, data_gen, cached): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), left.a == right.r_a, join_type) - assert_gpu_and_cpu_are_equal_collect(do_join) + def do_join_cached(spark): + return do_join(spark).cache().limit(50) + + if cached == "true": + return do_join_cached + else: + return do_join # For tests which include broadcast joins, right table is broadcasted and hence it is # made smaller than left table. # local sort becasue of https://github.com/NVIDIA/spark-rapids/issues/84 @ignore_order(local=True) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', - pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn) -def test_broadcast_join(data_gen, join_type): +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti', pytest.param('FullOuter', marks=[pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/280')])], ids=idfn) +@pytest.mark.parametrize('isCached', ["false", "true"]) +def test_broadcast_join(data_gen, join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_broadcast_join(join_type, data_gen, isCached)) + +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Inner']) +@pytest.mark.parametrize('isCached', ["false", pytest.param("true", marks=pytest.mark.xfail(reason="cache needs to be fixed"))], ids=idfn) +def test_broadcast_inner_join(data_gen, join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_broadcast_join(join_type, data_gen, isCached)) + +# Once https://github.com/NVIDIA/spark-rapids/issues/280 is fixed this test should be deleted +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen_no_nulls, ids=idfn) +@pytest.mark.parametrize('join_type', ['FullOuter'], ids=idfn) +def test_broadcast_join_no_nulls(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), left.a == right.r_a, join_type) assert_gpu_and_cpu_are_equal_collect(do_join) -# local sort becasue of https://github.com/NVIDIA/spark-rapids/issues/84 -@ignore_order(local=True) -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) -@pytest.mark.parametrize('join_type', ['Inner'], ids=idfn) -def test_broadcast_join_with_conditionals(data_gen, join_type): +def get_broadcast_join_with_conditionals(join_type, data_gen, cached): def do_join(spark): left, right = create_df(spark, data_gen, 500, 250) return left.join(broadcast(right), (left.a == right.r_a) & (left.b >= right.r_b), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join) + def do_join_cached(spark): + return do_join(spark).cache().limit(50) + if cached == "true": + return do_join_cached + else: + return do_join -_mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), - ('b', IntegerGen()), ('c', LongGen())] -_mixed_df2_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), - ('b', StringGen()), ('c', BooleanGen())] +# local sort because of https://github.com/NVIDIA/spark-rapids/issues/84 +@ignore_order(local=True) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Inner'], ids=idfn) +@pytest.mark.parametrize('isCached', ["false", pytest.param("true", marks=pytest.mark.xfail(reason="cache needs to be fixed"))], ids=idfn) +def test_broadcast_join_with_conditionals(data_gen, join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_broadcast_join_with_conditionals(join_type, data_gen, isCached)) -@ignore_order -@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'LeftSemi', 'LeftAnti', 'FullOuter'], ids=idfn) -def test_broadcast_join_mixed(join_type): +def get_broadcast_join_mixed_df(join_type, cached): + _mixed_df1_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), + ('b', IntegerGen()), ('c', LongGen())] + _mixed_df2_with_nulls = [('a', RepeatSeqGen(LongGen(nullable=(True, 20.0)), length= 10)), + ('b', StringGen()), ('c', BooleanGen())] def do_join(spark): left = gen_df(spark, _mixed_df1_with_nulls, length=500) right = gen_df(spark, _mixed_df2_with_nulls, length=500).withColumnRenamed("a", "r_a")\ .withColumnRenamed("b", "r_b").withColumnRenamed("c", "r_c") return left.join(broadcast(right), left.a.eqNullSafe(right.r_a), join_type) - assert_gpu_and_cpu_are_equal_collect(do_join) + def do_join_cached(spark): + return do_join(spark).cache().limit(50) + + if cached == "true": + return do_join_cached + else: + return do_join + +@ignore_order +@pytest.mark.parametrize('join_type', ['LeftSemi', 'LeftAnti'], ids=idfn) +@pytest.mark.parametrize('isCached', ["false", "true"]) +def test_broadcast_joins_mixed(join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_broadcast_join_mixed_df(join_type, isCached)) + +@ignore_order +@pytest.mark.parametrize('isCached', ["false", pytest.param("true", marks=pytest.mark.xfail(reason="cache needs to be fixed"))], ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Inner', 'FullOuter'], ids=idfn) +def test_broadcast_join_mixed_failing_cache(join_type, isCached): + assert_gpu_and_cpu_are_equal_collect(get_broadcast_join_mixed_df(join_type, isCached)) From 616d7e3a92ac1833d6c50149793b9c136b6be68c Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 26 Jun 2020 19:24:54 -0700 Subject: [PATCH 02/13] Better cache test Tests that cache a df, then use the cached df to execute another operation --- .../src/main/python/cache_test.py | 43 ++++++++++++++++++- .../src/main/python/spark_session.py | 2 + 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 6362e48134d..a5238a7d861 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -14,9 +14,12 @@ import pytest -from asserts import assert_gpu_and_cpu_are_equal_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal from data_gen import * import pyspark.sql.functions as f +from spark_session import with_cpu_session, with_gpu_session +from join_test import create_df +from marks import ignore_order def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( @@ -33,3 +36,41 @@ def test_cache_table(): spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.sql("select * from range5").limit(5)) + +gen=[StringGen(nullable=False), DateGen(nullable=False), TimestampGen(nullable=False)] +@ignore_order +@pytest.mark.parametrize('data_gen', gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_cached_join(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + return left.join(right, left.a == right.r_a, join_type).cache() + cached_df_cpu = with_cpu_session(do_join) + from_cpu = cached_df_cpu.collect() + cached_df_gpu = with_gpu_session(do_join) + from_gpu = cached_df_gpu.collect() + assert_equal(from_cpu, from_gpu) + +gen_filter=[(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), (DateGen(nullable=False), "a > '1/21/2012'"), (TimestampGen(nullable=False), "a > '1/21/2012'")] +@ignore_order +@pytest.mark.parametrize('data_gen', gen_filter, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_cached_join_filter(data_gen, join_type): + data, filter = data_gen + def do_join(spark): + left, right = create_df(spark, data, 500, 500) + return left.join(right, left.a == right.r_a, join_type).cache() + cached_df_cpu = with_cpu_session(do_join) + join_from_cpu = cached_df_cpu.collect() + filter_from_cpu = cached_df_cpu.filter(filter) + + cached_df_gpu = with_gpu_session(do_join) + join_from_gpu = cached_df_gpu.collect() + filter_from_gpu = cached_df_gpu.filter(filter) + + assert_equal(join_from_cpu, join_from_gpu) + + assert_equal(filter_from_cpu, filter_from_gpu) + + + diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 5dd91cba61a..87b8e9ecd08 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -48,6 +48,8 @@ def _set_all_confs(conf): def reset_spark_session_conf(): """Reset all of the configs for a given spark session.""" _set_all_confs(_orig_conf) + """We should clear the cache""" + spark.catalog.clearCache() # Have to reach into a private member to get access to the API we need current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys() for key in current_keys: From a0e90038b0adfc4b5b478a86e26c2bbd7e8fc24a Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Sun, 28 Jun 2020 20:37:37 -0700 Subject: [PATCH 03/13] added xfail to the failing tests --- integration_tests/src/main/python/cache_test.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index a5238a7d861..26c4cab4237 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -39,6 +39,7 @@ def test_cache_table(): gen=[StringGen(nullable=False), DateGen(nullable=False), TimestampGen(nullable=False)] @ignore_order +@pytest.mark.xfail(reason="TODO: github issue") @pytest.mark.parametrize('data_gen', gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_cached_join(data_gen, join_type): @@ -53,6 +54,7 @@ def do_join(spark): gen_filter=[(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), (DateGen(nullable=False), "a > '1/21/2012'"), (TimestampGen(nullable=False), "a > '1/21/2012'")] @ignore_order +@pytest.mark.xfail(reason="TODO: github issue") @pytest.mark.parametrize('data_gen', gen_filter, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_cached_join_filter(data_gen, join_type): From d1379b92700cc7fb9926d06e56e930adfb80b502 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 1 Jul 2020 23:15:35 -0700 Subject: [PATCH 04/13] more tests --- .../src/main/python/cache_test.py | 56 ++++++++++--------- .../src/main/python/spark_session.py | 2 +- 2 files changed, 31 insertions(+), 27 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 26c4cab4237..2983e135463 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -19,7 +19,8 @@ import pyspark.sql.functions as f from spark_session import with_cpu_session, with_gpu_session from join_test import create_df -from marks import ignore_order +from marks import incompat, allow_non_gpu +from join_test import all_gen_no_nulls def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( @@ -32,47 +33,50 @@ def test_passing_gpuExpr_as_Expr(): .limit(50) ) -def test_cache_table(): - spark.sql("CACHE TABLE range5 AS SELECT * FROM range(5)") - assert_gpu_and_cpu_are_equal_collect( - lambda spark : spark.sql("select * from range5").limit(5)) - -gen=[StringGen(nullable=False), DateGen(nullable=False), TimestampGen(nullable=False)] -@ignore_order +conf={"spark.rapids.sql.explain":"ALL"} @pytest.mark.xfail(reason="TODO: github issue") -@pytest.mark.parametrize('data_gen', gen, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen_no_nulls, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -def test_cached_join(data_gen, join_type): +def test_cache_join(data_gen, join_type): + from pyspark.sql.functions import asc def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join) - from_cpu = cached_df_cpu.collect() - cached_df_gpu = with_gpu_session(do_join) - from_gpu = cached_df_gpu.collect() + cached_df_cpu = with_cpu_session(do_join, conf) + from_cpu = debug_df(cached_df_cpu.sort(asc("a")).collect()) + cached_df_gpu = with_gpu_session(do_join, conf) + from_gpu = debug_df(cached_df_gpu.sort(asc("a")).collect()) assert_equal(from_cpu, from_gpu) -gen_filter=[(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), (DateGen(nullable=False), "a > '1/21/2012'"), (TimestampGen(nullable=False), "a > '1/21/2012'")] -@ignore_order +all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), + (ByteGen(nullable=False), "a < 100"), + (ShortGen(nullable=False), "a < 100"), + (IntegerGen(nullable=False), "a < 1000"), + (LongGen(nullable=False), "a < 1000"), + (BooleanGen(nullable=False), "a == false"), + (DateGen(nullable=False), "a > '1/21/2012'"), + (TimestampGen(nullable=False), "a > '1/21/2012'"), + pytest.param((FloatGen(nullable=False), "a < 1000"), marks=[incompat]), + pytest.param((DoubleGen(nullable=False),"a < 1000"), marks=[incompat])] + @pytest.mark.xfail(reason="TODO: github issue") -@pytest.mark.parametrize('data_gen', gen_filter, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@allow_non_gpu('InMemoryTableScanExec', 'RDDScanExec') def test_cached_join_filter(data_gen, join_type): + from pyspark.sql.functions import asc data, filter = data_gen def do_join(spark): left, right = create_df(spark, data, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join) - join_from_cpu = cached_df_cpu.collect() - filter_from_cpu = cached_df_cpu.filter(filter) + cached_df_cpu = with_cpu_session(do_join, conf) + join_from_cpu = cached_df_cpu.sort(asc("a")).collect() + filter_from_cpu = cached_df_cpu.filter(filter).collect() - cached_df_gpu = with_gpu_session(do_join) - join_from_gpu = cached_df_gpu.collect() - filter_from_gpu = cached_df_gpu.filter(filter) + cached_df_gpu = with_gpu_session(do_join, conf) + join_from_gpu = cached_df_gpu.sort(asc("a")).collect() + filter_from_gpu = cached_df_gpu.filter(filter).collect() assert_equal(join_from_cpu, join_from_gpu) assert_equal(filter_from_cpu, filter_from_gpu) - - - diff --git a/integration_tests/src/main/python/spark_session.py b/integration_tests/src/main/python/spark_session.py index 87b8e9ecd08..cbaa74fa01c 100644 --- a/integration_tests/src/main/python/spark_session.py +++ b/integration_tests/src/main/python/spark_session.py @@ -48,7 +48,7 @@ def _set_all_confs(conf): def reset_spark_session_conf(): """Reset all of the configs for a given spark session.""" _set_all_confs(_orig_conf) - """We should clear the cache""" + #We should clear the cache spark.catalog.clearCache() # Have to reach into a private member to get access to the API we need current_keys = _from_scala_map(spark.conf._jconf.getAll()).keys() From 260326edda2c8de8c42567dce2c839e3e48c06bd Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 6 Jul 2020 14:32:15 -0700 Subject: [PATCH 05/13] updated tests --- .../src/main/python/cache_test.py | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 2983e135463..453fd76e0d1 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -43,9 +43,16 @@ def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() cached_df_cpu = with_cpu_session(do_join, conf) - from_cpu = debug_df(cached_df_cpu.sort(asc("a")).collect()) + if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): + sort = [asc("a"), asc("b")] + else: + sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + + from_cpu = cached_df_cpu.sort(sort).collect() + print('COLLECTED\n{}'.format(from_cpu)) cached_df_gpu = with_gpu_session(do_join, conf) - from_gpu = debug_df(cached_df_gpu.sort(asc("a")).collect()) + from_gpu = cached_df_gpu.sort(sort).collect() + print('COLLECTED\n{}'.format(from_gpu)) assert_equal(from_cpu, from_gpu) all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), @@ -70,13 +77,17 @@ def do_join(spark): left, right = create_df(spark, data, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() cached_df_cpu = with_cpu_session(do_join, conf) - join_from_cpu = cached_df_cpu.sort(asc("a")).collect() - filter_from_cpu = cached_df_cpu.filter(filter).collect() + if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): + sort_columns = [asc("a"), asc("b")] + else: + sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + + join_from_cpu = cached_df_cpu.sort(sort_columns).collect() + filter_from_cpu = cached_df_cpu.filter(filter).sort(sort_columns).collect() cached_df_gpu = with_gpu_session(do_join, conf) - join_from_gpu = cached_df_gpu.sort(asc("a")).collect() - filter_from_gpu = cached_df_gpu.filter(filter).collect() + join_from_gpu = cached_df_gpu.sort(sort_columns).collect() + filter_from_gpu = cached_df_gpu.filter(filter).sort(sort_columns).collect() assert_equal(join_from_cpu, join_from_gpu) - assert_equal(filter_from_cpu, filter_from_gpu) From 03440ff37f95bf6ae1cccd7f00e421082d525998 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Thu, 9 Jul 2020 08:24:11 -0700 Subject: [PATCH 06/13] allowing ops to run on the CPU --- .../src/main/python/cache_test.py | 38 ++++++++++++------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 453fd76e0d1..0bc751239e5 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -20,7 +20,6 @@ from spark_session import with_cpu_session, with_gpu_session from join_test import create_df from marks import incompat, allow_non_gpu -from join_test import all_gen_no_nulls def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( @@ -32,15 +31,36 @@ def test_passing_gpuExpr_as_Expr(): .cache() .limit(50) ) +#creating special cases to just remove -0.0 +double_special_cases = [ + DoubleGen._make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen._make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen._make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + DoubleGen._make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + 0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'), + NEG_DOUBLE_NAN_MAX_VALUE +] + +all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), + (ByteGen(nullable=False), "a < 100"), + (ShortGen(nullable=False), "a < 100"), + (IntegerGen(nullable=False), "a < 1000"), + (LongGen(nullable=False), "a < 1000"), + (BooleanGen(nullable=False), "a == false"), + (DateGen(nullable=False), "a > '1/21/2012'"), + (TimestampGen(nullable=False), "a > '1/21/2012'"), + pytest.param((FloatGen(nullable=False, special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), + pytest.param((DoubleGen(nullable=False, special_cases=double_special_cases),"a < 1000"), marks=[incompat])] conf={"spark.rapids.sql.explain":"ALL"} @pytest.mark.xfail(reason="TODO: github issue") -@pytest.mark.parametrize('data_gen', all_gen_no_nulls, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_cache_join(data_gen, join_type): from pyspark.sql.functions import asc + data, filter = data_gen # we are not using the filter def do_join(spark): - left, right = create_df(spark, data_gen, 500, 500) + left, right = create_df(spark, data, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() cached_df_cpu = with_cpu_session(do_join, conf) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): @@ -55,21 +75,11 @@ def do_join(spark): print('COLLECTED\n{}'.format(from_gpu)) assert_equal(from_cpu, from_gpu) -all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), - (ByteGen(nullable=False), "a < 100"), - (ShortGen(nullable=False), "a < 100"), - (IntegerGen(nullable=False), "a < 1000"), - (LongGen(nullable=False), "a < 1000"), - (BooleanGen(nullable=False), "a == false"), - (DateGen(nullable=False), "a > '1/21/2012'"), - (TimestampGen(nullable=False), "a > '1/21/2012'"), - pytest.param((FloatGen(nullable=False), "a < 1000"), marks=[incompat]), - pytest.param((DoubleGen(nullable=False),"a < 1000"), marks=[incompat])] @pytest.mark.xfail(reason="TODO: github issue") @pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -@allow_non_gpu('InMemoryTableScanExec', 'RDDScanExec') +@allow_non_gpu(any=True) def test_cached_join_filter(data_gen, join_type): from pyspark.sql.functions import asc data, filter = data_gen From a1f43dd21f53c68c6e5709ba2b2d2d3e6f96eab7 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 13 Jul 2020 15:44:56 -0700 Subject: [PATCH 07/13] Added more Exec tests for cache --- .../src/main/python/cache_test.py | 153 +++++++++++++++--- 1 file changed, 134 insertions(+), 19 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 0bc751239e5..a53ec314b0e 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -19,7 +19,9 @@ import pyspark.sql.functions as f from spark_session import with_cpu_session, with_gpu_session from join_test import create_df -from marks import incompat, allow_non_gpu +from generate_expr_test import four_op_df +from marks import incompat, allow_non_gpu, ignore_order +from pyspark.sql.functions import asc def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( @@ -41,26 +43,29 @@ def test_passing_gpuExpr_as_Expr(): NEG_DOUBLE_NAN_MAX_VALUE ] -all_gen_no_nulls_filters = [(StringGen(nullable=False), "rlike(a, '^(?=.{1,5}$).*')"), - (ByteGen(nullable=False), "a < 100"), - (ShortGen(nullable=False), "a < 100"), - (IntegerGen(nullable=False), "a < 1000"), - (LongGen(nullable=False), "a < 1000"), - (BooleanGen(nullable=False), "a == false"), - (DateGen(nullable=False), "a > '1/21/2012'"), - (TimestampGen(nullable=False), "a > '1/21/2012'"), - pytest.param((FloatGen(nullable=False, special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), - pytest.param((DoubleGen(nullable=False, special_cases=double_special_cases),"a < 1000"), marks=[incompat])] +all_gen = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), + pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), BooleanGen(), DateGen(), TimestampGen()] + +all_gen_filters = [(StringGen(), "rlike(a, '^(?=.{1,5}$).*')"), + (ByteGen(), "a < 100"), + (ShortGen(), "a < 100"), + (IntegerGen(), "a < 1000"), + (LongGen(), "a < 1000"), + (BooleanGen(), "a == false"), + (DateGen(), "a > '1/21/2012'"), + (TimestampGen(), "a > '1/21/2012'"), + pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), + pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])] conf={"spark.rapids.sql.explain":"ALL"} -@pytest.mark.xfail(reason="TODO: github issue") -@pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_cache_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") from pyspark.sql.functions import asc - data, filter = data_gen # we are not using the filter def do_join(spark): - left, right = create_df(spark, data, 500, 500) + left, right = create_df(spark, data_gen, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() cached_df_cpu = with_cpu_session(do_join, conf) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): @@ -69,20 +74,22 @@ def do_join(spark): sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] from_cpu = cached_df_cpu.sort(sort).collect() - print('COLLECTED\n{}'.format(from_cpu)) cached_df_gpu = with_gpu_session(do_join, conf) from_gpu = cached_df_gpu.sort(sort).collect() - print('COLLECTED\n{}'.format(from_gpu)) assert_equal(from_cpu, from_gpu) -@pytest.mark.xfail(reason="TODO: github issue") -@pytest.mark.parametrize('data_gen', all_gen_no_nulls_filters, ids=idfn) +@pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +# We are OK running everything on CPU until we complete 'git issue' +# because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than +# AttributeReference @allow_non_gpu(any=True) def test_cached_join_filter(data_gen, join_type): from pyspark.sql.functions import asc data, filter = data_gen + if data.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") def do_join(spark): left, right = create_df(spark, data, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() @@ -101,3 +108,111 @@ def do_join(spark): assert_equal(join_from_cpu, join_from_gpu) assert_equal(filter_from_cpu, filter_from_gpu) + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_cache_broadcast_hash_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + from pyspark.sql.functions import asc + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + return left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() + cached_df_cpu = with_cpu_session(do_join, conf) + if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): + sort = [asc("a"), asc("b")] + else: + sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + + from_cpu = cached_df_cpu.sort(sort).collect() + cached_df_gpu = with_gpu_session(do_join, conf) + from_gpu = cached_df_gpu.sort(sort).collect() + assert_equal(from_cpu, from_gpu) + + +shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160", + "spark.sql.join.preferSortMergeJoin": "false", + "spark.sql.shuffle.partitions": "2", + "spark.rapids.sql.explain": "ALL", + "spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"} + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +def test_cache_shuffled_hash_join(data_gen, join_type): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + from pyspark.sql.functions import asc + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 500) + return left.join(right, left.a == right.r_a, join_type).cache() + cached_df_cpu = with_cpu_session(do_join, shuffled_conf) + if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): + sort = [asc("a"), asc("b")] + else: + sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + + from_cpu = cached_df_cpu.sort(sort).collect() + cached_df_gpu = with_gpu_session(do_join, shuffled_conf) + from_gpu = cached_df_gpu.sort(sort).collect() + assert_equal(from_cpu, from_gpu) + + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@pytest.mark.skip(reason="this isn't calling the broadcastNestedLoopJoin, come back to it") +def test_cache_broadcast_nested_loop_join(data_gen, join_type): + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 50) + return left.join(right, left.a == left.a, join_type).cache() + cached_df_cpu = with_cpu_session(do_join, shuffled_conf) + if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): + sort = [asc("a"), asc("b")] + else: + sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + + from_cpu = cached_df_cpu.sort(sort).collect() + cached_df_gpu = with_gpu_session(do_join, shuffled_conf) + from_gpu = cached_df_gpu.sort(sort).collect() + assert_equal(from_cpu, from_gpu) + +#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 +#This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec') +def test_cache_posexplode_makearray(spark_tmp_path, data_gen): + if data_gen.data_type == BooleanType(): + pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") + data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU' + def posExplode(spark): + return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() + cached_df_cpu = with_cpu_session(posExplode, conf) + cached_df_cpu.write.parquet(data_path_cpu) + from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu)) + + data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU' + cached_df_gpu = with_gpu_session(posExplode, conf) + cached_df_gpu.write.parquet(data_path_gpu) + from_gpu = with_gpu_session(lambda spark: spark.read.parquet(data_path_gpu)) + + sort_col = [asc("pos"), asc("col"), asc("a")] + assert_equal(cached_df_cpu.sort(sort_col).collect(), cached_df_gpu.sort(sort_col).collect()) + assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect()) + +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +def test_cache_expand_exec(data_gen): + def op_df(spark, length=2048, seed=0): + return gen_df(spark, StructGen([ + ('a', data_gen), + ('b', IntegerGen())], nullable=False), length=length, seed=seed) + + cached_df_cpu = with_cpu_session(op_df, conf).cache() + from_cpu = with_cpu_session(lambda spark: cached_df_cpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) + + cached_df_gpu = with_gpu_session(op_df, conf).cache() + from_gpu = with_cpu_session(lambda spark: cached_df_gpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) + + sort_col = [asc("a"), asc("b"), asc("count(b)")] + assert_equal(cached_df_cpu.sort(asc("a"), asc("b")).collect(), cached_df_gpu.sort(asc("a"), asc("b")).collect()) + assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect()) + + From 54572b1d8929811ebf93bc60545ff363fe4e214b Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 13 Jul 2020 16:59:07 -0700 Subject: [PATCH 08/13] removed conf --- .../src/main/python/cache_test.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index a53ec314b0e..1e865c109ff 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -57,7 +57,6 @@ def test_passing_gpuExpr_as_Expr(): pytest.param((FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), "a < 1000"), marks=[incompat]), pytest.param((DoubleGen(special_cases=double_special_cases),"a < 1000"), marks=[incompat])] -conf={"spark.rapids.sql.explain":"ALL"} @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) def test_cache_join(data_gen, join_type): @@ -67,14 +66,14 @@ def test_cache_join(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join, conf) + cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join, conf) + cached_df_gpu = with_gpu_session(do_join) from_gpu = cached_df_gpu.sort(sort).collect() assert_equal(from_cpu, from_gpu) @@ -93,7 +92,7 @@ def test_cached_join_filter(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data, 500, 500) return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join, conf) + cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort_columns = [asc("a"), asc("b")] else: @@ -102,7 +101,7 @@ def do_join(spark): join_from_cpu = cached_df_cpu.sort(sort_columns).collect() filter_from_cpu = cached_df_cpu.filter(filter).sort(sort_columns).collect() - cached_df_gpu = with_gpu_session(do_join, conf) + cached_df_gpu = with_gpu_session(do_join) join_from_gpu = cached_df_gpu.sort(sort_columns).collect() filter_from_gpu = cached_df_gpu.filter(filter).sort(sort_columns).collect() @@ -118,14 +117,14 @@ def test_cache_broadcast_hash_join(data_gen, join_type): def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) return left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join, conf) + cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join, conf) + cached_df_gpu = with_gpu_session(do_join) from_gpu = cached_df_gpu.sort(sort).collect() assert_equal(from_cpu, from_gpu) @@ -133,7 +132,6 @@ def do_join(spark): shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160", "spark.sql.join.preferSortMergeJoin": "false", "spark.sql.shuffle.partitions": "2", - "spark.rapids.sql.explain": "ALL", "spark.rapids.sql.exec.BroadcastNestedLoopJoinExec": "true"} @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @@ -185,12 +183,12 @@ def test_cache_posexplode_makearray(spark_tmp_path, data_gen): data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU' def posExplode(spark): return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() - cached_df_cpu = with_cpu_session(posExplode, conf) + cached_df_cpu = with_cpu_session(posExplode) cached_df_cpu.write.parquet(data_path_cpu) from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu)) data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU' - cached_df_gpu = with_gpu_session(posExplode, conf) + cached_df_gpu = with_gpu_session(posExplode) cached_df_gpu.write.parquet(data_path_gpu) from_gpu = with_gpu_session(lambda spark: spark.read.parquet(data_path_gpu)) @@ -205,10 +203,10 @@ def op_df(spark, length=2048, seed=0): ('a', data_gen), ('b', IntegerGen())], nullable=False), length=length, seed=seed) - cached_df_cpu = with_cpu_session(op_df, conf).cache() + cached_df_cpu = with_cpu_session(op_df).cache() from_cpu = with_cpu_session(lambda spark: cached_df_cpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) - cached_df_gpu = with_gpu_session(op_df, conf).cache() + cached_df_gpu = with_gpu_session(op_df).cache() from_gpu = with_cpu_session(lambda spark: cached_df_gpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) sort_col = [asc("a"), asc("b"), asc("count(b)")] From c3db5a63f9815cf57b5064779d067cc1986b6e81 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 14 Jul 2020 12:05:54 -0700 Subject: [PATCH 09/13] addressed review comments --- .../src/main/python/cache_test.py | 105 +++++++----------- integration_tests/src/main/python/data_gen.py | 12 +- 2 files changed, 48 insertions(+), 69 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 1e865c109ff..8eac9e10f9b 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -35,10 +35,10 @@ def test_passing_gpuExpr_as_Expr(): ) #creating special cases to just remove -0.0 double_special_cases = [ - DoubleGen._make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), - DoubleGen._make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), - DoubleGen._make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), - DoubleGen._make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(1, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), + DoubleGen.make_from(0, DOUBLE_MIN_EXP, DOUBLE_MAX_FRACTION), 0.0, 1.0, -1.0, float('inf'), float('-inf'), float('nan'), NEG_DOUBLE_NAN_MAX_VALUE ] @@ -63,20 +63,18 @@ def test_cache_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") from pyspark.sql.functions import asc - def do_join(spark): - left, right = create_df(spark, data_gen, 500, 500) - return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] - from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join) - from_gpu = cached_df_gpu.sort(sort).collect() - assert_equal(from_cpu, from_gpu) + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() # populates cache + return cached.sort(sort) + assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -89,24 +87,17 @@ def test_cached_join_filter(data_gen, join_type): data, filter = data_gen if data.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") - def do_join(spark): - left, right = create_df(spark, data, 500, 500) - return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort_columns = [asc("a"), asc("b")] else: sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + def do_join(spark): + left, right = create_df(spark, data, 500, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() #populates the cache + return cached.sort(sort_columns) - join_from_cpu = cached_df_cpu.sort(sort_columns).collect() - filter_from_cpu = cached_df_cpu.filter(filter).sort(sort_columns).collect() - - cached_df_gpu = with_gpu_session(do_join) - join_from_gpu = cached_df_gpu.sort(sort_columns).collect() - filter_from_gpu = cached_df_gpu.filter(filter).sort(sort_columns).collect() - - assert_equal(join_from_cpu, join_from_gpu) - assert_equal(filter_from_cpu, filter_from_gpu) + assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @@ -114,20 +105,17 @@ def test_cache_broadcast_hash_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") from pyspark.sql.functions import asc - def do_join(spark): - left, right = create_df(spark, data_gen, 500, 500) - return left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + def do_join(spark): + left, right = create_df(spark, data_gen, 500, 500) + cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() + cached.count() + return cached.sort(sort) - from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join) - from_gpu = cached_df_gpu.sort(sort).collect() - assert_equal(from_cpu, from_gpu) - + assert_gpu_and_cpu_are_equal_collect(do_join) shuffled_conf = {"spark.sql.autoBroadcastJoinThreshold": "160", "spark.sql.join.preferSortMergeJoin": "false", @@ -140,38 +128,34 @@ def test_cache_shuffled_hash_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") from pyspark.sql.functions import asc - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 500) - return left.join(right, left.a == right.r_a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join, shuffled_conf) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] - from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join, shuffled_conf) - from_gpu = cached_df_gpu.sort(sort).collect() - assert_equal(from_cpu, from_gpu) - + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 500) + cached = left.join(right, left.a == right.r_a, join_type).cache() + cached.count() + return cached.sort(sort) + assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) @pytest.mark.skip(reason="this isn't calling the broadcastNestedLoopJoin, come back to it") def test_cache_broadcast_nested_loop_join(data_gen, join_type): - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 50) - return left.join(right, left.a == left.a, join_type).cache() - cached_df_cpu = with_cpu_session(do_join, shuffled_conf) if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): sort = [asc("a"), asc("b")] else: sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] - from_cpu = cached_df_cpu.sort(sort).collect() - cached_df_gpu = with_gpu_session(do_join, shuffled_conf) - from_gpu = cached_df_gpu.sort(sort).collect() - assert_equal(from_cpu, from_gpu) + def do_join(spark): + left, right = create_df(spark, data_gen, 50, 50) + cached = left.join(right, left.a == left.a, join_type).cache() + cached.count() + return cached.sort(sort) + + assert_gpu_and_cpu_are_equal_collect(do_join) #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 #This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df @@ -183,6 +167,7 @@ def test_cache_posexplode_makearray(spark_tmp_path, data_gen): data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU' def posExplode(spark): return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() + cached_df_cpu = with_cpu_session(posExplode) cached_df_cpu.write.parquet(data_path_cpu) from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu)) @@ -198,19 +183,13 @@ def posExplode(spark): @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) def test_cache_expand_exec(data_gen): + sort_col = [asc("a"), asc("b"), asc("count(b)")] def op_df(spark, length=2048, seed=0): - return gen_df(spark, StructGen([ + cached = gen_df(spark, StructGen([ ('a', data_gen), - ('b', IntegerGen())], nullable=False), length=length, seed=seed) - - cached_df_cpu = with_cpu_session(op_df).cache() - from_cpu = with_cpu_session(lambda spark: cached_df_cpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) - - cached_df_gpu = with_gpu_session(op_df).cache() - from_gpu = with_cpu_session(lambda spark: cached_df_gpu.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b")))) - - sort_col = [asc("a"), asc("b"), asc("count(b)")] - assert_equal(cached_df_cpu.sort(asc("a"), asc("b")).collect(), cached_df_gpu.sort(asc("a"), asc("b")).collect()) - assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect()) + ('b', IntegerGen())], nullable=False), length=length, seed=seed).cache() + cached.count() # populate the cache + return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))).sort(sort_col) + assert_gpu_and_cpu_are_equal_collect(op_df) diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 0710d7f943e..0997403c55b 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -293,10 +293,10 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False self._use_full_range = (self._min_exp == DOUBLE_MIN_EXP) and (self._max_exp == DOUBLE_MAX_EXP) if special_cases is None: special_cases = [ - self._make_from(1, self._max_exp, DOUBLE_MAX_FRACTION), - self._make_from(0, self._max_exp, DOUBLE_MAX_FRACTION), - self._make_from(1, self._min_exp, DOUBLE_MAX_FRACTION), - self._make_from(0, self._min_exp, DOUBLE_MAX_FRACTION) + self.make_from(1, self._max_exp, DOUBLE_MAX_FRACTION), + self.make_from(0, self._max_exp, DOUBLE_MAX_FRACTION), + self.make_from(1, self._min_exp, DOUBLE_MAX_FRACTION), + self.make_from(0, self._min_exp, DOUBLE_MAX_FRACTION) ] if self._min_exp <= 0 and self._max_exp >= 0: special_cases.append(0.0) @@ -312,7 +312,7 @@ def __init__(self, min_exp=DOUBLE_MIN_EXP, max_exp=DOUBLE_MAX_EXP, no_nans=False super().__init__(DoubleType(), nullable=nullable, special_cases=special_cases) @staticmethod - def _make_from(sign, exp, fraction): + def make_from(sign, exp, fraction): sign = sign & 1 # 1 bit exp = (exp + 1023) & 0x7FF # add bias and 11 bits fraction = fraction & DOUBLE_MAX_FRACTION @@ -338,7 +338,7 @@ def gen_part_double(): sign = rand.getrandbits(1) exp = rand.randint(self._min_exp, self._max_exp) fraction = rand.getrandbits(52) - return self._fixup_nans(self._make_from(sign, exp, fraction)) + return self._fixup_nans(self.make_from(sign, exp, fraction)) self._start(rand, gen_part_double) class BooleanGen(DataGen): From 1f1203807f814ca8757f72d929da0661160be90a Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 15 Jul 2020 10:38:06 -0700 Subject: [PATCH 10/13] addressed review comments --- .../src/main/python/cache_test.py | 81 +++++++------------ 1 file changed, 29 insertions(+), 52 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 8eac9e10f9b..95612a6a63d 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -33,7 +33,7 @@ def test_passing_gpuExpr_as_Expr(): .cache() .limit(50) ) -#creating special cases to just remove -0.0 +# creating special cases to just remove -0.0 because of https://github.com/NVIDIA/spark-rapids/issues/84 double_special_cases = [ DoubleGen.make_from(1, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), DoubleGen.make_from(0, DOUBLE_MAX_EXP, DOUBLE_MAX_FRACTION), @@ -59,61 +59,51 @@ def test_passing_gpuExpr_as_Expr(): @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order def test_cache_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") - from pyspark.sql.functions import asc - if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): - sort = [asc("a"), asc("b")] - else: - sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() # populates cache - return cached.sort(sort) + return cached assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen_filters, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -# We are OK running everything on CPU until we complete 'git issue' +# We are OK running everything on CPU until we complete 'https://github.com/NVIDIA/spark-rapids/issues/360' # because we have an explicit check in our code that disallows InMemoryTableScan to have anything other than # AttributeReference @allow_non_gpu(any=True) +@ignore_order def test_cached_join_filter(data_gen, join_type): - from pyspark.sql.functions import asc data, filter = data_gen if data.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") - if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): - sort_columns = [asc("a"), asc("b")] - else: - sort_columns = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + def do_join(spark): left, right = create_df(spark, data, 500, 500) cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() #populates the cache - return cached.sort(sort_columns) + return cached assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order def test_cache_broadcast_hash_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") - from pyspark.sql.functions import asc - if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): - sort = [asc("a"), asc("b")] - else: - sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] + def do_join(spark): left, right = create_df(spark, data_gen, 500, 500) cached = left.join(right.hint("broadcast"), left.a == right.r_a, join_type).cache() cached.count() - return cached.sort(sort) + return cached assert_gpu_and_cpu_are_equal_collect(do_join) @@ -124,38 +114,29 @@ def do_join(spark): @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) +@ignore_order def test_cache_shuffled_hash_join(data_gen, join_type): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") - from pyspark.sql.functions import asc - if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): - sort = [asc("a"), asc("b")] - else: - sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] def do_join(spark): left, right = create_df(spark, data_gen, 50, 500) cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() - return cached.sort(sort) + return cached assert_gpu_and_cpu_are_equal_collect(do_join) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('join_type', ['Left', 'Right', 'Inner', 'LeftSemi', 'LeftAnti'], ids=idfn) -@pytest.mark.skip(reason="this isn't calling the broadcastNestedLoopJoin, come back to it") +@ignore_order def test_cache_broadcast_nested_loop_join(data_gen, join_type): - if (join_type == 'LeftAnti' or join_type == 'LeftSemi'): - sort = [asc("a"), asc("b")] - else: - sort = [asc("a"), asc("b"), asc("r_a"), asc("r_b")] - def do_join(spark): - left, right = create_df(spark, data_gen, 50, 50) - cached = left.join(right, left.a == left.a, join_type).cache() + left, right = create_df(spark, data_gen, 50, 25) + cached = left.crossJoin(right.hint("broadcast")).cache() cached.count() - return cached.sort(sort) + return cached - assert_gpu_and_cpu_are_equal_collect(do_join) + assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'}) #sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 #This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df @@ -165,31 +146,27 @@ def test_cache_posexplode_makearray(spark_tmp_path, data_gen): if data_gen.data_type == BooleanType(): pytest.xfail("https://github.com/NVIDIA/spark-rapids/issues/350") data_path_cpu = spark_tmp_path + '/PARQUET_DATA_CPU' - def posExplode(spark): - return four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() - - cached_df_cpu = with_cpu_session(posExplode) - cached_df_cpu.write.parquet(data_path_cpu) - from_cpu = with_cpu_session(lambda spark: spark.read.parquet(data_path_cpu)) - data_path_gpu = spark_tmp_path + '/PARQUET_DATA_GPU' - cached_df_gpu = with_gpu_session(posExplode) - cached_df_gpu.write.parquet(data_path_gpu) - from_gpu = with_gpu_session(lambda spark: spark.read.parquet(data_path_gpu)) - - sort_col = [asc("pos"), asc("col"), asc("a")] - assert_equal(cached_df_cpu.sort(sort_col).collect(), cached_df_gpu.sort(sort_col).collect()) - assert_equal(from_cpu.sort(sort_col).collect(), from_gpu.sort(sort_col).collect()) + def write_posExplode(data_path): + def posExplode(spark): + cached = four_op_df(spark, data_gen).selectExpr('posexplode(array(b, c, d))', 'a').cache() + cached.count() + cached.write.parquet(data_path) + spark.read.parquet(data_path) + return posExplode + from_cpu = with_cpu_session(write_posExplode(data_path_cpu)) + from_gpu = with_gpu_session(write_posExplode(data_path_gpu)) + assert_equal(from_cpu, from_gpu) @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +@ignore_order def test_cache_expand_exec(data_gen): - sort_col = [asc("a"), asc("b"), asc("count(b)")] def op_df(spark, length=2048, seed=0): cached = gen_df(spark, StructGen([ ('a', data_gen), ('b', IntegerGen())], nullable=False), length=length, seed=seed).cache() cached.count() # populate the cache - return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))).sort(sort_col) + return cached.rollup(f.col("a"), f.col("b")).agg(f.count(f.col("b"))) assert_gpu_and_cpu_are_equal_collect(op_df) From a6146dd8e0953371687c811cfdb6dd508e1e972c Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 15 Jul 2020 10:44:50 -0700 Subject: [PATCH 11/13] removed unnecessary import --- integration_tests/src/main/python/cache_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 95612a6a63d..3c8448fc5db 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -21,7 +21,6 @@ from join_test import create_df from generate_expr_test import four_op_df from marks import incompat, allow_non_gpu, ignore_order -from pyspark.sql.functions import asc def test_passing_gpuExpr_as_Expr(): assert_gpu_and_cpu_are_equal_collect( From d1bf608448a5fd83928ae16608e00fc986dd4534 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 15 Jul 2020 21:54:12 -0700 Subject: [PATCH 12/13] added filter back --- integration_tests/src/main/python/cache_test.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 3c8448fc5db..62ad3de60e2 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -87,7 +87,7 @@ def do_join(spark): left, right = create_df(spark, data, 500, 500) cached = left.join(right, left.a == right.r_a, join_type).cache() cached.count() #populates the cache - return cached + return cached.filter(filter) assert_gpu_and_cpu_are_equal_collect(do_join) @@ -137,7 +137,6 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'}) -#sort locally because of https://github.com/NVIDIA/spark-rapids/issues/84 #This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df @pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec') From 50a2801b8c70fa9d6e69655b45bcd018ad7fc1fc Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 20 Jul 2020 13:47:35 -0700 Subject: [PATCH 13/13] restricted date past 1582-10-15 --- integration_tests/src/main/python/cache_test.py | 14 ++++++++++++-- integration_tests/src/main/python/data_gen.py | 2 ++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 62ad3de60e2..96d17d21d54 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -16,6 +16,7 @@ from asserts import assert_gpu_and_cpu_are_equal_collect, assert_equal from data_gen import * +from datetime import date import pyspark.sql.functions as f from spark_session import with_cpu_session, with_gpu_session from join_test import create_df @@ -137,8 +138,17 @@ def do_join(spark): assert_gpu_and_cpu_are_equal_collect(do_join, conf={'spark.rapids.sql.exec.BroadcastNestedLoopJoinExec': 'true'}) -#This is a copy of a test from generate_expr_test.py except for the fact that we are caching the df -@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) +all_gen_restricting_dates = [StringGen(), ByteGen(), ShortGen(), IntegerGen(), LongGen(), + pytest.param(FloatGen(special_cases=[FLOAT_MIN, FLOAT_MAX, 0.0, 1.0, -1.0]), marks=[incompat]), + pytest.param(DoubleGen(special_cases=double_special_cases), marks=[incompat]), + BooleanGen(), + # due to backward compatibility we are avoiding writing dates prior to 1582-10-15 + # For more detail please look at SPARK-31404 + # This issue is tracked by https://github.com/NVIDIA/spark-rapids/issues/133 in the plugin + DateGen(start=date(1582, 10, 15)), + TimestampGen()] + +@pytest.mark.parametrize('data_gen', all_gen_restricting_dates, ids=idfn) @allow_non_gpu('InMemoryTableScanExec', 'DataWritingCommandExec') def test_cache_posexplode_makearray(spark_tmp_path, data_gen): if data_gen.data_type == BooleanType(): diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 0997403c55b..cdd637ca122 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -416,6 +416,8 @@ def _guess_leap_year(t): y = int(math.ceil(t/4.0)) * 4 if ((y % 100) == 0) and ((y % 400) != 0): y = y + 4 + if (y == 10000): + y = y - 4 return y _epoch = date(1970, 1, 1)