-
Notifications
You must be signed in to change notification settings - Fork 232
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support MinBy and MaxBy for non-float ordering #11371
Support MinBy and MaxBy for non-float ordering #11371
Conversation
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Firestarman <firestarmanllc@gmail.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
build |
Looks good to me, but it got failures in premerge. |
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am also concerned with the performance numbers. I don't know how to reproduce those numbers because df is not included. A speedup of 36% over the CPU is not much, and I am very concerned that if the value of the aggregation is long and complicated while the ordering key is not we will lose to the CPU because the CPU just has to do ordering for a very simple operation.
@@ -1339,6 +1339,43 @@ def test_generic_reductions(data_gen): | |||
'count(1)'), | |||
conf=local_conf) | |||
|
|||
# min_by and max_by are supported for pyspark since 3.3.0 so tested with sql | |||
@ignore_order(local=True) | |||
@pytest.mark.parametrize('data_gen', all_basic_gens + struct_gens_sample_with_decimal128 + array_gens_sample, ids=idfn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code says that maps are allowed for the value, not the order by key. Could we add some tests to verify that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Show resolved
Hide resolved
sql-plugin/src/main/scala/org/apache/spark/sql/rapids/aggregate/aggregateFunctions.scala
Outdated
Show resolved
Hide resolved
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Signed-off-by: Haoyang Li <haoyangl@nvidia.com>
Thanks for the review! Added data generation part and updated numbers in pr description, and another round test of I think in most use cases the ordering key is unique, so the compare function will almost always stop early before getting into the complicated value part, so it won't be slower. |
build |
@thirtiseven could you please file a follow on issue to look at ways to improve the performance of these operators. I did my own benchmarking and this is far from an ideal solution. CUDF does not deal well with min/max on a struct of two values. I ran the following benchmarks. I have 4 different data sets and 4 separate queries on those data sets. All of the tests were run on a 32 core thread ripper with spark configured to use 32 cores. The GPU is an a6000, but limited to 16 GiB of GPU memory. Dataset 1: (100 group by keys)
Dataset 2: (100 group by keys, but made into structs before agg) The 2048 partitions is to eliminate spilling on the sort for the CPU. I also ran one with the GPU at 64 partitions to make it more comparable to other CPU jobs.
Dataset 3: (1 million group by keys)
Dataset 4: (1 million group by keys, but made into structs before agg) The 2048 partitions is to eliminate spilling on the sort for the CPU. I also ran one with the GPU at 64 partitions to make it more comparable to other CPU jobs.
Query 1: (group by min_by and max_by)
Query 2: (group by min and max)
Query 3: (min_by and max_by reduction)
Query 4: (min and max reduction)
A few things to note from the results.
In general I think if we went with the |
Closes #10968
This PR supports MinBy and MaxBy aggregation for non-float ordering, which is not required by the customer.
The main idea is to pack the ordering and value columns to a struct column and do a min/max aggregation on them. When the ordering column is not unique this pr will always return the minimal value while spark's result is non-deterministic.
So this pr is not using the previous argmin + remove nulls + gather method in rapidsai/cudf#16163 and #11123. It does not have cudf dependency pr now.
Spark has a special NaN handling which is different with cuDF, so it will be not straightforward to implement float ordering min/max_by with current method. But it is a rare use case I think we don't need to support for now.
Many works are from @firestarman
Ran a simple perf test for 3 times
on 500000000 rows of (a: byte, b: long, c: long)
results:
Another round with a complex value and simple order: