Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] test_window_aggs_for_batched_finite_row_windows_partitioned failed on Scala 2.13 with DATAGEN_SEED=1704033145 #10134

Closed
sameerz opened this issue Dec 31, 2023 · 3 comments · Fixed by #10143
Assignees
Labels
bug Something isn't working test Only impacts tests

Comments

@sameerz
Copy link
Collaborator

sameerz commented Dec 31, 2023

Describe the bug
test_window_aggs_for_batched_finite_row_windows_partitioned failed on Scala 2.13 with DATAGEN_SEED=1704033145

[2023-12-31T15:58:13.289Z] FAILED ../../src/main/python/window_function_test.py::test_window_aggs_for_batched_finite_row_windows_partitioned[[('a', RepeatSeq(Integer)), ('b', Long), ('c', Integer)]-1000][DATAGEN_SEED=1704033145, IGNORE_ORDER({'local': True})] - AssertionError: GPU and CPU int values are different at [1653, 'max_c_desc']
[2023-12-31T15:58:13.287Z] ----------------------------- Captured stdout call -----------------------------
[2023-12-31T15:58:13.287Z] ### CPU RUN ###
[2023-12-31T15:58:13.287Z] ### GPU RUN ###
[2023-12-31T15:58:13.287Z] ### COLLECT: GPU TOOK 1.149477481842041 CPU TOOK 0.23086333274841309 ###
[2023-12-31T15:58:13.287Z] --- CPU OUTPUT
[2023-12-31T15:58:13.287Z] +++ GPU OUTPUT
[2023-12-31T15:58:13.287Z] @@ -1651,8 +1651,8 @@
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=13, count_c_negative=48, count_1_negative=5, sum_c_asc=-1522337086, avg_c_asc=57755395.9, max_c_desc=-180178850, min_c_asc=-699204090, lag_c_30_asc=None, lead_c_40_asc=-986110483)
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=14, count_c_negative=48, count_1_negative=6, sum_c_asc=-823132997, avg_c_asc=139202163.65, max_c_desc=-57826625, min_c_asc=-585127520, lag_c_30_asc=None, lead_c_40_asc=-590133661)
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=15, count_c_negative=48, count_1_negative=7, sum_c_asc=-119061324, avg_c_asc=216602118.025, max_c_desc=-1, min_c_asc=-180178850, lag_c_30_asc=None, lead_c_40_asc=-102784352)
[2023-12-31T15:58:13.288Z] -Row(count_1_asc=101, count_c_asc=16, count_c_negative=48, count_1_negative=8, sum_c_asc=426316276, avg_c_asc=280922573.15, max_c_desc=118944153, min_c_asc=-57826625, lag_c_30_asc=None, lead_c_40_asc=2147483647)
[2023-12-31T15:58:13.288Z] -Row(count_1_asc=101, count_c_asc=17, count_c_negative=48, count_1_negative=9, sum_c_asc=974775197, avg_c_asc=375564252.025, max_c_desc=-1, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=1148650438)
[2023-12-31T15:58:13.288Z] +Row(count_1_asc=101, count_c_asc=16, count_c_negative=48, count_1_negative=8, sum_c_asc=426316276, avg_c_asc=280922573.15, max_c_desc=-1, min_c_asc=-57826625, lag_c_30_asc=None, lead_c_40_asc=2147483647)
[2023-12-31T15:58:13.288Z] +Row(count_1_asc=101, count_c_asc=17, count_c_negative=48, count_1_negative=9, sum_c_asc=974775197, avg_c_asc=375564252.025, max_c_desc=118944153, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=1148650438)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=18, count_c_negative=48, count_1_negative=10, sum_c_asc=1795054366, avg_c_asc=419351579.925, max_c_desc=365198750, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=-1986725033)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=19, count_c_negative=48, count_1_negative=11, sum_c_asc=2851217903, avg_c_asc=457881349.85, max_c_desc=490632296, min_c_asc=118944153, lag_c_30_asc=None, lead_c_40_asc=1364594864)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=20, count_c_negative=48, count_1_negative=12, sum_c_asc=3878924604, avg_c_asc=488059336.45, max_c_desc=820279168, min_c_asc=365198750, lag_c_30_asc=None, lead_c_40_asc=None)
 -- Python 3.9.18 /opt/conda/bin/python
