Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up cudf device atomic with cuda::atomic_ref #13583

Merged
merged 11 commits into from
Jul 10, 2023
14 changes: 10 additions & 4 deletions cpp/include/cudf/detail/copy_if.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include <cudf/detail/gather.hpp>
#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/cuda.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/null_mask.hpp>
#include <cudf/strings/string_view.cuh>
#include <cudf/table/table.hpp>
Expand All @@ -44,6 +43,8 @@

#include <cub/cub.cuh>

#include <cuda/atomic>

#include <algorithm>

namespace cudf {
Expand Down Expand Up @@ -181,7 +182,9 @@ __launch_bounds__(block_size) __global__
if (wid > 0 && wid < last_warp)
output_valid[valid_index] = valid_warp;
else {
atomicOr(&output_valid[valid_index], valid_warp);
cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref{
output_valid[valid_index]};
ref.fetch_or(valid_warp, cuda::std::memory_order_relaxed);
}
}

Expand All @@ -190,7 +193,9 @@ __launch_bounds__(block_size) __global__
uint32_t valid_warp = __ballot_sync(0xffff'ffffu, temp_valids[block_size + threadIdx.x]);
if (lane == 0 && valid_warp != 0) {
tmp_warp_valid_counts += __popc(valid_warp);
atomicOr(&output_valid[valid_index + num_warps], valid_warp);
cuda::atomic_ref<cudf::bitmask_type, cuda::thread_scope_device> ref{
output_valid[valid_index + num_warps]};
ref.fetch_or(valid_warp, cuda::std::memory_order_relaxed);
}
}
}
Expand All @@ -206,7 +211,8 @@ __launch_bounds__(block_size) __global__
cudf::detail::single_lane_block_sum_reduce<block_size, leader_lane>(warp_valid_counts);

if (threadIdx.x == 0) { // one thread computes and adds to null count
atomicAdd(output_null_count, block_sum - block_valid_count);
cuda::atomic_ref<size_type, cuda::thread_scope_device> ref{*output_null_count};
ref.fetch_add(block_sum - block_valid_count, cuda::std::memory_order_relaxed);
}
}

Expand Down
124 changes: 1 addition & 123 deletions cpp/include/cudf/detail/utilities/device_atomics.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
* cudf::duration_us, cudf::duration_ns and bool
* where CUDA atomic operations are, `atomicAdd`, `atomicMin`, `atomicMax`,
* `atomicCAS`.
* `atomicAnd`, `atomicOr`, `atomicXor` are also supported for integer data types.
* Also provides `cudf::genericAtomicOperation` which performs atomic operation
* with the given binary operator.
*/
Expand Down Expand Up @@ -161,7 +160,6 @@ struct genericAtomicOperationImpl<T, Op, 8> {
// specialized functions for operators
// `atomicAdd` supports int32, float, double (signed int64 is not supported.)
// `atomicMin`, `atomicMax` support int32_t, int64_t
// `atomicAnd`, `atomicOr`, `atomicXor` support int32_t, int64_t
template <>
struct genericAtomicOperationImpl<float, DeviceSum, 4> {
using T = float;
Expand Down Expand Up @@ -252,63 +250,6 @@ struct genericAtomicOperationImpl<int64_t, DeviceMax, 8> {
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceAnd, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceAnd op)
{
return atomicAnd(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceAnd, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceAnd op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicAnd(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceOr, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceOr op)
{
return atomicOr(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceOr, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceOr op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicOr(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceXor, 4> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceXor op)
{
return atomicXor(addr, update_value);
}
};

template <typename T>
struct genericAtomicOperationImpl<T, DeviceXor, 8> {
__forceinline__ __device__ T operator()(T* addr, T const& update_value, DeviceXor op)
{
using T_int = long long int;
static_assert(sizeof(T) == sizeof(T_int));
T ret = atomicXor(reinterpret_cast<T_int*>(addr), type_reinterpret<T_int, T>(update_value));
return ret;
}
};
// -----------------------------------------------------------------------
// the implementation of `typesAtomicCASImpl`
template <typename T, size_t N = sizeof(T)>
Expand Down Expand Up @@ -598,66 +539,3 @@ __forceinline__ __device__ T atomicCAS(T* address, T compare, T val)
{
return cudf::detail::typesAtomicCASImpl<T>()(address, compare, val);
}

/**
* @brief Overloads for `atomicAnd`
* reads the `old` located at the `address` in global or shared memory,
* computes (old & val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicAnd` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicAnd(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceAnd{});
}

/**
* @brief Overloads for `atomicOr`
* reads the `old` located at the `address` in global or shared memory,
* computes (old | val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicOr` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicOr(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceOr{});
}

/**
* @brief Overloads for `atomicXor`
* reads the `old` located at the `address` in global or shared memory,
* computes (old ^ val), and stores the result back to memory at the same
* address. These three operations are performed in one atomic transaction.
*
* The supported types for `atomicXor` are:
* singed/unsigned integer 8/16/32/64 bits
* Cuda natively supports `sint32`, `uint32`, `sint64`, `uint64`.
*
* @param[in] address The address of old value in global or shared memory
* @param[in] val The value to be computed
*
* @returns The old value at `address`
*/
template <typename T, std::enable_if_t<std::is_integral_v<T>, T>* = nullptr>
__forceinline__ __device__ T atomicXor(T* address, T val)
{
return cudf::genericAtomicOperation(address, val, cudf::DeviceXor{});
}
33 changes: 0 additions & 33 deletions cpp/include/cudf/detail/utilities/device_operators.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -230,39 +230,6 @@ struct DeviceProduct {
}
};

/**
* @brief binary `and` operator
*/
struct DeviceAnd {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs & rhs)
{
return (lhs & rhs);
}
};

/**
* @brief binary `or` operator
*/
struct DeviceOr {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs | rhs)
{
return (lhs | rhs);
}
};

/**
* @brief binary `xor` operator
*/
struct DeviceXor {
template <typename T, std::enable_if_t<std::is_integral_v<T>>* = nullptr>
CUDF_HOST_DEVICE inline auto operator()(T const& lhs, T const& rhs) -> decltype(lhs ^ rhs)
{
return (lhs ^ rhs);
}
};

/**
* @brief Operator for calculating Lead/Lag window function.
*/
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/groupby/hash/multi_pass_kernels.cuh
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,11 +20,12 @@
#include <cudf/column/column_device_view.cuh>
#include <cudf/detail/aggregation/aggregation.hpp>
#include <cudf/detail/utilities/assert.cuh>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/dictionary/dictionary_column_view.hpp>
#include <cudf/table/table_device_view.cuh>
#include <cudf/utilities/type_dispatcher.hpp>

#include <cuda/atomic>

#include <cmath>

namespace cudf {
Expand Down Expand Up @@ -86,7 +87,8 @@ struct var_hash_functor {
auto x = static_cast<Target>(source.element<Source>(source_index));
auto mean = static_cast<Target>(sum.element<SumType>(target_index)) / group_size;
Target result = (x - mean) * (x - mean) / (group_size - ddof);
atomicAdd(&target.element<Target>(target_index), result);
cuda::atomic_ref<Target, cuda::thread_scope_device> ref{target.element<Target>(target_index)};
ref.fetch_add(result, cuda::std::memory_order_relaxed);
// STD sqrt is applied in finalize()

if (target_has_nulls and target.is_null(target_index)) { target.set_valid(target_index); }
Expand Down
40 changes: 25 additions & 15 deletions cpp/src/hash/concurrent_unordered_map.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <hash/managed.cuh>

#include <cudf/detail/nvtx/ranges.hpp>
#include <cudf/detail/utilities/device_atomics.cuh>
#include <cudf/detail/utilities/hash_functions.cuh>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/error.hpp>
Expand All @@ -35,6 +34,8 @@
#include <limits>
#include <type_traits>

#include <cuda/atomic>

namespace {
template <std::size_t N>
struct packed {
Expand Down Expand Up @@ -91,8 +92,8 @@ union pair_packer;
template <typename pair_type>
union pair_packer<pair_type, std::enable_if_t<is_packable<pair_type>()>> {
using packed_type = packed_t<pair_type>;
packed_type const packed;
pair_type const pair;
packed_type packed;
pair_type pair;

__device__ pair_packer(pair_type _pair) : pair{_pair} {}

Expand Down Expand Up @@ -268,16 +269,21 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
pair_packer<pair_type> const unused{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> const new_pair{insert_pair};
pair_packer<pair_type> const old{
atomicCAS(reinterpret_cast<typename pair_packer<pair_type>::packed_type*>(insert_location),
unused.packed,
new_pair.packed)};
pair_packer<pair_type> expected{thrust::make_pair(m_unused_key, m_unused_element)};
pair_packer<pair_type> desired{insert_pair};

using packed_type = typename pair_packer<pair_type>::packed_type;

if (old.packed == unused.packed) { return insert_result::SUCCESS; }
auto* insert_ptr = reinterpret_cast<packed_type*>(insert_location);
cuda::atomic_ref<packed_type, cuda::thread_scope_device> ref{*insert_ptr};
auto const success =
ref.compare_exchange_strong(expected.packed, desired.packed, cuda::std::memory_order_relaxed);

if (m_equal(old.pair.first, insert_pair.first)) { return insert_result::DUPLICATE; }
if (success) {
return insert_result::SUCCESS;
} else if (m_equal(expected.pair.first, insert_pair.first)) {
return insert_result::DUPLICATE;
}
return insert_result::CONTINUE;
}

Expand All @@ -292,16 +298,20 @@ class concurrent_unordered_map {
__device__ std::enable_if_t<not is_packable<pair_type>(), insert_result> attempt_insert(
value_type* const __restrict__ insert_location, value_type const& insert_pair)
{
key_type const old_key{atomicCAS(&(insert_location->first), m_unused_key, insert_pair.first)};
auto expected = m_unused_key;
cuda::atomic_ref<key_type, cuda::thread_scope_device> ref{insert_location->first};
auto const key_success =
ref.compare_exchange_strong(expected, insert_pair.first, cuda::std::memory_order_relaxed);

// Hash bucket empty
if (m_unused_key == old_key) {
if (key_success) {
insert_location->second = insert_pair.second;
return insert_result::SUCCESS;
}

// Key already exists
if (m_equal(old_key, insert_pair.first)) { return insert_result::DUPLICATE; }
else if (m_equal(expected, insert_pair.first)) {
return insert_result::DUPLICATE;
}

return insert_result::CONTINUE;
}
Expand Down
Loading