Skip to content

Commit

Permalink
Struct binary comparison op functionality for spark rapids (#11153)
Browse files Browse the repository at this point in the history
Replaces #9452
Takes the core cudf work from 9452 and integrates it into the JNI while removing struct support integration into cudf's binary operator API. Includes scalar comparison support in addition to vector to vector comparison support.

Follow on needed
Add null_equal operator support

Authors:
  - Ryan Lee (https://github.com/rwlee)
  - Bradley Dice (https://github.com/bdice)

Approvers:
  - Raza Jafri (https://github.com/razajafri)
  - Bradley Dice (https://github.com/bdice)
  - Nghia Truong (https://github.com/ttnghia)

URL: #11153
  • Loading branch information
rwlee authored Jul 8, 2022
1 parent fb871fb commit e6ff641
Show file tree
Hide file tree
Showing 9 changed files with 432 additions and 49 deletions.
42 changes: 42 additions & 0 deletions cpp/include/cudf/binaryop.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,5 +214,47 @@ cudf::data_type binary_operation_fixed_point_output_type(binary_operator op,
cudf::data_type const& lhs,
cudf::data_type const& rhs);

namespace binops {

/**
* @brief Computes output valid mask for op between a column and a scalar
*
* @param col Column to compute the valid mask from
* @param s Scalar to compute the valid mask from
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate the returned valid mask
* @return Computed validity mask
*/
std::pair<rmm::device_buffer, size_type> scalar_col_valid_mask_and(
column_view const& col,
scalar const& s,
rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

namespace compiled {
namespace detail {

/**
* @brief struct binary operation using `NaN` aware sorting physical element comparators
*
* @param out mutable view of output column
* @param lhs view of left operand column
* @param rhs view of right operand column
* @param is_lhs_scalar true if @p lhs is a single element column representing a scalar
* @param is_rhs_scalar true if @p rhs is a single element column representing a scalar
* @param op binary operator identifier
* @param stream CUDA stream used for device memory operations
*/
void apply_sorting_struct_binary_op(mutable_column_view& out,
column_view const& lhs,
column_view const& rhs,
bool is_lhs_scalar,
bool is_rhs_scalar,
binary_operator op,
rmm::cuda_stream_view stream = cudf::default_stream_value);
} // namespace detail
} // namespace compiled
} // namespace binops

/** @} */ // end of group
} // namespace cudf
26 changes: 14 additions & 12 deletions cpp/src/binaryop/binaryop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,21 @@ namespace binops {
/**
* @brief Computes output valid mask for op between a column and a scalar
*/
rmm::device_buffer scalar_col_valid_mask_and(column_view const& col,
scalar const& s,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
std::pair<rmm::device_buffer, size_type> scalar_col_valid_mask_and(
column_view const& col,
scalar const& s,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
if (col.is_empty()) return rmm::device_buffer{0, stream, mr};
if (col.is_empty()) return std::pair(rmm::device_buffer{0, stream, mr}, 0);

if (not s.is_valid(stream)) {
return cudf::detail::create_null_mask(col.size(), mask_state::ALL_NULL, stream, mr);
return std::pair(cudf::detail::create_null_mask(col.size(), mask_state::ALL_NULL, stream, mr),
col.size());
} else if (s.is_valid(stream) and col.nullable()) {
return cudf::detail::copy_bitmask(col, stream, mr);
return std::pair(cudf::detail::copy_bitmask(col, stream, mr), col.null_count());
} else {
return rmm::device_buffer{0, stream, mr};
return std::pair(rmm::device_buffer{0, stream, mr}, 0);
}
}

Expand Down Expand Up @@ -253,9 +255,9 @@ std::unique_ptr<column> make_fixed_width_column_for_output(scalar const& lhs,
if (binops::is_null_dependent(op)) {
return make_fixed_width_column(output_type, rhs.size(), mask_state::ALL_VALID, stream, mr);
} else {
auto new_mask = binops::scalar_col_valid_mask_and(rhs, lhs, stream, mr);
auto [new_mask, new_null_count] = binops::scalar_col_valid_mask_and(rhs, lhs, stream, mr);
return make_fixed_width_column(
output_type, rhs.size(), std::move(new_mask), cudf::UNKNOWN_NULL_COUNT, stream, mr);
output_type, rhs.size(), std::move(new_mask), new_null_count, stream, mr);
}
};

Expand All @@ -280,9 +282,9 @@ std::unique_ptr<column> make_fixed_width_column_for_output(column_view const& lh
if (binops::is_null_dependent(op)) {
return make_fixed_width_column(output_type, lhs.size(), mask_state::ALL_VALID, stream, mr);
} else {
auto new_mask = binops::scalar_col_valid_mask_and(lhs, rhs, stream, mr);
auto [new_mask, new_null_count] = binops::scalar_col_valid_mask_and(lhs, rhs, stream, mr);
return make_fixed_width_column(
output_type, lhs.size(), std::move(new_mask), cudf::UNKNOWN_NULL_COUNT, stream, mr);
output_type, lhs.size(), std::move(new_mask), new_null_count, stream, mr);
}
};

Expand Down
88 changes: 86 additions & 2 deletions cpp/src/binaryop/compiled/binary_ops.cu
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

#include "binary_ops.hpp"
#include "operation.cuh"
#include "struct_binary_ops.cuh"

#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_factories.hpp>
#include <cudf/detail/structs/utilities.hpp>
#include <cudf/scalar/scalar_device_view.cuh>
#include <cudf/strings/detail/utilities.cuh>

Expand All @@ -44,15 +46,15 @@ namespace {
*/
struct scalar_as_column_view {
using return_type = typename std::pair<column_view, std::unique_ptr<column>>;
template <typename T, std::enable_if_t<(is_fixed_width<T>())>* = nullptr>
template <typename T, CUDF_ENABLE_IF(is_fixed_width<T>())>
return_type operator()(scalar const& s, rmm::cuda_stream_view, rmm::mr::device_memory_resource*)
{
auto& h_scalar_type_view = static_cast<cudf::scalar_type_t<T>&>(const_cast<scalar&>(s));
auto col_v =
column_view(s.type(), 1, h_scalar_type_view.data(), (bitmask_type const*)s.validity_data());
return std::pair{col_v, std::unique_ptr<column>(nullptr)};
}
template <typename T, std::enable_if_t<(!is_fixed_width<T>())>* = nullptr>
template <typename T, CUDF_ENABLE_IF(!is_fixed_width<T>())>
return_type operator()(scalar const&, rmm::cuda_stream_view, rmm::mr::device_memory_resource*)
{
CUDF_FAIL("Unsupported type");
Expand Down Expand Up @@ -85,6 +87,15 @@ scalar_as_column_view::return_type scalar_as_column_view::operator()<cudf::strin
return std::pair{col_v, std::move(offsets_column)};
}

// specializing for struct column
template <>
scalar_as_column_view::return_type scalar_as_column_view::operator()<cudf::struct_view>(
scalar const& s, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr)
{
auto col = make_column_from_scalar(s, 1, stream, mr);
return std::pair{col->view(), std::move(col)};
}

/**
* @brief Converts scalar to column_view with single element.
*
Expand Down Expand Up @@ -375,6 +386,79 @@ void binary_operation(mutable_column_view& out,
auto [rhsv, aux] = scalar_to_column_view(rhs, stream);
operator_dispatcher(out, lhs, rhsv, false, true, op, stream);
}

namespace detail {
void apply_sorting_struct_binary_op(mutable_column_view& out,
column_view const& lhs,
column_view const& rhs,
bool is_lhs_scalar,
bool is_rhs_scalar,
binary_operator op,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS(lhs.type().id() == type_id::STRUCT && rhs.type().id() == type_id::STRUCT,
"Both columns must be struct columns");
CUDF_EXPECTS(!cudf::structs::detail::is_or_has_nested_lists(lhs) and
!cudf::structs::detail::is_or_has_nested_lists(rhs),
"List type is not supported");
// Struct child column type and structure mismatches are caught within the two_table_comparator
switch (op) {
case binary_operator::EQUAL: [[fallthrough]];
case binary_operator::NOT_EQUAL:
detail::apply_struct_equality_op(
out,
lhs,
rhs,
is_lhs_scalar,
is_rhs_scalar,
op,
cudf::experimental::row::equality::nan_equal_physical_equality_comparator{},
stream);
break;
case binary_operator::LESS:
detail::apply_struct_binary_op<ops::Less>(
out,
lhs,
rhs,
is_lhs_scalar,
is_rhs_scalar,
cudf::experimental::row::lexicographic::sorting_physical_element_comparator{},
stream);
break;
case binary_operator::GREATER:
detail::apply_struct_binary_op<ops::Greater>(
out,
lhs,
rhs,
is_lhs_scalar,
is_rhs_scalar,
cudf::experimental::row::lexicographic::sorting_physical_element_comparator{},
stream);
break;
case binary_operator::LESS_EQUAL:
detail::apply_struct_binary_op<ops::LessEqual>(
out,
lhs,
rhs,
is_lhs_scalar,
is_rhs_scalar,
cudf::experimental::row::lexicographic::sorting_physical_element_comparator{},
stream);
break;
case binary_operator::GREATER_EQUAL:
detail::apply_struct_binary_op<ops::GreaterEqual>(
out,
lhs,
rhs,
is_lhs_scalar,
is_rhs_scalar,
cudf::experimental::row::lexicographic::sorting_physical_element_comparator{},
stream);
break;
default: CUDF_FAIL("Unsupported operator for structs");
}
}
} // namespace detail
} // namespace compiled
} // namespace binops
} // namespace cudf
5 changes: 1 addition & 4 deletions cpp/src/binaryop/compiled/binary_ops.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,7 @@ void for_each(rmm::cuda_stream_view stream, cudf::size_type size, Functor f)
const int grid_size = util::div_rounding_up_safe(size, 2 * block_size);
for_each_kernel<<<grid_size, block_size, 0, stream.value()>>>(size, std::forward<Functor&&>(f));
}
namespace detail {
template <class T, class... Ts>
inline constexpr bool is_any_v = std::disjunction<std::is_same<T, Ts>...>::value;
}

template <class BinaryOperator>
void apply_binary_op(mutable_column_view& out,
column_view const& lhs,
Expand Down
139 changes: 139 additions & 0 deletions cpp/src/binaryop/compiled/struct_binary_ops.cuh
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include "binary_ops.hpp"
#include "operation.cuh"

#include <cudf/binaryop.hpp>
#include <cudf/column/column_device_view.cuh>
#include <cudf/column/column_view.hpp>
#include <cudf/detail/iterator.cuh>
#include <cudf/table/experimental/row_operators.cuh>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

namespace cudf::binops::compiled::detail {
template <class T, class... Ts>
inline constexpr bool is_any_v = std::disjunction<std::is_same<T, Ts>...>::value;

template <typename OptionalIterator, typename DeviceComparator>
struct device_comparison_functor {
// Explicit constructor definition required to avoid a "no instance of constructor" compilation
// error
device_comparison_functor(OptionalIterator const optional_iter,
bool const is_lhs_scalar,
bool const is_rhs_scalar,
DeviceComparator const& comparator)
: _optional_iter(optional_iter),
_is_lhs_scalar(is_lhs_scalar),
_is_rhs_scalar(is_rhs_scalar),
_comparator(comparator)
{
}

bool __device__ operator()(size_type i)
{
return _optional_iter[i].has_value() &&
_comparator(cudf::experimental::row::lhs_index_type{_is_lhs_scalar ? 0 : i},
cudf::experimental::row::rhs_index_type{_is_rhs_scalar ? 0 : i});
}

OptionalIterator const _optional_iter;
bool const _is_lhs_scalar;
bool const _is_rhs_scalar;
DeviceComparator const _comparator;
};

template <class BinaryOperator,
typename PhysicalElementComparator =
cudf::experimental::row::lexicographic::sorting_physical_element_comparator>
void apply_struct_binary_op(mutable_column_view& out,
column_view const& lhs,
column_view const& rhs,
bool is_lhs_scalar,
bool is_rhs_scalar,
PhysicalElementComparator comparator = {},
rmm::cuda_stream_view stream = cudf::default_stream_value)
{
auto const compare_orders = std::vector<order>(
lhs.size(),
is_any_v<BinaryOperator, ops::Greater, ops::GreaterEqual> ? order::DESCENDING
: order::ASCENDING);
auto const tlhs = table_view{{lhs}};
auto const trhs = table_view{{rhs}};
auto const table_comparator = cudf::experimental::row::lexicographic::two_table_comparator{
tlhs, trhs, compare_orders, {}, stream};
auto outd = column_device_view::create(out, stream);
auto optional_iter =
cudf::detail::make_optional_iterator<bool>(*outd, nullate::DYNAMIC{out.has_nulls()});
auto const comparator_nulls = nullate::DYNAMIC{has_nested_nulls(tlhs) || has_nested_nulls(trhs)};

auto tabulate_device_operator = [&](auto device_comparator) {
thrust::tabulate(
rmm::exec_policy(stream),
out.begin<bool>(),
out.end<bool>(),
device_comparison_functor{optional_iter, is_lhs_scalar, is_rhs_scalar, device_comparator});
};
is_any_v<BinaryOperator, ops::LessEqual, ops::GreaterEqual>
? tabulate_device_operator(table_comparator.less_equivalent(comparator_nulls, comparator))
: tabulate_device_operator(table_comparator.less(comparator_nulls, comparator));
}

template <typename PhysicalEqualityComparator =
cudf::experimental::row::equality::physical_equality_comparator>
void apply_struct_equality_op(mutable_column_view& out,
column_view const& lhs,
column_view const& rhs,
bool is_lhs_scalar,
bool is_rhs_scalar,
binary_operator op,
PhysicalEqualityComparator comparator = {},
rmm::cuda_stream_view stream = cudf::default_stream_value)
{
CUDF_EXPECTS(op == binary_operator::EQUAL || op == binary_operator::NOT_EQUAL,
"Unsupported operator for these types");

auto tlhs = table_view{{lhs}};
auto trhs = table_view{{rhs}};
auto table_comparator =
cudf::experimental::row::equality::two_table_comparator{tlhs, trhs, stream};
auto device_comparator =
table_comparator.equal_to(nullate::DYNAMIC{has_nested_nulls(tlhs) || has_nested_nulls(trhs)},
null_equality::EQUAL,
comparator);

auto outd = column_device_view::create(out, stream);
auto optional_iter =
cudf::detail::make_optional_iterator<bool>(*outd, nullate::DYNAMIC{out.has_nulls()});
thrust::tabulate(rmm::exec_policy(stream),
out.begin<bool>(),
out.end<bool>(),
[optional_iter,
is_lhs_scalar,
is_rhs_scalar,
preserve_output = (op != binary_operator::NOT_EQUAL),
device_comparator] __device__(size_type i) {
auto lhs = cudf::experimental::row::lhs_index_type{is_lhs_scalar ? 0 : i};
auto rhs = cudf::experimental::row::rhs_index_type{is_rhs_scalar ? 0 : i};
return optional_iter[i].has_value() and
(device_comparator(lhs, rhs) == preserve_output);
});
}
} // namespace cudf::binops::compiled::detail
2 changes: 1 addition & 1 deletion java/src/main/native/include/maps_column_view.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ class maps_column_view {
*/

std::unique_ptr<column>
contains(column_view const &key, rmm::cuda_stream_view stream = rmm::cuda_stream_default,
contains(column_view const &key, rmm::cuda_stream_view stream = cudf::default_stream_value,
rmm::mr::device_memory_resource *mr = rmm::mr::get_current_device_resource()) const;

private:
Expand Down
Loading

0 comments on commit e6ff641

Please sign in to comment.