Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Testing stream pool implementation #14437

Merged
merged 12 commits into from
Dec 19, 2023
56 changes: 56 additions & 0 deletions cpp/include/cudf/detail/utilities/stream_pool.hpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a copy-paste of the declarations from stream_pool.cpp, right? It's a good change, just want to make sure I'm not missing any other changes.

Copy link
Contributor Author

@shrshi shrshi Dec 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have moved the parent class declaration and create_global_cuda_stream_pool from stream_pool.cpp to the header file so that test_cuda_stream_pool in identify_stream_usage.cpp can include it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notes from an offline discussion - cuda_stream_pool was "hidden" in the source file on purpose; we want the pool usage to be limited to fork_streams/join_streams. The change here exposes more of the stream pool than we'd like.
However, we don't see a better solution, since the current approach at least does not require additional APIs in libcudf.

Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,62 @@

namespace cudf::detail {

class cuda_stream_pool {
public:
// matching type used in rmm::cuda_stream_pool::get_stream(stream_id)
using stream_id_type = std::size_t;

virtual ~cuda_stream_pool() = default;

/**
* @brief Get a `cuda_stream_view` of a stream in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return Stream view.
*/
virtual rmm::cuda_stream_view get_stream() = 0;

/**
* @brief Get a `cuda_stream_view` of the stream associated with `stream_id`.
*
* Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream.
* This function is thread safe with respect to other calls to the same function.
*
* @param stream_id Unique identifier for the desired stream
* @return Requested stream view.
*/
virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0;

/**
* @brief Get a set of `cuda_stream_view` objects from the pool.
*
* An attempt is made to ensure that the returned vector does not contain duplicate
* streams, but this cannot be guaranteed if `count` is greater than the value returned by
* `get_stream_pool_size()`.
*
* This function is thread safe with respect to other calls to the same function.
*
* @param count The number of stream views to return.
* @return Vector containing `count` stream views.
*/
virtual std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) = 0;

/**
* @brief Get the number of unique stream objects in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return the number of stream objects in the pool
*/
virtual std::size_t get_stream_pool_size() const = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is a bit redundant. We know that this is a stream pool.

Suggested change
virtual std::size_t get_stream_pool_size() const = 0;
virtual std::size_t get_pool_size() const = 0;

or just size().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_pool_size() is consistent with rmm::cuda_stream_pool. Surprised this wasn't caught before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, should be get_pool_size. Not a change for this PR, IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but then please address it in a follow up work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for pointing this out! I'll work on this change in a follow-up PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this a change for this PR, if all the code in this section is new in this PR?

};

/**
* @brief Initialize global stream pool.
*/
cuda_stream_pool* create_global_cuda_stream_pool();

/**
* @brief Acquire a set of `cuda_stream_view` objects and synchronize them to an event on another
* stream.
Expand Down
59 changes: 0 additions & 59 deletions cpp/src/utilities/stream_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@

namespace cudf::detail {

namespace {

// TODO: what is a good number here. what's the penalty for making it larger?
// Dave Baranec rule of thumb was max_streams_needed * num_concurrent_threads,
// where num_concurrent_threads was estimated to be 4. so using 32 will allow
Expand Down Expand Up @@ -58,57 +56,6 @@ std::size_t constexpr STREAM_POOL_SIZE = 32;
} while (0)
#endif

class cuda_stream_pool {
public:
// matching type used in rmm::cuda_stream_pool::get_stream(stream_id)
using stream_id_type = std::size_t;

virtual ~cuda_stream_pool() = default;

/**
* @brief Get a `cuda_stream_view` of a stream in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return Stream view.
*/
virtual rmm::cuda_stream_view get_stream() = 0;

/**
* @brief Get a `cuda_stream_view` of the stream associated with `stream_id`.
*
* Equivalent values of `stream_id` return a `cuda_stream_view` to the same underlying stream.
* This function is thread safe with respect to other calls to the same function.
*
* @param stream_id Unique identifier for the desired stream
* @return Requested stream view.
*/
virtual rmm::cuda_stream_view get_stream(stream_id_type stream_id) = 0;

/**
* @brief Get a set of `cuda_stream_view` objects from the pool.
*
* An attempt is made to ensure that the returned vector does not contain duplicate
* streams, but this cannot be guaranteed if `count` is greater than the value returned by
* `get_stream_pool_size()`.
*
* This function is thread safe with respect to other calls to the same function.
*
* @param count The number of stream views to return.
* @return Vector containing `count` stream views.
*/
virtual std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) = 0;

/**
* @brief Get the number of stream objects in the pool.
*
* This function is thread safe with respect to other calls to the same function.
*
* @return the number of stream objects in the pool
*/
virtual std::size_t get_stream_pool_size() const = 0;
};

/**
* @brief Implementation of `cuda_stream_pool` that wraps an `rmm::cuda_stram_pool`.
*/
Expand Down Expand Up @@ -157,13 +104,9 @@ class debug_cuda_stream_pool : public cuda_stream_pool {
std::size_t get_stream_pool_size() const override { return 1UL; }
};

