Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] explore using hyper-log-log to estimate the if we should continue with a partial aggregation. #11042

Open
revans2 opened this issue Jun 11, 2024 · 6 comments
Labels
feature request New feature or request performance A performance related task/issue

Comments

@revans2
Copy link
Collaborator

revans2 commented Jun 11, 2024

Is your feature request related to a problem? Please describe.

In Spark if we are doing a distributed partial aggregation we technically don't need to complete the aggregation for the task to get the correct answer after the shuffle and final aggregation is complete. The partial aggregation really is a performance optimization to try and reduce the size of the data before shuffling it. But at the same time there are cases where there really is no point in trying to combine all of the data together, because it will not reduce the size of the shuffled data. Enough of the keys within a task are unique, so any computation that we do will just be wasted.

#10950 is an attempt to address this, but it has some serious flaws in that it assumes that keys for a task, that can be combined, will appear close to each other in the task's data. I think a lot of the time this is true, but not all of the time. Unless the window we are looking at is for the entire task we have no way to guarantee that combining more data would or would not be good. If we know the total size of the data we can make some statistical assumptions, but we don't always know how large the data will be until we have seen it. And if we wait until the end of processing before we make a decision we will have already done a lot of processing and might have spilled a lot of data too.

#5199, rapidsai/cudf#10652, and NVIDIA/cuCollections#429 are issues to add in a HyperLogLog++ implementation on the GPU to help us do a very fast and efficient approximate count distinct operation. With that we should be able to make an informed decision at any point in time if the batches of data we have seen so far would combine to something smaller if we finished aggregating/combine them.

This does not solve all of the issues though.

  • In the common case we will only see a single input batch, or a small number of batches that likely combine together efficiently. We don't want to do extra computation unless we need to.
  • We also don't want to have to do a full first pass through all of the data before making some kind of a decision. For very large inputs with little to no chance of combining results we might generate a lot of memory pressure trying to keep everything in memory which can lead to a lot of spilling.
  • We also need to think about sorting vs hash partitioning the intermediate data instead of sorting it [FEA] Have a repartition fallback for hash aggregates instead of sort #10370
  • And finally we need to think about the optimization that was put in to avoid memory pressure when there are more output columns from an aggregation than input columns to it. In that case we may sort the input data proactively instead of taking the risk of sorting the intermediate data that appears would be larger.

Describe the solution you'd like
I propose that we keep some things the same as today. If we know that the input data is sorted start to do a first pass aggregation like we do today and release the batches as they are produced.

We also keep the sort fallback and heuristic for pre-sorting the data. The heuristic would be updated to do a approximate count distinct instead of a full count distinct though. I think those can/should be addressed separately as a way to possibly reduce memory pressure and spill.

With the current code we do a full pass through all of the input data and put the results for each aggregation into a queue. Once that pass is done, if there were multiple input batches, then we will try and combine them together through some merge aggregations. We will take groups of intermediate batches that add up to a size we know will fit in our GPU memory budget. We will concat them into a single batch, and then do the merge aggregation. This will happen in a loop until we either have a single batch or no two consecutive batches could be combined together. They have to be consecutive batches for first/last to work properly. If there are still multiple batches after this second merge phase, then we will sort the data by the grouping keys and finally do a merge aggregation pass before outputting the results. This can result in three full passes through all of the input data, all of which is kept in memory, and might be spilled.

Instead I propose that we will start to do the initial pass through the data like we do today. But instead of stopping only after the first pass is complete. We will stop once we have enough data that it looks like it is worth trying to combine them. If that happens, then we will combine the results and calculate a HyperLogLog++ on the result. We will also start to calculate a HyperLogLog++ result for each subsequent batch we see. With these we can quickly estimate how much of a size reduction we will see if we combined multiple batches together, by combining the sketches and estimating the unique count. If the data looks like we can combine things and stay under our limit, then we should keep trying to combine them and update the HyperLogLog++ buffers for the newly produced batches. If they look like we cannot combine things to stay under our budget, then things start to be difficult, as we have a few choices.

  1. We could sort the intermediate data + any new intermediate data that might be produced and output the batches from that. This might be replaced with hash partitioning in the future. [FEA] Have a repartition fallback for hash aggregates instead of sort #10370
  2. we could just output the intermediate data we have seen so far with no further processing, and then start over with aggregating the rest of the data.
  3. We could finish doing an initial pass through all of the data + combining as needed before we make a choice. That choice will be made using the HypeLogLog++ estimates. Then we will choose option 1 or 2.

