Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heuristic to speed up partial aggregates that get larger #8618

Merged
merged 5 commits into from
Jun 29, 2023

Conversation

revans2
Copy link
Collaborator

@revans2 revans2 commented Jun 27, 2023

This is my first PR for an optimization for aggregations where there are a lot of aggregations. Like hundreds of aggregations. The problem that I ran into when trying to do lots of aggregations is that the size of the aggregation data could be larger than the size of the input data. We try to cache the aggregation data to match Spark exactly so that no matter where a key shows up in the input we can make sure that it was merged with all other keys in the input and you get an output with no duplicate keys in it. If you combine those two together you end up with caching a lot of data.

What this does is that it tries to detect the situation where an aggregation might grow in size. If it does see that the agg would grow, then it switches over to doing a sort aggregation instead. Why a sort aggregation? It is a compromise. We could just take the input a batch at a time and return it. That would be something like #7404, but if the input is large (lots of batches) we might end up with shuffling a lot of extra data around. By sorting the input we still end up caching all of the data, but we are caching the smaller amount of data (in this case the input data) and we know that the number of duplicate rows we have (the amount of extra data being shuffled) would only be on the order of the number of batches we process.

This is not perfect. Sort can be very expensive. This is especially true if there are multiple columns or the columns are complex. For now I am limiting this heuristic to just a single aggregation column. But I plan to file a follow on to understand better the cost of sort so we can hopefully expand that to something more appropriate.

This also only works on partial aggregates. Why? Well final aggregates don't really grow in size by adding new columns or larger columns. The data already grew in the partial. And I just have not spent much time for complete aggs. I'll file a follow on issue for that too.

It turns out that most (probably all) aggregations are faster when the data is already sorted. So if the data is already sorted we just do the partial pass through agg.

At some point there are so many aggs that sorting the data and then doing the aggs is cheaper than just doing the hash aggs. I don't have cost models for those yet, so I will file a separate follow on for that too.

And finally this does not fully address #8398. It does not look at the idea of splitting the data by column instead of by row. This still does rows. That will be another follow on.

I need to run some more benchmarks at scale, but for now when doing 300 aggregations with a simple key (using TPC-DS data at scale factor 200).

val query_str = s"""SELECT CAST(ss_store_sk as $dt), ${permute_cols.combinations(4).take(num_aggs).map(f => f.mkString("SUM(", " * ", s") as sum_${f.mkString("")}")).mkString(",\n")} FROM agg_test GROUP BY CAST(ss_store_sk as $dt)"""

where agg_test is really store_sales with some columns renamed, the query sped up from 1,030 seconds to 430 seconds.

Most of the code changes are refactoring aggregate so I could split parts of it up into different iterators that I could choose from to make the agg operation I wanted, but a lot of white space changed too, so diffing it with -b to ignore white space changes makes it a lot simpler to follow what happened.

Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
@revans2
Copy link
Collaborator Author

revans2 commented Jun 27, 2023

build

@revans2
Copy link
Collaborator Author

revans2 commented Jun 28, 2023

build

@revans2
Copy link
Collaborator Author

revans2 commented Jun 28, 2023

Hmm, the markdown check failure was for a PGP key that when I go to the link it is fine, so I think it was just a one off. I'll try to trigger it again.

@revans2
Copy link
Collaborator Author

revans2 commented Jun 28, 2023

build

abellina
abellina previously approved these changes Jun 29, 2023
Copy link
Collaborator

@abellina abellina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes look great to me. I had minor nits that could be ignored.


val FORCE_SINGLE_PASS_PARTIAL_SORT_AGG: ConfEntryWithDefault[Boolean] =
conf("spark.rapids.sql.agg.forceSinglePassPartialSort")
.doc("Force a single pass partial sort agg to happen in all cases that it could, " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, indentation is off here.

}

// Partial mode:
// 1. boundInputReferences: picks column from raw input
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to your change, boundInputReferences is no longer here :( we should remove from the comments (I missed this)

sbIter.map { sb =>
withResource(new NvtxWithMetrics("finalize agg", NvtxColor.DARK_GREEN, aggTime,
opTime)) { _ =>
val finalBatch = boundExpressions.boundFinalProjections.map { exprs =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, since this is the final step for a partial (pre shuffle), should we call it batchToShuffle? To distinguish it from the batch that is merged and ready for a parent to process after a shuffle.

A comment around this function would help call out this is to be used in the single-pass agg case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it is not guaranteed to be that way. This same code is also used for a complete or final aggregation. This is the "final" step of any of the aggregations. Should I rename it so that it is a last pass instead of a final pass so that the terms don't conflict with the aggregation types?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And if I do, then I would want to rename boundFinalProjections right?

jlowe
jlowe previously approved these changes Jun 29, 2023
@revans2 revans2 dismissed stale reviews from jlowe and abellina via 3c75b69 June 29, 2023 15:09
@revans2
Copy link
Collaborator Author

revans2 commented Jun 29, 2023

build

@revans2
Copy link
Collaborator Author

revans2 commented Jun 29, 2023

@jlowe and @abellina please take another look

@revans2 revans2 merged commit 33c63fd into NVIDIA:branch-23.08 Jun 29, 2023
@revans2 revans2 deleted the large_agg_opt branch June 29, 2023 18:12
@sameerz sameerz added the reliability Features to improve reliability or bugs that severly impact the reliability of the plugin label Jun 30, 2023
@sameerz sameerz added the performance A performance related task/issue label Jun 30, 2023
(math.max(minPreGrowth, estimatedPreGrowth) * cardinality) / numRows
}
val wrappedIter = Seq(cb).toIterator ++ cbIter
(wrappedIter, estimatedGrowthAfterAgg > 1.0)
Copy link
Collaborator

@binmahone binmahone Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 1.0 too small as a threshold? currently a simple query like below will incur single pass:

spark.time(spark.range(0, 1000000000L, 1, 2).selectExpr("id + 1 as key", "id as value").groupBy("key").agg(avg(col("value")).alias("avg_v")).orderBy(desc("avg_v"), col("key")).show())

The reason is simply because avg aggregate will use two buffers (count and sum), so the total size after aggregate is 1.5x of the input.

Since you mentioned This is my first PR for an optimization for aggregations where there are a lot of aggregations , obviously the example query can't be counted as a lot of aggregations

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The heuristic is simply looking at the cost of sorting now vs sorting later. 1.0 is the correct cutoff for that decision. The assumption that a sort will happen later is what is not correct. Nor is the assumption that a single batch is representative of the entire task's data. But we don't have the information we need to know if that sort is going to happen or not. We don't know how much data is coming later on. This could be the only batch in which case the sort is useless. There could be 100 more batches coming. If that is true, then sorting the data now will be faster than sorting it later on because there is less data to spill (even if it is just a little less data). Also we don't have a good way to start sorting later after we have already done part of the operation.

If we want to fix this we really want to fix all of aggregation and have unified heuristics to decide what to do an when.

#11042

describes a starting point of moving to HyperLogLog for doing some operations. That includes updating this code to do an approximate count distinct instead of a full on count distinct (assuming the performance is good enough).

We also have #8622 which is to replace the sort here with a hash re-partition operation #11042 also talks about doing hash re-partitioning instead of sorting.

To me what we want is a somewhat lazy aggregation operation. I'll update #11042 to reflect some ideas I have for this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
performance A performance related task/issue reliability Features to improve reliability or bugs that severly impact the reliability of the plugin
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants