Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support duplicate_keep_option in cudf::distinct #11052

Merged
merged 96 commits into from
Jun 22, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
96 commits
Select commit Hold shift + click to select a range
c22b11c
Test passed
ttnghia Jun 4, 2022
5a6602c
Cleanup
ttnghia Jun 4, 2022
79fa051
Add bench
ttnghia Jun 4, 2022
a7e3463
Add comments
ttnghia Jun 4, 2022
3b7124e
Add unique bench
ttnghia Jun 4, 2022
4f933bf
Rewrite benchs
ttnghia Jun 4, 2022
bf2a717
Add parameter to the API
ttnghia Jun 7, 2022
11f9dd1
Fix compile errors
ttnghia Jun 7, 2022
b8832f7
Switch to use the old atomic ops
ttnghia Jun 7, 2022
f12a509
Add simple tests
ttnghia Jun 7, 2022
edcb612
Revert "Add simple tests"
ttnghia Jun 7, 2022
5b40152
Misc
ttnghia Jun 7, 2022
f2dc1eb
Change doxygen
ttnghia Jun 7, 2022
8a800e9
Cleanup
ttnghia Jun 7, 2022
c0144b3
Revert "Change doxygen"
ttnghia Jun 8, 2022
7d89f4c
Implement `distinct` by `distinct_map`
ttnghia Jun 8, 2022
ae891b3
Rewrite doxygen
ttnghia Jun 8, 2022
1786dbb
Add detail declaration of `distinct_map`
ttnghia Jun 8, 2022
6dfb4f4
Fix bug
ttnghia Jun 8, 2022
3352967
Alias `KEEP_ANY` to `KEEP_FIRST`
ttnghia Jun 8, 2022
52e29b1
Cleanup headers
ttnghia Jun 8, 2022
819a669
Rewrite comments and reorganize code
ttnghia Jun 8, 2022
2e5d41a
Rewrite comments
ttnghia Jun 8, 2022
8393809
Reverse tests
ttnghia Jun 8, 2022
5cdefa3
Fix old tests
ttnghia Jun 8, 2022
3626efb
Add a new overload for `cudf::distinct`
ttnghia Jun 8, 2022
bcc4abe
Reverse back breaking changes
ttnghia Jun 8, 2022
edbcc78
Fix compile error
ttnghia Jun 8, 2022
2f1ce5a
Reverse benchmark
ttnghia Jun 8, 2022
882a67a
Complete `StringKeyColumn` tests
ttnghia Jun 8, 2022
cdae2ac
Fix tests
ttnghia Jun 8, 2022
5b21d88
Fix tests
ttnghia Jun 8, 2022
3f18057
Rename function
ttnghia Jun 9, 2022
21456e7
Add `NonNullTable` tests
ttnghia Jun 9, 2022
8a17581
Add `SlicedNonNullTable` tests
ttnghia Jun 9, 2022
e05ad48
Add `InputWithNulls` tests
ttnghia Jun 9, 2022
03fb093
Change variable
ttnghia Jun 9, 2022
3c12942
Refactor
ttnghia Jun 9, 2022
6ab9673
Add `BasicList` tests
ttnghia Jun 9, 2022
37dfdcb
Add `NullableLists` tests
ttnghia Jun 9, 2022
b78cf5b
Add `ListsOfStructs` tests
ttnghia Jun 9, 2022
8de0948
Add `SlicedStructsOfLists` tests
ttnghia Jun 9, 2022
9e8c4a5
Misc
ttnghia Jun 9, 2022
7fa65ee
Add `ListsOfEmptyStructs` tests
ttnghia Jun 9, 2022
ff6e03e
Modify `EmptyDeepList` tests
ttnghia Jun 9, 2022
374545a
Add `StructsOfLists` tests
ttnghia Jun 9, 2022
9bf540a
Use `distinct` in Cython
ttnghia Jun 9, 2022
e1c3cd5
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 9, 2022
70d3164
Fix Python style
ttnghia Jun 9, 2022
bba15c2
Revert "Fix Python style"
ttnghia Jun 9, 2022
d895f48
Revert "Use `distinct` in Cython"
ttnghia Jun 9, 2022
56e791c
Fix compiling errors due to merging
ttnghia Jun 10, 2022
6ffc9b0
Fix doxygen
ttnghia Jun 10, 2022
dd8c845
Rewrite comment and rename variable
ttnghia Jun 10, 2022
d9c0ab9
Address review comments
ttnghia Jun 15, 2022
7770265
Remove one overload
ttnghia Jun 15, 2022
96a36c4
Fix benchmark
ttnghia Jun 16, 2022
0210228
Rename struct, and use CTAD
ttnghia Jun 16, 2022
65190cc
Add comment
ttnghia Jun 16, 2022
a4db720
Rename variable
ttnghia Jun 16, 2022
df05dc8
Misc
ttnghia Jun 16, 2022
5f7d778
WIP
ttnghia Jun 16, 2022
154645a
Rewrite doxygen
ttnghia Jun 16, 2022
8f04d50
Remove `keys` parameter from `get_distinct_indices`
ttnghia Jun 16, 2022
d806278
Rewrite doxygen
ttnghia Jun 16, 2022
3734344
Use another version of `gather`
ttnghia Jun 16, 2022
f731d35
Fix wrong doxygen
ttnghia Jun 16, 2022
e44c85d
Fix wrong doxygen again
ttnghia Jun 16, 2022
a74f71e
Misc
ttnghia Jun 16, 2022
f9de181
Update doxygen
ttnghia Jun 16, 2022
6cec1eb
Define hash_map and add todo
ttnghia Jun 16, 2022
661400a
Rename variable
ttnghia Jun 17, 2022
700e465
Fix doxygen
ttnghia Jun 17, 2022
1c783e8
Rename tests
ttnghia Jun 17, 2022
47c5eec
Fix a bug when comparing nulls as unequal
ttnghia Jun 18, 2022
4db34db
Add `InputWithNullsUnequal` tests
ttnghia Jun 19, 2022
fab367b
Add `ListsWithNullsUnequal` tests
ttnghia Jun 19, 2022
9ec27af
Rewrite doxygen
ttnghia Jun 19, 2022
1359ee0
Rewrite doxygen for `duplicate_keep_option` and add back performance …
ttnghia Jun 19, 2022
aa0a4ed
Remove redundant docsc
ttnghia Jun 19, 2022
01e03b6
Rename functor
ttnghia Jun 20, 2022
cdc3000
Modify comments
ttnghia Jun 20, 2022
45dec2a
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 20, 2022
cba4759
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 20, 2022
7247101
Attempt to split files, not yet cleanup
ttnghia Jun 20, 2022
120377b
Cleanup
ttnghia Jun 20, 2022
68133d4
Change functor name
ttnghia Jun 20, 2022
aefdadf
Add doxygen
ttnghia Jun 20, 2022
faf6778
Reorganize code
ttnghia Jun 20, 2022
e839323
Fix headers
ttnghia Jun 20, 2022
f5646b3
Fix header
ttnghia Jun 20, 2022
538ff08
Fix `mr` usage, and rewrite some comments
ttnghia Jun 21, 2022
a755bea
Pass `std::shared_ptr` by value
ttnghia Jun 21, 2022
f0ee266
Fix doxygen and change function name
ttnghia Jun 22, 2022
0b35671
Update doxygen
ttnghia Jun 22, 2022
1ac6501
Merge branch 'branch-22.08' into refactor_stream_compaction
ttnghia Jun 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cpp/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ ConfigureBench(APPLY_BOOLEAN_MASK_BENCH stream_compaction/apply_boolean_mask.cpp

# ##################################################################################################
# * stream_compaction benchmark -------------------------------------------------------------------
ConfigureNVBench(
STREAM_COMPACTION_NVBENCH stream_compaction/distinct.cpp stream_compaction/unique.cpp
ConfigureBench(
STREAM_COMPACTION_BENCH stream_compaction/distinct.cpp stream_compaction/unique.cpp
)

# ##################################################################################################
Expand Down
129 changes: 49 additions & 80 deletions cpp/benchmarks/stream_compaction/distinct.cpp
Original file line number Diff line number Diff line change
@@ -1,98 +1,67 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if 0

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/rmm_pool_raii.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf/column/column_view.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/lists/list_view.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/stream_compaction.hpp>
#include <cudf/types.hpp>

#include <nvbench/nvbench.cuh>
namespace {
static constexpr cudf::size_type num_struct_members = 8;
static constexpr cudf::size_type max_str_length = 32;

NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_ms, "cudf::timestamp_ms", "cudf::timestamp_ms");

template <typename Type>
void nvbench_distinct(nvbench::state& state, nvbench::type_list<Type>)
static auto create_random_structs_column(cudf::size_type n_rows)
{
cudf::rmm_pool_raii pool_raii;
data_profile table_profile;
table_profile.set_distribution_params(cudf::type_id::INT32, distribution_id::UNIFORM, 0, n_rows);
table_profile.set_distribution_params(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length);

// The first two struct members are int32 and string.
// The first column is also used as keys in groupby.
// The subsequent struct members are int32 and string again.
auto table = create_random_table(
cycle_dtypes({cudf::type_id::INT32, cudf::type_id::STRING}, num_struct_members),
row_count{n_rows},
table_profile);
return cudf::make_structs_column(n_rows, table->release(), 0, {});
}

cudf::size_type const num_rows = state.get_int64("NumRows");
void BM_fn(benchmark::State& state)
{
auto const size{static_cast<cudf::size_type>(state.range(0))};

data_profile profile;
profile.set_null_frequency(0.01);
profile.set_cardinality(0);
profile.set_distribution_params<Type>(cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, 100);
constexpr cudf::size_type repeat_times = 4; // <25% unique rows
// constexpr cudf::size_type repeat_times = 2; // <50% unique rows
// constexpr int repeat_times = 1; // <100% unique rows

auto source_table =
create_random_table(cycle_dtypes({cudf::type_to_id<Type>()}, 1), row_count{num_rows}, profile);
auto input = create_random_structs_column(size / repeat_times);
auto const input0 = std::make_unique<cudf::column>(*input);

auto input_column = cudf::column_view(source_table->get_column(0));
auto input_table = cudf::table_view({input_column, input_column, input_column, input_column});
for (int i = 0; i < repeat_times - 1; ++i) {
input = cudf::concatenate(std::vector<cudf::column_view>{input0->view(), input->view()});
}

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::distinct(input_table, {0}, cudf::null_equality::EQUAL, stream_view);
});
for ([[maybe_unused]] auto _ : state) {
[[maybe_unused]] auto const timer = cuda_event_timer(state, true);
auto const result = cudf::distinct(cudf::table_view{{*input}}, std::vector<int>{0});
}
}

using data_type = nvbench::type_list<bool, int8_t, int32_t, int64_t, float, cudf::timestamp_ms>;
} // namespace

NVBENCH_BENCH_TYPES(nvbench_distinct, NVBENCH_TYPE_AXES(data_type))
.set_name("distinct")
.set_type_axes_names({"Type"})
.add_int64_axis("NumRows", {10'000, 100'000, 1'000'000, 10'000'000});
class Distinct : public cudf::benchmark {
};

template <typename Type>
void nvbench_distinct_list(nvbench::state& state, nvbench::type_list<Type>)
{
cudf::rmm_pool_raii pool_raii;
BENCHMARK_DEFINE_F(Distinct, BM)(::benchmark::State& state) { BM_fn(state); }

auto const size = state.get_int64("ColumnSize");
auto const dtype = cudf::type_to_id<Type>();
double const null_frequency = state.get_float64("null_frequency");

data_profile table_data_profile;
if (dtype == cudf::type_id::LIST) {
table_data_profile.set_distribution_params(dtype, distribution_id::UNIFORM, 0, 4);
table_data_profile.set_distribution_params(
cudf::type_id::INT32, distribution_id::UNIFORM, 0, 4);
table_data_profile.set_list_depth(1);
} else {
// We're comparing distinct() on a non-nested column to that on a list column with the same
// number of distinct rows. The max list size is 4 and the number of distinct values in the
// list's child is 5. So the number of distinct rows in the list = 1 + 5 + 5^2 + 5^3 + 5^4 = 781
// We want this column to also have 781 distinct values.
table_data_profile.set_distribution_params(dtype, distribution_id::UNIFORM, 0, 781);
}
table_data_profile.set_null_frequency(null_frequency);

auto const table = create_random_table(
{dtype}, table_size_bytes{static_cast<size_t>(size)}, table_data_profile, 0);

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result = cudf::detail::distinct(*table, {0}, cudf::null_equality::EQUAL, stream_view);
});
}
BENCHMARK_REGISTER_F(Distinct, BM)
->RangeMultiplier(8)
->Ranges({{1 << 10, 1 << 26}})
->UseManualTime()
->Unit(benchmark::kMillisecond);

NVBENCH_BENCH_TYPES(nvbench_distinct_list,
NVBENCH_TYPE_AXES(nvbench::type_list<int32_t, cudf::list_view>))
.set_name("distinct_list")
.set_type_axes_names({"Type"})
.add_float64_axis("null_frequency", {0.0, 0.1})
.add_int64_axis("ColumnSize", {100'000'000});
#endif
118 changes: 52 additions & 66 deletions cpp/benchmarks/stream_compaction/unique.cpp
Original file line number Diff line number Diff line change
@@ -1,83 +1,69 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#if 1

#include <benchmarks/common/generate_input.hpp>
#include <benchmarks/fixture/rmm_pool_raii.hpp>
#include <benchmarks/fixture/benchmark_fixture.hpp>
#include <benchmarks/synchronization/synchronization.hpp>

#include <cudf/column/column_view.hpp>
#include <cudf/detail/stream_compaction.hpp>
#include <cudf/column/column_factories.hpp>
#include <cudf/concatenate.hpp>
#include <cudf/sorting.hpp>
#include <cudf/stream_compaction.hpp>
#include <cudf/types.hpp>

#include <nvbench/nvbench.cuh>
static constexpr cudf::size_type num_struct_members = 8;
static constexpr cudf::size_type max_str_length = 32;

// necessary for custom enum types
// see: https://github.com/NVIDIA/nvbench/blob/main/examples/enums.cu
NVBENCH_DECLARE_ENUM_TYPE_STRINGS(
// Enum type:
cudf::duplicate_keep_option,
// Callable to generate input strings:
[](cudf::duplicate_keep_option option) {
switch (option) {
case cudf::duplicate_keep_option::KEEP_FIRST: return "KEEP_FIRST";
case cudf::duplicate_keep_option::KEEP_LAST: return "KEEP_LAST";
case cudf::duplicate_keep_option::KEEP_NONE: return "KEEP_NONE";
default: return "ERROR";
}
},
// Callable to generate descriptions:
[](auto) { return std::string{}; })
namespace {
static auto create_random_structs_column(cudf::size_type n_rows)
{
data_profile table_profile;
table_profile.set_distribution_params(cudf::type_id::INT32, distribution_id::UNIFORM, 0, n_rows);
table_profile.set_distribution_params(
cudf::type_id::STRING, distribution_id::NORMAL, 0, max_str_length);

NVBENCH_DECLARE_TYPE_STRINGS(cudf::timestamp_ms, "cudf::timestamp_ms", "cudf::timestamp_ms");
// The first two struct members are int32 and string.
// The first column is also used as keys in groupby.
// The subsequent struct members are int32 and string again.
auto table = create_random_table(
cycle_dtypes({cudf::type_id::INT32, cudf::type_id::STRING}, num_struct_members),
row_count{n_rows},
table_profile);
return cudf::make_structs_column(n_rows, table->release(), 0, {});
}

template <typename Type, cudf::duplicate_keep_option Keep>
void nvbench_unique(nvbench::state& state, nvbench::type_list<Type, nvbench::enum_type<Keep>>)
void BM_fn(benchmark::State& state)
{
if constexpr (not std::is_same_v<Type, int32_t> and
Keep != cudf::duplicate_keep_option::KEEP_FIRST) {
state.skip("Skip unwanted benchmarks.");
}
auto const size{static_cast<cudf::size_type>(state.range(0))};

cudf::rmm_pool_raii pool_raii;
constexpr cudf::size_type repeat_times = 4; // <25% unique rows
// constexpr cudf::size_type repeat_times = 2; // <50% unique rows
// constexpr int repeat_times = 1; // <100% unique rows

cudf::size_type const num_rows = state.get_int64("NumRows");
auto input = create_random_structs_column(size / repeat_times);
auto const input0 = std::make_unique<cudf::column>(*input);

data_profile profile;
profile.set_null_frequency(0.01);
profile.set_cardinality(0);
profile.set_distribution_params<Type>(cudf::type_to_id<Type>(), distribution_id::UNIFORM, 0, 100);
for (int i = 0; i < repeat_times - 1; ++i) {
input = cudf::concatenate(std::vector<cudf::column_view>{input0->view(), input->view()});
}

auto source_table =
create_random_table(cycle_dtypes({cudf::type_to_id<Type>()}, 1), row_count{num_rows}, profile);
for ([[maybe_unused]] auto _ : state) {
[[maybe_unused]] auto const timer = cuda_event_timer(state, true);
auto const sorted_input = cudf::sort(cudf::table_view{{*input}});
auto const result = cudf::unique(
sorted_input->view(), std::vector<int>{0}, cudf::duplicate_keep_option::KEEP_FIRST);
}
}
} // namespace

auto input_column = cudf::column_view(source_table->get_column(0));
auto input_table = cudf::table_view({input_column, input_column, input_column, input_column});
class Unique : public cudf::benchmark {
};

state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) {
rmm::cuda_stream_view stream_view{launch.get_stream()};
auto result =
cudf::detail::unique(input_table, {0}, Keep, cudf::null_equality::EQUAL, stream_view);
});
}
BENCHMARK_DEFINE_F(Unique, BM)(::benchmark::State& state) { BM_fn(state); }

using data_type = nvbench::type_list<bool, int8_t, int32_t, int64_t, float, cudf::timestamp_ms>;
using keep_option = nvbench::enum_type_list<cudf::duplicate_keep_option::KEEP_FIRST,
cudf::duplicate_keep_option::KEEP_LAST,
cudf::duplicate_keep_option::KEEP_NONE>;
BENCHMARK_REGISTER_F(Unique, BM)
->RangeMultiplier(8)
->Ranges({{1 << 10, 1 << 26}})
->UseManualTime()
->Unit(benchmark::kMillisecond);

NVBENCH_BENCH_TYPES(nvbench_unique, NVBENCH_TYPE_AXES(data_type, keep_option))
.set_name("unique")
.set_type_axes_names({"Type", "KeepOption"})
.add_int64_axis("NumRows", {10'000, 100'000, 1'000'000, 10'000'000});
#endif
44 changes: 43 additions & 1 deletion cpp/src/stream_compaction/distinct.cu
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cudf/column/column_factories.hpp>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/copy.hpp>
#include <cudf/detail/gather.cuh>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/nvtx/ranges.hpp>
Expand Down Expand Up @@ -80,21 +81,62 @@ std::unique_ptr<table> distinct(table_view const& input,
// insert distinct indices into the map.
key_map.insert(iter, iter + num_rows, hash_key, key_equal, stream.value());

#if 0
auto const output_size{key_map.get_size()};
auto distinct_indices = cudf::make_numeric_column(
data_type{type_id::INT32}, output_size, mask_state::UNALLOCATED, stream, mr);
// write distinct indices to a numeric column
key_map.retrieve_all(distinct_indices->mutable_view().begin<cudf::size_type>(),
thrust::make_discard_iterator(),
stream.value());

// run gather operation to establish new order
return detail::gather(input,
distinct_indices->view(),
out_of_bounds_policy::DONT_CHECK,
detail::negative_index_policy::NOT_ALLOWED,
stream,
mr);
#else

auto const key_size{keys_view.num_rows()};
auto distinct_indices = rmm::device_uvector<size_type>(key_size, stream);

// Fill `key_size` if keep_first
// Fill `INT_MIN` if keep_last
thrust::uninitialized_fill(rmm::exec_policy(stream),
distinct_indices.begin(),
distinct_indices.begin() + key_size,
key_size);

auto const d_map = key_map.get_device_view();
thrust::for_each(
rmm::exec_policy(stream),
thrust::make_counting_iterator<size_type>(0),
thrust::make_counting_iterator<size_type>(key_size),
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
[distinct_indices = distinct_indices.begin(), d_map, hash_key, key_equal] __device__(auto key) {
// iter should always be valid, because all keys have been inserted.
// Here idx is the index of the unique elements that has been inserted into the map.
// As such, `find` calling for all duplicate keys will return the same idx value.
auto const idx =
d_map.find(key, hash_key, key_equal)->second.load(cuda::std::memory_order_relaxed);

// Store the minimum index of all keys that are equal.
atomicMin(&distinct_indices[idx], key);
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
ttnghia marked this conversation as resolved.
Show resolved Hide resolved
});

// Filter out the invalid indices, which are indices of the duplicate keys
// (except the first one that has valid index being written in the previous step).
auto gather_map = rmm::device_uvector<size_type>(key_size, stream);
auto const gather_map_end =
thrust::copy_if(rmm::exec_policy(stream),
distinct_indices.begin(),
distinct_indices.end(),
gather_map.begin(),
[key_size] __device__(auto const idx) { return idx != key_size; });

return cudf::detail::gather(
input, gather_map.begin(), gather_map_end, out_of_bounds_policy::DONT_CHECK, stream, mr);
#endif
}

} // namespace detail
Expand Down
Loading