Skip to content

Commit

Permalink
Refactor semi_anti_join (#11100)
Browse files Browse the repository at this point in the history
A (left) semi-join between the left and right tables returns a set of rows in the left table that has matching rows (i.e., compared equally) in the right table. As such, for each row in the left table, it needs to check if that row has a match in the right table. 

Such check is very generic and has applications in many other places, not just in semi-join. This PR exposes that check functionality as a new `cudf::detail::contains(table_view, table_view)` for internal usage.

Closes #11037.

Depends on:
 * NVIDIA/cuCollections#175

Authors:
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Yunsong Wang (https://github.com/PointKernel)

URL: #11100
  • Loading branch information
ttnghia authored Jul 14, 2022
1 parent 9034b1b commit ed9355f
Show file tree
Hide file tree
Showing 10 changed files with 258 additions and 112 deletions.
3 changes: 2 additions & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ add_library(
src/round/round.cu
src/scalar/scalar.cpp
src/scalar/scalar_factories.cpp
src/search/contains.cu
src/search/contains_column.cu
src/search/contains_table.cu
src/search/contains_nested.cu
src/search/search_ordered.cu
src/sort/is_sorted.cu
Expand Down
27 changes: 21 additions & 6 deletions cpp/benchmarks/join/join_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,14 @@ static void BM_join(state_type& state, Join JoinFunc)
return cudf::detail::valid_if(validity, validity + size, thrust::identity<bool>{}).first;
};

std::unique_ptr<cudf::column> build_key_column = [&]() {
std::unique_ptr<cudf::column> build_key_column0 = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size,
build_random_null_mask(build_table_size))
: cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
build_table_size);
}();
std::unique_ptr<cudf::column> probe_key_column = [&]() {
std::unique_ptr<cudf::column> probe_key_column0 = [&]() {
return Nullable ? cudf::make_numeric_column(cudf::data_type(cudf::type_to_id<key_type>()),
probe_table_size,
build_random_null_mask(probe_table_size))
Expand All @@ -104,21 +104,36 @@ static void BM_join(state_type& state, Join JoinFunc)
}();

generate_input_tables<key_type, cudf::size_type>(
build_key_column->mutable_view().data<key_type>(),
build_key_column0->mutable_view().data<key_type>(),
build_table_size,
probe_key_column->mutable_view().data<key_type>(),
probe_key_column0->mutable_view().data<key_type>(),
probe_table_size,
selectivity,
multiplicity);

// Copy build_key_column0 and probe_key_column0 into new columns.
// If Nullable, the new columns will be assigned new nullmasks.
auto const build_key_column1 = [&]() {
auto col = std::make_unique<cudf::column>(build_key_column0->view());
if (Nullable) { col->set_null_mask(build_random_null_mask(build_table_size)); }
return col;
}();
auto const probe_key_column1 = [&]() {
auto col = std::make_unique<cudf::column>(probe_key_column0->view());
if (Nullable) { col->set_null_mask(build_random_null_mask(probe_table_size)); }
return col;
}();

auto init = cudf::make_fixed_width_scalar<payload_type>(static_cast<payload_type>(0));
auto build_payload_column = cudf::sequence(build_table_size, *init);
auto probe_payload_column = cudf::sequence(probe_table_size, *init);

CUDF_CHECK_CUDA(0);

cudf::table_view build_table({build_key_column->view(), *build_payload_column});
cudf::table_view probe_table({probe_key_column->view(), *probe_payload_column});
cudf::table_view build_table(
{build_key_column0->view(), build_key_column1->view(), *build_payload_column});
cudf::table_view probe_table(
{probe_key_column0->view(), probe_key_column1->view(), *probe_payload_column});

// Setup join parameters and result table
[[maybe_unused]] std::vector<cudf::size_type> columns_to_join = {0};
Expand Down
32 changes: 32 additions & 0 deletions cpp/include/cudf/detail/search.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cudf/types.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_uvector.hpp>

namespace cudf::detail {
/**
Expand Down Expand Up @@ -65,6 +66,37 @@ std::unique_ptr<column> contains(column_view const& haystack,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);

/**
* @brief Check if rows in the given `needles` table exist in the `haystack` table.
*
* Given two tables, each row in the `needles` table is checked to see if there is any matching row
* (i.e., compared equal to it) in the `haystack` table. The boolean search results are written into
* the corresponding rows of the output array.
*
* @code{.pseudo}
* Example:
*
* haystack = { { 5, 4, 1, 2, 3 } }
* needles = { { 0, 1, 2 } }
* output = { false, true, true }
* @endcode
*
* @param haystack The table containing the search space
* @param needles A table of rows whose existence to check in the search space
* @param compare_nulls Control whether nulls should be compared as equal or not
* @param compare_nans Control whether floating-point NaNs values should be compared as equal or not
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned vector
* @return A vector of bools indicating if each row in `needles` has matching rows in `haystack`
*/
rmm::device_uvector<bool> contains(
table_view const& haystack,
table_view const& needles,
null_equality compare_nulls,
nan_equality compare_nans,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
* @brief Check if the (unique) row of the `needle` column is contained in the `haystack` column.
*
Expand Down
1 change: 0 additions & 1 deletion cpp/include/cudf/join.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ std::unique_ptr<cudf::table> full_join(
* @code{.pseudo}
* TableA: {{0, 1, 2}}
* TableB: {{1, 2, 3}}
* right_on: {1}
* Result: {1, 2}
* @endcode
*
Expand Down
8 changes: 8 additions & 0 deletions cpp/include/cudf/table/table_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ inline bool has_nested_nulls(table_view const& input)
});
}

/**
* @brief The function to collect all nullable columns at all nested levels in a given table.
*
* @param table The input table
* @return A vector containing all nullable columns in the input table
*/
std::vector<column_view> get_nullable_columns(table_view const& table);

/**
* @brief Checks if two `table_view`s have columns of same types
*
Expand Down
26 changes: 17 additions & 9 deletions cpp/src/join/join_common_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,28 @@ constexpr auto remap_sentinel_hash(H hash, S sentinel)
}

/**
* @brief Device functor to create a pair of hash value and index for a given row.
* @brief Device functor to create a pair of {hash_value, row_index} for a given row.
*
* @tparam T Type of row index, must be convertible to `size_type`.
* @tparam Hasher The type of internal hasher to compute row hash.
*/
template <typename Hasher, typename T = size_type>
class make_pair_function {
public:
CUDF_HOST_DEVICE make_pair_function(row_hash const& hash,
hash_value_type const empty_key_sentinel)
CUDF_HOST_DEVICE make_pair_function(Hasher const& hash, hash_value_type const empty_key_sentinel)
: _hash{hash}, _empty_key_sentinel{empty_key_sentinel}
{
}

__device__ __forceinline__ cudf::detail::pair_type operator()(size_type i) const noexcept
__device__ __forceinline__ auto operator()(size_type i) const noexcept
{
// Compute the hash value of row `i`
auto row_hash_value = remap_sentinel_hash(_hash(i), _empty_key_sentinel);
return cuco::make_pair(row_hash_value, i);
return cuco::make_pair(row_hash_value, T{i});
}

private:
row_hash _hash;
Hasher _hash;
hash_value_type const _empty_key_sentinel;
};

Expand Down Expand Up @@ -93,7 +96,10 @@ class row_is_valid {
* probe_row_hash == build_row_hash) and then using a row_equality_comparator
* to compare the contents of the row indices that are stored as the payload in
* the hash map.
*
* @tparam Comparator The row comparator type to perform row equality comparison from row indices.
*/
template <typename Comparator = row_equality>
class pair_equality {
public:
pair_equality(table_device_view lhs,
Expand All @@ -104,14 +110,16 @@ class pair_equality {
{
}

__device__ __forceinline__ bool operator()(const pair_type& lhs,
const pair_type& rhs) const noexcept
pair_equality(Comparator const d_eqcomp) : _check_row_equality{std::move(d_eqcomp)} {}

template <typename LhsPair, typename RhsPair>
__device__ __forceinline__ bool operator()(LhsPair const& lhs, RhsPair const& rhs) const noexcept
{
return lhs.first == rhs.first and _check_row_equality(rhs.second, lhs.second);
}

private:
row_equality _check_row_equality;
Comparator _check_row_equality;
};

/**
Expand Down
110 changes: 15 additions & 95 deletions cpp/src/join/semi_join.cu
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,12 @@
* limitations under the License.
*/

#include <join/join_common_utils.cuh>
#include <join/join_common_utils.hpp>

#include <cudf/column/column_factories.hpp>
#include <cudf/detail/gather.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/detail/null_mask.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/detail/search.hpp>
#include <cudf/dictionary/detail/update_keys.hpp>
#include <cudf/join.hpp>
#include <cudf/table/table.hpp>
Expand All @@ -37,26 +34,11 @@
#include <thrust/distance.h>
#include <thrust/iterator/counting_iterator.h>
#include <thrust/sequence.h>
#include <thrust/tuple.h>
#include <thrust/transform.h>

namespace cudf {
namespace detail {

namespace {
/**
* @brief Device functor to create a pair of hash value and index for a given row.
*/
struct make_pair_fn {
__device__ __forceinline__ cudf::detail::pair_type operator()(size_type i) const noexcept
{
// The value is irrelevant since we only ever use the hash map to check for
// membership of a particular row index.
return cuco::make_pair(static_cast<hash_value_type>(i), 0);
}
};

} // namespace

std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
join_kind const kind,
cudf::table_view const& left_keys,
Expand All @@ -78,90 +60,28 @@ std::unique_ptr<rmm::device_uvector<cudf::size_type>> left_semi_anti_join(
return result;
}

auto const left_num_rows = left_keys.num_rows();
auto const right_num_rows = right_keys.num_rows();

// flatten structs for the right and left and use that for the hash table
auto right_flattened_tables = structs::detail::flatten_nested_columns(
right_keys, {}, {}, structs::detail::column_nullability::FORCE);
auto left_flattened_tables = structs::detail::flatten_nested_columns(
left_keys, {}, {}, structs::detail::column_nullability::FORCE);

auto right_flattened_keys = right_flattened_tables.flattened_columns();
auto left_flattened_keys = left_flattened_tables.flattened_columns();

// Create hash table.
semi_map_type hash_table{compute_hash_table_size(right_num_rows),
cuco::sentinel::empty_key{std::numeric_limits<hash_value_type>::max()},
cuco::sentinel::empty_value{cudf::detail::JoinNoneValue},
hash_table_allocator_type{default_allocator<char>{}, stream},
stream.value()};

// Create hash table containing all keys found in right table
auto right_rows_d = table_device_view::create(right_flattened_keys, stream);
auto const right_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(right_flattened_keys)};
row_hash const hash_build{right_nulls, *right_rows_d};
row_equality equality_build{right_nulls, *right_rows_d, *right_rows_d, compare_nulls};
make_pair_fn pair_func_build{};

auto iter = cudf::detail::make_counting_transform_iterator(0, pair_func_build);

// skip rows that are null here.
if ((compare_nulls == null_equality::EQUAL) or (not nullable(right_keys))) {
hash_table.insert(iter, iter + right_num_rows, hash_build, equality_build, stream.value());
} else {
thrust::counting_iterator<size_type> stencil(0);
auto const [row_bitmask, _] = cudf::detail::bitmask_and(right_flattened_keys, stream);
row_is_valid pred{static_cast<bitmask_type const*>(row_bitmask.data())};

// insert valid rows
hash_table.insert_if(
iter, iter + right_num_rows, stencil, pred, hash_build, equality_build, stream.value());
}

// Now we have a hash table, we need to iterate over the rows of the left table
// and check to see if they are contained in the hash table
auto left_rows_d = table_device_view::create(left_flattened_keys, stream);
auto const left_nulls = cudf::nullate::DYNAMIC{cudf::has_nulls(left_flattened_keys)};
row_hash hash_probe{left_nulls, *left_rows_d};
// Note: This equality comparator violates symmetry of equality and is
// therefore relying on the implementation detail of the order in which its
// operator is invoked. If cuco makes no promises about the order of
// invocation this seems a bit unsafe.
row_equality equality_probe{left_nulls, *right_rows_d, *left_rows_d, compare_nulls};

// For semi join we want contains to be true, for anti join we want contains to be false
bool const join_type_boolean = (kind == join_kind::LEFT_SEMI_JOIN);

auto hash_table_view = hash_table.get_device_view();
// Materialize a `flagged` boolean array to generate a gather map.
// Previously, the gather map was generated directly without this array but by calling to
// `map.contains` inside the `thrust::copy_if` kernel. However, that led to increasing register
// usage and reducing performance, as reported here: https://github.com/rapidsai/cudf/pull/10511.
auto const flagged =
cudf::detail::contains(right_keys, left_keys, compare_nulls, nan_equality::ALL_EQUAL, stream);

auto const left_num_rows = left_keys.num_rows();
auto gather_map =
std::make_unique<rmm::device_uvector<cudf::size_type>>(left_num_rows, stream, mr);

rmm::device_uvector<bool> flagged(left_num_rows, stream, mr);
auto flagged_d = flagged.data();

auto counting_iter = thrust::counting_iterator<size_type>(0);
thrust::for_each(
rmm::exec_policy(stream),
counting_iter,
counting_iter + left_num_rows,
[flagged_d, hash_table_view, join_type_boolean, hash_probe, equality_probe] __device__(
const size_type idx) {
flagged_d[idx] =
hash_table_view.contains(idx, hash_probe, equality_probe) == join_type_boolean;
});

// gather_map_end will be the end of valid data in gather_map
auto gather_map_end =
thrust::copy_if(rmm::exec_policy(stream),
counting_iter,
counting_iter + left_num_rows,
thrust::counting_iterator<size_type>(0),
thrust::counting_iterator<size_type>(left_num_rows),
gather_map->begin(),
[flagged_d] __device__(size_type const idx) { return flagged_d[idx]; });
[kind, d_flagged = flagged.begin()] __device__(size_type const idx) {
return *(d_flagged + idx) == (kind == join_kind::LEFT_SEMI_JOIN);
});

auto join_size = thrust::distance(gather_map->begin(), gather_map_end);
gather_map->resize(join_size, stream);
gather_map->resize(thrust::distance(gather_map->begin(), gather_map_end), stream);
return gather_map;
}

Expand Down
File renamed without changes.
Loading

0 comments on commit ed9355f

Please sign in to comment.