Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement mixed equality/conditional joins #9917

Merged
Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
c2ba04d
Add APIs for mixed equality/inequality joins.
vyasr Nov 23, 2021
c1eec27
Add new implementations for mixed joins.
vyasr Nov 24, 2021
bcc2c7b
Move mixed joins to separate file to avoid introducing hash join rela…
vyasr Nov 24, 2021
06c26ea
Start implementing map logic in new join, add notes.
vyasr Nov 25, 2021
061c74c
Create new kernels for mixed joins.
vyasr Nov 25, 2021
db11c2b
Pass hash table device view to kernel.
vyasr Nov 29, 2021
342a8d1
Revert change to build_join_hash_table allowing empty tables and filt…
vyasr Nov 29, 2021
1fdce0f
Version of mixed kernel with correct logic, but currently requires to…
vyasr Dec 2, 2021
9b173b0
Fix some bugs and reduce shared memory usage to the point of compilat…
vyasr Dec 7, 2021
a77fbaf
Create new array to hold per-row matches and define new comparator fo…
vyasr Dec 8, 2021
13c30d2
Compiling version of new code that computes per-row matches.
vyasr Dec 9, 2021
9c3c3a7
Change size APIs to return per-row counts and verify results.
vyasr Dec 9, 2021
ae446db
Change mixed join APIs to accept the offsets.
vyasr Dec 9, 2021
df173af
First attempt at using offsets column in join.
vyasr Dec 9, 2021
5b8bf2d
Fix seg fault, now just a buggy impl.
vyasr Dec 9, 2021
636a604
Fix bugs, implementation now works as expected for simple test.
vyasr Dec 11, 2021
cb14a53
Fix a bug and add one more extensive test.
vyasr Dec 13, 2021
890fd6f
Start addressing TODOs.
vyasr Dec 13, 2021
e693960
Address more TODOs.
vyasr Dec 14, 2021
6b39bdd
Start setting up scaffolding for more systematic testing.
vyasr Dec 14, 2021
444a140
Change APIs to accept separate table views for the hash join.
vyasr Dec 14, 2021
5515daa
Add compare_nulls parameter and various additional internal error che…
vyasr Dec 14, 2021
40ea881
Update tests to only use the subset of necessary columns for the cond…
vyasr Dec 14, 2021
9e54706
Move equality tables before conditional tables.
vyasr Dec 14, 2021
2b2c6cc
Plumb compare_nulls all the way through.
vyasr Dec 14, 2021
273be21
Start defining helper functions for generalized mixed join testing.
vyasr Dec 14, 2021
c34de6a
Move mixed join tests to separate TU.
vyasr Dec 14, 2021
473efdb
Define standard testing function for mixed inner joins.
vyasr Dec 15, 2021
bd87a3d
Update to use new cuco API with 4 iterators instead of two.
vyasr Dec 15, 2021
c5f8f9a
Add proper support for non-inner joins.
vyasr Dec 15, 2021
e11e84e
Add left join APIs and tests.
vyasr Dec 15, 2021
07b48c1
Add full join APIs and tests.
vyasr Dec 15, 2021
c74af29
Add tests of and fix bug with asymmetric joins.
vyasr Dec 15, 2021
8a4df8e
Clean up some TODOs.
vyasr Dec 15, 2021
f9739fc
Add tests for and fix null support.
vyasr Dec 15, 2021
80a268d
Rewrite docstrings.
vyasr Dec 16, 2021
b240cc5
Change size APIs to return columns instead of device_uvectors.
vyasr Dec 16, 2021
2bb25a3
Change join APIs to use column_views instead of device_spans.
vyasr Dec 16, 2021
5f49458
Merge remote-tracking branch 'origin/branch-22.02' into feature/mixed…
vyasr Dec 16, 2021
d724647
Update cuco.
vyasr Dec 16, 2021
ec5d601
Fix unnecessary extra line.
vyasr Dec 16, 2021
15a6d52
Apply suggestions from code review
vyasr Dec 16, 2021
1dc63d8
Fix swapping logic in operator in conjunction with proper handling of…
vyasr Dec 18, 2021
78dd48d
Simplify operator logic and add comments.
vyasr Dec 18, 2021
9041b99
Merge remote-tracking branch 'origin/branch-22.02' into feature/mixed…
vyasr Dec 21, 2021
5726c5e
Fix style.
vyasr Dec 21, 2021
ecb77ca
Remove addressed TODOs.
vyasr Dec 22, 2021
a7f9d15
Merge remote-tracking branch 'origin/branch-22.02' into feature/mixed…
vyasr Jan 6, 2022
b7accc3
Address simple PR comments.
vyasr Jan 6, 2022
0daaacd
Switch raw pointers to device spans for matches_per_row.
vyasr Jan 6, 2022
0915bff
Remove unsupported full join size calculation.
vyasr Jan 6, 2022
d7ab5a7
Merge remote-tracking branch 'origin/branch-22.02' into feature/mixed…
vyasr Jan 11, 2022
0442cd3
Construct a row equality comparator on the host and pass it to the ke…
vyasr Jan 11, 2022
ad41f9d
Some minor cleanup.
vyasr Jan 11, 2022
4d4f4e6
Change the offsets to an rmm::device_uvector<size_type> instead of a …
vyasr Jan 11, 2022
c0e2297
Fix APIs to accept const spans instead of non-const ones.
vyasr Jan 11, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,7 @@ add_library(
src/jit/parser.cpp
src/jit/type.cpp
src/join/conditional_join.cu
src/join/mixed_join.cu
src/join/cross_join.cu
src/join/hash_join.cu
src/join/join.cu
Expand Down
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_cucollections.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ function(find_and_configure_cucollections)
cuco 0.0
GLOBAL_TARGETS cuco::cuco
CPM_ARGS GITHUB_REPOSITORY NVIDIA/cuCollections
GIT_TAG 6433e8ad7571f14cc5384051b049029c60dd1ce0
GIT_TAG 193de1aa74f5721717f991ca757dc610c852bb17
OPTIONS "BUILD_TESTS OFF" "BUILD_BENCHMARKS OFF" "BUILD_EXAMPLES OFF"
)

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/ast/detail/expression_evaluator.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ struct expression_evaluator {
template <typename ResultSubclass, typename T, bool result_has_nulls>
CUDF_DFI void evaluate(expression_result<ResultSubclass, T, result_has_nulls>& output_object,
cudf::size_type const row_index,
IntermediateDataType<has_nulls>* thread_intermediate_storage)
IntermediateDataType<has_nulls>* thread_intermediate_storage) const
{
evaluate(output_object, row_index, row_index, row_index, thread_intermediate_storage);
}
Expand All @@ -455,7 +455,7 @@ struct expression_evaluator {
cudf::size_type const left_row_index,
cudf::size_type const right_row_index,
cudf::size_type const output_row_index,
IntermediateDataType<has_nulls>* thread_intermediate_storage)
IntermediateDataType<has_nulls>* thread_intermediate_storage) const
{
cudf::size_type operator_source_index{0};
for (cudf::size_type operator_index = 0; operator_index < plan.operators.size();
Expand Down
307 changes: 306 additions & 1 deletion cpp/include/cudf/join.hpp

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions cpp/include/cudf/table/row_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,11 @@ template <template <typename> class hash_function, typename Nullate>
class row_hasher {
public:
row_hasher() = delete;
row_hasher(Nullate has_nulls, table_device_view t) : _table{t}, _has_nulls{has_nulls} {}
row_hasher(Nullate has_nulls, table_device_view t, uint32_t seed)
CUDA_HOST_DEVICE_CALLABLE row_hasher(Nullate has_nulls, table_device_view t)
: _table{t}, _has_nulls{has_nulls}
{
}
CUDA_HOST_DEVICE_CALLABLE row_hasher(Nullate has_nulls, table_device_view t, uint32_t seed)
: _table{t}, _seed(seed), _has_nulls{has_nulls}
{
}
Expand Down
21 changes: 14 additions & 7 deletions cpp/src/join/conditional_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,11 @@ conditional_join(table_view const& left,
// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d config(swap_tables ? right_num_rows : left_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;
join_kind kernel_join_type = join_type == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : join_type;
join_kind const kernel_join_type =
join_type == join_kind::FULL_JOIN ? join_kind::LEFT_JOIN : join_type;

// If the join size was not provided as an input, compute it here.
std::size_t join_size;
Expand Down Expand Up @@ -197,6 +199,13 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
// Until we add logic to handle the number of non-matches in the right table,
// full joins are not supported in this function. Note that this does not
// prevent actually performing full joins since we do that by calculating the
// left join and then concatenating the complementary right indices.
CUDF_EXPECTS(join_type != join_kind::FULL_JOIN,
"Size estimation is not available for full joins.");

// We can immediately filter out cases where one table is empty. In
// some cases, we return all the rows of the other table with a corresponding
// null index for the empty table; in others, we return an empty output.
Expand Down Expand Up @@ -232,8 +241,7 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// If none of the input columns actually contain nulls, we can still use the
// non-nullable version of the expression evaluation code path for
// performance, so we capture that information as well.
auto const nullable = cudf::nullable(left) || cudf::nullable(right);
auto const has_nulls = nullable && (cudf::has_nulls(left) || cudf::has_nulls(right));
auto const has_nulls = binary_predicate.may_evaluate_null(left, right, stream);

auto const parser =
ast::detail::expression_parser{binary_predicate, left, right, has_nulls, stream, mr};
Expand All @@ -246,11 +254,10 @@ std::size_t compute_conditional_join_output_size(table_view const& left,
// For inner joins we support optimizing the join by launching one thread for
// whichever table is larger rather than always using the left table.
auto swap_tables = (join_type == join_kind::INNER_JOIN) && (right_num_rows > left_num_rows);
detail::grid_1d config(swap_tables ? right_num_rows : left_num_rows, DEFAULT_JOIN_BLOCK_SIZE);
detail::grid_1d const config(swap_tables ? right_num_rows : left_num_rows,
DEFAULT_JOIN_BLOCK_SIZE);
auto const shmem_size_per_block = parser.shmem_per_thread * config.num_threads_per_block;

assert(join_type != join_kind::FULL_JOIN);

// Allocate storage for the counter used to get the size of the join output
rmm::device_scalar<std::size_t> size(0, stream, mr);
CHECK_CUDA(stream.value());
Expand Down
59 changes: 0 additions & 59 deletions cpp/src/join/hash_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

#include <cudf/copying.hpp>
#include <cudf/detail/concatenate.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/structs/utilities.hpp>

#include <rmm/cuda_stream_view.hpp>
Expand All @@ -34,26 +33,6 @@
namespace cudf {
namespace detail {

namespace {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

} // anonymous namespace

std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table(
table_view const& probe, table_view const& build)
{
Expand All @@ -62,44 +41,6 @@ std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table
return std::make_pair(std::move(empty_probe), std::move(empty_build));
}

/**
* @brief Builds the hash table based on the given `build_table`.
*
* @param build Table of columns used to build join hash.
* @param hash_table Build hash table.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
void build_join_hash_table(cudf::table_view const& build,
multimap_type& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
{
auto build_table_ptr = cudf::table_device_view::create(build, stream);

CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table_ptr->num_rows(), "Build side table has no rows");

row_hash hash_build{nullate::YES{}, *build_table_ptr};
auto const empty_key_sentinel = hash_table.get_empty_key_sentinel();
make_pair_function pair_func{hash_build, empty_key_sentinel};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func);

size_type const build_table_num_rows{build_table_ptr->num_rows()};
if ((compare_nulls == null_equality::EQUAL) or (not nullable(build))) {
hash_table.insert(iter, iter + build_table_num_rows, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first;
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value());
}
}

/**
* @brief Probes the `hash_table` built from `build_table` for tuples in `probe_table`,
* and returns the output indices of `build_table` and `probe_table` as a combined table.
Expand Down
58 changes: 57 additions & 1 deletion cpp/src/join/hash_join.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <cudf/detail/concatenate.cuh>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/join.hpp>
Expand All @@ -39,6 +40,22 @@
namespace cudf {
namespace detail {

/**
* @brief Device functor to determine if a row is valid.
*/
class row_is_valid {
public:
row_is_valid(bitmask_type const* row_bitmask) : _row_bitmask{row_bitmask} {}

__device__ __inline__ bool operator()(const size_type& i) const noexcept
{
return cudf::bit_is_set(_row_bitmask, i);
}

private:
bitmask_type const* _row_bitmask;
};

/**
* @brief Remaps a hash value to a new value if it is equal to the specified sentinel value.
*
Expand All @@ -57,7 +74,8 @@ constexpr auto remap_sentinel_hash(H hash, S sentinel)
*/
class make_pair_function {
public:
make_pair_function(row_hash const& hash, hash_value_type const empty_key_sentinel)
CUDA_HOST_DEVICE_CALLABLE make_pair_function(row_hash const& hash,
hash_value_type const empty_key_sentinel)
: _hash{hash}, _empty_key_sentinel{empty_key_sentinel}
{
}
Expand Down Expand Up @@ -141,6 +159,44 @@ std::pair<std::unique_ptr<table>, std::unique_ptr<table>> get_empty_joined_table
std::unique_ptr<cudf::table> combine_table_pair(std::unique_ptr<cudf::table>&& left,
std::unique_ptr<cudf::table>&& right);

/**
* @brief Builds the hash table based on the given `build_table`.
*
vyasr marked this conversation as resolved.
Show resolved Hide resolved
* @param build Table of columns used to build join hash.
* @param hash_table Build hash table.
* @param compare_nulls Controls whether null join-key values should match or not.
* @param stream CUDA stream used for device memory operations and kernel launches.
*
*/
template <typename MultimapType>
void build_join_hash_table(cudf::table_view const& build,
MultimapType& hash_table,
null_equality compare_nulls,
rmm::cuda_stream_view stream)
{
auto build_table_ptr = cudf::table_device_view::create(build, stream);

CUDF_EXPECTS(0 != build_table_ptr->num_columns(), "Selected build dataset is empty");
CUDF_EXPECTS(0 != build_table_ptr->num_rows(), "Build side table has no rows");

row_hash hash_build{nullate::YES{}, *build_table_ptr};
auto const empty_key_sentinel = hash_table.get_empty_key_sentinel();
make_pair_function pair_func{hash_build, empty_key_sentinel};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func);

size_type const build_table_num_rows{build_table_ptr->num_rows()};
if ((compare_nulls == null_equality::EQUAL) or (not nullable(build))) {
hash_table.insert(iter, iter + build_table_num_rows, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const row_bitmask = cudf::detail::bitmask_and(build, stream).first;
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(iter, iter + build_table_num_rows, stencil, pred, stream.value());
}
}
} // namespace detail

struct hash_join::hash_join_impl {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@ namespace detail {

/**
* @brief Device functor to determine if two pairs are identical.
*
* This equality comparator is designed for use with cuco::static_multimap's
* pair* APIs, which will compare equality based on comparing (key, value)
* pairs. In the context of joins, these pairs are of the form
* (row_hash, row_id). A hash probe hit indicates that hash of a probe row's hash is
* equal to the hash of the hash of some row in the multimap, at which point we need an
* equality comparator that will check whether the contents of the rows are
* identical. This comparator does so by verifying key equality (i.e. that
* probe_row_hash == build_row_hash) and then using a row_equality_comparator
* to compare the contents of the row indices that are stored as the payload in
* the hash map.
*/
class pair_equality {
public:
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/join/join_common_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,15 @@ using multimap_type =
hash_table_allocator_type,
cuco::double_hashing<DEFAULT_JOIN_CG_SIZE, hash_type, hash_type>>;

// Multimap type used for mixed joins. TODO: This is a temporary alias used
// until the mixed joins are converted to using CGs properly. Right now it's
// using a cooperative group of size 1.
using mixed_multimap_type = cuco::static_multimap<hash_value_type,
size_type,
cuda::thread_scope_device,
default_allocator<char>,
vyasr marked this conversation as resolved.
Show resolved Hide resolved
cuco::double_hashing<1, hash_type, hash_type>>;

using row_hash = cudf::row_hasher<default_hash, cudf::nullate::YES>;

using row_equality = cudf::row_equality_comparator<cudf::nullate::YES>;
Expand Down
Loading