Skip to content

Commit

Permalink
Support nested structs in rank and dense rank (#8962)
Browse files Browse the repository at this point in the history
Follow on to #8652 for nested struct support using, partially removing the need for #8683.

This change simplifies the rank algorithm by assuming `superimpose_parent_nulls` has been ran on the struct column. This removes the need for separate logic that ensures we are not comparing elements covered by a parent column's null mask.

Authors:
  - https://github.com/rwlee

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - https://github.com/nvdbaranec
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Mark Harris (https://github.com/harrism)

URL: #8962
  • Loading branch information
rwlee authored Sep 14, 2021
1 parent eab2486 commit eae76cf
Show file tree
Hide file tree
Showing 9 changed files with 301 additions and 318 deletions.
1 change: 0 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ add_library(cudf
src/groupby/sort/group_sum.cu
src/groupby/sort/scan.cpp
src/groupby/sort/group_count_scan.cu
src/groupby/sort/group_dense_rank_scan.cu
src/groupby/sort/group_max_scan.cu
src/groupby/sort/group_min_scan.cu
src/groupby/sort/group_rank_scan.cu
Expand Down
105 changes: 0 additions & 105 deletions cpp/src/groupby/sort/group_dense_rank_scan.cu

This file was deleted.

141 changes: 87 additions & 54 deletions cpp/src/groupby/sort/group_rank_scan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,96 +16,129 @@

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/table/row_operators.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/logical.h>
#include <structs/utilities.hpp>

namespace cudf {
namespace groupby {
namespace detail {
namespace {
template <bool has_nested_nulls>
std::unique_ptr<column> generate_ranks(column_view const& order_by,
/**
* @brief generate grouped row ranks or dense ranks using a row comparison then scan the results
*
* @tparam has_nulls if the order_by column has nulls
* @tparam value_resolver flag value resolver function with boolean first and row number arguments
* @tparam scan_operator scan function ran on the flag values
* @param order_by input column to generate ranks for
* @param group_labels ID of group that the corresponding value belongs to
* @param group_offsets group index offsets with group ID indices
* @param resolver flag value resolver
* @param scan_op scan operation ran on the flag results
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned column's device memory
* @return std::unique_ptr<column> rank values
*/
template <bool has_nulls, typename value_resolver, typename scan_operator>
std::unique_ptr<column> rank_generator(column_view const& order_by,
device_span<size_type const> group_labels,
device_span<size_type const> group_offsets,
value_resolver resolver,
scan_operator scan_op,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const flat_order =
order_by.type().id() == type_id::STRUCT
? table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}}
: table_view{{order_by}};
auto const d_flat_order = table_device_view::create(flat_order, stream);
row_equality_comparator<has_nested_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(
data_type{type_to_id<size_type>()}, order_by.size(), mask_state::UNALLOCATED, stream, mr);
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_table, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(data_type{type_to_id<size_type>()},
order_table.num_rows(),
mask_state::UNALLOCATED,
stream,
mr);
auto mutable_ranks = ranks->mutable_view();

if (order_by.type().id() == type_id::STRUCT && order_by.has_nulls()) {
auto const d_col_order = column_device_view::create(order_by, stream);
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
d_col_order = *d_col_order,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
auto group_start = offsets[labels[row_index]];
if (row_index == group_start) { return 1; }
bool const lhs_is_null{d_col_order.is_null(row_index)};
bool const rhs_is_null{d_col_order.is_null(row_index - 1)};
if (lhs_is_null && rhs_is_null) {
return 0;
} else if (lhs_is_null != rhs_is_null) {
return row_index - group_start + 1;
}
return comparator(row_index, row_index - 1) ? 0
: row_index - group_start + 1;
});
} else {
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
auto group_start = offsets[labels[row_index]];
return row_index != group_start && comparator(row_index, row_index - 1)
? 0
: row_index - group_start + 1;
});
}
thrust::tabulate(
rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, resolver, labels = group_labels.data(), offsets = group_offsets.data()] __device__(
size_type row_index) {
auto group_start = offsets[labels[row_index]];
return resolver(row_index == group_start || !comparator(row_index, row_index - 1),
row_index - group_start);
});

thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
group_labels.begin(),
group_labels.end(),
mutable_ranks.begin<size_type>(),
mutable_ranks.begin<size_type>(),
thrust::equal_to<size_type>{},
DeviceMax{});
scan_op);

return ranks;
}
} // namespace

std::unique_ptr<column> rank_scan(column_view const& order_by,
device_span<size_type const> group_labels,
device_span<size_type const> group_offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if ((order_by.type().id() == type_id::STRUCT &&
has_nested_nulls(
table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}})) ||
(order_by.type().id() != type_id::STRUCT && order_by.has_nulls())) {
return generate_ranks<true>(order_by, group_labels, group_offsets, stream, mr);
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
group_labels,
group_offsets,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}
return rank_generator<false>(
order_by,
group_labels,
group_offsets,
[] __device__(bool equality, auto row_index) { return equality ? row_index + 1 : 0; },
DeviceMax{},
stream,
mr);
}

std::unique_ptr<column> dense_rank_scan(column_view const& order_by,
device_span<size_type const> group_labels,
device_span<size_type const> group_offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (has_nested_nulls(table_view{{order_by}})) {
return rank_generator<true>(
order_by,
group_labels,
group_offsets,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}
return generate_ranks<false>(order_by, group_labels, group_offsets, stream, mr);
return rank_generator<false>(
order_by,
group_labels,
group_offsets,
[] __device__(bool equality, auto row_index) { return equality; },
DeviceSum{},
stream,
mr);
}

} // namespace detail
Expand Down
14 changes: 4 additions & 10 deletions cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#include <rmm/cuda_stream_view.hpp>

#include <structs/utilities.hpp>

#include <memory>

namespace cudf {
Expand Down Expand Up @@ -114,12 +116,8 @@ void scan_result_functor::operator()<aggregation::RANK>(aggregation const& agg)
CUDF_EXPECTS(helper.is_presorted(),
"Rank aggregate in groupby scan requires the keys to be presorted");
auto const order_by = get_grouped_values();
CUDF_EXPECTS(order_by.type().id() != type_id::LIST,
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in grouped rank scan.");
CUDF_EXPECTS(std::none_of(order_by.child_begin(),
order_by.child_end(),
[](auto const& col) { return is_nested(col.type()); }),
"Unsupported nested columns in grouped rank scan.");

cache.add_result(
col_idx,
Expand All @@ -135,12 +133,8 @@ void scan_result_functor::operator()<aggregation::DENSE_RANK>(aggregation const&
CUDF_EXPECTS(helper.is_presorted(),
"Dense rank aggregate in groupby scan requires the keys to be presorted");
auto const order_by = get_grouped_values();
CUDF_EXPECTS(order_by.type().id() != type_id::LIST,
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(order_by),
"Unsupported list type in grouped dense_rank scan.");
CUDF_EXPECTS(std::none_of(order_by.child_begin(),
order_by.child_end(),
[](auto const& col) { return is_nested(col.type()); }),
"Unsupported nested columns in grouped dense_rank scan.");

cache.add_result(
col_idx,
Expand Down
Loading

0 comments on commit eae76cf

Please sign in to comment.