Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement groupby aggregations M2 and MERGE_M2 #8605

Merged
merged 22 commits into from
Jul 8, 2021

Conversation

ttnghia
Copy link
Contributor

@ttnghia ttnghia commented Jun 24, 2021

This PR adds the following new groupby aggregations:

  • M2: sum of squares of differences from group mean, which is essentially an unscaled version of variance
  • MERGE_M2 for merging distributedly computed M2

These are auxiliary aggregations used for generating intermediate results for distributed computing of variance and standard deviation, with numerical stability.

Below is an overview of the algorithm for distributed computing of VARIANCE aggregation:

  • Partition data into batches and compute (partial results) groupby COUNT_VALID, MEAN and M2 aggregations for each batch.
  • Vertically concatenate results of COUNT_VALID, MEAN and M2, then assemble a structs column with these values in separate children columns.
  • The new structs column is given as the input values column to MERGE_M2 aggregation to produce new merged values for those partial results, following the algorithm described here: https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm. As such, the output of MERGE_M2 is also a structs column similar to its input.
  • Those partial results (COUNT_VALID, MEAN, and M2) may be merged in several merging steps.
  • After the last merging step, we have the final M2 values. Then, a finalizing step will compute variance/std for each key group using cudf binops (variance = M2 / group_size, and ``variance_pop = M2 / (group_size - 1)`).

Reference: https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm

There are also some other clean-up and re-order code in several related files.

@ttnghia ttnghia added feature request New feature or request 2 - In Progress Currently a work in progress libcudf Affects libcudf (C++/CUDA) code. Spark Functionality that helps Spark RAPIDS non-breaking Non-breaking change labels Jun 24, 2021
@ttnghia ttnghia self-assigned this Jun 24, 2021
@github-actions github-actions bot added the CMake CMake build issue label Jun 24, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jun 25, 2021
@revans2
Copy link
Contributor

revans2 commented Jun 25, 2021

What is the difference between "MERGE_VARIANCE" and "MERGE_STDS"? for Spark we really just want something that will take the M2, MEAN, and COUNT_VALID and produce a merged M2, MEAN, and COUNT_VALID. We may do multiple intermediate merges along the way and if the schema changes as a part of the MERGE step we cannot do that any more.

@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 25, 2021

What is the difference between "MERGE_VARIANCE" and "MERGE_STDS"? for Spark we really just want something that will take the M2, MEAN, and COUNT_VALID and produce a merged M2, MEAN, and COUNT_VALID. We may do multiple intermediate merges along the way and if the schema changes as a part of the MERGE step we cannot do that any more.

Both MERGE_VARIANCE and MERGE_STDS do the following

  • Merge M2, MEAN, COUNT_VALID, and
  • Merge VARIANCE/STD

If a merging is a final merge, we can directly use the computed final values of VARIANCE/STD (the second step above). If it is just an intermediate merge, we will use its merged values of M2, MEAN, COUNT_VALID and discard the results of the second step.

Yes, this is a bit wasting of computation to always do the second step when its results may not be used. Unless we know exactly when to do it then we can specify whether we will do it during calling to the merging aggs.

In Spark, its is doing a 3-steps approach: computing + merging + finalizing. In libcudf so far, we only implement computing + merging aggregations. Thus, we have to combine the merging and finalizing steps within the merging aggs.

Adding more aggs like FINALIZE_VARIANCES and FINALIZE_STDS would resolve our concern. However, I don't think that is the best idea.

I'm thinking of refactoring the entire groupby aggs to make them having 3 APIs that support distributed computing environment similar to Spark: compute + merge + finalize. This is breaking change thus I'm not sure how practical it is.

@revans2
Copy link
Contributor

revans2 commented Jun 25, 2021

That should be fine. I didn't see any implementation or doxygen comments or implementation to really understand what was proposed. I personally would be fine with just a MERGE_M2, and then we could use binary ops to compute the sampling standard deviation, population standard deviation, population variance, or sampling variance depending on what operation we are doing. With the current proposal we would have to end up doing it anyways for at least 2 out of the 4 operations.

@jrhemstad
Copy link
Contributor

I'm trying to understand why we need these new "merge" aggregations at all.

Looking at the Dask algorithm it uses the relation that the variance of a set X is equal to the sum of the squares minus the square of the sums divided by the sample size, e.g.,

image

So a distributed groupby variance can be done as a groupby sum and groupby sum of squares with a division at the end without any new aggregations needing to be added.

@jrhemstad
Copy link
Contributor

Apparently that algorithm has numerical stability issues: https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Na%C3%AFve_algorithm

But that page also lists several alternative algorithms with better stability that should be explored.

I would prefer to find a solution that doesn't require adding several new, specialized "merge" aggregation types.

@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 25, 2021

Thanks Jake. As Bobby said we may just need M2 and MERGE_M2 aggregations for implementing stable merge. Then, mergings for variance and std can be computed using binops.

Yes, by just having MERGE_M2 we will not output redundant data each merge (which will be discarded if it is not a final merge).

Update: I have removed MERGE_VARIANCES and MERGE_STDS, and added MERGE_M2.

@rapidsai rapidsai deleted a comment from codecov bot Jun 25, 2021
@ttnghia ttnghia changed the title Implement groupby aggregations M2, MERGE_VARIANCES, and MERGE_STDS Implement groupby aggregations M2, MERGE_M2 Jun 25, 2021
@ttnghia ttnghia changed the title Implement groupby aggregations M2, MERGE_M2 Implement groupby aggregations M2 and MERGE_M2 Jun 25, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jun 25, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jun 30, 2021
@jrhemstad
Copy link
Contributor

jrhemstad commented Jun 30, 2021

@ttnghia could you describe the algorithm being used here to compute a distributed variance? An example would be especially helpful.

@rapidsai rapidsai deleted a comment from codecov bot Jun 30, 2021
@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 30, 2021

@ttnghia could you describe the algorithm being used here to compute a distributed variance? An example would be especially helpful.

Sure. I have added more details about the algorithm in this PR's description, and plan to add several usage examples in the unit tests, which will be pushed up later.

@rapidsai rapidsai deleted a comment from codecov bot Jun 30, 2021
@ttnghia
Copy link
Contributor Author

ttnghia commented Jun 30, 2021

@jrhemstad From the algorithm I described in the top post, one more thing I'm still concerned about is the merging step for VALID_COUNT. In groupby, the results of SUM (for integer) are stored in int64_t. Thus, I may have to cast back int32_t from int64_t after each merge. A better way for it is to add another aggregation MERGE_COUNT_VALID but I'm reluctant to do that.

Maybe I will just need to rewrite my MERGE_M2 implementation, using type_dispatcher to allow it to operate on both types int32_t and int64_t of COUNT_VALID values?

@rapidsai rapidsai deleted a comment from codecov bot Jul 6, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jul 6, 2021
@jrhemstad
Copy link
Contributor

I may have to cast back int32_t from int64_t after each merge

Why would you need to cast back to int32_t? Just to save memory or something?

I'm going to change the MERGE_M2 aggregation to have its input and output match (nearly exactly) with the algorithm used in Spark

How is Spark's algorithm different from what you listed here? https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm

@rapidsai rapidsai deleted a comment from codecov bot Jul 6, 2021
@ttnghia
Copy link
Contributor Author

ttnghia commented Jul 6, 2021

I may have to cast back int32_t from int64_t after each merge

Why would you need to cast back to int32_t? Just to save memory or something?

I'm going to change the MERGE_M2 aggregation to have its input and output match (nearly exactly) with the algorithm used in Spark

How is Spark's algorithm different from what you listed here? https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm

  1. The output from COUNT_VALID is of int32_t type. Merging COUNT_VALID by SUM aggregation will produce int64_t type output. If don't cast back to int32_t, the algorithm must be able to operate on both int32_t and int64_t. This requires a call to type_dispatcher.

  2. Spark uses that algorithm: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala#L81

Note that 1) is no longer applicable, as I have changed the implementation of MERGE_M2 to output merged values of COUNT_VALID (and also MEAN). This is much better, because during merging M2 I have to compute them thus I just output them.

@rapidsai rapidsai deleted a comment from codecov bot Jul 7, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jul 7, 2021
@rapidsai rapidsai deleted a comment from codecov bot Jul 7, 2021
@vuule vuule added 5 - Ready to Merge Testing and reviews complete, ready to merge and removed 3 - Ready for Review Ready for review by team labels Jul 7, 2021
@harrism
Copy link
Member

harrism commented Jul 8, 2021

@gpucibot merge

@rapids-bot rapids-bot bot merged commit 53b3c16 into rapidsai:branch-21.08 Jul 8, 2021
@ttnghia ttnghia deleted the merge_std_variance branch October 13, 2021 21:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
5 - Ready to Merge Testing and reviews complete, ready to merge CMake CMake build issue feature request New feature or request libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change Spark Functionality that helps Spark RAPIDS
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants