From bbf7ad4a6ce26d48bf0510e6231cf8410244ac8e Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 11:19:48 -0400 Subject: [PATCH 01/17] Add hash_join::inner_join_size API --- cpp/src/join/hash_join.cuh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index e6df2b58b15..00ca7072784 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -253,6 +253,10 @@ struct hash_join::hash_join_impl { rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; + std::size_t inner_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + private: template std::pair>, From b3d2781153db273e3f59382bfc68edcf65f7cef3 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 11:35:51 -0400 Subject: [PATCH 02/17] Add left_join_size & full_join_size APIs in hash_join class --- cpp/src/join/hash_join.cuh | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index 00ca7072784..f639747f192 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -257,6 +257,14 @@ struct hash_join::hash_join_impl { null_equality compare_nulls = null_equality::EQUAL, rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + std::size_t left_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + + std::size_t full_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + private: template std::pair>, From e08be996b41d2e2c64173a5d28587de59e52e591 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 12:12:41 -0400 Subject: [PATCH 03/17] Add detail::compute_join_output_size function --- cpp/src/join/hash_join.cuh | 81 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index f639747f192..fb3d25590eb 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -38,6 +38,87 @@ namespace cudf { namespace detail { +/** + * @brief Calculates the exact size of the join output produced when + * joining two tables together. + * + * @throw cudf::logic_error if JoinKind is not INNER_JOIN or LEFT_JOIN + * @throw cudf::logic_error if the exact size overflows cudf::size_type + * + * @tparam JoinKind The type of join to be performed + * @tparam multimap_type The type of the hash table + * + * @param build_table The right hand table + * @param probe_table The left hand table + * @param hash_table A hash table built on the build table that maps the index + * of every row to the hash value of that row. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return The exact size of the output of the join operation + */ +template +std::size_t compute_join_output_size(table_device_view build_table, + table_device_view probe_table, + multimap_type const& hash_table, + null_equality compare_nulls, + rmm::cuda_stream_view stream) +{ + const size_type build_table_num_rows{build_table.num_rows()}; + const size_type probe_table_num_rows{probe_table.num_rows()}; + + // If the build table is empty, we know exactly how large the output + // will be for the different types of joins and can return immediately + if (0 == build_table_num_rows) { + switch (JoinKind) { + // Inner join with an empty table will have no output + case join_kind::INNER_JOIN: return 0; + + // Left join with an empty table will have an output of NULL rows + // equal to the number of rows in the probe table + case join_kind::LEFT_JOIN: return probe_table_num_rows; + + default: CUDF_FAIL("Unsupported join type"); + } + } + + // Allocate storage for the counter used to get the size of the join output + std::size_t h_size{0}; + rmm::device_scalar d_size(0, stream); + + CHECK_CUDA(stream.value()); + + constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; + int numBlocks{-1}; + + CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( + &numBlocks, compute_join_output_size, block_size, 0)); + + int dev_id{-1}; + CUDA_TRY(cudaGetDevice(&dev_id)); + + int num_sms{-1}; + CUDA_TRY(cudaDeviceGetAttribute(&num_sms, cudaDevAttrMultiProcessorCount, dev_id)); + + row_hash hash_probe{probe_table}; + row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; + // Probe the hash table without actually building the output to simply + // find what the size of the output will be. + compute_join_output_size + <<>>(hash_table, + build_table, + probe_table, + hash_probe, + equality, + probe_table_num_rows, + d_size.data()); + + CHECK_CUDA(stream.value()); + h_size = d_size.value(stream); + + return h_size; +} + /** * @brief Gives an estimate of the size of the join output produced when * joining two tables together. From 5213eae2aebbac9e93eba07c1c05a9e406030872 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 12:39:34 -0400 Subject: [PATCH 04/17] Implement inner/left/full_join_size functions for hash_join class --- cpp/src/join/hash_join.cu | 42 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 2624ea68629..9fa62903abe 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -352,6 +352,48 @@ hash_join::hash_join_impl::full_join(cudf::table_view const &probe, return compute_hash_join(probe, compare_nulls, stream, mr); } +std::size_t hash_join::hash_join_impl::inner_join_size(cudf::table_view const &probe, + null_equality compare_nulls, + rmm::cuda_stream_view stream) const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); + + auto build_table = cudf::table_device_view::create(_build, stream); + auto probe_table = cudf::table_device_view::create(probe, stream); + + return cudf::detail::compute_join_output_size( + *build_table, *probe_table, *_hash_table, compare_nulls, stream); +} + +std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const &probe, + null_equality compare_nulls, + rmm::cuda_stream_view stream) const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); + + auto build_table = cudf::table_device_view::create(_build, stream); + auto probe_table = cudf::table_device_view::create(probe, stream); + + return cudf::detail::compute_join_output_size( + *build_table, *probe_table, *_hash_table, compare_nulls, stream); +} + +std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const &probe, + null_equality compare_nulls, + rmm::cuda_stream_view stream) const +{ + CUDF_FUNC_RANGE(); + CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); + + auto build_table = cudf::table_device_view::create(_build, stream); + auto probe_table = cudf::table_device_view::create(probe, stream); + + return cudf::detail::compute_join_output_size( + *build_table, *probe_table, *_hash_table, compare_nulls, stream); +} + template std::pair>, std::unique_ptr>> From 59c55c8d52f4c35a3283f2625476cb224449396a Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 12:48:57 -0400 Subject: [PATCH 05/17] Add default parameters to the existing join APIs --- cpp/src/join/hash_join.cuh | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index fb3d25590eb..6a491643371 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -316,23 +316,23 @@ struct hash_join::hash_join_impl { std::pair>, std::unique_ptr>> inner_join(cudf::table_view const& probe, - null_equality compare_nulls, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) const; + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; std::pair>, std::unique_ptr>> left_join(cudf::table_view const& probe, - null_equality compare_nulls, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) const; + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; std::pair>, std::unique_ptr>> full_join(cudf::table_view const& probe, - null_equality compare_nulls, - rmm::cuda_stream_view stream, - rmm::mr::device_memory_resource* mr) const; + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; std::size_t inner_join_size(cudf::table_view const& probe, null_equality compare_nulls = null_equality::EQUAL, From 1a41ea699c5f565d76265f60b96365102ea8418f Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 13:53:21 -0400 Subject: [PATCH 06/17] Add optional output_size argument to external join APIs --- cpp/include/cudf/join.hpp | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 428a4195bf8..984f5b7f06e 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -526,9 +526,10 @@ class hash_join { * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. + * @param output_size Optional value which allows users to specify the exact output size. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device * memory. - * @param stream CUDA stream used for device memory operations and kernel launches * * @return A pair of columns [`left_indices`, `right_indices`] that can be used to construct * the result of performing an inner join between two tables with `build` and `probe` @@ -537,9 +538,10 @@ class hash_join { std::pair>, std::unique_ptr>> inner_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls = null_equality::EQUAL, + std::optional output_size = {}, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; /** * Returns the row indices that can be used to construct the result of performing @@ -547,9 +549,10 @@ class hash_join { * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. + * @param output_size Optional value which allows users to specify the exact output size. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device * memory. - * @param stream CUDA stream used for device memory operations and kernel launches * * @return A pair of columns [`left_indices`, `right_indices`] that can be used to construct * the result of performing a left join between two tables with `build` and `probe` @@ -558,9 +561,10 @@ class hash_join { std::pair>, std::unique_ptr>> left_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls = null_equality::EQUAL, + std::optional output_size = {}, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; /** * Returns the row indices that can be used to construct the result of performing @@ -568,9 +572,10 @@ class hash_join { * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. + * @param output_size Optional value which allows users to specify the exact output size. + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate the returned table and columns' device * memory. - * @param stream CUDA stream used for device memory operations and kernel launches * * @return A pair of columns [`left_indices`, `right_indices`] that can be used to construct * the result of performing a full join between two tables with `build` and `probe` @@ -579,9 +584,10 @@ class hash_join { std::pair>, std::unique_ptr>> full_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls = null_equality::EQUAL, + std::optional output_size = {}, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; private: struct hash_join_impl; From e0df4f6390feb8925ec2e4c9fd5d008e78e8e586 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 14:20:40 -0400 Subject: [PATCH 07/17] Add *_join_size APIs in the join header file + doc updates --- cpp/include/cudf/join.hpp | 52 ++++++++++++++++++++++++++++++++++++--- cpp/src/join/join.cu | 31 ++++++++++++++++++++--- 2 files changed, 77 insertions(+), 6 deletions(-) diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 984f5b7f06e..30b5e9c9a89 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -522,7 +522,8 @@ class hash_join { /** * Returns the row indices that can be used to construct the result of performing - * an inner join between two tables. @see cudf::inner_join(). + * an inner join between two tables. @see cudf::inner_join(). Behavior is undefined if the + * provided `output_size` is smaller than the actual output size. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. @@ -545,7 +546,8 @@ class hash_join { /** * Returns the row indices that can be used to construct the result of performing - * a left join between two tables. @see cudf::left_join(). + * a left join between two tables. @see cudf::left_join(). Behavior is undefined if the + * provided `output_size` is smaller than the actual output size. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. @@ -568,7 +570,8 @@ class hash_join { /** * Returns the row indices that can be used to construct the result of performing - * a full join between two tables. @see cudf::full_join(). + * a full join between two tables. @see cudf::full_join(). Behavior is undefined if the + * provided `output_size` is smaller than the actual output size. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. @@ -589,6 +592,49 @@ class hash_join { rmm::cuda_stream_view stream = rmm::cuda_stream_default, rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + /** + * Returns the exact number of output when performing an inner join with the specified probe + * table. + * + * @param probe The probe table, from which the tuples are probed. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return The exact number of output when performing an inner join between two tables with + * `build` and `probe` as the the join keys . + */ + std::size_t inner_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + + /** + * Returns the exact number of output when performing a left join with the specified probe table. + * + * @param probe The probe table, from which the tuples are probed. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return The exact number of output when performing a left join between two tables with `build` + * and `probe` as the the join keys . + */ + std::size_t left_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + + /** + * Returns the exact number of output when performing a full join with the specified probe table. + * + * @param probe The probe table, from which the tuples are probed. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches + * + * @return The exact number of output when performing a full join between two tables with `build` + * and `probe` as the the join keys . + */ + std::size_t full_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + private: struct hash_join_impl; const std::unique_ptr impl; diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index f2e4bab02c6..93df523b4be 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -234,30 +234,55 @@ std::pair>, std::unique_ptr>> hash_join::inner_join(cudf::table_view const& probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const { - return impl->inner_join(probe, compare_nulls, stream, mr); + return impl->inner_join(probe, compare_nulls, output_size, stream, mr); } std::pair>, std::unique_ptr>> hash_join::left_join(cudf::table_view const& probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const { - return impl->left_join(probe, compare_nulls, stream, mr); + return impl->left_join(probe, compare_nulls, output_size, stream, mr); } std::pair>, std::unique_ptr>> hash_join::full_join(cudf::table_view const& probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const { - return impl->full_join(probe, compare_nulls, stream, mr); + return impl->full_join(probe, compare_nulls, output_size, stream, mr); +} + +std::size_t hash_join::inner_join_size( + cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const +{ + return impl->inner_join_size(probe, compare_nulls, stream); +} + +std::size_t hash_join::left_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const +{ + return impl->left_join_size(probe, compare_nulls, stream); +} + +std::size_t hash_join::full_join_size(cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default) const +{ + return impl->full_join_size(probe, compare_nulls, stream); } // external APIs From 27be6d619047af7c2dcf5c40ab3b71659f6e1755 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 15:33:00 -0400 Subject: [PATCH 08/17] Updates: - Add optional output_size to join APIs and refactor related functions - Remove default arguments for the hash_join_impl APIs - Update docs --- cpp/include/cudf/join.hpp | 1 + cpp/src/join/hash_join.cu | 101 +++++++++++++++++++------------------ cpp/src/join/hash_join.cuh | 39 ++++++++------ cpp/src/join/join.cu | 29 ++++++----- 4 files changed, 92 insertions(+), 78 deletions(-) diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 30b5e9c9a89..94ab0ed239b 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -22,6 +22,7 @@ #include #include +#include #include namespace cudf { diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 9fa62903abe..86204b82c17 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -84,6 +84,7 @@ struct valid_range { * @param left_table_row_count Number of rows of left table * @param right_table_row_count Number of rows of right table * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned vectors. * * @return Pair of vectors containing the left join indices complement */ @@ -208,6 +209,7 @@ std::unique_ptr> build_join_ /** * @brief Probes the `hash_table` built from `build_table` for tuples in `probe_table`, * and returns the output indices of `build_table` and `probe_table` as a combined table. + * Behavior is undefined if the provided `output_size` is smaller than the actual output size. * * @tparam JoinKind The type of join to be performed. * @@ -215,7 +217,9 @@ std::unique_ptr> build_join_ * @param probe_table Table of probe side columns to join. * @param hash_table Hash table built from `build_table`. * @param compare_nulls Controls whether null join-key values should match or not. + * @param output_size Optional value which allows users to specify the exact output size. * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the returned vectors. * * @return Join output indices vector pair. */ @@ -226,60 +230,48 @@ probe_join_hash_table(cudf::table_device_view build_table, cudf::table_device_view probe_table, multimap_type const &hash_table, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - std::size_t estimated_size = estimate_join_output_size( - build_table, probe_table, hash_table, compare_nulls, stream); + std::size_t join_size; + // Use output size directly if provided + if (output_size.has_value()) { + join_size = output_size.value(); + } + // Otherwise, compute the exact output size + else { + join_size = compute_join_output_size( + build_table, probe_table, hash_table, compare_nulls, stream); + } - // If the estimated output size is zero, return immediately - if (estimated_size == 0) { + // If output size is zero, return immediately + if (join_size == 0) { return std::make_pair(std::make_unique>(0, stream, mr), std::make_unique>(0, stream, mr)); } - // Because we are approximating the number of joined elements, our approximation - // might be incorrect and we might have underestimated the number of joined elements. - // As such we will need to de-allocate memory and re-allocate memory to ensure - // that the final output is correct. rmm::device_scalar write_index(0, stream); - std::size_t join_size{0}; - - auto left_indices = std::make_unique>(0, stream, mr); - auto right_indices = std::make_unique>(0, stream, mr); - - auto current_estimated_size = estimated_size; - do { - left_indices->resize(estimated_size, stream); - right_indices->resize(estimated_size, stream); - - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - detail::grid_1d config(probe_table.num_rows(), block_size); - write_index.set_value_zero(stream); - - row_hash hash_probe{probe_table}; - row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - probe_hash_table - <<>>( - hash_table, - build_table, - probe_table, - hash_probe, - equality, - left_indices->data(), - right_indices->data(), - write_index.data(), - estimated_size); - - CHECK_CUDA(stream.value()); - - join_size = write_index.value(stream); - current_estimated_size = estimated_size; - estimated_size *= 2; - } while ((current_estimated_size < join_size)); - - left_indices->resize(join_size, stream); - right_indices->resize(join_size, stream); + + auto left_indices = std::make_unique>(join_size, stream, mr); + auto right_indices = std::make_unique>(join_size, stream, mr); + + constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; + detail::grid_1d config(probe_table.num_rows(), block_size); + + row_hash hash_probe{probe_table}; + row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; + probe_hash_table + <<>>(hash_table, + build_table, + probe_table, + hash_probe, + equality, + left_indices->data(), + right_indices->data(), + write_index.data(), + join_size); + return std::make_pair(std::move(left_indices), std::move(right_indices)); } @@ -323,33 +315,39 @@ std::pair>, std::unique_ptr>> hash_join::hash_join_impl::inner_join(cudf::table_view const &probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, compare_nulls, stream, mr); + return compute_hash_join( + probe, compare_nulls, output_size, stream, mr); } std::pair>, std::unique_ptr>> hash_join::hash_join_impl::left_join(cudf::table_view const &probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, compare_nulls, stream, mr); + return compute_hash_join( + probe, compare_nulls, output_size, stream, mr); } std::pair>, std::unique_ptr>> hash_join::hash_join_impl::full_join(cudf::table_view const &probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) const { CUDF_FUNC_RANGE(); - return compute_hash_join(probe, compare_nulls, stream, mr); + return compute_hash_join( + probe, compare_nulls, output_size, stream, mr); } std::size_t hash_join::hash_join_impl::inner_join_size(cudf::table_view const &probe, @@ -399,6 +397,7 @@ std::pair>, std::unique_ptr>> hash_join::hash_join_impl::compute_hash_join(cudf::table_view const &probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) const { @@ -425,7 +424,8 @@ hash_join::hash_join_impl::compute_hash_join(cudf::table_view const &probe, [](const auto &b, const auto &p) { return b.type() == p.type(); }), "Mismatch in joining column data types"); - return probe_join_indices(flattened_probe_table, compare_nulls, stream, mr); + return probe_join_indices( + flattened_probe_table, compare_nulls, output_size, stream, mr); } template @@ -433,6 +433,7 @@ std::pair>, std::unique_ptr>> hash_join::hash_join_impl::probe_join_indices(cudf::table_view const &probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) const { @@ -450,7 +451,7 @@ hash_join::hash_join_impl::probe_join_indices(cudf::table_view const &probe, ? cudf::detail::join_kind::LEFT_JOIN : JoinKind; auto join_indices = cudf::detail::probe_join_hash_table( - *build_table, *probe_table, *_hash_table, compare_nulls, stream, mr); + *build_table, *probe_table, *_hash_table, compare_nulls, output_size, stream, mr); if (JoinKind == cudf::detail::join_kind::FULL_JOIN) { auto complement_indices = detail::get_left_join_indices_complement( diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index 6a491643371..99b9bc2419c 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -316,35 +316,38 @@ struct hash_join::hash_join_impl { std::pair>, std::unique_ptr>> inner_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls, + std::optional output_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const; std::pair>, std::unique_ptr>> left_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls, + std::optional output_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const; std::pair>, std::unique_ptr>> full_join(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default, - rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; + null_equality compare_nulls, + std::optional output_size, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const; std::size_t inner_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + null_equality compare_nulls, + rmm::cuda_stream_view stream) const; std::size_t left_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + null_equality compare_nulls, + rmm::cuda_stream_view stream) const; std::size_t full_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + null_equality compare_nulls, + rmm::cuda_stream_view stream) const; private: template @@ -352,13 +355,15 @@ struct hash_join::hash_join_impl { std::unique_ptr>> compute_hash_join(cudf::table_view const& probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; /** * @brief Probes the `_hash_table` built from `_build` for tuples in `probe_table`, * and returns the output indices of `build_table` and `probe_table` as a combined table, - * i.e. if full join is specified as the join type then left join is called. + * i.e. if full join is specified as the join type then left join is called. Behavior + * is undefined if the provided `output_size` is smaller than the actual output size. * * @throw cudf::logic_error if hash table is null. * @@ -366,6 +371,7 @@ struct hash_join::hash_join_impl { * * @param probe_table Table of probe side columns to join. * @param compare_nulls Controls whether null join-key values should match or not. + * @param output_size Optional value which allows users to specify the exact output size. * @param stream CUDA stream used for device memory operations and kernel launches. * @param mr Device memory resource used to allocate the returned vectors. * @@ -376,6 +382,7 @@ struct hash_join::hash_join_impl { std::unique_ptr>> probe_join_indices(cudf::table_view const& probe, null_equality compare_nulls, + std::optional output_size, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) const; }; diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index 93df523b4be..d5d083496e7 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -45,16 +45,18 @@ inner_join(table_view const& left_input, auto const left = matched.second.front(); auto const right = matched.second.back(); + std::optional output_size; + // For `inner_join`, we can freely choose either the `left` or `right` table to use for // building/probing the hash map. Because building is typically more expensive than probing, we // build the hash map from the smaller table. if (right.num_rows() > left.num_rows()) { cudf::hash_join hj_obj(left, compare_nulls, stream); - auto result = hj_obj.inner_join(right, compare_nulls, stream, mr); + auto result = hj_obj.inner_join(right, compare_nulls, output_size, stream, mr); return std::make_pair(std::move(result.second), std::move(result.first)); } else { cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.inner_join(left, compare_nulls, stream, mr); + return hj_obj.inner_join(left, compare_nulls, output_size, stream, mr); } } @@ -111,8 +113,10 @@ left_join(table_view const& left_input, table_view const left = matched.second.front(); table_view const right = matched.second.back(); + std::optional output_size; + cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.left_join(left, compare_nulls, stream, mr); + return hj_obj.left_join(left, compare_nulls, output_size, stream, mr); } std::unique_ptr left_join(table_view const& left_input, @@ -174,8 +178,10 @@ full_join(table_view const& left_input, table_view const left = matched.second.front(); table_view const right = matched.second.back(); + std::optional output_size; + cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.full_join(left, compare_nulls, stream, mr); + return hj_obj.full_join(left, compare_nulls, output_size, stream, mr); } std::unique_ptr
full_join(table_view const& left_input, @@ -263,24 +269,23 @@ hash_join::full_join(cudf::table_view const& probe, return impl->full_join(probe, compare_nulls, output_size, stream, mr); } -std::size_t hash_join::inner_join_size( - cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const +std::size_t hash_join::inner_join_size(cudf::table_view const& probe, + null_equality compare_nulls, + rmm::cuda_stream_view stream) const { return impl->inner_join_size(probe, compare_nulls, stream); } std::size_t hash_join::left_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + null_equality compare_nulls, + rmm::cuda_stream_view stream) const { return impl->left_join_size(probe, compare_nulls, stream); } std::size_t hash_join::full_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const + null_equality compare_nulls, + rmm::cuda_stream_view stream) const { return impl->full_join_size(probe, compare_nulls, stream); } From 85b109e7c07da894ef49c8698ea290f4fd988274 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 15:48:11 -0400 Subject: [PATCH 09/17] Remove the deprecated estimate_join_output_size function --- cpp/src/join/hash_join.cuh | 129 ------------------------------------- 1 file changed, 129 deletions(-) diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index 99b9bc2419c..cdfb8f79ba3 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -119,135 +119,6 @@ std::size_t compute_join_output_size(table_device_view build_table, return h_size; } -/** - * @brief Gives an estimate of the size of the join output produced when - * joining two tables together. - * - * If the two tables are of relatively equal size, then the returned output - * size will be the exact output size. However, if the probe table is - * significantly larger than the build table, then we attempt to estimate the - * output size by using only a subset of the rows in the probe table. - * - * @throw cudf::logic_error if JoinKind is not INNER_JOIN or LEFT_JOIN - * @throw cudf::logic_error if the estimated size overflows cudf::size_type - * - * @tparam JoinKind The type of join to be performed - * @tparam multimap_type The type of the hash table - * - * @param build_table The right hand table - * @param probe_table The left hand table - * @param hash_table A hash table built on the build table that maps the index - * of every row to the hash value of that row. - * @param compare_nulls Controls whether null join-key values should match or not. - * @param stream CUDA stream used for device memory operations and kernel launches - * - * @return An estimate of the size of the output of the join operation - */ -template -std::size_t estimate_join_output_size(table_device_view build_table, - table_device_view probe_table, - multimap_type const& hash_table, - null_equality compare_nulls, - rmm::cuda_stream_view stream) -{ - const size_type build_table_num_rows{build_table.num_rows()}; - const size_type probe_table_num_rows{probe_table.num_rows()}; - - // If the probe table is significantly larger (5x) than the build table, - // then we attempt to only use a subset of the probe table rows to compute an - // estimate of the join output size. - size_type probe_to_build_ratio{0}; - if (build_table_num_rows > 0) { - probe_to_build_ratio = static_cast( - std::ceil(static_cast(probe_table_num_rows) / build_table_num_rows)); - } else { - // If the build table is empty, we know exactly how large the output - // will be for the different types of joins and can return immediately - switch (JoinKind) { - // Inner join with an empty table will have no output - case join_kind::INNER_JOIN: return 0; - - // Left join with an empty table will have an output of NULL rows - // equal to the number of rows in the probe table - case join_kind::LEFT_JOIN: return probe_table_num_rows; - - default: CUDF_FAIL("Unsupported join type"); - } - } - - size_type sample_probe_num_rows{probe_table_num_rows}; - constexpr size_type MAX_RATIO{5}; - if (probe_to_build_ratio > MAX_RATIO) { sample_probe_num_rows = build_table_num_rows; } - - // Allocate storage for the counter used to get the size of the join output - std::size_t h_size_estimate{0}; - rmm::device_scalar size_estimate(0, stream); - - CHECK_CUDA(stream.value()); - - constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; - int numBlocks{-1}; - - CUDA_TRY(cudaOccupancyMaxActiveBlocksPerMultiprocessor( - &numBlocks, compute_join_output_size, block_size, 0)); - - int dev_id{-1}; - CUDA_TRY(cudaGetDevice(&dev_id)); - - int num_sms{-1}; - CUDA_TRY(cudaDeviceGetAttribute(&num_sms, cudaDevAttrMultiProcessorCount, dev_id)); - - // Continue probing with a subset of the probe table until either: - // a non-zero output size estimate is found OR - // all of the rows in the probe table have been sampled - do { - sample_probe_num_rows = std::min(sample_probe_num_rows, probe_table_num_rows); - - size_estimate.set_value_zero(stream); - - row_hash hash_probe{probe_table}; - row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - // Probe the hash table without actually building the output to simply - // find what the size of the output will be. - compute_join_output_size - <<>>(hash_table, - build_table, - probe_table, - hash_probe, - equality, - sample_probe_num_rows, - size_estimate.data()); - CHECK_CUDA(stream.value()); - - // Only in case subset of probe table is chosen, - // increase the estimated output size by a factor of the ratio between the - // probe and build tables - if (sample_probe_num_rows < probe_table_num_rows) { - h_size_estimate = size_estimate.value(stream) * probe_to_build_ratio; - } else { - h_size_estimate = size_estimate.value(stream); - } - - // If the size estimate is non-zero, then we have a valid estimate and can break - // If sample_probe_num_rows >= probe_table_num_rows, then we've sampled the entire - // probe table, in which case the estimate is exact and we can break - if ((h_size_estimate > 0) || (sample_probe_num_rows >= probe_table_num_rows)) { break; } - - // If the size estimate is zero, then double the number of sampled rows in the probe - // table. Reduce the ratio of the number of probe rows sampled to the number of rows - // in the build table by the same factor - if (0 == h_size_estimate) { - constexpr size_type GROW_RATIO{2}; - sample_probe_num_rows *= GROW_RATIO; - probe_to_build_ratio = - static_cast(std::ceil(static_cast(probe_to_build_ratio) / GROW_RATIO)); - } - - } while (true); - - return h_size_estimate; -} - /** * @brief Computes the trivial left join operation for the case when the * right table is empty. In this case all the valid indices of the left table From f35d43cb02d138fc5d6bcb20441122a324ec5707 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 15:58:00 -0400 Subject: [PATCH 10/17] Use std::nullopt instead of uninitialized std::optional variable --- cpp/src/join/join.cu | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index d5d083496e7..a660941e352 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -45,18 +45,16 @@ inner_join(table_view const& left_input, auto const left = matched.second.front(); auto const right = matched.second.back(); - std::optional output_size; - // For `inner_join`, we can freely choose either the `left` or `right` table to use for // building/probing the hash map. Because building is typically more expensive than probing, we // build the hash map from the smaller table. if (right.num_rows() > left.num_rows()) { cudf::hash_join hj_obj(left, compare_nulls, stream); - auto result = hj_obj.inner_join(right, compare_nulls, output_size, stream, mr); + auto result = hj_obj.inner_join(right, compare_nulls, std::nullopt, stream, mr); return std::make_pair(std::move(result.second), std::move(result.first)); } else { cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.inner_join(left, compare_nulls, output_size, stream, mr); + return hj_obj.inner_join(left, compare_nulls, std::nullopt, stream, mr); } } @@ -113,10 +111,8 @@ left_join(table_view const& left_input, table_view const left = matched.second.front(); table_view const right = matched.second.back(); - std::optional output_size; - cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.left_join(left, compare_nulls, output_size, stream, mr); + return hj_obj.left_join(left, compare_nulls, std::nullopt, stream, mr); } std::unique_ptr
left_join(table_view const& left_input, @@ -178,10 +174,8 @@ full_join(table_view const& left_input, table_view const left = matched.second.front(); table_view const right = matched.second.back(); - std::optional output_size; - cudf::hash_join hj_obj(right, compare_nulls, stream); - return hj_obj.full_join(left, compare_nulls, output_size, stream, mr); + return hj_obj.full_join(left, compare_nulls, std::nullopt, stream, mr); } std::unique_ptr
full_join(table_view const& left_input, From a56f2e48685c3fc7a391162efc15ebafad3269fd Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 16:57:36 -0400 Subject: [PATCH 11/17] Update join unit tests for new inner/left/full_join_size APIs --- cpp/tests/join/join_tests.cpp | 23 ++++++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/cpp/tests/join/join_tests.cpp b/cpp/tests/join/join_tests.cpp index b0a2149d50f..212458d5118 100644 --- a/cpp/tests/join/join_tests.cpp +++ b/cpp/tests/join/join_tests.cpp @@ -1225,8 +1225,13 @@ TEST_F(JoinTest, HashJoinSequentialProbes) Table t0(std::move(cols0)); - auto result = hash_join.full_join(t0); + auto output_size = hash_join.full_join_size(t0); + std::optional optional_size = output_size; + std::size_t const size_gold = 9; + EXPECT_EQ(output_size, size_gold); + + auto result = hash_join.full_join(t0, cudf::null_equality::EQUAL, optional_size); auto result_table = cudf::table_view({cudf::column_view{cudf::data_type{cudf::type_id::INT32}, static_cast(result.first->size()), @@ -1258,7 +1263,13 @@ TEST_F(JoinTest, HashJoinSequentialProbes) Table t0(std::move(cols0)); - auto result = hash_join.left_join(t0); + auto output_size = hash_join.left_join_size(t0); + std::optional optional_size = output_size; + + std::size_t const size_gold = 5; + EXPECT_EQ(output_size, size_gold); + + auto result = hash_join.left_join(t0, cudf::null_equality::EQUAL, optional_size); auto result_table = cudf::table_view({cudf::column_view{cudf::data_type{cudf::type_id::INT32}, static_cast(result.first->size()), @@ -1290,7 +1301,13 @@ TEST_F(JoinTest, HashJoinSequentialProbes) Table t0(std::move(cols0)); - auto result = hash_join.inner_join(t0); + auto output_size = hash_join.inner_join_size(t0); + std::optional optional_size = output_size; + + std::size_t const size_gold = 3; + EXPECT_EQ(output_size, size_gold); + + auto result = hash_join.inner_join(t0, cudf::null_equality::EQUAL, optional_size); auto result_table = cudf::table_view({cudf::column_view{cudf::data_type{cudf::type_id::INT32}, static_cast(result.first->size()), From 52d4f4e59970ed53989de83465471205ce3fbeda Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Fri, 4 Jun 2021 18:15:34 -0400 Subject: [PATCH 12/17] Early exit for trivial left join cases --- cpp/src/join/hash_join.cu | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 86204b82c17..3b5279647af 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -369,6 +369,9 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const &pr rmm::cuda_stream_view stream) const { CUDF_FUNC_RANGE(); + + // Trivial left join case - exit early + if (!_hash_table) { return probe.num_rows(); } CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); auto build_table = cudf::table_device_view::create(_build, stream); @@ -383,6 +386,9 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const &pr rmm::cuda_stream_view stream) const { CUDF_FUNC_RANGE(); + + // Trivial left join case - exit early + if (!_hash_table) { return probe.num_rows(); } CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); auto build_table = cudf::table_device_view::create(_build, stream); From 24f4bfd75d1a36ee50e7ff345f31595e7e58b901 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 7 Jun 2021 16:44:12 -0400 Subject: [PATCH 13/17] Fix bugs in full_join_size: create get_full_join_size function as a temporary solution --- cpp/include/cudf/join.hpp | 10 ++- cpp/src/join/hash_join.cu | 132 ++++++++++++++++++++++++++++++++++--- cpp/src/join/hash_join.cuh | 3 +- cpp/src/join/join.cu | 5 +- 4 files changed, 134 insertions(+), 16 deletions(-) diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 94ab0ed239b..2dbef24144b 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -628,13 +628,17 @@ class hash_join { * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. * @param stream CUDA stream used for device memory operations and kernel launches + * @param mr Device memory resource used to allocate the intermediate table and columns' device + * memory. * * @return The exact number of output when performing a full join between two tables with `build` * and `probe` as the the join keys . */ - std::size_t full_join_size(cudf::table_view const& probe, - null_equality compare_nulls = null_equality::EQUAL, - rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; + std::size_t full_join_size( + cudf::table_view const& probe, + null_equality compare_nulls = null_equality::EQUAL, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; private: struct hash_join_impl; diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 3b5279647af..70b35a6f71b 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -241,7 +241,10 @@ probe_join_hash_table(cudf::table_device_view build_table, } // Otherwise, compute the exact output size else { - join_size = compute_join_output_size( + constexpr cudf::detail::join_kind ProbeJoinKind = + (JoinKind == cudf::detail::join_kind::FULL_JOIN) ? cudf::detail::join_kind::LEFT_JOIN + : JoinKind; + join_size = compute_join_output_size( build_table, probe_table, hash_table, compare_nulls, stream); } @@ -261,7 +264,82 @@ probe_join_hash_table(cudf::table_device_view build_table, row_hash hash_probe{probe_table}; row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; - probe_hash_table + if constexpr (JoinKind == cudf::detail::join_kind::FULL_JOIN) { + probe_hash_table + <<>>( + hash_table, + build_table, + probe_table, + hash_probe, + equality, + left_indices->data(), + right_indices->data(), + write_index.data(), + join_size); + auto const actual_size = write_index.value(); + left_indices->resize(actual_size, stream); + right_indices->resize(actual_size, stream); + } else { + probe_hash_table + <<>>( + hash_table, + build_table, + probe_table, + hash_probe, + equality, + left_indices->data(), + right_indices->data(), + write_index.data(), + join_size); + } + return std::make_pair(std::move(left_indices), std::move(right_indices)); +} + +/** + * @brief Probes the `hash_table` built from `build_table` for tuples in `probe_table` twice, + * and returns the output size of a full join operation between `build_table` and `probe_table`. + * TODO: this is a temporary solution as part of `full_join_size`. To be refactored during + * cuco integration. + * + * @param build_table Table of build side columns to join. + * @param probe_table Table of probe side columns to join. + * @param hash_table Hash table built from `build_table`. + * @param compare_nulls Controls whether null join-key values should match or not. + * @param stream CUDA stream used for device memory operations and kernel launches. + * @param mr Device memory resource used to allocate the intermediate vectors. + * + * @return Output size of full join. + */ +std::size_t get_full_join_size(cudf::table_device_view build_table, + cudf::table_device_view probe_table, + multimap_type const &hash_table, + null_equality compare_nulls, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) +{ + std::size_t join_size = compute_join_output_size( + build_table, probe_table, hash_table, compare_nulls, stream); + + // If output size is zero, return immediately + if (join_size == 0) { return join_size; } + + rmm::device_scalar write_index(0, stream); + + auto left_indices = std::make_unique>(join_size, stream, mr); + auto right_indices = std::make_unique>(join_size, stream, mr); + + constexpr int block_size{DEFAULT_JOIN_BLOCK_SIZE}; + detail::grid_1d config(probe_table.num_rows(), block_size); + + row_hash hash_probe{probe_table}; + row_equality equality{probe_table, build_table, compare_nulls == null_equality::EQUAL}; + probe_hash_table <<>>(hash_table, build_table, probe_table, @@ -271,8 +349,45 @@ probe_join_hash_table(cudf::table_device_view build_table, right_indices->data(), write_index.data(), join_size); + // Rlease intermediate memory alloation + left_indices->resize(0, stream); - return std::make_pair(std::move(left_indices), std::move(right_indices)); + auto const left_table_row_count = probe_table.num_rows(); + auto const right_table_row_count = build_table.num_rows(); + + std::size_t left_join_complement_size; + + // If left table is empty then all rows of the right table should be represented in the joined + // indices. + if (left_table_row_count == 0) { + left_join_complement_size = right_table_row_count; + } else { + // Assume all the indices in invalid_index_map are invalid + auto invalid_index_map = + std::make_unique>(right_table_row_count, stream); + thrust::uninitialized_fill( + rmm::exec_policy(stream), invalid_index_map->begin(), invalid_index_map->end(), int32_t{1}); + + // Functor to check for index validity since left joins can create invalid indices + valid_range valid(0, right_table_row_count); + + // invalid_index_map[index_ptr[i]] = 0 for i = 0 to right_table_row_count + // Thus specifying that those locations are valid + thrust::scatter_if(rmm::exec_policy(stream), + thrust::make_constant_iterator(0), + thrust::make_constant_iterator(0) + right_indices->size(), + right_indices->begin(), // Index locations + right_indices->begin(), // Stencil - Check if index location is valid + invalid_index_map->begin(), // Output indices + valid); // Stencil Predicate + + // Create list of indices that have been marked as invalid + left_join_complement_size = thrust::count_if(rmm::exec_policy(stream), + invalid_index_map->begin(), + invalid_index_map->end(), + thrust::identity()); + } + return join_size + left_join_complement_size; } std::unique_ptr combine_table_pair(std::unique_ptr &&left, @@ -383,7 +498,8 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const &pr std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const &probe, null_equality compare_nulls, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource *mr) const { CUDF_FUNC_RANGE(); @@ -394,8 +510,7 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const &pr auto build_table = cudf::table_device_view::create(_build, stream); auto probe_table = cudf::table_device_view::create(probe, stream); - return cudf::detail::compute_join_output_size( - *build_table, *probe_table, *_hash_table, compare_nulls, stream); + return get_full_join_size(*build_table, *probe_table, *_hash_table, compare_nulls, stream, mr); } template @@ -453,10 +568,7 @@ hash_join::hash_join_impl::probe_join_indices(cudf::table_view const &probe, auto build_table = cudf::table_device_view::create(_build, stream); auto probe_table = cudf::table_device_view::create(probe, stream); - constexpr cudf::detail::join_kind ProbeJoinKind = (JoinKind == cudf::detail::join_kind::FULL_JOIN) - ? cudf::detail::join_kind::LEFT_JOIN - : JoinKind; - auto join_indices = cudf::detail::probe_join_hash_table( + auto join_indices = cudf::detail::probe_join_hash_table( *build_table, *probe_table, *_hash_table, compare_nulls, output_size, stream, mr); if (JoinKind == cudf::detail::join_kind::FULL_JOIN) { diff --git a/cpp/src/join/hash_join.cuh b/cpp/src/join/hash_join.cuh index cdfb8f79ba3..f9ccbd68c74 100644 --- a/cpp/src/join/hash_join.cuh +++ b/cpp/src/join/hash_join.cuh @@ -218,7 +218,8 @@ struct hash_join::hash_join_impl { std::size_t full_join_size(cudf::table_view const& probe, null_equality compare_nulls, - rmm::cuda_stream_view stream) const; + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const; private: template diff --git a/cpp/src/join/join.cu b/cpp/src/join/join.cu index a660941e352..6cb04cadcac 100644 --- a/cpp/src/join/join.cu +++ b/cpp/src/join/join.cu @@ -279,9 +279,10 @@ std::size_t hash_join::left_join_size(cudf::table_view const& probe, std::size_t hash_join::full_join_size(cudf::table_view const& probe, null_equality compare_nulls, - rmm::cuda_stream_view stream) const + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) const { - return impl->full_join_size(probe, compare_nulls, stream); + return impl->full_join_size(probe, compare_nulls, stream, mr); } // external APIs From 34e4ac4c9284f063f2eaa93a38754ca851d00fff Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Mon, 7 Jun 2021 20:24:03 -0400 Subject: [PATCH 14/17] Use optional::value_or to get rid of if-else branches --- cpp/src/join/hash_join.cu | 19 ++++++------------- 1 file changed, 6 insertions(+), 13 deletions(-) diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 70b35a6f71b..19991b7a0f9 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -234,19 +234,12 @@ probe_join_hash_table(cudf::table_device_view build_table, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource *mr) { - std::size_t join_size; - // Use output size directly if provided - if (output_size.has_value()) { - join_size = output_size.value(); - } - // Otherwise, compute the exact output size - else { - constexpr cudf::detail::join_kind ProbeJoinKind = - (JoinKind == cudf::detail::join_kind::FULL_JOIN) ? cudf::detail::join_kind::LEFT_JOIN - : JoinKind; - join_size = compute_join_output_size( - build_table, probe_table, hash_table, compare_nulls, stream); - } + // Use the output size directly if provided. Otherwise, compute the exact output size + constexpr cudf::detail::join_kind ProbeJoinKind = (JoinKind == cudf::detail::join_kind::FULL_JOIN) + ? cudf::detail::join_kind::LEFT_JOIN + : JoinKind; + std::size_t const join_size = output_size.value_or(compute_join_output_size( + build_table, probe_table, hash_table, compare_nulls, stream)); // If output size is zero, return immediately if (join_size == 0) { From 9a2d99e6d2cd885779c8ab4f70cdfc0b78ef2397 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 8 Jun 2021 16:32:56 -0400 Subject: [PATCH 15/17] Minor doc updates --- cpp/include/cudf/join.hpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/cpp/include/cudf/join.hpp b/cpp/include/cudf/join.hpp index 2dbef24144b..1f9ed71ce8c 100644 --- a/cpp/include/cudf/join.hpp +++ b/cpp/include/cudf/join.hpp @@ -594,8 +594,8 @@ class hash_join { rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()) const; /** - * Returns the exact number of output when performing an inner join with the specified probe - * table. + * Returns the exact number of matches (rows) when performing an inner join with the specified + * probe table. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. @@ -609,7 +609,8 @@ class hash_join { rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; /** - * Returns the exact number of output when performing a left join with the specified probe table. + * Returns the exact number of matches (rows) when performing a left join with the specified probe + * table. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. @@ -623,7 +624,8 @@ class hash_join { rmm::cuda_stream_view stream = rmm::cuda_stream_default) const; /** - * Returns the exact number of output when performing a full join with the specified probe table. + * Returns the exact number of matches (rows) when performing a full join with the specified probe + * table. * * @param probe The probe table, from which the tuples are probed. * @param compare_nulls Controls whether null join-key values should match or not. From 7c7366c945d971b99c01f25b74c483600d385796 Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Tue, 8 Jun 2021 18:41:34 -0400 Subject: [PATCH 16/17] Minor update: pass stream to device_scalar::value --- cpp/src/join/hash_join.cu | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index 19991b7a0f9..c7dccdbbe36 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -272,7 +272,7 @@ probe_join_hash_table(cudf::table_device_view build_table, right_indices->data(), write_index.data(), join_size); - auto const actual_size = write_index.value(); + auto const actual_size = write_index.value(stream); left_indices->resize(actual_size, stream); right_indices->resize(actual_size, stream); } else { From 8d2c8b14a49619ed24cb3bf4c6d235bc1109bc5c Mon Sep 17 00:00:00 2001 From: Yunsong Wang Date: Wed, 9 Jun 2021 11:10:19 -0400 Subject: [PATCH 17/17] Remove redundant CUDF_EXPECTS --- cpp/src/join/hash_join.cu | 2 -- 1 file changed, 2 deletions(-) diff --git a/cpp/src/join/hash_join.cu b/cpp/src/join/hash_join.cu index c7dccdbbe36..dfe3231e897 100644 --- a/cpp/src/join/hash_join.cu +++ b/cpp/src/join/hash_join.cu @@ -480,7 +480,6 @@ std::size_t hash_join::hash_join_impl::left_join_size(cudf::table_view const &pr // Trivial left join case - exit early if (!_hash_table) { return probe.num_rows(); } - CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); auto build_table = cudf::table_device_view::create(_build, stream); auto probe_table = cudf::table_device_view::create(probe, stream); @@ -498,7 +497,6 @@ std::size_t hash_join::hash_join_impl::full_join_size(cudf::table_view const &pr // Trivial left join case - exit early if (!_hash_table) { return probe.num_rows(); } - CUDF_EXPECTS(_hash_table, "Hash table of hash join is null."); auto build_table = cudf::table_device_view::create(_build, stream); auto probe_table = cudf::table_device_view::create(probe, stream);