Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a heuristic to skip second or third agg pass #10950
Add a heuristic to skip second or third agg pass #10950
Changes from all commits
bb35e1e
8010c01
e5b2fef
7451f84
4af0c72
35834cb
97230b5
f0f47bd
91b877c
c9925c3
c62838a
c4c5053
9e79773
ac4801b
30961ab
283a4a5
3e51b42
File filter
Filter by extension
Conversations
Jump to
There are no files selected for viewing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please correct me if I get anything wrong about the algorithm.
It looks like we do a first pass over the data and we place the results in the
aggregatedBatches
queue. We keep track of the total number of rows that survived that initial aggregation pass along with the number of rows that were input to this aggregation. If the number of rows after the first agg pass was reduced by at least X% where X is (1 - spark.rapids.sql.agg.skipAggPassReductionRatio), then we continue trying to combine the results together.Otherwise we skip the "shouldSkipThirdPassAgg", which is confusing because we are not doing a third pass, this would be a second aggregation pass, but whatever.
Combining the results together is done by calling
tryMergeAggregatedBatches()
. This code will try and concatenate batches together up to a target input size and then merge those batches. It stops if we get to a single output batch, or we hit a situation where we could not find two batches that would fit in the target merge batch size.At that point if we didn't reduce the number of rows by X% again, then we will also "shouldSkipThirdPassAgg"
At this point either the data is aggregated into a single batch that we can output, or we have multiple batches to deal with. If we are going to skip the third pass agg (and it is not a final aggregation), then we just output what we have done so far. Otherwise we do the sort fallback iterator.
That looks okay to me, but I am curious if you think we are being aggressive enough with this? and do we want follow on work to see if we can make it more aggressive. I am just a little concerned that we might do an entire first pass over multiple batches, when it is clear from the very first batch that this is just not going to work. We have to be very careful that we don't change the order of the batches so things like first and last still work correctly, but that should not be too difficult.
What if we read an input batch, and check if it reduced the number of rows by X% if not, we just instantly return it. No need to cache that batch at all. We just let downstream deal with the large batch we just saw. We keep doing that until we hit a situation where we did see something reduce in size. Then we start to cache them, so that we don't mess up the order. I am concerned about spilling and memory pressure . A map side aggregation is almost always going to feed into a shuffle and if we can release some of the batch batches quickly we reduce the GPU memory pressure because it should be pulled back off of the GPU without us needing to cache anything.
but it looks like we ask the firstPassIterator if there is more to process. firstPassIterator reads in the original input data, does the pre-project, and does an initial aggregation. aggregateInputBatches reads in those input batches and places them into a queue
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
second pass agg -> tryMergeAggregatedBatches() , third pass agg -> buildSortFallbackIterator, the naming should be correct.