Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow initial value for cudf::reduce and cudf::segmented_reduce. #11137

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 22 additions & 15 deletions cpp/include/cudf/detail/null_mask.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -508,17 +508,20 @@ std::vector<size_type> segmented_null_count(bitmask_type const* bitmask,
* validity of any/all elements of segments of an input null mask.
*
* @tparam OffsetIterator Random-access input iterator type.
* @param bitmask Null mask residing in device memory whose segments will be
* reduced into a new mask.
* @param first_bit_indices_begin Random-access input iterator to the beginning
* of a sequence of indices of the first bit in each segment (inclusive).
* @param first_bit_indices_end Random-access input iterator to the end of a
* sequence of indices of the first bit in each segment (inclusive).
* @param last_bit_indices_begin Random-access input iterator to the beginning
* of a sequence of indices of the last bit in each segment (exclusive).
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment
* must be valid for the reduced value to be valid. If `null_policy::EXCLUDE`,
* the reduction is valid if any element in the segment is valid.
* @param bitmask Null mask residing in device memory whose segments will be reduced into a new
* mask.
* @param first_bit_indices_begin Random-access input iterator to the beginning of a sequence of
* indices of the first bit in each segment (inclusive).
* @param first_bit_indices_end Random-access input iterator to the end of a sequence of indices of
* the first bit in each segment (inclusive).
* @param last_bit_indices_begin Random-access input iterator to the beginning of a sequence of
* indices of the last bit in each segment (exclusive).
* @param null_handling If `null_policy::INCLUDE`, all elements in a segment must be valid for the
* reduced value to be valid. If `null_policy::EXCLUDE`, the reduction is valid if any element in
* the segment is valid.
* @param valid_initial_value Indicates whether a valid initial value was provided to the reduction.
* True indicates a valid initial value, false indicates a null initial value, and null indicates no
* initial value was provided.
* @param stream CUDA stream used for device memory operations and kernel launches.
* @param mr Device memory resource used to allocate the returned buffer's device memory.
* @return A pair containing the reduced null mask and number of nulls.
Expand All @@ -530,6 +533,7 @@ std::pair<rmm::device_buffer, size_type> segmented_null_mask_reduction(
OffsetIterator first_bit_indices_end,
OffsetIterator last_bit_indices_begin,
null_policy null_handling,
std::optional<bool> valid_initial_value,
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -549,7 +553,9 @@ std::pair<rmm::device_buffer, size_type> segmented_null_mask_reduction(
return cudf::detail::valid_if(
segment_length_iterator,
segment_length_iterator + num_segments,
[] __device__(auto const& length) { return length > 0; },
[valid_initial_value] __device__(auto const& length) {
return valid_initial_value.value_or(length > 0);
},
stream,
mr);
}
Expand All @@ -567,11 +573,12 @@ std::pair<rmm::device_buffer, size_type> segmented_null_mask_reduction(
return cudf::detail::valid_if(
length_and_valid_count,
length_and_valid_count + num_segments,
[null_handling] __device__(auto const& length_and_valid_count) {
[null_handling, valid_initial_value] __device__(auto const& length_and_valid_count) {
auto const length = thrust::get<0>(length_and_valid_count);
auto const valid_count = thrust::get<1>(length_and_valid_count);
return (length > 0) and
((null_handling == null_policy::EXCLUDE) ? valid_count > 0 : valid_count == length);
return (null_handling == null_policy::EXCLUDE)
? (valid_initial_value.value_or(false) || valid_count > 0)
bdice marked this conversation as resolved.
Show resolved Hide resolved
: (valid_initial_value.value_or(length > 0) && valid_count == length);
},
stream,
mr);
Expand Down
57 changes: 32 additions & 25 deletions cpp/include/cudf/detail/reduction.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ namespace detail {
* @param[in] d_in the begin iterator
* @param[in] num_items the number of items
* @param[in] op the reduction operator
* @param[in] init Optional initial value of the reduction
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
* @param[in] mr Device memory resource used to allocate the returned scalar's device
* memory
Expand All @@ -58,12 +59,13 @@ template <typename Op,
std::unique_ptr<scalar> reduce(InputIterator d_in,
cudf::size_type num_items,
op::simple_op<Op> sop,
std::optional<OutputType> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto binary_op = sop.get_binary_op();
auto identity = sop.template get_identity<OutputType>();
auto dev_result = rmm::device_scalar<OutputType>{identity, stream, mr};
auto binary_op = sop.get_binary_op();
auto initial_value = init.value_or(sop.template get_identity<OutputType>());
auto dev_result = rmm::device_scalar<OutputType>{initial_value, stream, mr};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -74,7 +76,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -85,7 +87,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());

// only for string_view, data is copied
Expand All @@ -100,6 +102,7 @@ template <typename Op,
std::unique_ptr<scalar> reduce(InputIterator d_in,
cudf::size_type num_items,
op::simple_op<Op> sop,
std::optional<OutputType> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
Expand All @@ -116,12 +119,13 @@ template <typename Op,
std::unique_ptr<scalar> reduce(InputIterator d_in,
cudf::size_type num_items,
op::simple_op<Op> sop,
std::optional<OutputType> init,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto binary_op = sop.get_binary_op();
auto identity = sop.template get_identity<OutputType>();
auto dev_result = rmm::device_scalar<OutputType>{identity, stream};
auto binary_op = sop.get_binary_op();
auto initial_value = init.value_or(sop.template get_identity<OutputType>());
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
auto dev_result = rmm::device_scalar<OutputType>{initial_value, stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -132,7 +136,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -143,7 +147,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
dev_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());

using ScalarType = cudf::scalar_type_t<OutputType>;
Expand All @@ -154,13 +158,14 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
/**
* @brief compute reduction by the compound operator (reduce and transform)
*
* @param[in] d_in the begin iterator
* @param[in] num_items the number of items
* @param[in] op the reduction operator
* @param[in] valid_count the intermediate operator argument 1
* @param[in] ddof the intermediate operator argument 2
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
* @param[in] mr Device memory resource used to allocate the returned scalar's device
* @param[in] d_in the begin iterator
* @param[in] num_items the number of items
* @param[in] op the reduction operator
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] valid_count the intermediate operator argument 1
* @param[in] ddof the intermediate operator argument 2
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] init Optional initial value of the reduction.
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
* @param[in] mr Device memory resource used to allocate the returned scalar's device
* memory
* @returns Output scalar in device memory
*
Expand All @@ -184,9 +189,10 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
auto binary_op = cop.get_binary_op();
IntermediateType identity = cop.template get_identity<IntermediateType>();
rmm::device_scalar<IntermediateType> intermediate_result{identity, stream};
auto binary_op = cop.get_binary_op();
auto initial_value = cop.template get_identity<IntermediateType>();

rmm::device_scalar<IntermediateType> intermediate_result{initial_value, stream};

// Allocate temporary storage
rmm::device_buffer d_temp_storage;
Expand All @@ -197,7 +203,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -208,7 +214,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
intermediate_result.data(),
num_items,
binary_op,
identity,
initial_value,
stream.value());

// compute the result value from intermediate value in device
Expand Down Expand Up @@ -240,6 +246,7 @@ std::unique_ptr<scalar> reduce(InputIterator d_in,
* @param[out] d_out the begin iterator to output
* @param[in] binary_op the reduction operator
* @param[in] identity the identity element of the reduction operator
* @param[in] initial_value Initial value of the reduction.
SrikarVanavasam marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] stream CUDA stream used for device memory operations and kernel launches.
*
*/
Expand All @@ -255,7 +262,7 @@ void segmented_reduce(InputIterator d_in,
OffsetIterator d_offset_end,
OutputIterator d_out,
BinaryOp binary_op,
OutputType identity,
OutputType initial_value,
rmm::cuda_stream_view stream)
{
auto num_segments = static_cast<size_type>(std::distance(d_offset_begin, d_offset_end)) - 1;
Expand All @@ -271,7 +278,7 @@ void segmented_reduce(InputIterator d_in,
d_offset_begin,
d_offset_begin + 1,
binary_op,
identity,
initial_value,
stream.value());
d_temp_storage = rmm::device_buffer{temp_storage_bytes, stream};

Expand All @@ -284,7 +291,7 @@ void segmented_reduce(InputIterator d_in,
d_offset_begin,
d_offset_begin + 1,
binary_op,
identity,
initial_value,
stream.value());
}

Expand Down
Loading