[2023-12-31T15:58:13.286Z] 
[2023-12-31T15:58:13.286Z] data_gen = [('a', RepeatSeq(Integer)), ('b', Long), ('c', Integer)]
[2023-12-31T15:58:13.286Z] batch_size = '1000'
[2023-12-31T15:58:13.286Z] 
[2023-12-31T15:58:13.286Z]     @ignore_order(local=True)
[2023-12-31T15:58:13.286Z]     @pytest.mark.parametrize('batch_size', ['1000', '1g'], ids=idfn)
[2023-12-31T15:58:13.286Z]     @pytest.mark.parametrize('data_gen', [
[2023-12-31T15:58:13.286Z]         _grpkey_short_with_nulls,
[2023-12-31T15:58:13.286Z]         _grpkey_int_with_nulls,
[2023-12-31T15:58:13.286Z]         _grpkey_long_with_nulls,
[2023-12-31T15:58:13.286Z]         _grpkey_date_with_nulls,
[2023-12-31T15:58:13.286Z]     ], ids=idfn)
[2023-12-31T15:58:13.286Z]     def test_window_aggs_for_batched_finite_row_windows_partitioned(data_gen, batch_size):
[2023-12-31T15:58:13.286Z]         conf = {'spark.rapids.sql.batchSizeBytes': batch_size}
[2023-12-31T15:58:13.286Z] >       assert_gpu_and_cpu_are_equal_sql(
[2023-12-31T15:58:13.286Z]             lambda spark: gen_df(spark, data_gen, length=2048),
[2023-12-31T15:58:13.286Z]             'window_agg_table',
[2023-12-31T15:58:13.286Z]             """
[2023-12-31T15:58:13.286Z]             SELECT
[2023-12-31T15:58:13.286Z]               COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                              ROWS BETWEEN CURRENT ROW AND 100 FOLLOWING) AS count_1_asc,
[2023-12-31T15:58:13.286Z]               COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                              ROWS BETWEEN 100 PRECEDING AND CURRENT ROW) AS count_c_asc,
[2023-12-31T15:58:13.286Z]               COUNT(c) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                              ROWS BETWEEN -50 PRECEDING AND 100 FOLLOWING) AS count_c_negative,
[2023-12-31T15:58:13.286Z]               COUNT(1) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                              ROWS BETWEEN 50 PRECEDING AND -10 FOLLOWING) AS count_1_negative,
[2023-12-31T15:58:13.286Z]               SUM(c) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                            ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS sum_c_asc,
[2023-12-31T15:58:13.286Z]               AVG(c) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                            ROWS BETWEEN 10 PRECEDING AND 30 FOLLOWING) AS avg_c_asc,
[2023-12-31T15:58:13.286Z]               MAX(c) OVER (PARTITION BY a ORDER BY b,c DESC
[2023-12-31T15:58:13.286Z]                            ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS max_c_desc,
[2023-12-31T15:58:13.286Z]               MIN(c) OVER (PARTITION BY a ORDER BY b,c ASC
[2023-12-31T15:58:13.286Z]                            ROWS BETWEEN 1 PRECEDING AND 3 FOLLOWING) AS min_c_asc,
[2023-12-31T15:58:13.286Z]               LAG(c, 30) OVER (PARTITION BY a ORDER BY b,c ASC) AS lag_c_30_asc,
[2023-12-31T15:58:13.286Z]               LEAD(c, 40) OVER (PARTITION BY a ORDER BY b,c ASC) AS lead_c_40_asc
[2023-12-31T15:58:13.286Z]             FROM window_agg_table
[2023-12-31T15:58:13.286Z]             """,
[2023-12-31T15:58:13.286Z]             validate_execs_in_gpu_plan=['GpuBatchedBoundedWindowExec'],
[2023-12-31T15:58:13.286Z]             conf=conf)
[2023-12-31T15:58:13.286Z] 
[2023-12-31T15:58:13.286Z] ../../src/main/python/window_function_test.py:1830: 
[2023-12-31T15:58:13.286Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2023-12-31T15:58:13.286Z] ../../src/main/python/asserts.py:637: in assert_gpu_and_cpu_are_equal_sql
[2023-12-31T15:58:13.286Z]     assert_gpu_and_cpu_are_equal_collect(do_it_all, conf, is_cpu_first=is_cpu_first)
[2023-12-31T15:58:13.286Z] ../../src/main/python/asserts.py:595: in assert_gpu_and_cpu_are_equal_collect
[2023-12-31T15:58:13.286Z]     _assert_gpu_and_cpu_are_equal(func, 'COLLECT', conf=conf, is_cpu_first=is_cpu_first, result_canonicalize_func_before_compare=result_canonicalize_func_before_compare)
[2023-12-31T15:58:13.286Z] ../../src/main/python/asserts.py:517: in _assert_gpu_and_cpu_are_equal
[2023-12-31T15:58:13.286Z]     assert_equal(from_cpu, from_gpu)
[2023-12-31T15:58:13.286Z] ../../src/main/python/asserts.py:107: in assert_equal
[2023-12-31T15:58:13.286Z]     _assert_equal(cpu, gpu, float_check=get_float_check(), path=[])
[2023-12-31T15:58:13.287Z] ../../src/main/python/asserts.py:43: in _assert_equal
[2023-12-31T15:58:13.287Z]     _assert_equal(cpu[index], gpu[index], float_check, path + [index])
[2023-12-31T15:58:13.287Z] ../../src/main/python/asserts.py:36: in _assert_equal
[2023-12-31T15:58:13.287Z]     _assert_equal(cpu[field], gpu[field], float_check, path + [field])
[2023-12-31T15:58:13.287Z] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
[2023-12-31T15:58:13.287Z] 
[2023-12-31T15:58:13.287Z] cpu = 118944153, gpu = -1
[2023-12-31T15:58:13.287Z] float_check = . at 0x7fc14404b0d0>
[2023-12-31T15:58:13.287Z] path = [1653, 'max_c_desc']
[2023-12-31T15:58:13.287Z] 
[2023-12-31T15:58:13.287Z]     def _assert_equal(cpu, gpu, float_check, path):
[2023-12-31T15:58:13.287Z]         t = type(cpu)
[2023-12-31T15:58:13.287Z]         if (t is Row):
[2023-12-31T15:58:13.287Z]             assert len(cpu) == len(gpu), "CPU and GPU row have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
[2023-12-31T15:58:13.287Z]             if hasattr(cpu, "__fields__") and hasattr(gpu, "__fields__"):
[2023-12-31T15:58:13.287Z]                 assert cpu.__fields__ == gpu.__fields__, "CPU and GPU row have different fields at {} CPU: {} GPU: {}".format(path, cpu.__fields__, gpu.__fields__)
[2023-12-31T15:58:13.287Z]                 for field in cpu.__fields__:
[2023-12-31T15:58:13.287Z]                     _assert_equal(cpu[field], gpu[field], float_check, path + [field])
[2023-12-31T15:58:13.287Z]             else:
[2023-12-31T15:58:13.287Z]                 for index in range(len(cpu)):
[2023-12-31T15:58:13.287Z]                     _assert_equal(cpu[index], gpu[index], float_check, path + [index])
[2023-12-31T15:58:13.287Z]         elif (t is list):
[2023-12-31T15:58:13.287Z]             assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
[2023-12-31T15:58:13.287Z]             for index in range(len(cpu)):
[2023-12-31T15:58:13.287Z]                 _assert_equal(cpu[index], gpu[index], float_check, path + [index])
[2023-12-31T15:58:13.287Z]         elif (t is tuple):
[2023-12-31T15:58:13.287Z]             assert len(cpu) == len(gpu), "CPU and GPU list have different lengths at {} CPU: {} GPU: {}".format(path, len(cpu), len(gpu))
[2023-12-31T15:58:13.287Z]             for index in range(len(cpu)):
[2023-12-31T15:58:13.287Z]                 _assert_equal(cpu[index], gpu[index], float_check, path + [index])
[2023-12-31T15:58:13.287Z]         elif (t is pytypes.GeneratorType):
[2023-12-31T15:58:13.287Z]             index = 0
[2023-12-31T15:58:13.287Z]             # generator has no zip :( so we have to do this the hard way
[2023-12-31T15:58:13.287Z]             done = False
[2023-12-31T15:58:13.287Z]             while not done:
[2023-12-31T15:58:13.287Z]                 sub_cpu = None
[2023-12-31T15:58:13.287Z]                 sub_gpu = None
[2023-12-31T15:58:13.287Z]                 try:
[2023-12-31T15:58:13.287Z]                     sub_cpu = next(cpu)
[2023-12-31T15:58:13.287Z]                 except StopIteration:
[2023-12-31T15:58:13.287Z]                     done = True
[2023-12-31T15:58:13.287Z]     
[2023-12-31T15:58:13.287Z]                 try:
[2023-12-31T15:58:13.287Z]                     sub_gpu = next(gpu)
[2023-12-31T15:58:13.287Z]                 except StopIteration:
[2023-12-31T15:58:13.287Z]                     done = True
[2023-12-31T15:58:13.287Z]     
[2023-12-31T15:58:13.287Z]                 if done:
[2023-12-31T15:58:13.287Z]                     assert sub_cpu == sub_gpu and sub_cpu == None, "CPU and GPU generators have different lengths at {}".format(path)
[2023-12-31T15:58:13.287Z]                 else:
[2023-12-31T15:58:13.287Z]                     _assert_equal(sub_cpu, sub_gpu, float_check, path + [index])
[2023-12-31T15:58:13.287Z]     
[2023-12-31T15:58:13.287Z]                 index = index + 1
[2023-12-31T15:58:13.287Z]         elif (t is dict):
[2023-12-31T15:58:13.287Z]             # The order of key/values is not guaranteed in python dicts, nor are they guaranteed by Spark
[2023-12-31T15:58:13.287Z]             # so sort the items to do our best with ignoring the order of dicts
[2023-12-31T15:58:13.287Z]             cpu_items = list(cpu.items()).sort(key=_RowCmp)
[2023-12-31T15:58:13.287Z]             gpu_items = list(gpu.items()).sort(key=_RowCmp)
[2023-12-31T15:58:13.287Z]             _assert_equal(cpu_items, gpu_items, float_check, path + ["map"])
[2023-12-31T15:58:13.287Z]         elif (t is int):
[2023-12-31T15:58:13.287Z] >           assert cpu == gpu, "GPU and CPU int values are different at {}".format(path)
[2023-12-31T15:58:13.287Z] E           AssertionError: GPU and CPU int values are different at [1653, 'max_c_desc']
[2023-12-31T15:58:13.287Z] 
[2023-12-31T15:58:13.287Z] ../../src/main/python/asserts.py:78: AssertionError
[2023-12-31T15:58:13.287Z] ----------------------------- Captured stdout call -----------------------------
[2023-12-31T15:58:13.287Z] ### CPU RUN ###
[2023-12-31T15:58:13.287Z] ### GPU RUN ###
[2023-12-31T15:58:13.287Z] ### COLLECT: GPU TOOK 1.149477481842041 CPU TOOK 0.23086333274841309 ###
[2023-12-31T15:58:13.287Z] --- CPU OUTPUT
[2023-12-31T15:58:13.287Z] +++ GPU OUTPUT
[2023-12-31T15:58:13.287Z] @@ -1651,8 +1651,8 @@
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=13, count_c_negative=48, count_1_negative=5, sum_c_asc=-1522337086, avg_c_asc=57755395.9, max_c_desc=-180178850, min_c_asc=-699204090, lag_c_30_asc=None, lead_c_40_asc=-986110483)
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=14, count_c_negative=48, count_1_negative=6, sum_c_asc=-823132997, avg_c_asc=139202163.65, max_c_desc=-57826625, min_c_asc=-585127520, lag_c_30_asc=None, lead_c_40_asc=-590133661)
[2023-12-31T15:58:13.287Z]  Row(count_1_asc=101, count_c_asc=15, count_c_negative=48, count_1_negative=7, sum_c_asc=-119061324, avg_c_asc=216602118.025, max_c_desc=-1, min_c_asc=-180178850, lag_c_30_asc=None, lead_c_40_asc=-102784352)
[2023-12-31T15:58:13.288Z] -Row(count_1_asc=101, count_c_asc=16, count_c_negative=48, count_1_negative=8, sum_c_asc=426316276, avg_c_asc=280922573.15, max_c_desc=118944153, min_c_asc=-57826625, lag_c_30_asc=None, lead_c_40_asc=2147483647)
[2023-12-31T15:58:13.288Z] -Row(count_1_asc=101, count_c_asc=17, count_c_negative=48, count_1_negative=9, sum_c_asc=974775197, avg_c_asc=375564252.025, max_c_desc=-1, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=1148650438)
[2023-12-31T15:58:13.288Z] +Row(count_1_asc=101, count_c_asc=16, count_c_negative=48, count_1_negative=8, sum_c_asc=426316276, avg_c_asc=280922573.15, max_c_desc=-1, min_c_asc=-57826625, lag_c_30_asc=None, lead_c_40_asc=2147483647)
[2023-12-31T15:58:13.288Z] +Row(count_1_asc=101, count_c_asc=17, count_c_negative=48, count_1_negative=9, sum_c_asc=974775197, avg_c_asc=375564252.025, max_c_desc=118944153, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=1148650438)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=18, count_c_negative=48, count_1_negative=10, sum_c_asc=1795054366, avg_c_asc=419351579.925, max_c_desc=365198750, min_c_asc=-1, lag_c_30_asc=None, lead_c_40_asc=-1986725033)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=19, count_c_negative=48, count_1_negative=11, sum_c_asc=2851217903, avg_c_asc=457881349.85, max_c_desc=490632296, min_c_asc=118944153, lag_c_30_asc=None, lead_c_40_asc=1364594864)
[2023-12-31T15:58:13.288Z]  Row(count_1_asc=101, count_c_asc=20, count_c_negative=48, count_1_negative=12, sum_c_asc=3878924604, avg_c_asc=488059336.45, max_c_desc=820279168, min_c_asc=365198750, lag_c_30_asc=None, lead_c_40_asc=None)