/**
* @brief Initialize global stream pool.
*/
cuda_stream_pool* create_global_cuda_stream_pool()
{
if (getenv("LIBCUDF_USE_DEBUG_STREAM_POOL")) return new debug_cuda_stream_pool();

return new rmm_cuda_stream_pool();
}

Expand Down Expand Up @@ -231,8 +174,6 @@ cuda_stream_pool& global_cuda_stream_pool()
return *pools[device_id.value()];
}

} // anonymous namespace

std::vector<rmm::cuda_stream_view> fork_streams(rmm::cuda_stream_view stream, std::size_t count)
{
auto const streams = global_cuda_stream_pool().get_streams(count);
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SORTING_TEST streams/sorting_test.cpp STREAM_MODE testing)
Expand Down
33 changes: 33 additions & 0 deletions cpp/tests/streams/pool_test.cu
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/default_stream.hpp>
shrshi marked this conversation as resolved.
Show resolved Hide resolved

#include <cudf/detail/utilities/stream_pool.hpp>
#include <rmm/cuda_stream_view.hpp>

class StreamPoolTest : public cudf::test::BaseFixture {};

__global__ void do_nothing_kernel() {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if this kernel without any code will be optimized out so the for loop below will never be executed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out! I printed the GPU trace from the nsys profiler to check this and it shows that the do_nothing_kernel is being executed.


TEST_F(StreamPoolTest, ForkStreams)
vuule marked this conversation as resolved.
Show resolved Hide resolved
{
auto streams = cudf::detail::fork_streams(cudf::test::get_default_stream(), 2);
for (auto& stream : streams) {
do_nothing_kernel<<<1, 32, 0, stream.value()>>>();
}
}
41 changes: 37 additions & 4 deletions cpp/tests/utilities/identify_stream_usage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include <cudf/detail/utilities/stacktrace.hpp>
#include <cudf/detail/utilities/stream_pool.hpp>

#include <rmm/cuda_stream.hpp>
#include <rmm/cuda_stream_view.hpp>
Expand All @@ -31,10 +32,14 @@
#include <string>
#include <unordered_map>

// This file is compiled into a separate library that is dynamically loaded with LD_PRELOAD at
// runtime to libcudf to override some stream-related symbols in libcudf. The goal of such a library
// is to verify if the stream/stream pool is being correctly forwarded between API calls.
//
// We control whether to override cudf::test::get_default_stream or
// cudf::get_default_stream with a compile-time flag. Thesee are the two valid
// options:
// 1. STREAM_MODE_TESTING=OFF: In this mode, cudf::get_default_stream will
// cudf::get_default_stream with a compile-time flag. The behaviour of tests
// depend on whether STREAM_MODE_TESTING is defined:
// 1. If STREAM_MODE_TESTING is not defined, cudf::get_default_stream will
// return a custom stream and stream_is_invalid will return true if any CUDA
// API is called using any of CUDA's default stream constants
// (cudaStreamLegacy, cudaStreamDefault, or cudaStreamPerThread). This check
Expand All @@ -44,7 +49,7 @@
// is not sufficient to guarantee a stream-ordered API because it will not
// identify places in the code that use cudf::get_default_stream instead of
// properly forwarding along a user-provided stream.
// 2. STREAM_MODE_TESTING=ON: In this mode, cudf::test::get_default_stream
// 2. If STREAM_MODE_TESTING compiler option is defined, cudf::test::get_default_stream
// returns a custom stream and stream_is_invalid returns true if any CUDA
// API is called using any stream other than cudf::test::get_default_stream.
// This is a necessary and sufficient condition to ensure that libcudf is
Expand All @@ -66,6 +71,34 @@ rmm::cuda_stream_view const get_default_stream()
} // namespace test
vyasr marked this conversation as resolved.
Show resolved Hide resolved
#endif

#ifdef STREAM_MODE_TESTING
namespace detail {

/**
* @brief Implementation of `cuda_stream_pool` that always returns the
* `cudf::test::get_default_stream()`
*/
class test_cuda_stream_pool : public cuda_stream_pool {
public:
rmm::cuda_stream_view get_stream() override { return cudf::test::get_default_stream(); }
[[maybe_unused]] rmm::cuda_stream_view get_stream(stream_id_type stream_id) override
{
return cudf::test::get_default_stream();
}

std::vector<rmm::cuda_stream_view> get_streams(std::size_t count) override
{
return std::vector<rmm::cuda_stream_view>(count, cudf::test::get_default_stream());
}

std::size_t get_stream_pool_size() const override { return 1UL; }
vyasr marked this conversation as resolved.
Show resolved Hide resolved
};

cuda_stream_pool* create_global_cuda_stream_pool() { return new test_cuda_stream_pool(); }
harrism marked this conversation as resolved.
Show resolved Hide resolved

} // namespace detail
#endif

} // namespace cudf

bool stream_is_invalid(cudaStream_t stream)
Expand Down