diff --git a/integration_tests/src/main/python/qa_nightly_select_test.py b/integration_tests/src/main/python/qa_nightly_select_test.py index 3e8272daf5f..0e2e8aeb511 100644 --- a/integration_tests/src/main/python/qa_nightly_select_test.py +++ b/integration_tests/src/main/python/qa_nightly_select_test.py @@ -15,6 +15,7 @@ from pyspark.sql.types import * from pyspark import SparkConf, SparkContext, SQLContext +import pyspark.sql.functions as f import datetime from argparse import ArgumentParser from decimal import Decimal @@ -123,7 +124,18 @@ def num_stringDf_first_last(spark, field_name): ("NVIDIASPARKTEAM", 0, 50, 0, -20, 2.012, 4.000013, -4.01, False, tm, dt), (None, 0, 500, -3200, 0, 0.0, 0.0, -4.01, False, tm, dt), ("phuoc", 30, 500, 3200, -20, 20.12, 4.000013, 4.01, False, tm, dt)] - df = spark.createDataFrame(data,schema=schema).repartition(1).orderBy(field_name) + # First/Last have a lot of odd issues with getting these tests to pass + # They are non-deterministic unless you have a single partition that is sorted + # that is why we are coalesce to a single partition and sort within the partition + # also for sort aggregations (done when variable width types like strings are in the output) + # spark will re-sort the data based off of the grouping key. Spark sort appears to + # have no guarantee about being a stable sort. In practice I have found that + # sorting the data desc with nulls last matches with what spark is doing, but + # there is no real guarantee that it will continue to work, so if the first/last + # tests fail on strings this might be the cause of it. + df = spark.createDataFrame(data,schema=schema)\ + .coalesce(1)\ + .sortWithinPartitions(f.col(field_name).desc_nulls_last()) df.createOrReplaceTempView("test_table") def idfn(val): @@ -133,9 +145,15 @@ def idfn(val): 'spark.rapids.sql.variableFloatAgg.enabled': 'true', 'spark.rapids.sql.hasNans': 'false', 'spark.rapids.sql.castStringToFloat.enabled': 'true', - 'spark.rapids.sql.castFloatToString.enabled': 'true', + 'spark.rapids.sql.castFloatToString.enabled': 'true' } +_first_last_qa_conf = _qa_conf.copy() +_first_last_qa_conf.update({ + # some of the first/last tests need a single partition to work reliably when run on a large cluster. + 'spark.sql.shuffle.partitions': '1' + }) + @approximate_float @incompat @qarun @@ -185,7 +203,7 @@ def test_select_first_last(sql_query_line, pytestconfig): if sql_query: print(sql_query) with_cpu_session(lambda spark: num_stringDf_first_last(spark, sql_query_line[2])) - assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query), conf=_qa_conf) + assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.sql(sql_query), conf=_first_last_qa_conf) @approximate_float(abs=1e-6) @incompat diff --git a/integration_tests/src/main/python/qa_nightly_sql.py b/integration_tests/src/main/python/qa_nightly_sql.py index 4f811ff0116..d48c7664b2c 100644 --- a/integration_tests/src/main/python/qa_nightly_sql.py +++ b/integration_tests/src/main/python/qa_nightly_sql.py @@ -765,7 +765,6 @@ ("SELECT FIRST(floatF) as res FROM test_table GROUP BY intF", "FIRST(floatF) GROUP BY intF", "floatF"), ("SELECT FIRST(doubleF) as res FROM test_table GROUP BY intF", "FIRST(doubleF) GROUP BY intF", "doubleF"), ("SELECT FIRST(booleanF) as res FROM test_table GROUP BY intF", "FIRST(booleanF) GROUP BY intF", "booleanF"), -("SELECT FIRST(strF) as res FROM test_table GROUP BY intF", "FIRST(strF) GROUP BY intF", "strF"), ("SELECT FIRST(dateF) as res FROM test_table GROUP BY intF", "FIRST(dateF) GROUP BY intF", "dateF"), ("SELECT FIRST(timestampF) as res FROM test_table GROUP BY intF", "FIRST(timestampF) GROUP BY intF", "timestampF"), ("SELECT FIRST(byteF) as res FROM test_table GROUP BY intF, shortF", "FIRST(byteF) GROUP BY intF, shortF", "byteF"), @@ -778,12 +777,17 @@ ("SELECT LAST(floatF) as res FROM test_table GROUP BY intF", "LAST(floatF) GROUP BY intF", "floatF"), ("SELECT LAST(doubleF) as res FROM test_table GROUP BY intF", "LAST(doubleF) GROUP BY intF", "doubleF"), ("SELECT LAST(booleanF) as res FROM test_table GROUP BY intF", "LAST(booleanF) GROUP BY intF", "booleanF"), -("SELECT LAST(strF) as res FROM test_table GROUP BY intF", "LAST(strF) GROUP BY intF", "strF"), ("SELECT LAST(dateF) as res FROM test_table GROUP BY intF", "LAST(dateF) GROUP BY intF", "dateF"), ("SELECT LAST(timestampF) as res FROM test_table GROUP BY intF", "LAST(timestampF) GROUP BY intF", "timestampF"), ("SELECT byteF, SUM(byteF) OVER (PARTITION BY shortF ORDER BY intF ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING ) as res FROM test_table", "byteF, SUM(byteF) OVER (PARTITION BY shortF ORDER BY intF ROWS BETWEEN 2 PRECEDING AND 2 FOLLOWING ) as res", "byteF"), ("SELECT SUM(intF) OVER (PARTITION BY byteF ORDER BY byteF ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) as res FROM test_table", "SUM(intF) OVER (PARTITION BY byteF ORDER BY byteF ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) as res", "intF"), +# Aggregations with variable width outputs, like strings, are done using a sort aggregation on the CPU +# There are a number of issues related to this and getting the GPU to match. If either of these +# queries fail it is likely related to sorting in spark, and there may not be a lot that we can +# do to fix this. +("SELECT LAST(strF) as res FROM test_table GROUP BY intF", "LAST(strF) GROUP BY intF", "strF"), +("SELECT FIRST(strF) as res FROM test_table GROUP BY intF", "FIRST(strF) GROUP BY intF", "strF"), ] ''' ("SELECT LAST(byteF) FROM test_table", "LAST(byteF)"),