Steps/Code to reproduce bug
Run Integration tests with Scala 2.13

Expected behavior
Tests pass

Environment details (please complete the following information)

  • Environment location: Standalone
  • Spark configuration settings related to the issue

Additional context

@sameerz sameerz added bug Something isn't working ? - Needs Triage Need team to review and classify labels Dec 31, 2023
@revans2
Copy link
Collaborator

revans2 commented Jan 2, 2024

This is rather scary. I was able to repro it locally, and to reduce the size of the test to just have the first two count operations and max. I am going to try and debug a bit more to see what I can come up with.

@revans2
Copy link
Collaborator

revans2 commented Jan 2, 2024

Actually I have dug into it and the failure is a test problem, not a plugin problem. The issue is that we order by both the columns b and c, but no effort has been made to make the ordering unambiguous. We got unlucky with this data seed and got two rows of input that are exactly the same null, null, -1. This test actually does two different window operations because the order of the sort is different. The second sort that happens resulted in the order of these two rows switching compared to the CPU which resulted in a different value for the max and the columns that came from the first window operation.

We just need a way to make the ordering unambiguous.

@mattahrens mattahrens added test Only impacts tests and removed ? - Needs Triage Need team to review and classify labels Jan 2, 2024
@mythrocks
Copy link
Collaborator

