Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEA] support for running window aggregations #8440

Closed
revans2 opened this issue Jun 4, 2021 · 10 comments · Fixed by #8652
Closed

[FEA] support for running window aggregations #8440

revans2 opened this issue Jun 4, 2021 · 10 comments · Fixed by #8652
Labels
0 - Backlog In queue waiting for assignment feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Performance Performance related issue Spark Functionality that helps Spark RAPIDS

Comments

@revans2
Copy link
Contributor

revans2 commented Jun 4, 2021

Is your feature request related to a problem? Please describe.
This really is a performance improvement, but it can be a very big performance improvement, and an important one. There is a very specific rolling window called a running window. This is unbounded preceding and no rows following. It is the default for Spark if no range is provided and is fairly commonly used. The problem is that the generic window processing has really bad scaling for large windows. It can very easily take over the entire GPU for long periods of time. For example the following are measurements I took on a V100 16 GiB doing a rolling window sum aggregation for the entire column.

rows (millions) op time (GPU) spark time (CPU single core) spark time (GPU) inclusive_scan time predicted spark with inclusive_scan (GPU) predicted speedup vs CPU current speedup vs CPU
1 336 329 411 < 1 75 4.39 0.80
2.5 1926 591 2043 < 1 117 5.05 0.29
4 4920 870 5073 < 1 153 5.69 0.17
5 7662 1093 7814 < 1 152 7.19 0.14
7.5 17364 1511 17506 < 1 142 10.64 0.09
8 19722 1601 19930 < 1 208 7.70 0.08
10 30825 2042 31014 < 1 189 10.80 0.07
12.5 48239 2464 48289 < 1 50 49.28 0.05
16 79113 3139 79241 < 1 128 24.52 0.04
250 19369344.8 predicted 51569 19369344.8 predicted 5 255 202.23 0.00

window sum time vs  rows(1)

To do a window of 250 million rows the curve fitting predicts it will take over 5 hours to complete. Yes this is a really extreme case, but it is here just to illustrate why this is needed.

If I do a thrust::inclusive_scan on the same data (250 million rows) it takes 5 milliseconds. That is a 3 billion x speedup. Even for a million row window after we fix it all up to deal with nulls and groups etc that would been a noticeable performance improvement to an end user.

Now how common is this to happen? Well it is common enough that Spark has a special performance optimization just for it, or they would be in even worse shape than we are. Also NDS query 67 has a rank window function that is very large (billions of rows, I think at least hundreds of millions of rows). count and row_number don't appear to suffer as much from this as sum does. This may just be because count only has to look at validity data that is much smaller than the actual data. But rank needs to look at all of the previous rows to see if they match the current row/each other so it can figure out what its rank is. This feels a lot more like sum than row_number, but I probably have to wait and see what the performance of rank really is like to be sure.

Describe the solution you'd like
I would love to see an optimization behind the scenes that I don't have to change anything and cudf just sees that this is a running window operation and switches to a new back end if it can support it. I would also be happy with a new API and I would switch to it when I need to.

Describe alternatives you've considered

  1. Keep it as is and see if anyone complains (if rank really is like sum someone will complain very very quickly)
  2. Detect the situation and slice the data up into much smaller batches so we can process it faster. (not a perfect solution because I cannot detect all of the situations and even with batches of 1,000,000 rows we are slower than the CPU).
  3. Write my own for some special use cases.
@revans2 revans2 added feature request New feature or request Needs Triage Need team to review and classify Performance Performance related issue Spark Functionality that helps Spark RAPIDS labels Jun 4, 2021
@jrhemstad jrhemstad added libcudf Affects libcudf (C++/CUDA) code. 0 - Backlog In queue waiting for assignment and removed Needs Triage Need team to review and classify labels Jun 7, 2021
@revans2
Copy link
Contributor Author

revans2 commented Jun 7, 2021

After talking with @jrhemstad I took a look at the scan and segmented scan APIs.

The scan API looks perfect for one part of the use case. I need to run some performance tests and also verify that the aggregations we need are there. The problems I have are with the segmented scan API. Spark guarantees that the input data is sorted by key, and expects the processing to output the data in the exact same order it came in. It makes assumptions about this so that it can chain some types of processing. The groupby API does not provide any such guarantees, even though internally it appears to do so. It would really be nice to have that guarantee documented and tested somewhere. The second issue I have is that if the output ordering is guaranteed, then we don't need to make a copy of the keys to output it.

I will try to put together a prototype based off of these and if the numbers look good I will file follow on issues to try to address these concerns.

@mythrocks
Copy link
Contributor

Thanks for the tip on segmented scan(). I'll check what's to be done. It sounds like we'll need another kernel for RunningWindowExec, although I'm not yet completely clear on how the calculation is to be distributed.

@revans2
Copy link
Contributor Author

revans2 commented Jun 8, 2021

@jrhemstad I have been trying out scan to see if I can make it work. I filed #8462 because the implementations are inconsistent, but that is not exactly what is blocking me from using it, but the issue still is null handling.

For some operations like SUM a window will output a null if and only if all of the values that went into that sum were null. In all other cases the null is ignored.

input: [null, 1, 2, null, 3, 5, 8, 10]
sum output: [null, 1, 3, 3, 6, 11, 19, 29]

With the scan API if I include nulls or exclude them there is no way to get what I need. I could replace nulls with something else, but I would have to replace all nulls with 0, except for the first null/nulls depending on how #8462 plays out.

