Skip to content

Commit

Permalink
nested struct support for rank using struct null pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
rwlee committed Sep 2, 2021
1 parent 4556b5b commit b3508ce
Show file tree
Hide file tree
Showing 8 changed files with 245 additions and 236 deletions.
68 changes: 21 additions & 47 deletions cpp/src/groupby/sort/group_dense_rank_scan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,65 +17,40 @@
#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/table/row_operators.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/logical.h>
#include <structs/utilities.hpp>

namespace cudf {
namespace groupby {
namespace detail {
namespace {
template <bool has_nested_nulls>
std::unique_ptr<column> generate_dense_ranks(column_view const& order_by,
template <bool has_nulls>
std::unique_ptr<column> generate_dense_ranks(table_view const& order_by,
device_span<size_type const> group_labels,
device_span<size_type const> group_offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const flat_order =
order_by.type().id() == type_id::STRUCT
? table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}}
: table_view{{order_by}};
auto const d_flat_order = table_device_view::create(flat_order, stream);
row_equality_comparator<has_nested_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_by, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(
data_type{type_to_id<size_type>()}, order_by.size(), mask_state::UNALLOCATED, stream, mr);
data_type{type_to_id<size_type>()}, order_by.num_rows(), mask_state::UNALLOCATED, stream, mr);
auto mutable_ranks = ranks->mutable_view();

if (order_by.type().id() == type_id::STRUCT && order_by.has_nulls()) {
auto const d_col_order = column_device_view::create(order_by, stream);
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
d_col_order = *d_col_order,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
if (row_index == offsets[labels[row_index]]) { return true; }
bool const lhs_is_null{d_col_order.is_null(row_index)};
bool const rhs_is_null{d_col_order.is_null(row_index - 1)};
if (lhs_is_null && rhs_is_null) {
return false;
} else if (lhs_is_null != rhs_is_null) {
return true;
}
return !comparator(row_index, row_index - 1);
});

} else {
thrust::tabulate(
rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, labels = group_labels.data(), offsets = group_offsets.data()] __device__(
size_type row_index) {
return row_index == offsets[labels[row_index]] || !comparator(row_index, row_index - 1);
});
}
thrust::tabulate(
rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator, labels = group_labels.data(), offsets = group_offsets.data()] __device__(
size_type row_index) {
return row_index == offsets[labels[row_index]] || !comparator(row_index, row_index - 1);
});

thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
group_labels.begin(),
Expand All @@ -91,13 +66,12 @@ std::unique_ptr<column> dense_rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if ((order_by.type().id() == type_id::STRUCT &&
has_nested_nulls(
table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}})) ||
(order_by.type().id() != type_id::STRUCT && order_by.has_nulls())) {
return generate_dense_ranks<true>(order_by, group_labels, group_offsets, stream, mr);
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
if (has_nested_nulls(table_view{{order_by}})) {
return generate_dense_ranks<true>(order_table, group_labels, group_offsets, stream, mr);
}
return generate_dense_ranks<false>(order_by, group_labels, group_offsets, stream, mr);
return generate_dense_ranks<false>(order_table, group_labels, group_offsets, stream, mr);
}

} // namespace detail
Expand Down
77 changes: 25 additions & 52 deletions cpp/src/groupby/sort/group_rank_scan.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,45 @@

#include <cudf/column/column.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/aggregation/aggregation.cuh>
#include <cudf/detail/utilities/device_operators.cuh>
#include <cudf/table/row_operators.cuh>
#include <cudf/types.hpp>
#include <cudf/utilities/span.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <thrust/logical.h>
#include <structs/utilities.hpp>

namespace cudf {
namespace groupby {
namespace detail {
namespace {
template <bool has_nested_nulls>
std::unique_ptr<column> generate_ranks(column_view const& order_by,
template <bool has_nulls>
std::unique_ptr<column> generate_ranks(table_view const& order_by,
device_span<size_type const> group_labels,
device_span<size_type const> group_offsets,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto const flat_order =
order_by.type().id() == type_id::STRUCT
? table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}}
: table_view{{order_by}};
auto const d_flat_order = table_device_view::create(flat_order, stream);
row_equality_comparator<has_nested_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto const flattener = cudf::structs::detail::flatten_nested_columns(
order_by, {}, {}, structs::detail::column_nullability::MATCH_INCOMING);
auto const d_flat_order = table_device_view::create(std::get<0>(flattener), stream);
row_equality_comparator<has_nulls> comparator(*d_flat_order, *d_flat_order, true);
auto ranks = make_fixed_width_column(
data_type{type_to_id<size_type>()}, order_by.size(), mask_state::UNALLOCATED, stream, mr);
data_type{type_to_id<size_type>()}, order_by.num_rows(), mask_state::UNALLOCATED, stream, mr);
auto mutable_ranks = ranks->mutable_view();

if (order_by.type().id() == type_id::STRUCT && order_by.has_nulls()) {
auto const d_col_order = column_device_view::create(order_by, stream);
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
d_col_order = *d_col_order,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
auto group_start = offsets[labels[row_index]];
if (row_index == group_start) { return 1; }
bool const lhs_is_null{d_col_order.is_null(row_index)};
bool const rhs_is_null{d_col_order.is_null(row_index - 1)};
if (lhs_is_null && rhs_is_null) {
return 0;
} else if (lhs_is_null != rhs_is_null) {
return row_index - group_start + 1;
}
return comparator(row_index, row_index - 1) ? 0
: row_index - group_start + 1;
});
} else {
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
auto group_start = offsets[labels[row_index]];
return row_index != group_start && comparator(row_index, row_index - 1)
? 0
: row_index - group_start + 1;
});
}
thrust::tabulate(rmm::exec_policy(stream),
mutable_ranks.begin<size_type>(),
mutable_ranks.end<size_type>(),
[comparator,
labels = group_labels.data(),
offsets = group_offsets.data()] __device__(size_type row_index) {
auto group_start = offsets[labels[row_index]];
return row_index != group_start && comparator(row_index, row_index - 1)
? 0
: row_index - group_start + 1;
});

thrust::inclusive_scan_by_key(rmm::exec_policy(stream),
group_labels.begin(),
Expand All @@ -99,13 +73,12 @@ std::unique_ptr<column> rank_scan(column_view const& order_by,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if ((order_by.type().id() == type_id::STRUCT &&
has_nested_nulls(
table_view{std::vector<column_view>{order_by.child_begin(), order_by.child_end()}})) ||
(order_by.type().id() != type_id::STRUCT && order_by.has_nulls())) {
return generate_ranks<true>(order_by, group_labels, group_offsets, stream, mr);
auto const superimposed = structs::detail::superimpose_parent_nulls(order_by, stream, mr);
table_view const order_table{{std::get<0>(superimposed)}};
if (has_nested_nulls(table_view{{order_by}})) {
return generate_ranks<true>(order_table, group_labels, group_offsets, stream, mr);
}
return generate_ranks<false>(order_by, group_labels, group_offsets, stream, mr);
return generate_ranks<false>(order_table, group_labels, group_offsets, stream, mr);
}

} // namespace detail
Expand Down
14 changes: 4 additions & 10 deletions cpp/src/groupby/sort/scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

#include <rmm/cuda_stream_view.hpp>

#include <structs/utilities.hpp>

#include <memory>

namespace cudf {
Expand Down Expand Up @@ -114,12 +116,8 @@ void scan_result_functor::operator()<aggregation::RANK>(aggregation const& agg)
CUDF_EXPECTS(helper.is_presorted(),
"Rank aggregate in groupby scan requires the keys to be presorted");
auto const order_by = get_grouped_values();
CUDF_EXPECTS(order_by.type().id() != type_id::LIST,
CUDF_EXPECTS(!cudf::structs::detail::contains_list(order_by),
"Unsupported list type in grouped rank scan.");
CUDF_EXPECTS(std::none_of(order_by.child_begin(),
order_by.child_end(),
[](auto const& col) { return is_nested(col.type()); }),
"Unsupported nested columns in grouped rank scan.");

cache.add_result(
col_idx,
Expand All @@ -135,12 +133,8 @@ void scan_result_functor::operator()<aggregation::DENSE_RANK>(aggregation const&
CUDF_EXPECTS(helper.is_presorted(),
"Dense rank aggregate in groupby scan requires the keys to be presorted");
auto const order_by = get_grouped_values();
CUDF_EXPECTS(order_by.type().id() != type_id::LIST,
CUDF_EXPECTS(!cudf::structs::detail::contains_list(order_by),
"Unsupported list type in grouped dense_rank scan.");
CUDF_EXPECTS(std::none_of(order_by.child_begin(),
order_by.child_end(),
[](auto const& col) { return is_nested(col.type()); }),
"Unsupported nested columns in grouped dense_rank scan.");

cache.add_result(
col_idx,
Expand Down
Loading

0 comments on commit b3508ce

Please sign in to comment.