We just need a way to make the ordering unambiguous.

I suspected that might be the case. I'm looking into it now.

mythrocks added a commit to mythrocks/spark-rapids that referenced this issue Jan 2, 2024
…lure.

Fixes NVIDIA#10134.

This commit fixes test failures in `test_window_aggs_for_batched_finite_row_windows_partitioned`, resulting from ambiguous
ordering in the window function input.
The failing tests partition by `a`, and order by `b,c`.  When the values of `b,c` have repeated values, the results from
the window function execution is indeterminate.
This commit changes the definition of the aggregation column `c` (that's also included in the order-by clause), to use unique
long values.  This guarantees deterministic output.

Signed-off-by: MithunR <mythrocks@gmail.com>
mythrocks added a commit that referenced this issue Jan 4, 2024
…lure. (#10143)

Fixes #10134.

This commit fixes test failures in `test_window_aggs_for_batched_finite_row_windows_partitioned`, resulting from ambiguous
ordering in the window function input.
The failing tests partition by `a`, and order by `b,c`.  When the values of `b,c` have repeated values, the results from
the window function execution is indeterminate.
This commit changes the definition of the aggregation column `c` (that's also included in the order-by clause), to use unique
long values.  This guarantees deterministic output.

Signed-off-by: MithunR <mythrocks@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working test Only impacts tests
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants