Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor distinct with hashset-based algorithms #15984

Merged
merged 17 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 67 additions & 79 deletions cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,70 @@
#include "distinct_helpers.hpp"

#include <cudf/column/column_view.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>
#include <rmm/mr/device/per_device_resource.hpp>
#include <rmm/resource_ref.hpp>

#include <cuco/static_set.cuh>
#include <cuda/functional>
#include <thrust/copy.h>
#include <thrust/distance.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/iterator/discard_iterator.h>

#include <atomic>
srinivasyadav18 marked this conversation as resolved.
Show resolved Hide resolved
#include <utility>
#include <vector>

namespace cudf {
namespace detail {
namespace {
/**
* @brief Invokes the given `func` with desired the row equality and probing method
*
* @tparam HasNested Flag indicating whether there are nested columns in the input
* @tparam Func Type of the helper function doing `distinct` check
*
* @param compare_nulls Control whether nulls should be compared as equal or not
* @param compare_nans Control whether floating-point NaNs values should be compared as equal or not
* @param has_nulls Flag indicating whether the input has nulls or not
* @param row_equal Self table comparator
* @param func The input functor to invoke
*/
template <bool HasNested, typename Func>
rmm::device_uvector<cudf::size_type> dispatch_hash_set(
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
null_equality compare_nulls,
nan_equality compare_nans,
bool has_nulls,
cudf::experimental::row::equality::self_comparator row_equal,
Func&& func)
{
if (compare_nans == nan_equality::ALL_EQUAL) {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator{});
return func(d_equal);
} else {
auto const d_equal = row_equal.equal_to<HasNested>(
nullate::DYNAMIC{has_nulls},
compare_nulls,
cudf::experimental::row::equality::physical_equality_comparator{});
return func(d_equal);
}
}
} // namespace

rmm::device_uvector<size_type> distinct_indices(table_view const& input,
duplicate_keep_option keep,
Expand All @@ -47,97 +89,43 @@ rmm::device_uvector<size_type> distinct_indices(table_view const& input,
rmm::cuda_stream_view stream,
rmm::device_async_resource_ref mr)
{
if (input.num_rows() == 0 or input.num_columns() == 0) {
auto const num_rows = input.num_rows();

if (num_rows == 0 or input.num_columns() == 0) {
return rmm::device_uvector<size_type>(0, stream, mr);
}

auto map = hash_map_type{compute_hash_table_size(input.num_rows()),
cuco::empty_key{-1},
cuco::empty_value{std::numeric_limits<size_type>::min()},
cudf::detail::cuco_allocator{stream},
stream.value()};

auto const preprocessed_input =
cudf::experimental::row::hash::preprocessed_table::create(input, stream);
auto const has_nulls = nullate::DYNAMIC{cudf::has_nested_nulls(input)};
auto const has_nested_columns = cudf::detail::has_nested_columns(input);

auto const row_hasher = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const key_hasher = row_hasher.device_hasher(has_nulls);

auto const row_comp = cudf::experimental::row::equality::self_comparator(preprocessed_input);

auto const pair_iter = cudf::detail::make_counting_transform_iterator(
size_type{0},
cuda::proclaim_return_type<cuco::pair<size_type, size_type>>(
[] __device__(size_type const i) { return cuco::make_pair(i, i); }));

auto const insert_keys = [&](auto const value_comp) {
if (has_nested_columns) {
auto const key_equal = row_comp.equal_to<true>(has_nulls, nulls_equal, value_comp);
map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
} else {
auto const key_equal = row_comp.equal_to<false>(has_nulls, nulls_equal, value_comp);
map.insert(pair_iter, pair_iter + input.num_rows(), key_hasher, key_equal, stream.value());
}
auto const row_hash = cudf::experimental::row::hash::row_hasher(preprocessed_input);
auto const d_hash = row_hash.device_hasher(has_nulls);

auto const row_equal = cudf::experimental::row::equality::self_comparator(preprocessed_input);
srinivasyadav18 marked this conversation as resolved.
Show resolved Hide resolved

auto const helper_func = [&](auto const& d_equal) {
using RowHasher = std::decay_t<decltype(d_equal)>;
auto set = hash_set_type<RowHasher>{num_rows,
0.5, // desired load factor
cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL},
d_equal,
{d_hash},
srinivasyadav18 marked this conversation as resolved.
Show resolved Hide resolved
{},
{},
cudf::detail::cuco_allocator{stream},
stream.value()};
auto const iter = thrust::counting_iterator<cudf::size_type>{0};
auto const size = set.insert(iter, iter + num_rows, stream.value());
return detail::process_keep_option(set, size, num_rows, keep, stream, mr);
};

if (nans_equal == nan_equality::ALL_EQUAL) {
using nan_equal_comparator =
cudf::experimental::row::equality::nan_equal_physical_equality_comparator;
insert_keys(nan_equal_comparator{});
if (cudf::detail::has_nested_columns(input)) {
return dispatch_hash_set<true>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
} else {
using nan_unequal_comparator = cudf::experimental::row::equality::physical_equality_comparator;
insert_keys(nan_unequal_comparator{});
return dispatch_hash_set<false>(nulls_equal, nans_equal, has_nulls, row_equal, helper_func);
}

auto output_indices = rmm::device_uvector<size_type>(map.get_size(), stream, mr);

// If we don't care about order, just gather indices of distinct keys taken from map.
if (keep == duplicate_keep_option::KEEP_ANY) {
map.retrieve_all(output_indices.begin(), thrust::make_discard_iterator(), stream.value());
return output_indices;
}

// For other keep options, reduce by row on rows that compare equal.
auto const reduction_results = reduce_by_row(map,
std::move(preprocessed_input),
input.num_rows(),
has_nulls,
has_nested_columns,
keep,
nulls_equal,
nans_equal,
stream,
rmm::mr::get_current_device_resource());

// Extract the desired output indices from reduction results.
auto const map_end = [&] {
if (keep == duplicate_keep_option::KEEP_NONE) {
// Reduction results with `KEEP_NONE` are either group sizes of equal rows, or `0`.
// Thus, we only output index of the rows in the groups having group size of `1`.
return thrust::copy_if(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(input.num_rows()),
output_indices.begin(),
[reduction_results = reduction_results.begin()] __device__(
auto const idx) { return reduction_results[idx] == size_type{1}; });
}

// Reduction results with `KEEP_FIRST` and `KEEP_LAST` are row indices of the first/last row in
// each group of equal rows (which are the desired output indices), or the value given by
// `reduction_init_value()`.
return thrust::copy_if(rmm::exec_policy(stream),
reduction_results.begin(),
reduction_results.end(),
output_indices.begin(),
[init_value = reduction_init_value(keep)] __device__(auto const idx) {
return idx != init_value;
});
}();

output_indices.resize(thrust::distance(output_indices.begin(), map_end), stream);
return output_indices;
}

std::unique_ptr<table> distinct(table_view const& input,
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/stream_compaction/distinct_count.cu
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
*/

#include "stream_compaction_common.cuh"
#include "stream_compaction_common.hpp"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/cuco_helpers.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/sorting.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/hashing/detail/helper_functions.cuh>
#include <cudf/stream_compaction.hpp>
#include <cudf/table/experimental/row_operators.cuh>
#include <cudf/table/table_view.hpp>
Expand Down
Loading
Loading