The upside of option 1 is that we produce a full output like Spark on the CPU would. The downside is that this is mostly what we do today and there was little point in calculating the HyperLogLog++'s if we go this route. It just might reduce the amount of memory pressure we are under in a few cases by more aggressively combining batches early instead of waiting until the first pass is complete.

The upside of option 2 is that we don't do a sort at all, which can be really expensive in terms of memory pressure and computation. Even with #10370 not doing something is going to be faster than doing something more efficiently. The downside is that there is an unknown probability that the size of the shuffle data will be larger.

The upside of option 3 is that we now have complete, or nearly complete information, before we make a choice. The downside is that we have to have done a complete pass through the data, which can lead to increased memory pressure.

To balance these I would like to see us start with option 3, as the memory pressure is no worse than it is today and it might get to be better if we start to combine things aggressively. If we know that there are no first or last aggregations, we could even try to combine them out of order if the sketches indicate that would be good.

If the data indicates that we have a lot of things that could be combined together based off of the ratio of approx_count_distinct vs the actual number of rows we have pending. Then we sort/re-partition to combine. If the data indicates that there is little if anything to combine, then we just start to release the partial aggregations that we have done so far.

@revans2 revans2 added feature request New feature or request ? - Needs Triage Need team to review and classify performance A performance related task/issue labels Jun 11, 2024
@revans2
Copy link
Collaborator Author

revans2 commented Jun 12, 2024

I do want to add a few things.

  1. If we know the total input size, like if the data is coming directly from a shuffle along with some AQE updates or if it is being read from files, with no predicate push down or column pruning. We might be able to use some statistics to decide to stop looking at the data early. Those would have to be based on the assumption that the distribution of the data is the same as what we have seen. But the reality is that we don't know the distribution of the data. Even then that assumption is not a bad one. But the problem is that it is super rare for an aggregation to be in a position to know what the input data size is ahead of time. Because of that we decided not to go this route, but it might be worth looking into anyways, even without the total input size available. Like if we guess that in the absence of any data we will see one more batch. I don't know it is something we need to think about.
  2. We also thought about using spill as a trigger to start releasing data instead of asking for more data. This has a few downsides. First we are in a memory constrained situation when we decide that we need to spill. By releasing data we are going to potentially try to pull data back into memory (we need to process data in order for consistent first/last operations). We could detect that we are not doing a first/last and we could update the code to only use batches that are already in memory to begin with. The second down side is that we are likely to be in an inconsistent state and be part of the problem. In the common case there are two threads running on the GPU. That means 50% of the time, possibly more, we are the ones that were in the middle of an operation and asked for more memory. So we would have to treat this like an interrupt and not do any processing in the callback. We would have to just flip a bit and the next time we get a change to run we would start to release data already in memory instead of trying harder to combine things. But the memory pressure might have changed drastically at that point. Might still be worth trying to see what happens.

@revans2
Copy link
Collaborator Author

revans2 commented Jun 12, 2024

A comment from @binmahone triggered another idea for me about a possible optimization that we could do with HyperLogLog++ sketches for each batch. We could have a custom GPU kernel that could evaluate in parallel all, or at least a very large number of combinations of batches. This would let us very quickly decide which batches, if any, would be worth combining and which might have no overlap with other batches. Essentially a clustering algorithm for the batches based on estimated overlap between them. It would need to know if we needed to maintain order or not (first/last), but that should not be too hard to deal with. Any batch with effectively no overlap at all could be released when it is ready (again first/last). I am not sure how common it is for overlaps to not occur between batches, but I can see it happening. For example if specific events occur in bursts, we could end up with things that combine well for a few batches, but not for others.

@revans2
Copy link
Collaborator Author

revans2 commented Jun 13, 2024

Another comment from @binmahone has spurred some more ideas about how we can/should integrate the per-sort optimization from #8618 into this work.

This issue describes doing a first aggregation pass over all of the data, with some more aggressive merging of data when specific size limits are met. It also talks about doing a full pass over the data before deciding what to do next. I don't see that changing with respect to #8618, but what I do see changing is when we do the first pass aggregation. I would say that we start off just like what is described here and do a first pass aggregation on the input batches until we hit an intermediate result that would go over the limit we have set. At that point we would calculate the HyperLogLog++ for the output of that aggregation result. If the high level plan heuristic from #8618 indicates that it is worth checking if the input will be smaller than the output then we first calculate the HyperLogLog++ of the input batche. If it looks like there is very little that could be combined (the input size is smaller than the output size). Then we keep the batch in a pre-aggregated state. The HyperLogLog++ sketch for this batch could still be used to/combined with the sketches from batches that have been aggregated to make decisions about how to combine batches together. If we decide that it is worth combining batches together. Then we can lazily do the first pass aggregation on the batches that need it before doing a merge aggregation afterwards. We also could look at doing hash partitioning of the data after doing a full pass over the data. A full pass would not necessarily involve doing a first pass aggregation. It would just involve having read in and cached all of the data for the task so we can make an informed decision on what to do next. That might involve re-partitioning the data before we do any aggregation on it. We can also use the approximate distinct count for all of the data to decide how many partitions we would need before doing that.

@mattahrens mattahrens removed the ? - Needs Triage Need team to review and classify label Jun 13, 2024
@binmahone
Copy link
Collaborator

binmahone commented Jun 14, 2024

quick question: why are you highlighting They have to be consecutive batches for first/last to work properly while agg functions like first is documented as non-deterministic in https://docs.databricks.com/en/sql/language-manual/functions/first.html ? @revans2

@revans2
Copy link
Collaborator Author

revans2 commented Jun 14, 2024

quick question: why are you highlighting They have to be consecutive batches for first/last to work properly while agg functions like first is documented as non-deterministic in https://docs.databricks.com/en/sql/language-manual/functions/first.html ? @revans2

tldr; Technically yes you are 100% correct.

But here is the long answer. It is not perfect though because there are a lot of caveats with it. Technically yes they don't have to be that way. But the reality is that they are close to deterministic enough that users start to assume that it is deterministic. Especially when running small tests in local mode. Also we can match it so why not? If we see a huge performance improvement from not matching the CPU then we can put that behavior under a flag and have it be fast by default. For example in local mode without any failures the following will produce the same result all of the time.

df.groupBy("groupingKey").agg("FIRST(b)")

This is because FIRST is deterministic on the CPU except when it comes to shuffle. But the order of local batches is deterministic and so when all batches are local batches the order is deterministic.

If we want to abuse Spark a little with some knowledge about how it works we could even do.

df.partitionBy("groupingKey").sortWithinPartitions("a").groupBy("groupingKey").agg("FIRST(b)")

and it will produce the same result every single time.

@liurenjie1024
Copy link
Collaborator

Thanks @revans2 for raising this, it's an interesting idea to determine if we should do a partial agg at runtime. There are also other approaches to determine if we should do partial aggregation in other system:

  1. History based optimizer. In traditional database system we use cost base optimizer to determine if we should add an extra partial aggregation, which is not pratical in bigdata system since usually we don't have enough statistics and the cardinality estimation is not accurate enough. But it's feasible to use statistics collected from history run of sql queries since it doesn't change too much for daily run etl queries, and this is what we can learn from prestodb.

  2. Another approach is simpler, which limits the hashtable size of partial aggregation and stops processing it in the partial aggregation phase when the hashtable grows beyond this size. This is how duckdb works%20We%20will,in%20some%20manner), and IIRC, snowflake has similar optimizations.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature request New feature or request performance A performance related task/issue
Projects
None yet
Development

No branches or pull requests

4 participants