Skip to content

Commit

Permalink
Clean up cudf device atomic with cuda::atomic_ref (#13583)
Browse files Browse the repository at this point in the history
Contributes to #13575

Depends on #13574, #13578

This PR cleans up custom atomic implementations in libcudf by using `cuda::atomic_ref` when possible. It removes atomic bitwise operations like `and`, `or` and `xor` since libcudac++ already provides proper replacements.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - David Wendt (https://github.com/davidwendt)

URL: #13583
  • Loading branch information
PointKernel authored Jul 10, 2023
1 parent 964925a commit 67deda0
Show file tree
Hide file tree
Showing 19 changed files with 122 additions and 315 deletions.
14 changes: 10 additions & 4 deletions cpp/include/cudf/detail/copy_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cudf/detail/gather.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/string_view.cuh>
#include <cudf/table/table.hpp>
Expand All @@ -44,6 +43,8 @@

#include <cub/cub.cuh>

#include <cuda/atomic>

#include <algorithm>

namespace cudf {
Expand Down Expand Up @@ -181,7 +182,9 @@ __launch_bounds__(block_size) __global__
if (wid > 0 && wid < last_warp)
output_valid[valid_index] = valid_warp;
else {
atomicOr(&output_valid[valid_index], valid_warp);
cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref{
output_valid[valid_index]};
ref.fetch_or(valid_warp, cuda::std::memory_order_relaxed);
}
}

Expand All @@ -190,7 +193,9 @@ __launch_bounds__(block_size) __global__
uint32_t valid_warp = __ballot_sync(0xffff'ffffu, temp_valids[block_size + threadIdx.x]);
if (lane == 0 && valid_warp != 0) {
tmp_warp_valid_counts += __popc(valid_warp);
atomicOr(&output_valid[valid_index + num_warps], valid_warp);
cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref{
output_valid[valid_index + num_warps]};
ref.fetch_or(valid_warp, cuda::std::memory_order_relaxed);
}
}
}
Expand All @@ -206,7 +211,8 @@ __launch_bounds__(block_size) __global__
cudf::detail::single_lane_block_sum_reduce<block_size, leader_lane>(warp_valid_counts);

if (threadIdx.x == 0) { // one thread computes and adds to null count
atomicAdd(output_null_count, block_sum - block_valid_count);
cuda::atomic_ref<size_type, cuda::thread_scope_device> ref{*output_null_count};
ref.fetch_add(block_sum - block_valid_count, cuda::std::memory_order_relaxed);
}
}

