[FEA] Do a hash based re-partition instead of a sort based fallback for hash aggregate #8391
Labels
feature request
New feature or request
performance
A performance related task/issue
reliability
Features to improve reliability or bugs that severly impact the reliability of the plugin
Is your feature request related to a problem? Please describe.
Currently if the intermediate data for a hash aggregate is too large we fall back to sorting the data, splitting it on key boundaries and then merging the intermediate results.
spark-rapids/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala
Lines 745 to 772 in 9d9d491
This works, but we know that sort is very expensive, especially compared to doing hash partitioning.
#8390 was filed to see if we can find a cheaper way to avoid spilling as much memory as we build up batches, and also as a way to avoid doing the sort/key partitioning.
This is here to try and replace the sort/key partitioning entirely, and may even replace some of #8390 assuming that it works out well.
Describe the solution you'd like
I would like a patch that replaces the
outOfCoreIter
andkeyBatchingIter
with something closer to what happens in theGpuSubPartitionHashJoin
https://github.com/NVIDIA/spark-rapids/blob/branch-23.06/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/GpuSubPartitionHashJoin.scala
The idea would be to partition each intermediate batch using a hash seed that is different from the one used to partition the data in a hash partitioning. Unlike join I think we could use a different partitioning implementation because we don't need to join keys of possibly different types together, but it might be nice to keep the code common if possible.
I was thinking that as soon as we see intermediate batches are larger than the target batch size, and we are not going to just output the data without a merge because of #7404, then we start to re-partition the intermediate results.
In the first version of this we do it just like join. We do one pass through the data and partition it 16 ways. After that first pass we can combine small partitions together and process them to produce a result that we can output. If there are any partitions larger than the target batch size we can split them further with a heuristic that should be fairly accurate based off of the size of the data, but again it should use a different seed that the previous passes. Then we can merge them and output them from the second pass.
If #8390 looks good it might me worth thinking about playing some games with merging intermediate results after we do the partitioning instead of doing a second pass at partitioning, but at that point we likely have 4 GiB of data cached (512 MiB per partition with 16 partitions) and I don't know how likely all of that has spilled, and if it has spilled is the cost to read back in that data is worth the savings in merging it.
For now I would say just stick with something simple and then we can modify it as we see needs.
As for testing I want to see a number of different situations that we test from both a performance and a reliability standpoint. All of these would be from the standpoint of a lot of data going into a small number of tasks. Like we had way too few shuffle partitions for the size of the data. A lot of this is going to really be about the cardinality and ordering of the grouping keys.
I want to see what happens when the key's
1. cardinality is high and is highly grouped (the data is almost sorted by the key, should get good combining initially, but not after a first pass)
2. cardinality is high and is randomly distributed (should get almost no combining in the partial)
3. cardinality is low and is highly grouped
4. cardinality is low and is randomly distributed
5. cardinality is medium and is high grouped
6. cardinality is medium and is randomly distributed
By high cardinality I mean we each key shows up 2 to 3 times in the entire dataset, for medium 200 to 300 times, and for low 20,000 to 30,000 times. But we want enough data that a single task cannot hold all of it in GPU memory. At least a few hundred GiB of data.
With this I would also like to see how it behaves with large numbers of aggregations. That can increase the size of the input data by adding lots of new columns, but also when combined with #8382 we might get lots of batches with relatively few rows in each.
The text was updated successfully, but these errors were encountered: