Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable approx percentile tests #3770

Merged
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 69 additions & 41 deletions integration_tests/src/main/python/hash_aggregate_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,51 +1124,67 @@ def do_it(spark):
return df.groupBy('a').agg(f.min(df.b[1]["a"]))
assert_gpu_and_cpu_are_equal_collect(do_it)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long_repeated_keys():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', ByteGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_byte_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', ByteGen())], length=100),
0.5, conf)

@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_repeated_keys(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', RepeatSeqGen(LongGen(), length=20)),
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', LongRangeGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_long_scalar():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_long_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', LongRangeGen())], length=100),
0.5)
0.5, conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_double():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_double(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100),
[0.05, 0.25, 0.5, 0.75, 0.95])
[0.05, 0.25, 0.5, 0.75, 0.95], conf)

@pytest.mark.skip(reason="https://github.com/NVIDIA/spark-rapids/issues/3692")
@ignore_order(local=True)
def test_hash_groupby_approx_percentile_double_scalar():
@pytest.mark.parametrize('aqe_enabled', ['false', 'true'], ids=idfn)
def test_hash_groupby_approx_percentile_double_scalar(aqe_enabled):
conf = copy_and_update(_approx_percentile_conf, {'spark.sql.adaptive.enabled': aqe_enabled})
compare_percentile_approx(
lambda spark: gen_df(spark, [('k', StringGen(nullable=False)),
('v', DoubleGen())], length=100),
0.05)
0.05, conf)

# The percentile approx tests differ from other tests because we do not expect the CPU and GPU to produce the same
# results due to the different algorithms being used. Instead we compute an exact percentile on the CPU and then
# compute approximate percentiles on CPU and GPU and assert that the GPU numbers are accurate within some percentage
# of the CPU numbers
def compare_percentile_approx(df_fun, percentiles):
def compare_percentile_approx(df_fun, percentiles, conf):

# create SQL statements for exact and approx percentiles
p_exact_sql = create_percentile_sql("percentile", percentiles)
Expand All @@ -1190,32 +1206,44 @@ def run_approx(spark):
# run approx_percentile on CPU and GPU
approx_cpu, approx_gpu = run_with_cpu_and_gpu(run_approx, 'COLLECT', _approx_percentile_conf)

for result in zip(exact, approx_cpu, approx_gpu):
assert len(exact) == len(approx_cpu)
assert len(exact) == len(approx_gpu)

for i in range(len(exact)):
cpu_exact_result = exact[i]
cpu_approx_result = approx_cpu[i]
gpu_approx_result = approx_gpu[i]

# assert that keys match
assert result[0]['k'] == result[1]['k']
assert result[1]['k'] == result[2]['k']

exact = result[0]['the_percentile']
cpu = result[1]['the_percentile']
gpu = result[2]['the_percentile']

if exact is not None:
if isinstance(exact, list):
for x in zip(exact, cpu, gpu):
exact = x[0]
cpu = x[1]
gpu = x[2]
gpu_delta = abs(float(gpu) - float(exact))
cpu_delta = abs(float(cpu) - float(exact))
assert cpu_exact_result['k'] == cpu_approx_result['k']
assert cpu_exact_result['k'] == gpu_approx_result['k']

# extract the percentile result column
exact_percentile = cpu_exact_result['the_percentile']
cpu_approx_percentile = cpu_approx_result['the_percentile']
gpu_approx_percentile = gpu_approx_result['the_percentile']

if exact_percentile is None:
assert cpu_approx_percentile is None
assert gpu_approx_percentile is None
else:
assert cpu_approx_percentile is not None
assert gpu_approx_percentile is not None
if isinstance(exact_percentile, list):
for j in range(len(exact_percentile)):
assert cpu_approx_percentile[j] is not None
assert gpu_approx_percentile[j] is not None
gpu_delta = abs(float(gpu_approx_percentile[j]) - float(exact_percentile[j]))
revans2 marked this conversation as resolved.
Show resolved Hide resolved
cpu_delta = abs(float(cpu_approx_percentile[j]) - float(exact_percentile[j]))
if gpu_delta > cpu_delta:
# GPU is less accurate so make sure we are within some tolerance
if gpu_delta == 0:
assert abs(gpu_delta / cpu_delta) - 1 < 0.001
else:
assert abs(cpu_delta / gpu_delta) - 1 < 0.001
else:
gpu_delta = abs(float(gpu) - float(exact))
cpu_delta = abs(float(cpu) - float(exact))
gpu_delta = abs(float(gpu_approx_percentile) - float(exact_percentile))
cpu_delta = abs(float(cpu_approx_percentile) - float(exact_percentile))
if gpu_delta > cpu_delta:
# GPU is less accurate so make sure we are within some tolerance
if gpu_delta == 0:
Expand All @@ -1225,10 +1253,10 @@ def run_approx(spark):

def create_percentile_sql(func_name, percentiles):
if isinstance(percentiles, list):
return """select k, {}(v, array({})) as the_percentile from t group by k""".format(
return """select k, {}(v, array({})) as the_percentile from t group by k order by k""".format(
func_name, ",".join(str(i) for i in percentiles))
else:
return """select k, {}(v, {}) as the_percentile from t group by k""".format(
return """select k, {}(v, {}) as the_percentile from t group by k order by k""".format(
func_name, percentiles)

@ignore_order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,46 +28,60 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {

val DEFAULT_PERCENTILES = Array(0.005, 0.05, 0.25, 0.45, 0.5, 0.55, 0.75, 0.95, 0.995)

test("null handling") {
val func = spark => salariesWithNull(spark)
doTest(func, delta = Some(100))
}

test("1 row per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 1, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1)
doTest(func, delta = Some(100))
}

test("5 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 5, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 5)
doTest(func, delta = Some(100))
}

test("250 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, delta = Some(100))
}

test("2500 rows per group, delta 100, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 2500, delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 2500)
doTest(func, delta = Some(100))
}

test("250 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, delta = None)
}

test("25000 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 25000, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 25000)
doTest(func, delta = None)
}

test("50000 rows per group, default delta, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000)
doTest(func, delta = None)
}

// test with a threshold just below the default level of 10000
test("50000 rows per group, delta 9999, doubles") {
doTest(DataTypes.DoubleType, rowsPerGroup = 50000, delta = Some(9999))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 50000)
doTest(func, delta = Some(9999))
}

test("empty input set") {
doTest(DataTypes.DoubleType, rowsPerGroup = 0, delta = None)
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 1)
doTest(func, delta = None)
}

test("scalar percentile") {
doTest(DataTypes.DoubleType, rowsPerGroup = 250,
percentileArg = Left(0.5), delta = Some(100))
val func = spark => salaries(spark, DataTypes.DoubleType, rowsPerDept = 250)
doTest(func, percentileArg = Left(0.5), delta = Some(100))
}

test("empty percentile array fall back to CPU") {
Expand Down Expand Up @@ -105,25 +119,25 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {
}, conf)
}

private def doTest(dataType: DataType,
rowsPerGroup: Int,
percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES),
delta: Option[Int]) {
private def doTest(
func: SparkSession => DataFrame,
percentileArg: Either[Double, Array[Double]] = Right(DEFAULT_PERCENTILES),
delta: Option[Int]) {

val percentiles = withCpuSparkSession { spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta,
calcPercentiles(spark, func, percentileArg, delta,
approx = false)
}

val approxPercentilesCpu = withCpuSparkSession { spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true)
calcPercentiles(spark, func, percentileArg, delta, approx = true)
}

val conf = new SparkConf()
.set("spark.rapids.sql.expression.ApproximatePercentile", "true")

val approxPercentilesGpu = withGpuSparkSession(spark =>
calcPercentiles(spark, dataType, rowsPerGroup, percentileArg, delta, approx = true)
calcPercentiles(spark, func, percentileArg, delta, approx = true)
, conf)

val keys = percentiles.keySet ++ approxPercentilesCpu.keySet ++ approxPercentilesGpu.keySet
Expand Down Expand Up @@ -165,14 +179,13 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {

private def calcPercentiles(
spark: SparkSession,
dataType: DataType,
rowsPerDept: Int,
dfFunc: SparkSession => DataFrame,
percentilesArg: Either[Double, Array[Double]],
delta: Option[Int],
approx: Boolean
): Map[String, Array[Double]] = {

val df = salaries(spark, dataType, rowsPerDept)
val df = dfFunc(spark)

val percentileArg = percentilesArg match {
case Left(n) => s"$n"
Expand Down Expand Up @@ -210,6 +223,14 @@ class ApproximatePercentileSuite extends SparkQueryCompareTestSuite {
}).toMap
}

private def salariesWithNull(spark: SparkSession): DataFrame = {
import spark.implicits._
Seq(("a", null), ("b", null), ("b", "123456.78")).toDF("dept", "x")
.withColumn("salary", expr("CAST(x AS double)"))
.drop("x")
.repartition(2)
}

private def salaries(
spark: SparkSession,
salaryDataType: DataType, rowsPerDept: Int): DataFrame = {
Expand Down