From 7a0ff7ba8c53373c1a33a95788017a85a0beeae9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 Oct 2021 14:39:28 -0600 Subject: [PATCH 01/13] Enable some approx percentile tests Signed-off-by: Andy Grove --- .../src/main/python/hash_aggregate_test.py | 59 +++++++++++-------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 6c9ac6cf007..326461a236f 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1088,7 +1088,6 @@ def do_it(spark): return df.groupBy('a').agg(f.min(df.b[1]["a"])) assert_gpu_and_cpu_are_equal_collect(do_it) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long_repeated_keys(): compare_percentile_approx( @@ -1096,7 +1095,6 @@ def test_hash_groupby_approx_percentile_long_repeated_keys(): ('v', LongRangeGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long(): compare_percentile_approx( @@ -1104,7 +1102,6 @@ def test_hash_groupby_approx_percentile_long(): ('v', LongRangeGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long_scalar(): compare_percentile_approx( @@ -1112,7 +1109,7 @@ def test_hash_groupby_approx_percentile_long_scalar(): ('v', LongRangeGen())], length=100), 0.5) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double(): compare_percentile_approx( @@ -1120,7 +1117,7 @@ def test_hash_groupby_approx_percentile_double(): ('v', DoubleGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double_scalar(): compare_percentile_approx( @@ -1153,24 +1150,36 @@ def run_approx(spark): # run approx_percentile on CPU and GPU approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf) + print(approx_cpu) + print(approx_gpu) + + assert len(exact) == len(approx_cpu) + assert len(exact) == len(approx_gpu) + + for i in range(len(exact)): + cpu_exact_result = exact[i] + cpu_approx_result = approx_cpu[i] + gpu_approx_result = approx_gpu[i] - for result in zip(exact, approx_cpu, approx_gpu): # assert that keys match - assert result[0]['k'] == result[1]['k'] - assert result[1]['k'] == result[2]['k'] - - exact = result[0]['the_percentile'] - cpu = result[1]['the_percentile'] - gpu = result[2]['the_percentile'] - - if exact is not None: - if isinstance(exact, list): - for x in zip(exact, cpu, gpu): - exact = x[0] - cpu = x[1] - gpu = x[2] - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + assert cpu_exact_result['k'] == cpu_approx_result['k'] + assert cpu_exact_result['k'] == gpu_approx_result['k'] + + # extract the percentile result column + exact_percentile = cpu_exact_result['the_percentile'] + cpu_approx_percentile = cpu_approx_result['the_percentile'] + gpu_approx_percentile = gpu_approx_result['the_percentile'] + + if exact_percentile is None: + assert cpu_approx_percentile is None + assert gpu_approx_percentile is None + else: + assert cpu_approx_percentile is not None + assert gpu_approx_percentile is not None + if isinstance(exact_percentile, list): + for j in range(len(exact_percentile)): + gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j])) + cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j])) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1178,8 +1187,8 @@ def run_approx(spark): else: assert abs(cpu_delta / gpu_delta) - 1 < 0.001 else: - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + gpu_delta = abs(float(gpu_approx_percentile) - float(exact_percentile)) + cpu_delta = abs(float(cpu_approx_percentile) - float(exact_percentile)) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1189,10 +1198,10 @@ def run_approx(spark): def create_percentile_sql(func_name, percentiles): if isinstance(percentiles, list): - return """select k, {}(v, array({})) as the_percentile from t group by k""".format( + return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format( func_name, ",".join(str(i) for i in percentiles)) else: - return """select k, {}(v, {}) as the_percentile from t group by k""".format( + return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format( func_name, percentiles) @ignore_order From 5bee18cce37a1f49b770d540049fd16dc66c08a6 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 Oct 2021 14:39:28 -0600 Subject: [PATCH 02/13] Enable some approx percentile tests Signed-off-by: Andy Grove --- .../src/main/python/hash_aggregate_test.py | 59 +++++++++++-------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index faab4c2f5ec..a49eb9c5d24 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1124,7 +1124,6 @@ def do_it(spark): return df.groupBy('a').agg(f.min(df.b[1]["a"])) assert_gpu_and_cpu_are_equal_collect(do_it) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long_repeated_keys(): compare_percentile_approx( @@ -1132,7 +1131,6 @@ def test_hash_groupby_approx_percentile_long_repeated_keys(): ('v', LongRangeGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long(): compare_percentile_approx( @@ -1140,7 +1138,6 @@ def test_hash_groupby_approx_percentile_long(): ('v', LongRangeGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") @ignore_order(local=True) def test_hash_groupby_approx_percentile_long_scalar(): compare_percentile_approx( @@ -1148,7 +1145,7 @@ def test_hash_groupby_approx_percentile_long_scalar(): ('v', LongRangeGen())], length=100), 0.5) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double(): compare_percentile_approx( @@ -1156,7 +1153,7 @@ def test_hash_groupby_approx_percentile_double(): ('v', DoubleGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692") +@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double_scalar(): compare_percentile_approx( @@ -1189,24 +1186,36 @@ def run_approx(spark): # run approx_percentile on CPU and GPU approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf) + print(approx_cpu) + print(approx_gpu) + + assert len(exact) == len(approx_cpu) + assert len(exact) == len(approx_gpu) + + for i in range(len(exact)): + cpu_exact_result = exact[i] + cpu_approx_result = approx_cpu[i] + gpu_approx_result = approx_gpu[i] - for result in zip(exact, approx_cpu, approx_gpu): # assert that keys match - assert result[0]['k'] == result[1]['k'] - assert result[1]['k'] == result[2]['k'] - - exact = result[0]['the_percentile'] - cpu = result[1]['the_percentile'] - gpu = result[2]['the_percentile'] - - if exact is not None: - if isinstance(exact, list): - for x in zip(exact, cpu, gpu): - exact = x[0] - cpu = x[1] - gpu = x[2] - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + assert cpu_exact_result['k'] == cpu_approx_result['k'] + assert cpu_exact_result['k'] == gpu_approx_result['k'] + + # extract the percentile result column + exact_percentile = cpu_exact_result['the_percentile'] + cpu_approx_percentile = cpu_approx_result['the_percentile'] + gpu_approx_percentile = gpu_approx_result['the_percentile'] + + if exact_percentile is None: + assert cpu_approx_percentile is None + assert gpu_approx_percentile is None + else: + assert cpu_approx_percentile is not None + assert gpu_approx_percentile is not None + if isinstance(exact_percentile, list): + for j in range(len(exact_percentile)): + gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j])) + cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j])) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1214,8 +1223,8 @@ def run_approx(spark): else: assert abs(cpu_delta / gpu_delta) - 1 < 0.001 else: - gpu_delta = abs(float(gpu) - float(exact)) - cpu_delta = abs(float(cpu) - float(exact)) + gpu_delta = abs(float(gpu_approx_percentile) - float(exact_percentile)) + cpu_delta = abs(float(cpu_approx_percentile) - float(exact_percentile)) if gpu_delta > cpu_delta: # GPU is less accurate so make sure we are within some tolerance if gpu_delta == 0: @@ -1225,10 +1234,10 @@ def run_approx(spark): def create_percentile_sql(func_name, percentiles): if isinstance(percentiles, list): - return """select k, {}(v, array({})) as the_percentile from t group by k""".format( + return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format( func_name, ",".join(str(i) for i in percentiles)) else: - return """select k, {}(v, {}) as the_percentile from t group by k""".format( + return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format( func_name, percentiles) @ignore_order From 7f5674f108cd2e808bb91cad681e27d787c72f78 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 4 Oct 2021 15:30:05 -0600 Subject: [PATCH 03/13] enable tests and add more tests --- .../src/main/python/hash_aggregate_test.py | 2 - .../rapids/ApproximatePercentileSuite.scala | 64 +++++++++++++------ 2 files changed, 43 insertions(+), 23 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index a49eb9c5d24..2eadb49ad94 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1145,7 +1145,6 @@ def test_hash_groupby_approx_percentile_long_scalar(): ('v', LongRangeGen())], length=100), 0.5) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double(): compare_percentile_approx( @@ -1153,7 +1152,6 @@ def test_hash_groupby_approx_percentile_double(): ('v', DoubleGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) -@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3706") @ignore_order(local=True) def test_hash_groupby_approx_percentile_double_scalar(): compare_percentile_approx( diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala index f26d85b9d13..95ef870ccd6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala @@ -28,46 +28,61 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { val DEFAULT_PERCENTILES = Array(0.005, 0.05, 0.25, 0.45, 0.5, 0.55, 0.75, 0.95, 0.995) + // cudaErrorIllegalAddress: an illegal memory access was encountered + test("null handling") { + val func = spark => salariesWithNull(spark) + doTest(func, delta = Some(100)) + } + test("1 row per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 1, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1) + doTest(func, delta = Some(100)) } test("5 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 5, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 5) + doTest(func, delta = Some(100)) } test("250 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, delta = Some(100)) } test("2500 rows per group, delta 100, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 2500, delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 2500) + doTest(func, delta = Some(100)) } test("250 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, delta = None) } test("25000 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 25000, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 25000) + doTest(func, delta = None) } test("50000 rows per group, default delta, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000) + doTest(func, delta = None) } // test with a threshold just below the default level of 10000 test("50000 rows per group, delta 9999, doubles") { - doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = Some(9999)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000) + doTest(func, delta = Some(9999)) } test("empty input set") { - doTest(DataTypes.DoubleType, rowsPerGroup = 0, delta = None) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1) + doTest(func, delta = None) } test("scalar percentile") { - doTest(DataTypes.DoubleType, rowsPerGroup = 250, - percentileArg = Left(0.5), delta = Some(100)) + val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250) + doTest(func, percentileArg = Left(0.5), delta = Some(100)) } test("empty percentile array fall back to CPU") { @@ -105,25 +120,25 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { }, conf) } - private def doTest(dataType: DataType, - rowsPerGroup: Int, - percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES), - delta: Option[Int]) { + private def doTest( + func: SparkSession => DataFrame, + percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES), + delta: Option[Int]) { val percentiles = withCpuSparkSession { spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, + calcPercentiles(spark, func, percentileArg, delta, approx = false) } val approxPercentilesCpu = withCpuSparkSession { spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true) + calcPercentiles(spark, func, percentileArg, delta, approx = true) } val conf = new SparkConf() .set("spark.rapids.sql.expression.ApproximatePercentile", "true") val approxPercentilesGpu = withGpuSparkSession(spark => - calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true) + calcPercentiles(spark, func, percentileArg, delta, approx = true) , conf) val keys = percentiles.keySet ++ approxPercentilesCpu.keySet ++ approxPercentilesGpu.keySet @@ -165,14 +180,13 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { private def calcPercentiles( spark: SparkSession, - dataType: DataType, - rowsPerDept: Int, + dfFunc: SparkSession => DataFrame, percentilesArg: Either[Double, Array[Double]], delta: Option[Int], approx: Boolean ): Map[String, Array[Double]] = { - val df = salaries(spark, dataType, rowsPerDept) + val df = dfFunc(spark) val percentileArg = percentilesArg match { case Left(n) => s"$n" @@ -210,6 +224,14 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { }).toMap } + private def salariesWithNull(spark: SparkSession): DataFrame = { + import spark.implicits._ + Seq(("a", null), ("b", "123456.78")).toDF("dept", "x") + .withColumn("salary", expr("CAST(x AS double)")) + .drop("x") + .repartition(2) + } + private def salaries( spark: SparkSession, salaryDataType: DataType, rowsPerDept: Int): DataFrame = { From e5fb1d05bf6ce2ee363825e529b1ee4f431232f9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 5 Oct 2021 09:49:38 -0600 Subject: [PATCH 04/13] improve null test --- .../com/nvidia/spark/rapids/ApproximatePercentileSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala index 95ef870ccd6..b6f77c83048 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala @@ -226,7 +226,7 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { private def salariesWithNull(spark: SparkSession): DataFrame = { import spark.implicits._ - Seq(("a", null), ("b", "123456.78")).toDF("dept", "x") + Seq(("a", null), ("b", null), ("b", "123456.78")).toDF("dept", "x") .withColumn("salary", expr("CAST(x AS double)")) .drop("x") .repartition(2) From 3a2c4967036bf1e88fe8113797887c4c62ebc557 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 7 Oct 2021 10:14:49 -0600 Subject: [PATCH 05/13] add tests for byte input --- .../src/main/python/hash_aggregate_test.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 2eadb49ad94..f16d92d7d12 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1131,6 +1131,20 @@ def test_hash_groupby_approx_percentile_long_repeated_keys(): ('v', LongRangeGen())], length=100), [0.05, 0.25, 0.5, 0.75, 0.95]) +@ignore_order(local=True) +def test_hash_groupby_approx_percentile_byte(): + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', ByteGen())], length=100), + [0.05, 0.25, 0.5, 0.75, 0.95]) + +@ignore_order(local=True) +def test_hash_groupby_approx_percentile_byte_scalar(): + compare_percentile_approx( + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', ByteGen())], length=100), + 0.5) + @ignore_order(local=True) def test_hash_groupby_approx_percentile_long(): compare_percentile_approx( From 8f906a88488380acb9500dcbc00f2c8560053b6b Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 7 Oct 2021 19:04:16 -0600 Subject: [PATCH 06/13] remove temp debug print --- integration_tests/src/main/python/hash_aggregate_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index f16d92d7d12..8413768518d 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1198,8 +1198,6 @@ def run_approx(spark): # run approx_percentile on CPU and GPU approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf) - print(approx_cpu) - print(approx_gpu) assert len(exact) == len(approx_cpu) assert len(exact) == len(approx_gpu) From 40c4644135822e1e952dc08184551137e84fb1d3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Thu, 7 Oct 2021 19:07:04 -0600 Subject: [PATCH 07/13] Remove comment Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/ApproximatePercentileSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala index b6f77c83048..3636126d264 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ApproximatePercentileSuite.scala @@ -28,7 +28,6 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite { val DEFAULT_PERCENTILES = Array(0.005, 0.05, 0.25, 0.45, 0.5, 0.55, 0.75, 0.95, 0.995) - // cudaErrorIllegalAddress: an illegal memory access was encountered test("null handling") { val func = spark => salariesWithNull(spark) doTest(func, delta = Some(100)) From 163df724f8b1877f16e6d0f60bfd05b734696253 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 12 Oct 2021 10:51:40 -0600 Subject: [PATCH 08/13] update documentation --- docs/compatibility.md | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index fc1419db4c3..a1caf20525d 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -567,8 +567,4 @@ The GPU implementation of `approximate_percentile` uses [t-Digests](https://arxiv.org/abs/1902.04023) which have high accuracy, particularly near the tails of a distribution. Because the results are not bit-for-bit identical with the Apache Spark implementation of `approximate_percentile`, this feature is disabled by default and can be enabled by setting -`spark.rapids.approxPercentileEnabled=true`. - -There are known issues with the approximate percentile implementation -([#3706](https://github.com/NVIDIA/spark-rapids/issues/3706), -[#3692](https://github.com/NVIDIA/spark-rapids/issues/3692)) and the feature should be considered experimental. \ No newline at end of file +`spark.rapids.sql.expression.ApproximatePercentile=true`. \ No newline at end of file From d2f5acbe539a3ef5c32a7d8db76b937ada038173 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 15 Oct 2021 11:47:25 -0600 Subject: [PATCH 09/13] run approx percentile tests with and without AQE Signed-off-by: Andy Grove --- .../src/main/python/hash_aggregate_test.py | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 8413768518d..b7f1bf42ef7 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1124,60 +1124,74 @@ def do_it(spark): return df.groupBy('a').agg(f.min(df.b[1]["a"])) assert_gpu_and_cpu_are_equal_collect(do_it) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_long_repeated_keys(): +def test_hash_groupby_approx_percentile_byte(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( - lambda spark: gen_df(spark, [('k', RepeatSeqGen(LongGen(), length=20)), - ('v', LongRangeGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), + ('v', ByteGen())], length=100), + [0.05, 0.25, 0.5, 0.75, 0.95], conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_byte(): +def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', ByteGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + 0.5, conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_byte_scalar(): +def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( - lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), - ('v', ByteGen())], length=100), - 0.5) + lambda spark: gen_df(spark, [('k', RepeatSeqGen(LongGen(), length=20)), + ('v', LongRangeGen())], length=100), + [0.05, 0.25, 0.5, 0.75, 0.95], conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_long(): +def test_hash_groupby_approx_percentile_long(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', LongRangeGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + [0.05, 0.25, 0.5, 0.75, 0.95], conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_long_scalar(): +def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', LongRangeGen())], length=100), - 0.5) + 0.5, conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_double(): +def test_hash_groupby_approx_percentile_double(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', DoubleGen())], length=100), - [0.05, 0.25, 0.5, 0.75, 0.95]) + [0.05, 0.25, 0.5, 0.75, 0.95], conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) @ignore_order(local=True) -def test_hash_groupby_approx_percentile_double_scalar(): +def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( lambda spark: gen_df(spark, [('k', StringGen(nullable=False)), ('v', DoubleGen())], length=100), - 0.05) + 0.05, conf) # The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same # results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then # compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage # of the CPU numbers -def compare_percentile_approx(df_fun, percentiles): +def compare_percentile_approx(df_fun, percentiles, conf): # create SQL statements for exact and approx percentiles p_exact_sql = create_percentile_sql("percentile", percentiles) From ea908eddd58c76724f57dd3fa2be10b0aea322d5 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 15 Oct 2021 16:56:37 -0600 Subject: [PATCH 10/13] Add test for split CPU/GPU approx_percentile and implement fix --- .../src/main/python/hash_aggregate_test.py | 25 +++++++++++++++++++ .../com/nvidia/spark/rapids/RapidsMeta.scala | 5 ++++ .../execution/GpuBroadcastExchangeExec.scala | 2 +- .../GpuShuffleExchangeExecBase.scala | 4 +-- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index b7f1bf42ef7..8a480ad9ee9 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1187,6 +1187,31 @@ def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled): ('v', DoubleGen())], length=100), 0.05, conf) +@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) +@ignore_order(local=True) +@allow_non_gpu('TakeOrderedAndProjectExec', 'Alias', 'Cast', 'ObjectHashAggregateExec', 'AggregateExpression', + 'ApproximatePercentile', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'CollectLimitExec') +def test_hash_groupby_approx_percentile_partial_fallback_to_cpu(aqe_enabled): + conf = copy_and_update(_approx_percentile_conf, { + 'spark.sql.adaptive.enabled': aqe_enabled, + 'spark.rapids.sql.explain': 'ALL' + }) + + def create_and_show_df(spark): + df = gen_df(spark, [('k', StringGen(nullable=False)), + ('v', DoubleGen())], length=100) + df.createOrReplaceTempView("t") + df2 = spark.sql("SELECT k, approx_percentile(v, array(0.1, 0.2)) from t group by k") + + # the "show" introduces a `CAST(approx_percentile(...) AS string)` on the final aggregate and this is + # not supported on GPU so falls back to CPU and the purpose of this test is to make sure that the + # partial aggregate also falls back to CPU + df2.show() + + return df2 + + run_with_cpu_and_gpu(create_and_show_df, 'COLLECT', conf) + # The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same # results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then # compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 82d78c2b081..3f89e42ae01 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -627,6 +627,11 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } } + def recursivelyCheckTags() { + wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) + childPlans.foreach(_.recursivelyCheckTags) + } + /** * Run rules that happen for the entire tree after it has been tagged initially. */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index 48e3524de30..b88dcdb8e2c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -253,7 +253,7 @@ class GpuBroadcastMeta( } // when AQE is enabled and we are planning a new query stage, we need to look at meta-data // previously stored on the spark plan to determine whether this exchange can run on GPU - wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) + recursivelyCheckTags() } override def convertToGpu(): GpuExec = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 00220ea160d..c5b9089bbd1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -18,11 +18,9 @@ package org.apache.spark.sql.rapids.execution import scala.collection.AbstractIterator import scala.concurrent.Future - import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode - import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer @@ -81,7 +79,7 @@ class GpuShuffleMeta( override def tagPlanForGpu(): Unit = { // when AQE is enabled and we are planning a new query stage, we need to look at meta-data // previously stored on the spark plan to determine whether this exchange can run on GPU - wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) + recursivelyCheckTags() shuffle.outputPartitioning match { case _: RoundRobinPartitioning From 45683a8af7a76374306b4b70731d6be4e1bfb22a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Oct 2021 17:05:00 -0600 Subject: [PATCH 11/13] scalastyle --- .../src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala | 2 +- .../spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 3f89e42ae01..e5f5e00c370 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -629,7 +629,7 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, def recursivelyCheckTags() { wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) - childPlans.foreach(_.recursivelyCheckTags) + childPlans.foreach(_.recursivelyCheckTags()) } /** diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index c5b9089bbd1..2e5980b67a9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -18,9 +18,11 @@ package org.apache.spark.sql.rapids.execution import scala.collection.AbstractIterator import scala.concurrent.Future + import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.shims.v2.ShimUnaryExecNode + import org.apache.spark.{MapOutputStatistics, ShuffleDependency} import org.apache.spark.rdd.RDD import org.apache.spark.serializer.Serializer From f5c633af557b3fbae05e142f1af5e4316fa34326 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 18 Oct 2021 18:07:39 -0600 Subject: [PATCH 12/13] Revert fix for issue 3770 --- .../src/main/python/hash_aggregate_test.py | 25 ------------------- .../com/nvidia/spark/rapids/RapidsMeta.scala | 5 ---- .../execution/GpuBroadcastExchangeExec.scala | 2 +- .../GpuShuffleExchangeExecBase.scala | 2 +- 4 files changed, 2 insertions(+), 32 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index 8a480ad9ee9..b7f1bf42ef7 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1187,31 +1187,6 @@ def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled): ('v', DoubleGen())], length=100), 0.05, conf) -@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) -@allow_non_gpu('TakeOrderedAndProjectExec', 'Alias', 'Cast', 'ObjectHashAggregateExec', 'AggregateExpression', - 'ApproximatePercentile', 'Literal', 'ShuffleExchangeExec', 'HashPartitioning', 'CollectLimitExec') -def test_hash_groupby_approx_percentile_partial_fallback_to_cpu(aqe_enabled): - conf = copy_and_update(_approx_percentile_conf, { - 'spark.sql.adaptive.enabled': aqe_enabled, - 'spark.rapids.sql.explain': 'ALL' - }) - - def create_and_show_df(spark): - df = gen_df(spark, [('k', StringGen(nullable=False)), - ('v', DoubleGen())], length=100) - df.createOrReplaceTempView("t") - df2 = spark.sql("SELECT k, approx_percentile(v, array(0.1, 0.2)) from t group by k") - - # the "show" introduces a `CAST(approx_percentile(...) AS string)` on the final aggregate and this is - # not supported on GPU so falls back to CPU and the purpose of this test is to make sure that the - # partial aggregate also falls back to CPU - df2.show() - - return df2 - - run_with_cpu_and_gpu(create_and_show_df, 'COLLECT', conf) - # The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same # results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then # compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index e5f5e00c370..82d78c2b081 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -627,11 +627,6 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, } } - def recursivelyCheckTags() { - wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) - childPlans.foreach(_.recursivelyCheckTags()) - } - /** * Run rules that happen for the entire tree after it has been tagged initially. */ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala index b88dcdb8e2c..48e3524de30 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuBroadcastExchangeExec.scala @@ -253,7 +253,7 @@ class GpuBroadcastMeta( } // when AQE is enabled and we are planning a new query stage, we need to look at meta-data // previously stored on the spark plan to determine whether this exchange can run on GPU - recursivelyCheckTags() + wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) } override def convertToGpu(): GpuExec = { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala index 8e50e9798ef..463b471281d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuShuffleExchangeExecBase.scala @@ -81,7 +81,7 @@ class GpuShuffleMeta( override def tagPlanForGpu(): Unit = { // when AQE is enabled and we are planning a new query stage, we need to look at meta-data // previously stored on the spark plan to determine whether this exchange can run on GPU - recursivelyCheckTags() + wrapped.getTagValue(gpuSupportedTag).foreach(_.foreach(willNotWorkOnGpu)) shuffle.outputPartitioning match { case _: RoundRobinPartitioning From 13039fe6987f07629ee3cff8d703c352bceb722a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 22 Oct 2021 10:07:27 -0600 Subject: [PATCH 13/13] address PR feedback --- integration_tests/src/main/python/hash_aggregate_test.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/integration_tests/src/main/python/hash_aggregate_test.py b/integration_tests/src/main/python/hash_aggregate_test.py index b7f1bf42ef7..af7722414b3 100644 --- a/integration_tests/src/main/python/hash_aggregate_test.py +++ b/integration_tests/src/main/python/hash_aggregate_test.py @@ -1125,7 +1125,6 @@ def do_it(spark): assert_gpu_and_cpu_are_equal_collect(do_it) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_byte(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1134,7 +1133,6 @@ def test_hash_groupby_approx_percentile_byte(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1143,7 +1141,6 @@ def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled): 0.5, conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1152,7 +1149,6 @@ def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_long(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1161,7 +1157,6 @@ def test_hash_groupby_approx_percentile_long(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1170,7 +1165,6 @@ def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled): 0.5, conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_double(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1179,7 +1173,6 @@ def test_hash_groupby_approx_percentile_double(aqe_enabled): [0.05, 0.25, 0.5, 0.75, 0.95], conf) @pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn) -@ignore_order(local=True) def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled): conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled}) compare_percentile_approx( @@ -1238,6 +1231,8 @@ def run_approx(spark): assert gpu_approx_percentile is not None if isinstance(exact_percentile, list): for j in range(len(exact_percentile)): + assert cpu_approx_percentile[j] is not None + assert gpu_approx_percentile[j] is not None gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j])) cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j])) if gpu_delta > cpu_delta: