Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support MinBy and MaxBy for non-float ordering #11371

Merged
merged 22 commits into from
Aug 29, 2024

Conversation

thirtiseven
Copy link
Collaborator

@thirtiseven thirtiseven commented Aug 21, 2024

Closes #10968

This PR supports MinBy and MaxBy aggregation for non-float ordering, which is not required by the customer.

The main idea is to pack the ordering and value columns to a struct column and do a min/max aggregation on them. When the ordering column is not unique this pr will always return the minimal value while spark's result is non-deterministic.

So this pr is not using the previous argmin + remove nulls + gather method in rapidsai/cudf#16163 and #11123. It does not have cudf dependency pr now.

Spark has a special NaN handling which is different with cuDF, so it will be not straightforward to implement float ordering min/max_by with current method. But it is a rare use case I think we don't need to support for now.

Many works are from @firestarman

Ran a simple perf test for 3 times

// (with big data gen)
import org.apache.spark.sql.tests.datagen._
val dataTable = DBGen().addTable("data", "a byte, b long, c long", 500000000)
dataTable.toDF(spark).write.mode("OVERWRITE").parquet("PERF")

// spark-rapids
val df = spark.read.parquet("PERF")
spark.time(df.groupBy("a").agg(min_by(col("b"), col("c")), min_by(col("c"), col("b"))).show(300))

on 500000000 rows of (a: byte, b: long, c: long)

results:

gpu: 3,484 ms
cpu: 5,844 ms
speedup: 68%

Another round with a complex value and simple order:

val dataTable = DBGen().addTable("data", "a byte, b struct<string, array<long>>, c int", 20000000)
spark.time(df.groupBy("a").agg(min_by(col("b"), col("c"))).show(300, false))

gpu: 1,049 ms
#11123: 1,019 m s
cpu: 2,776 ms
speedup: 2.65x

firestarman and others added 12 commits July 1, 2024 02:01
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven thirtiseven self-assigned this Aug 23, 2024
@thirtiseven thirtiseven marked this pull request as ready for review August 23, 2024 07:32
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

build

@firestarman
Copy link
Collaborator

firestarman commented Aug 26, 2024

Looks good to me, but it got failures in premerge.

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also concerned with the performance numbers. I don't know how to reproduce those numbers because df is not included. A speedup of 36% over the CPU is not much, and I am very concerned that if the value of the aggregation is long and complicated while the ordering key is not we will lose to the CPU because the CPU just has to do ordering for a very simple operation.

tools/generated_files/311/operatorsScore.csv Outdated Show resolved Hide resolved
tools/generated_files/312/operatorsScore.csv Outdated Show resolved Hide resolved
tools/generated_files/312/supportedExprs.csv Outdated Show resolved Hide resolved
tools/generated_files/313/supportedExprs.csv Outdated Show resolved Hide resolved
@@ -1339,6 +1339,43 @@ def test_generic_reductions(data_gen):
'count(1)'),
conf=local_conf)

# min_by and max_by are supported for pyspark since 3.3.0 so tested with sql
@ignore_order(local=True)
@pytest.mark.parametrize('data_gen', all_basic_gens + struct_gens_sample_with_decimal128 + array_gens_sample, ids=idfn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code says that maps are allowed for the value, not the order by key. Could we add some tests to verify that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
@thirtiseven
Copy link
Collaborator Author

thirtiseven commented Aug 27, 2024

I am also concerned with the performance numbers. I don't know how to reproduce those numbers because df is not included. A speedup of 36% over the CPU is not much, and I am very concerned that if the value of the aggregation is long and complicated while the ordering key is not we will lose to the CPU because the CPU just has to do ordering for a very simple operation.

Thanks for the review! Added data generation part and updated numbers in pr description, and another round test of "a byte, b struct<string, array<long>>, c int", it got 2.65x speedup.

I think in most use cases the ordering key is unique, so the compare function will almost always stop early before getting into the complicated value part, so it won't be slower.

@sameerz sameerz added the feature request New feature or request label Aug 27, 2024
@firestarman
Copy link
Collaborator

build

@thirtiseven thirtiseven merged commit dbd92d2 into NVIDIA:branch-24.10 Aug 29, 2024
44 of 45 checks passed
@thirtiseven thirtiseven deleted the min_by_plugin_only branch August 29, 2024 13:49
@revans2
Copy link
Collaborator

revans2 commented Aug 29, 2024

@thirtiseven could you please file a follow on issue to look at ways to improve the performance of these operators. I did my own benchmarking and this is far from an ideal solution. CUDF does not deal well with min/max on a struct of two values. I ran the following benchmarks.

I have 4 different data sets and 4 separate queries on those data sets. All of the tests were run on a 32 core thread ripper with spark configured to use 32 cores. The GPU is an a6000, but limited to 16 GiB of GPU memory.

Dataset 1: (100 group by keys)

spark.range(10000000000L).selectExpr("id div 100000000 as a", "id div 1000000 as b", "id as c").createOrReplaceTempView("tbl")

Dataset 2: (100 group by keys, but made into structs before agg) The 2048 partitions is to eliminate spilling on the sort for the CPU. I also ran one with the GPU at 64 partitions to make it more comparable to other CPU jobs.

spark.range(1, 10000000000L, 1 , 2048).selectExpr("id div 100000000 as a", "named_struct('f', id div 1000000, 'o', id) as b", "named_struct('o', id, 'f', id div 1000000) as c").createOrReplaceTempView("tbl")

Dataset 3: (1 million group by keys)

spark.range(10000000000L).selectExpr("id div 10000 as a", "id div 100 as b", "id as c").createOrReplaceTempView("tbl")

Dataset 4: (1 million group by keys, but made into structs before agg) The 2048 partitions is to eliminate spilling on the sort for the CPU. I also ran one with the GPU at 64 partitions to make it more comparable to other CPU jobs.

spark.range(1, 10000000000L, 1 , 2048).selectExpr("id div 1000 as a", "named_struct('f', id div 100, 'o', id) as b", "named_struct('o', id, 'f', id div 100) as c").createOrReplaceTempView("tbl")

Query 1: (group by min_by and max_by)

spark.time(spark.sql("SELECT a, min_by(c,b), max_by(c,b), min_by(b, c), max_by(b, c) FROM tbl GROUP BY a ORDER BY a").show())

Query 2: (group by min and max)

spark.time(spark.sql("SELECT a, min(c), max(c), min(b), max(b) FROM tbl GROUP BY a ORDER BY a").show())

Query 3: (min_by and max_by reduction)

spark.time(spark.sql("SELECT min_by(c,b), max_by(c,b), min_by(b, c), max_by(b, c) FROM tbl").show())

Query 4: (min and max reduction)

spark.time(spark.sql("SELECT min(c), max(c), min(b), max(b) FROM tbl").show())
hardware data set Q1 Q2 Q3 Q4
GPU DS1 42,988 7,508 4,689 952
CPU DS1 11,602 6,549 2,822 1,540
GPU DS2(2048) 56,256 44,122 23,232 7,053
CPU DS2(2048) 482,006 272,889 418,851 222,792
GPU DS2(64) 50,855 41,485 9,438 4,074
GPU DS3 42,686 3,698 4,518 953
CPU DS3 10,837 7,073 2,645 1,540
GPU DS4(2048) 57,188 45,757 20,856 6,928
CPU DS4(2048) 492,970 289,821 418,500 221,631
GPU DS4(64) 52,555 42,694 9,199 3,896

A few things to note from the results.

  1. A min/max on a long (DS1 - Q2) is about 5.7 times faster than a struct(long, long) (DS1 - Q1) (because in CUDF we are forced to fall back to a sort aggregate for the struct operation.
  2. The CPU saw less than a 2x speedup for the same change.
  3. For DS2(2048) and DS2(64) the GPU saw very little speedup because now both are sorted.
  4. For DS2(2048) the CPU saw a very similar speedup of a little less that 2x.
  5. We know that GPU has some issue with a small number of group by keys in aggregations compared to reductions. The CPU shows no such problems.

In general I think if we went with the argmin and then a gather for many fixed width types we would see a huge performance improvement. If the they are not fixed width, then we would not likely see much performance improvement, but we might see a very small amount, just because there would be less data to look at.

@thirtiseven
Copy link
Collaborator Author

@revans2 Thanks for the testing and analysis! filed #11412

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[FEA] support min_by function
4 participants