Min and Max are the same as sum, but row number and count (which scan does not support yet) treat nulls very differently and it is not clean how that should work.

@harrism harrism changed the title [FEA] support for running windows aggregations [FEA] support for running window aggregations Jun 9, 2021
@revans2
Copy link
Contributor Author

revans2 commented Jun 9, 2021

@jrhemstad I looked at segmented scan too. It is self consistent in how it handles nulls. I think with it and replace_nulls I can make it work. It will not be as efficient as a purpose built system, but I should be able to get it to work. I will try to come up with a prototype for it and file any follow on issues that would make it more efficient. The main thing I think I will need is some kind of a guarantee on ordering for both a sorted segmented scan, and for sorted segmented replace nulls.

@revans2
Copy link
Contributor Author

revans2 commented Jun 11, 2021

I finally was able to finish a really hacked up prototype of a running window for max in Spark that uses a combination of scan and replace_nulls if there is no partitioning, and group_by.scan and group_by.replace_nulls if there is partitioning.

The numbers look good.

For scan the numbers were just always better than our current implementation. I tested starting at 1 million rows. The new implementation was 7 times faster than the current and this included sorting the data. So, the actual speedup for just the window portion was much higher. I tested through 10 million rows. There my GPU started to thermally throttle on the non-scan implementation. At 10 million rows the new version was 436x faster than the current GPU version and 30x faster than the CPU version. I tested through 1 billion rows for just the CPU vs the new GPU version. The improvement over the CPU ended up leveling out at 150x for a complete query including sorting (but part of this is because the CPU can only use a single core in these cases).

For the group_by.scan version I processed 1 billion rows, but I varied the number rows in each window. Typical small windows showed no difference in the run time at all. I had to get over approximately 1,000 rows per window before they diverged, and after that the old GPU version got slower very quickly while the new GPU version changed very little. It was obvious that it scaled with the number of rows not the size of the window. This was true for the CPU implementation too until we started to hit data skew because there were fewer partitions than there were cores to process them.

Over all it looks really good.

@jrhemstad and @harrism

At this point I am trying to figure out the next steps. I can see two possible directions.

  1. We take the existing APIs and fix/work around a few issues so that the Spark plugin just knows about running windows and does the right thing in those cases.
  2. We modify the the window implementation so that it knows about running windows and uses and alternative way to calculate the result when it sees a running window that can be sped up this way.

The advantage of option 2 is that it ends up being transparent to anyone using the window APIs. Some cases just get faster, and no end user has to change their code.

The advantage to 1 is that there are some operations, like rank and dense_rank that don't really make since in any context except a running window (Spark only supports them as a part of a running window). So if we have separate implementations, then there does not need to be a kernel or implementation for them in the window implementation itself.

@harrism
Copy link
Member

harrism commented Jun 14, 2021

Does "modify the window implementation" mean "modify cudf::rolling in libcudf". I think this makes the most sense if it is indeed transparent to the user because then it benefits everyone.

@jrhemstad
Copy link
Contributor

Running window is really just another (bad) name for a scan or a groupby scan. It's really not a "window" operation at all and is mainly a nomenclature issue in Spark. Therefore, it doesn't make sense to try and hide those under the cudf::rolling implementation. How would we even detect that it's a "running" window vs. a "rolling" window? Introspect the data on the device? Add new a new cudf::running_window API?

@mythrocks
Copy link
Contributor

mythrocks commented Jun 16, 2021

How would we even detect that it's a "running" window vs. a "rolling" window?

We can find a better name for it in CUDF, but window is a "running window" if the window grows for every consecutive row. In Spark, it is defined as ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW.
In CUDF terms, that would be [preceding=∞, following=0].

How would we even detect that it's a "running" window vs. a "rolling" window? Introspect the data on the device? Add new a new cudf::running_window API?

// cudf::window_bounds preceding, following;
auto is_running_window = preceding.is_unbounded && following.value == 0;

@harrism
Copy link
Member

harrism commented Jun 16, 2021

Oh, I misunderstood. I thought that scan was just one step in this computation. Indeed sounds like it's just a [groupby] scan.

@revans2
Copy link
Contributor Author

revans2 commented Jul 9, 2021

I am going to close this for now. I filed two follow on issues #8709 and #8710 to help with my implementation. Be aware that this same issue exists for something similar to a running window, but for range based queries instead of row based queries. Spark has similar optimizations in place for them, but we do not have customers asking for that at this time. scan and group by scan will not work for those types of queries though.

@revans2 revans2 closed this as completed Jul 9, 2021
rapids-bot bot pushed a commit that referenced this issue Jul 22, 2021
resolves #7208 and resolves #8440 (Rank in rolling window is a functional equivalent of scan)
replaces #8138 and #8506

Adds functionality for aggregation operators rank and dense_rank. Rank and dense rank supported by scan and groupby scan (segmented scan). This PR also includes java support for the added aggregations.

Authors:
  - https://github.com/rwlee

Approvers:
  - Conor Hoekstra (https://github.com/codereport)
  - Robert (Bobby) Evans (https://github.com/revans2)
  - Nghia Truong (https://github.com/ttnghia)
  - Jake Hemstad (https://github.com/jrhemstad)

URL: #8652
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
0 - Backlog In queue waiting for assignment feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. Performance Performance related issue Spark Functionality that helps Spark RAPIDS
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants