diff --git a/cpp/include/cudf/detail/utilities/stream_pool.hpp b/cpp/include/cudf/detail/utilities/stream_pool.hpp index 95384a9d73e..19ef26a10cb 100644 --- a/cpp/include/cudf/detail/utilities/stream_pool.hpp +++ b/cpp/include/cudf/detail/utilities/stream_pool.hpp @@ -25,6 +25,62 @@ namespace cudf::detail { +class cuda_stream_pool { + public: + // matching type used in rmm::cuda_stream_pool::get_stream(stream_id) + using stream_id_type = std::size_t; + + virtual ~cuda_stream_pool() = default; + + /** + * @brief Get a `cuda_stream_view` of a stream in the pool. + * + * This function is thread safe with respect to other calls to the same function. + * + * @return Stream view. + */ + virtual rmm::cuda_stream_view get_stream() = 0; + + /** + * @brief Get a `cuda_stream_view` of the stream associated with `stream_id`. + * + * Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream. + * This function is thread safe with respect to other calls to the same function. + * + * @param stream_id Unique identifier for the desired stream + * @return Requested stream view. + */ + virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0; + + /** + * @brief Get a set of `cuda_stream_view` objects from the pool. + * + * An attempt is made to ensure that the returned vector does not contain duplicate + * streams, but this cannot be guaranteed if `count` is greater than the value returned by + * `get_stream_pool_size()`. + * + * This function is thread safe with respect to other calls to the same function. + * + * @param count The number of stream views to return. + * @return Vector containing `count` stream views. + */ + virtual std::vector get_streams(std::size_t count) = 0; + + /** + * @brief Get the number of unique stream objects in the pool. + * + * This function is thread safe with respect to other calls to the same function. + * + * @return the number of stream objects in the pool + */ + virtual std::size_t get_stream_pool_size() const = 0; +}; + +/** + * @brief Initialize global stream pool. + */ +cuda_stream_pool* create_global_cuda_stream_pool(); + /** * @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another * stream. diff --git a/cpp/src/utilities/stream_pool.cpp b/cpp/src/utilities/stream_pool.cpp index b3b20889ef8..121873ad44b 100644 --- a/cpp/src/utilities/stream_pool.cpp +++ b/cpp/src/utilities/stream_pool.cpp @@ -29,8 +29,6 @@ namespace cudf::detail { -namespace { - // TODO: what is a good number here. what's the penalty for making it larger? // Dave Baranec rule of thumb was max_streams_needed * num_concurrent_threads, // where num_concurrent_threads was estimated to be 4. so using 32 will allow @@ -58,57 +56,6 @@ std::size_t constexpr STREAM_POOL_SIZE = 32; } while (0) #endif -class cuda_stream_pool { - public: - // matching type used in rmm::cuda_stream_pool::get_stream(stream_id) - using stream_id_type = std::size_t; - - virtual ~cuda_stream_pool() = default; - - /** - * @brief Get a `cuda_stream_view` of a stream in the pool. - * - * This function is thread safe with respect to other calls to the same function. - * - * @return Stream view. - */ - virtual rmm::cuda_stream_view get_stream() = 0; - - /** - * @brief Get a `cuda_stream_view` of the stream associated with `stream_id`. - * - * Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream. - * This function is thread safe with respect to other calls to the same function. - * - * @param stream_id Unique identifier for the desired stream - * @return Requested stream view. - */ - virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0; - - /** - * @brief Get a set of `cuda_stream_view` objects from the pool. - * - * An attempt is made to ensure that the returned vector does not contain duplicate - * streams, but this cannot be guaranteed if `count` is greater than the value returned by - * `get_stream_pool_size()`. - * - * This function is thread safe with respect to other calls to the same function. - * - * @param count The number of stream views to return. - * @return Vector containing `count` stream views. - */ - virtual std::vector get_streams(std::size_t count) = 0; - - /** - * @brief Get the number of stream objects in the pool. - * - * This function is thread safe with respect to other calls to the same function. - * - * @return the number of stream objects in the pool - */ - virtual std::size_t get_stream_pool_size() const = 0; -}; - /** * @brief Implementation of `cuda_stream_pool` that wraps an `rmm::cuda_stram_pool`. */ @@ -157,13 +104,9 @@ class debug_cuda_stream_pool : public cuda_stream_pool { std::size_t get_stream_pool_size() const override { return 1UL; } }; -/** - * @brief Initialize global stream pool. - */ cuda_stream_pool* create_global_cuda_stream_pool() { if (getenv("LIBCUDF_USE_DEBUG_STREAM_POOL")) return new debug_cuda_stream_pool(); - return new rmm_cuda_stream_pool(); } @@ -231,8 +174,6 @@ cuda_stream_pool& global_cuda_stream_pool() return *pools[device_id.value()]; } -} // anonymous namespace - std::vector fork_streams(rmm::cuda_stream_view stream, std::size_t count) { auto const streams = global_cuda_stream_pool().get_streams(count); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index b35c72b9e9d..af4ab8c2485 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -644,6 +644,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing) ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing) diff --git a/cpp/tests/streams/pool_test.cu b/cpp/tests/streams/pool_test.cu new file mode 100644 index 00000000000..0f92e1c0c2b --- /dev/null +++ b/cpp/tests/streams/pool_test.cu @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +class StreamPoolTest : public cudf::test::BaseFixture {}; + +__global__ void do_nothing_kernel() {} + +TEST_F(StreamPoolTest, ForkStreams) +{ + auto streams = cudf::detail::fork_streams(cudf::test::get_default_stream(), 2); + for (auto& stream : streams) { + do_nothing_kernel<<<1, 32, 0, stream.value()>>>(); + } +} diff --git a/cpp/tests/utilities/identify_stream_usage.cpp b/cpp/tests/utilities/identify_stream_usage.cpp index ab2a85a0842..bdc338d2c92 100644 --- a/cpp/tests/utilities/identify_stream_usage.cpp +++ b/cpp/tests/utilities/identify_stream_usage.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include @@ -31,10 +32,14 @@ #include #include +// This file is compiled into a separate library that is dynamically loaded with LD_PRELOAD at +// runtime to libcudf to override some stream-related symbols in libcudf. The goal of such a library +// is to verify if the stream/stream pool is being correctly forwarded between API calls. +// // We control whether to override cudf::test::get_default_stream or -// cudf::get_default_stream with a compile-time flag. Thesee are the two valid -// options: -// 1. STREAM_MODE_TESTING=OFF: In this mode, cudf::get_default_stream will +// cudf::get_default_stream with a compile-time flag. The behaviour of tests +// depend on whether STREAM_MODE_TESTING is defined: +// 1. If STREAM_MODE_TESTING is not defined, cudf::get_default_stream will // return a custom stream and stream_is_invalid will return true if any CUDA // API is called using any of CUDA's default stream constants // (cudaStreamLegacy, cudaStreamDefault, or cudaStreamPerThread). This check @@ -44,7 +49,7 @@ // is not sufficient to guarantee a stream-ordered API because it will not // identify places in the code that use cudf::get_default_stream instead of // properly forwarding along a user-provided stream. -// 2. STREAM_MODE_TESTING=ON: In this mode, cudf::test::get_default_stream +// 2. If STREAM_MODE_TESTING compiler option is defined, cudf::test::get_default_stream // returns a custom stream and stream_is_invalid returns true if any CUDA // API is called using any stream other than cudf::test::get_default_stream. // This is a necessary and sufficient condition to ensure that libcudf is @@ -66,6 +71,34 @@ rmm::cuda_stream_view const get_default_stream() } // namespace test #endif +#ifdef STREAM_MODE_TESTING +namespace detail { + +/** + * @brief Implementation of `cuda_stream_pool` that always returns the + * `cudf::test::get_default_stream()` + */ +class test_cuda_stream_pool : public cuda_stream_pool { + public: + rmm::cuda_stream_view get_stream() override { return cudf::test::get_default_stream(); } + [[maybe_unused]] rmm::cuda_stream_view get_stream(stream_id_type stream_id) override + { + return cudf::test::get_default_stream(); + } + + std::vector get_streams(std::size_t count) override + { + return std::vector(count, cudf::test::get_default_stream()); + } + + std::size_t get_stream_pool_size() const override { return 1UL; } +}; + +cuda_stream_pool* create_global_cuda_stream_pool() { return new test_cuda_stream_pool(); } + +} // namespace detail +#endif + } // namespace cudf bool stream_is_invalid(cudaStream_t stream)