Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] See if an intermediate merge helps reduce memory in hash aggregate #8390

Open
revans2 opened this issue May 24, 2023 · 2 comments
Open
Labels
feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin

Comments

@revans2
Copy link
Collaborator

revans2 commented May 24, 2023

Is your feature request related to a problem? Please describe.
Currently when doing a hash aggregate we will do an initial aggregate per batch and save away the resulting batch. When we are done we look at the size of those intermediate batches and if it is too large we do a sort + key partitioning pass on the intermediate data and then can do the final pass per batch that comes out.

This generally works, but we might end up using a lot more intermediate data to store those merge results than we should, which might cause us to sort more often than we need to. This would happen when we have a lot of input data, multiple batches, which reduces the size decently, but the keys are evenly distributed through all the batches. It is rather common to have something like a GpuExpandExec or a join that explodes right before an aggregation that drops the size of the data down to something really small again.

It would be great to test out what happens if we put in a heuristic where we have a merging of intermediate batches.

When a batch is added we would see if there is more than one pending batch and if the sum of those batches is larger than 1/2 the target batch size. It should probably not take into account batches that have already been merged once when making the decision to merge batches. If the check passes then we would concat all of those batches together, including ones that have already been merged previously, unless the size would be too large..., and do a merge aggregation on them which would replace the existing input batches with the new merged result. If the size of the output data did not go down by at least some percentage (I am thinking 50%, but we should play with this) then we set a flag and stop trying to merge the intermediate results. This would be to protect us against situations where it looks like we are not going to combine anything at all.

This is related #7404 where if ti really looks like nothing is going to combine on a partial aggregate, it might be worth just passing the batches on through to the shuffle. But we might want to have the heuristic for that be a little higher than 50%. Would need to play with both of those to see what really works well. To be clear #7404 should be separate from this. I just think if we do this and the numbers look good, it might be nice to see what happens if we add #7404 to it because it should be fairly simple to do.

As for testing I want to see a number of different situations that we test from both a performance and a reliability standpoint. All of these would be from the standpoint of a lot of data going into a small number of tasks. Like we had way too few shuffle partitions for the size of the data. A lot of this is going to really be about the cardinality and ordering of the grouping keys.

I want to see what happens when the key's
1. cardinality is high and is highly grouped (the data is almost sorted by the key, should get good combining initially, but not after a first pass)
2. cardinality is high and is randomly distributed (should get almost no combining in the partial)
3. cardinality is low and is highly grouped
4. cardinality is low and is randomly distributed
5. cardinality is medium and is high grouped
6. cardinality is medium and is randomly distributed

By high cardinality I mean we each key shows up 2 to 3 times in the entire dataset, for medium 200 to 300 times, and for low 20,000 to 30,000 times. But we want enough data that a single task cannot hold all of it in GPU memory. At least a few hundred GiB of data.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify labels May 24, 2023
@mattahrens mattahrens added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label May 25, 2023
@revans2 revans2 added the performance A performance related task/issue label May 25, 2023
@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label May 31, 2023
@binmahone
Copy link
Collaborator

Hi @revans2 , is this issue same thing as what we do in com.nvidia.spark.rapids.GpuMergeAggregateIterator#tryMergeAggregatedBatches today?

@revans2
Copy link
Collaborator Author

revans2 commented Jul 16, 2024

@binmahone sorry for the late reply. No it is not exactly the same as what happens today with tryMergeAggregatedBatches. tryMergeAggregatedBatches happens after an initial aggregation pass through all of the input data is done. This is saying lets see if we can be more aggressive with the merging. As soon as we hit a merge window size, lets try and merge that data instead of waiting until the end. If it reduces the size significantly, then we can go on and process more data the same way, but with a smaller footprint/spill. If it does not reduce the size, then we might want to take it as feedback and decide if we should fall back to doing hash repartitioning/sort instead.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

No branches or pull requests

3 participants