Expand Down
124 changes: 1 addition & 123 deletions cpp/include/cudf/detail/utilities/device_atomics.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
* cudf::duration_us, cudf::duration_ns and bool
* where CUDA atomic operations are, `atomicAdd`, `atomicMin`, `atomicMax`,
* `atomicCAS`.
* `atomicAnd`, `atomicOr`, `atomicXor` are also supported for integer data types.
* Also provides `cudf::genericAtomicOperation` which performs atomic operation
* with the given binary operator.
*/
Expand Down Expand Up @@ -161,7 +160,6 @@ struct genericAtomicOperationImpl<T, Op, 8> {
// specialized functions for operators
// `atomicAdd` supports int32, float, double (signed int64 is not supported.)
// `atomicMin`, `atomicMax` support int32_t, int64_t
// `atomicAnd`, `atomicOr`, `atomicXor` support int32_t, int64_t
template <>
struct genericAtomicOperationImpl<float, DeviceSum, 4> {
using T = float;
Expand Down Expand Up @@ -252,63 +250,6 @@ struct genericAtomicOperationImpl<int64_t, DeviceMax, 8> {
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceAnd, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceAnd op)
{
return atomicAnd(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceAnd, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceAnd op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicAnd(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceOr, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceOr op)
{
return atomicOr(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceOr, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceOr op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicOr(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceXor, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceXor op)
{
return atomicXor(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceXor, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceXor op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicXor(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};
// -----------------------------------------------------------------------
// the implementation of `typesAtomicCASImpl`
template <typename T, size_t N = sizeof(T)>
Expand Down Expand Up @@ -598,66 +539,3 @@ __forceinline__ __device__ T atomicCAS(T* address, T compare, T val)
{
return cudf::detail::typesAtomicCASImpl<T>()(address, compare, val);
}

/**
* @brief Overloads for `atomicAnd`
* reads the `old` located at the `address` in global or shared memory,
* computes (old & val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicAnd` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicAnd(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceAnd{});
}

/**
* @brief Overloads for `atomicOr`
* reads the `old` located at the `address` in global or shared memory,
* computes (old | val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicOr` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicOr(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceOr{});
}

/**
* @brief Overloads for `atomicXor`
* reads the `old` located at the `address` in global or shared memory,
* computes (old ^ val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicXor` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicXor(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceXor{});
}
33 changes: 0 additions & 33 deletions cpp/include/cudf/detail/utilities/device_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -230,39 +230,6 @@ struct DeviceProduct {
}
};

/**
* @brief binary `and` operator
*/
struct DeviceAnd {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs & rhs)
{
return (lhs & rhs);
}
};

/**
* @brief binary `or` operator
*/
struct DeviceOr {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs | rhs)
{
return (lhs | rhs);
}
};

/**
* @brief binary `xor` operator
*/
struct DeviceXor {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs ^ rhs)
{
return (lhs ^ rhs);
}
};

/**
* @brief Operator for calculating Lead/Lag window function.
*/
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/groupby/hash/multi_pass_kernels.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,12 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/utilities/type_dispatcher.hpp>

#include <cuda/atomic>

#include <cmath>

namespace cudf {
Expand Down Expand Up @@ -86,7 +87,8 @@ struct var_hash_functor {
auto x = static_cast<Target>(source.element<Source>(source_index));
auto mean = static_cast<Target>(sum.element<SumType>(target_index)) / group_size;
Target result = (x - mean) * (x - mean) / (group_size - ddof);
atomicAdd(&target.element<Target>(target_index), result);
cuda::atomic_ref<Target, cuda::thread_scope_device> ref{target.element<Target>(target_index)};
ref.fetch_add(result, cuda::std::memory_order_relaxed);
// STD sqrt is applied in finalize()

if (target_has_nulls and target.is_null(target_index)) { target.set_valid(target_index); }
Expand Down
40 changes: 25 additions & 15 deletions cpp/src/hash/concurrent_unordered_map.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <hash/managed.cuh>

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -35,6 +34,8 @@
#include <limits>
#include <type_traits>

#include <cuda/atomic>

namespace {
template <std::size_t N>
struct packed {
Expand Down Expand Up @@ -91,8 +92,8 @@ union pair_packer;
template <typename pair_type>
union pair_packer<pair_type, std::enable_if_t<is_packable<pair_type>()>> {
using packed_type = packed_t<pair_type>;
packed_type const packed;
pair_type const pair;
packed_type packed;
pair_type pair;

__device__ pair_packer(pair_type _pair) : pair{_pair} {}

Expand Down Expand Up @@ -268,16 +269,21 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
pair_packer<pair_type> const unused{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> const new_pair{insert_pair};
pair_packer<pair_type> const old{
atomicCAS(reinterpret_cast<typename pair_packer<pair_type>::packed_type*>(insert_location),
unused.packed,
new_pair.packed)};
pair_packer<pair_type> expected{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> desired{insert_pair};

using packed_type = typename pair_packer<pair_type>::packed_type;

if (old.packed == unused.packed) { return insert_result::SUCCESS; }
auto* insert_ptr = reinterpret_cast<packed_type*>(insert_location);
cuda::atomic_ref<packed_type, cuda::thread_scope_device> ref{*insert_ptr};
auto const success =
ref.compare_exchange_strong(expected.packed, desired.packed, cuda::std::memory_order_relaxed);

if (m_equal(old.pair.first, insert_pair.first)) { return insert_result::DUPLICATE; }
if (success) {
return insert_result::SUCCESS;
} else if (m_equal(expected.pair.first, insert_pair.first)) {
return insert_result::DUPLICATE;
}
return insert_result::CONTINUE;
}

Expand All @@ -292,16 +298,20 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<not is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
key_type const old_key{atomicCAS(&(insert_location->first), m_unused_key, insert_pair.first)};
auto expected = m_unused_key;
cuda::atomic_ref<key_type, cuda::thread_scope_device> ref{insert_location->first};
auto const key_success =
ref.compare_exchange_strong(expected, insert_pair.first, cuda::std::memory_order_relaxed);

// Hash bucket empty
if (m_unused_key == old_key) {
if (key_success) {
insert_location->second = insert_pair.second;
return insert_result::SUCCESS;
}

// Key already exists
if (m_equal(old_key, insert_pair.first)) { return insert_result::DUPLICATE; }
else if (m_equal(expected, insert_pair.first)) {
return insert_result::DUPLICATE;
}

return insert_result::CONTINUE;
}
Expand Down
Loading

0 comments on commit 67deda0

Please sign in to comment.