Skip to content

Commit

Permalink
Implement groupby aggregations M2 and MERGE_M2 (#8605)
Browse files Browse the repository at this point in the history
This PR adds the following new groupby aggregations:
 * `M2`: sum of squares of differences from group mean, which is essentially an unscaled version of variance
 * `MERGE_M2` for merging distributedly computed `M2`

These are auxiliary aggregations used for generating intermediate results for distributed computing of variance and standard deviation, with numerical stability.

Below is an overview of the algorithm for distributed computing of `VARIANCE` aggregation:
 * Partition data into batches and compute (partial results) groupby `COUNT_VALID`, `MEAN` and `M2` aggregations for each batch.
 * Vertically concatenate results of `COUNT_VALID`, `MEAN` and `M2`, then assemble a structs column with these values in separate children columns.
 * The new structs column is given as the input values column to `MERGE_M2` aggregation to produce new merged values for those partial results, following the algorithm described here: https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm. As such, the output of `MERGE_M2` is also a structs column similar to its input.
 * Those partial results (`COUNT_VALID`, `MEAN`, and `M2`) may be merged in several merging steps.
 * After the last merging step, we have the final `M2` values. Then, a finalizing step will compute variance/std for each key group using cudf binops (`variance = M2 / group_size`, and ``variance_pop = M2 / (group_size - 1)`).
 
 
Reference: https://www.wikiwand.com/en/Algorithms_for_calculating_variance#/Parallel_algorithm

There are also some other clean-up and re-order code in several related files.

Authors:
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Jake Hemstad (https://github.com/jrhemstad)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #8605
  • Loading branch information
ttnghia authored Jul 8, 2021
1 parent f0f2170 commit 53b3c16
Show file tree
Hide file tree
Showing 11 changed files with 1,439 additions and 154 deletions.
8 changes: 5 additions & 3 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,16 @@ add_library(cudf
src/filling/sequence.cu
src/groupby/groupby.cu
src/groupby/hash/groupby.cu
src/groupby/sort/aggregate.cpp
src/groupby/sort/group_argmax.cu
src/groupby/sort/group_argmin.cu
src/groupby/sort/aggregate.cpp
src/groupby/sort/group_collect.cu
src/groupby/sort/group_merge_lists.cu
src/groupby/sort/group_count.cu
src/groupby/sort/group_m2.cu
src/groupby/sort/group_max.cu
src/groupby/sort/group_min.cu
src/groupby/sort/group_merge_lists.cu
src/groupby/sort/group_merge_m2.cu
src/groupby/sort/group_nth_element.cu
src/groupby/sort/group_nunique.cu
src/groupby/sort/group_product.cu
Expand Down Expand Up @@ -272,7 +274,7 @@ add_library(cudf
src/join/join.cu
src/join/semi_join.cu
src/lists/contains.cu
src/lists/combine/concatenate_list_elements.cu
src/lists/combine/concatenate_list_elements.cu
src/lists/combine/concatenate_rows.cu
src/lists/copying/concatenate.cu
src/lists/copying/copying.cu
Expand Down
77 changes: 54 additions & 23 deletions cpp/include/cudf/aggregation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ class aggregation {
ALL, ///< all reduction
SUM_OF_SQUARES, ///< sum of squares reduction
MEAN, ///< arithmetic mean reduction
VARIANCE, ///< groupwise variance
STD, ///< groupwise standard deviation
M2, ///< sum of squares of differences from the mean
VARIANCE, ///< variance
STD, ///< standard deviation
MEDIAN, ///< median reduction
QUANTILE, ///< compute specified quantile(s)
ARGMAX, ///< Index of max element
Expand All @@ -78,12 +79,13 @@ class aggregation {
ROW_NUMBER, ///< get row-number of current index (relative to rolling window)
COLLECT_LIST, ///< collect values into a list
COLLECT_SET, ///< collect values into a list without duplicate entries
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
LEAD, ///< window function, accesses row at specified offset following current row
LAG, ///< window function, accesses row at specified offset preceding current row
PTX, ///< PTX UDF based reduction
CUDA ///< CUDA UDF based reduction
CUDA, ///< CUDA UDF based reduction
MERGE_LISTS, ///< merge multiple lists values into one list
MERGE_SETS, ///< merge multiple lists values into one list then drop duplicate entries
MERGE_M2 ///< merge partial values of M2 aggregation
};

aggregation() = delete;
Expand Down Expand Up @@ -159,6 +161,20 @@ std::unique_ptr<Base> make_sum_of_squares_aggregation();
template <typename Base = aggregation>
std::unique_ptr<Base> make_mean_aggregation();

/**
* @brief Factory to create a M2 aggregation
*
* A M2 aggregation is sum of squares of differences from the mean. That is:
* `M2 = SUM((x - MEAN) * (x - MEAN))`.
*
* This aggregation produces the intermediate values that are used to compute variance and standard
* deviation across multiple discrete sets. See
* `https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance#Parallel_algorithm` for more
* detail.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_m2_aggregation();

/**
* @brief Factory to create a VARIANCE aggregation
*
Expand Down Expand Up @@ -271,11 +287,33 @@ std::unique_ptr<Base> make_collect_set_aggregation(null_policy null_handling = n
null_equality nulls_equal = null_equality::EQUAL,
nan_equality nans_equal = nan_equality::UNEQUAL);

/// Factory to create a LAG aggregation
template <typename Base = aggregation>
std::unique_ptr<Base> make_lag_aggregation(size_type offset);

/// Factory to create a LEAD aggregation
template <typename Base = aggregation>
std::unique_ptr<Base> make_lead_aggregation(size_type offset);

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
*
* @param[in] type: either udf_type::PTX or udf_type::CUDA
* @param[in] user_defined_aggregator A string containing the aggregator code
* @param[in] output_type expected output type
*
* @return aggregation unique pointer housing user_defined_aggregator string.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
data_type output_type);

/**
* @brief Factory to create a MERGE_LISTS aggregation.
*
* Given a lists column, this aggregation merges all the lists corresponding to the same key value
* into one list. It is designed specificly to merge the partial results of multiple (distributed)
* into one list. It is designed specifically to merge the partial results of multiple (distributed)
* groupby `COLLECT_LIST` aggregations into a final `COLLECT_LIST` result. As such, it requires the
* input lists column to be non-nullable (the child column containing list entries is not subjected
* to this requirement).
Expand All @@ -290,7 +328,7 @@ std::unique_ptr<Base> make_merge_lists_aggregation();
* value into one list, then it drops all the duplicate entries in each lists, producing a lists
* column containing non-repeated entries.
*
* This aggregation is designed specificly to merge the partial results of multiple (distributed)
* This aggregation is designed specifically to merge the partial results of multiple (distributed)
* groupby `COLLECT_LIST` or `COLLECT_SET` aggregations into a final `COLLECT_SET` result. As such,
* it requires the input lists column to be non-nullable (the child column containing list entries
* is not subjected to this requirement).
Expand All @@ -308,27 +346,20 @@ template <typename Base = aggregation>
std::unique_ptr<Base> make_merge_sets_aggregation(null_equality nulls_equal = null_equality::EQUAL,
nan_equality nans_equal = nan_equality::UNEQUAL);

/// Factory to create a LAG aggregation
template <typename Base = aggregation>
std::unique_ptr<Base> make_lag_aggregation(size_type offset);

/// Factory to create a LEAD aggregation
template <typename Base = aggregation>
std::unique_ptr<Base> make_lead_aggregation(size_type offset);

/**
* @brief Factory to create an aggregation base on UDF for PTX or CUDA
* @brief Factory to create a MERGE_M2 aggregation
*
* @param[in] type: either udf_type::PTX or udf_type::CUDA
* @param[in] user_defined_aggregator A string containing the aggregator code
* @param[in] output_type expected output type
* Merges the results of `M2` aggregations on independent sets into a new `M2` value equivalent to
* if a single `M2` aggregation was done across all of the sets at once. This aggregation is only
* valid on structs whose members are the result of the `COUNT_VALID`, `MEAN`, and `M2` aggregations
* on the same sets. The output of this aggregation is a struct containing the merged `COUNT_VALID`,
* `MEAN`, and `M2` aggregations.
*
* @return aggregation unique pointer housing user_defined_aggregator string.
* The input `M2` aggregation values are expected to be all non-negative numbers, since they
* were output from `M2` aggregation.
*/
template <typename Base = aggregation>
std::unique_ptr<Base> make_udf_aggregation(udf_type type,
std::string const& user_defined_aggregator,
data_type output_type);
std::unique_ptr<Base> make_merge_m2_aggregation();

/** @} */ // end of group
} // namespace cudf
Loading

0 comments on commit 53b3c16

Please sign in to comment.