From 2a931269af84a04a920f1d2d89b68f6dd2ac85e5 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 3 Nov 2023 21:07:04 +0000 Subject: [PATCH 1/8] added streams to parquet api --- cpp/include/cudf/io/parquet.hpp | 6 +- cpp/include/cudf_test/column_wrapper.hpp | 16 +- cpp/src/io/functions.cpp | 10 +- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/io/parquet_test.cpp | 145 ++++++++++++++++++ cpp/tests/utilities/identify_stream_usage.cpp | 4 +- 6 files changed, 171 insertions(+), 11 deletions(-) create mode 100644 cpp/tests/streams/io/parquet_test.cpp diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index ea18da74d5a..aac643eb4dc 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -401,6 +401,7 @@ class parquet_reader_options_builder { * @endcode * * @param options Settings for controlling reading behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource used to allocate device memory of the table in the returned * table_with_metadata * @@ -408,6 +409,7 @@ class parquet_reader_options_builder { */ table_with_metadata read_parquet( parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -1163,11 +1165,13 @@ class parquet_writer_options_builder { * @endcode * * @param options Settings for controlling writing behavior + * @param stream CUDA stream used for device memory operations and kernel launches * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if * requested in parquet_writer_options (empty blob otherwise). */ -std::unique_ptr> write_parquet(parquet_writer_options const& options); +std::unique_ptr> write_parquet( + parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Merges multiple raw metadata blobs that were previously created by write_parquet diff --git a/cpp/include/cudf_test/column_wrapper.hpp b/cpp/include/cudf_test/column_wrapper.hpp index e94dfea9dcf..b9f2e0d9868 100644 --- a/cpp/include/cudf_test/column_wrapper.hpp +++ b/cpp/include/cudf_test/column_wrapper.hpp @@ -803,7 +803,8 @@ class strings_column_wrapper : public detail::column_wrapper { offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); auto d_bitmask = cudf::detail::make_device_uvector_sync( null_mask, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource()); - wrapped = cudf::make_strings_column(d_chars, d_offsets, d_bitmask, null_count); + wrapped = cudf::make_strings_column( + d_chars, d_offsets, d_bitmask, null_count, cudf::test::get_default_stream()); } /** @@ -1846,7 +1847,8 @@ class structs_column_wrapper : public detail::column_wrapper { child_column_wrappers.end(), std::back_inserter(child_columns), [&](auto const& column_wrapper) { - return std::make_unique(column_wrapper.get()); + return std::make_unique(column_wrapper.get(), + cudf::test::get_default_stream()); }); init(std::move(child_columns), validity); } @@ -1882,7 +1884,8 @@ class structs_column_wrapper : public detail::column_wrapper { child_column_wrappers.end(), std::back_inserter(child_columns), [&](auto const& column_wrapper) { - return std::make_unique(column_wrapper.get()); + return std::make_unique(column_wrapper.get(), + cudf::test::get_default_stream()); }); init(std::move(child_columns), validity_iter); } @@ -1906,8 +1909,11 @@ class structs_column_wrapper : public detail::column_wrapper { return cudf::test::detail::make_null_mask(validity.begin(), validity.end()); }(); - wrapped = cudf::make_structs_column( - num_rows, std::move(child_columns), null_count, std::move(null_mask)); + wrapped = cudf::make_structs_column(num_rows, + std::move(child_columns), + null_count, + std::move(null_mask), + cudf::test::get_default_stream()); } template diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 00d56008611..7ca919e362c 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -478,13 +478,14 @@ using namespace cudf::io::parquet::detail; namespace detail_parquet = cudf::io::parquet::detail; table_with_metadata read_parquet(parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { CUDF_FUNC_RANGE(); auto datasources = make_datasources(options.get_source()); - auto reader = std::make_unique( - std::move(datasources), options, cudf::get_default_stream(), mr); + auto reader = + std::make_unique(std::move(datasources), options, stream, mr); return reader->read(options); } @@ -544,7 +545,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata) /** * @copydoc cudf::io::write_parquet */ -std::unique_ptr> write_parquet(parquet_writer_options const& options) +std::unique_ptr> write_parquet(parquet_writer_options const& options, + rmm::cuda_stream_view stream) { namespace io_detail = cudf::io::detail; @@ -552,7 +554,7 @@ std::unique_ptr> write_parquet(parquet_writer_options const auto sinks = make_datasinks(options.get_sink()); auto writer = std::make_unique( - std::move(sinks), options, io_detail::single_write_mode::YES, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::YES, stream); writer->write(options.get_table(), options.get_partitions()); diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index e966ef3fb04..ae0056612f2 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -659,6 +659,7 @@ ConfigureTest( ) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing) # ################################################################################################## # Install tests #################################################################################### diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp new file mode 100644 index 00000000000..9576238437e --- /dev/null +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +// Global environment for temporary files +auto const temp_env = static_cast( + ::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment)); + +class ParquetTest : public cudf::test::BaseFixture {}; + +TEST_F(ParquetTest, ParquetWriter) +{ + constexpr auto num_rows = 10; + + std::vector zeros(num_rows, 0); + std::vector ones(num_rows, 1); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{12}}; + }); + auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{-12}}; + }); + + cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); + cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); + + cudf::test::lists_column_wrapper col8{ + {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; + + cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); + cudf::test::structs_column_wrapper col9{child_col}; + + std::vector col10_data(num_rows, "rapids"); + cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); + + cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); + + cudf::io::table_input_metadata tab_metadata(tab); + tab_metadata.column_metadata[0].set_name("bools"); + tab_metadata.column_metadata[1].set_name("int8s"); + tab_metadata.column_metadata[2].set_name("int16s"); + tab_metadata.column_metadata[3].set_name("int32s"); + tab_metadata.column_metadata[4].set_name("floats"); + tab_metadata.column_metadata[5].set_name("doubles"); + tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); + tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); + tab_metadata.column_metadata[8].set_name("lists"); + tab_metadata.column_metadata[9].set_name("structs"); + tab_metadata.column_metadata[10].set_name("strings"); + + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab) + .metadata(tab_metadata); + cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); +} + +TEST_F(ParquetTest, ParquetReader) +{ + constexpr auto num_rows = 10; + + std::vector zeros(num_rows, 0); + std::vector ones(num_rows, 1); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{12}}; + }); + auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{-12}}; + }); + + cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); + cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); + + cudf::test::lists_column_wrapper col8{ + {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; + + cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); + cudf::test::structs_column_wrapper col9{child_col}; + + std::vector col10_data(num_rows, "rapids"); + cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); + + cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); + + cudf::io::table_input_metadata tab_metadata(tab); + tab_metadata.column_metadata[0].set_name("bools"); + tab_metadata.column_metadata[1].set_name("int8s"); + tab_metadata.column_metadata[2].set_name("int16s"); + tab_metadata.column_metadata[3].set_name("int32s"); + tab_metadata.column_metadata[4].set_name("floats"); + tab_metadata.column_metadata[5].set_name("doubles"); + tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); + tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); + tab_metadata.column_metadata[8].set_name("lists"); + tab_metadata.column_metadata[9].set_name("structs"); + tab_metadata.column_metadata[10].set_name("strings"); + + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab) + .metadata(tab_metadata); + cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts, cudf::test::get_default_stream()); +} diff --git a/cpp/tests/utilities/identify_stream_usage.cpp b/cpp/tests/utilities/identify_stream_usage.cpp index ab2a85a0842..3caef2dc509 100644 --- a/cpp/tests/utilities/identify_stream_usage.cpp +++ b/cpp/tests/utilities/identify_stream_usage.cpp @@ -15,6 +15,7 @@ */ #include +#include #include #include @@ -72,7 +73,8 @@ bool stream_is_invalid(cudaStream_t stream) { #ifdef STREAM_MODE_TESTING // In this mode the _only_ valid stream is the one returned by cudf::test::get_default_stream. - return (stream != cudf::test::get_default_stream().value()); + return (stream == cudf::get_default_stream().value() || stream == cudaStreamDefault || + stream == cudaStreamLegacy || stream == cudaStreamPerThread); #else // We explicitly list the possibilities rather than using // `cudf::get_default_stream().value()` because there is no guarantee that From 1316b4b99cc3b58ea00a7bd57b90b5be34d130fc Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 3 Nov 2023 22:50:48 +0000 Subject: [PATCH 2/8] streams for chunked operations --- cpp/include/cudf/io/parquet.hpp | 8 +++- cpp/src/io/functions.cpp | 17 ++++---- cpp/tests/streams/io/parquet_test.cpp | 62 +++++++++++++++++++++++++++ 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index aac643eb4dc..106d522f244 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -440,11 +440,13 @@ class chunked_parquet_reader { * @param chunk_read_limit Limit on total number of bytes to be returned per read, * or `0` if there is no limit * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ chunked_parquet_reader( std::size_t chunk_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -463,12 +465,14 @@ class chunked_parquet_reader { * @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or * `0` if there is no limit * @param options The options used to read Parquet file + * @param stream CUDA stream used for device memory operations and kernel launches * @param mr Device memory resource to use for device memory allocation */ chunked_parquet_reader( std::size_t chunk_read_limit, std::size_t pass_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream(), rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); /** @@ -1782,8 +1786,10 @@ class parquet_chunked_writer { * @brief Constructor with chunked writer options * * @param[in] options options used to write table + * @param[in] stream CUDA stream used for device memory operations and kernel launches */ - parquet_chunked_writer(chunked_parquet_writer_options const& options); + parquet_chunked_writer(chunked_parquet_writer_options const& options, + rmm::cuda_stream_view stream = cudf::get_default_stream()); /** * @brief Writes table to output. diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 7ca919e362c..34c6270a030 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -566,13 +566,10 @@ std::unique_ptr> write_parquet(parquet_writer_options const */ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) - : reader{std::make_unique(chunk_read_limit, - 0, - make_datasources(options.get_source()), - options, - cudf::get_default_stream(), - mr)} + : reader{std::make_unique( + chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)} { } @@ -582,12 +579,13 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit, std::size_t pass_read_limit, parquet_reader_options const& options, + rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) : reader{std::make_unique(chunk_read_limit, pass_read_limit, make_datasources(options.get_source()), options, - cudf::get_default_stream(), + stream, mr)} { } @@ -620,14 +618,15 @@ table_with_metadata chunked_parquet_reader::read_chunk() const /** * @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer */ -parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options) +parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options, + rmm::cuda_stream_view stream) { namespace io_detail = cudf::io::detail; auto sinks = make_datasinks(options.get_sink()); writer = std::make_unique( - std::move(sinks), options, io_detail::single_write_mode::NO, cudf::get_default_stream()); + std::move(sinks), options, io_detail::single_write_mode::NO, stream); } /** diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp index 9576238437e..a12b289e2c3 100644 --- a/cpp/tests/streams/io/parquet_test.cpp +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -142,4 +142,66 @@ TEST_F(ParquetTest, ParquetReader) cudf::io::parquet_reader_options in_opts = cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); auto result = cudf::io::read_parquet(in_opts, cudf::test::get_default_stream()); + auto meta = cudf::io::read_parquet_metadata(cudf::io::source_info{filepath}); +} + +TEST_F(ParquetTest, ChunkedOperations) +{ + constexpr auto num_rows = 10; + + std::vector zeros(num_rows, 0); + std::vector ones(num_rows, 1); + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{12}}; + }); + auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{-12}}; + }); + + cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); + cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); + cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); + + cudf::test::lists_column_wrapper col8{ + {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; + + cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); + cudf::test::structs_column_wrapper col9{child_col}; + + std::vector col10_data(num_rows, "rapids"); + cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); + + cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); + + cudf::io::table_input_metadata tab_metadata(tab); + tab_metadata.column_metadata[0].set_name("bools"); + tab_metadata.column_metadata[1].set_name("int8s"); + tab_metadata.column_metadata[2].set_name("int16s"); + tab_metadata.column_metadata[3].set_name("int32s"); + tab_metadata.column_metadata[4].set_name("floats"); + tab_metadata.column_metadata[5].set_name("doubles"); + tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); + tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); + tab_metadata.column_metadata[8].set_name("lists"); + tab_metadata.column_metadata[9].set_name("structs"); + tab_metadata.column_metadata[10].set_name("strings"); + + auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); + cudf::io::chunked_parquet_writer_options out_opts = + cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath}) + .metadata(tab_metadata); + cudf::io::parquet_chunked_writer(out_opts, cudf::test::get_default_stream()).write(tab); + + auto reader = cudf::io::chunked_parquet_reader( + 1L << 31, + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}), + cudf::test::get_default_stream()); + while (reader.has_next()) { + auto chunk = reader.read_chunk(); + } } From e31ce02cd757951dbe0b9c30703c3c8d56faede2 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 6 Nov 2023 18:44:18 +0000 Subject: [PATCH 3/8] Updated cudf default stream lookup --- cpp/tests/utilities/identify_stream_usage.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/tests/utilities/identify_stream_usage.cpp b/cpp/tests/utilities/identify_stream_usage.cpp index 3caef2dc509..e908a95a028 100644 --- a/cpp/tests/utilities/identify_stream_usage.cpp +++ b/cpp/tests/utilities/identify_stream_usage.cpp @@ -14,8 +14,8 @@ * limitations under the License. */ +#include #include -#include #include #include @@ -73,7 +73,7 @@ bool stream_is_invalid(cudaStream_t stream) { #ifdef STREAM_MODE_TESTING // In this mode the _only_ valid stream is the one returned by cudf::test::get_default_stream. - return (stream == cudf::get_default_stream().value() || stream == cudaStreamDefault || + return (stream == cudf::detail::default_stream_value.value() || stream == cudaStreamDefault || stream == cudaStreamLegacy || stream == cudaStreamPerThread); #else // We explicitly list the possibilities rather than using From 09ebf8e38b78d597eb56104ec6478b2742e854c8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Mon, 6 Nov 2023 20:16:05 +0000 Subject: [PATCH 4/8] compare with rmm stream directly --- cpp/tests/utilities/identify_stream_usage.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cpp/tests/utilities/identify_stream_usage.cpp b/cpp/tests/utilities/identify_stream_usage.cpp index e908a95a028..d2ba9dfcc13 100644 --- a/cpp/tests/utilities/identify_stream_usage.cpp +++ b/cpp/tests/utilities/identify_stream_usage.cpp @@ -14,7 +14,6 @@ * limitations under the License. */ -#include #include #include @@ -73,7 +72,12 @@ bool stream_is_invalid(cudaStream_t stream) { #ifdef STREAM_MODE_TESTING // In this mode the _only_ valid stream is the one returned by cudf::test::get_default_stream. - return (stream == cudf::detail::default_stream_value.value() || stream == cudaStreamDefault || +#if defined(CUDF_USE_PER_THREAD_DEFAULT_STREAM) + rmm::cuda_stream_view default_stream_value{rmm::cuda_stream_per_thread}; +#else + rmm::cuda_stream_view default_stream_value{}; +#endif + return (stream == default_stream_value.value() || stream == cudaStreamDefault || stream == cudaStreamLegacy || stream == cudaStreamPerThread); #else // We explicitly list the possibilities rather than using From 23427a1ec3ec6b194a8305446590fd2ad04c57d8 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Jan 2024 18:31:58 +0000 Subject: [PATCH 5/8] removed relaxation to test stream relaxation; test cleanup --- cpp/tests/CMakeLists.txt | 1 + cpp/tests/streams/io/parquet_test.cpp | 129 +++--------------- cpp/tests/utilities/identify_stream_usage.cpp | 8 +- 3 files changed, 21 insertions(+), 117 deletions(-) diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index d0abcc225d1..1b391a4d088 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -647,6 +647,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing) +ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing) ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing) ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing) diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp index a12b289e2c3..067c61baafd 100644 --- a/cpp/tests/streams/io/parquet_test.cpp +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -34,8 +34,14 @@ auto const temp_env = static_cast( class ParquetTest : public cudf::test::BaseFixture {}; -TEST_F(ParquetTest, ParquetWriter) -{ +template +std::vector> make_uniqueptrs_vector(UniqPtrs&& ... uniqptrs) { + std::vector> ptrsvec; + (ptrsvec.push_back(std::forward(uniqptrs)), ...); + return ptrsvec; +} + +cudf::table construct_table() { constexpr auto num_rows = 10; std::vector zeros(num_rows, 0); @@ -65,78 +71,25 @@ TEST_F(ParquetTest, ParquetWriter) std::vector col10_data(num_rows, "rapids"); cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); - cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); - - cudf::io::table_input_metadata tab_metadata(tab); - tab_metadata.column_metadata[0].set_name("bools"); - tab_metadata.column_metadata[1].set_name("int8s"); - tab_metadata.column_metadata[2].set_name("int16s"); - tab_metadata.column_metadata[3].set_name("int32s"); - tab_metadata.column_metadata[4].set_name("floats"); - tab_metadata.column_metadata[5].set_name("doubles"); - tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); - tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); - tab_metadata.column_metadata[8].set_name("lists"); - tab_metadata.column_metadata[9].set_name("structs"); - tab_metadata.column_metadata[10].set_name("strings"); + auto colsptr = make_uniqueptrs_vector(col0.release(), col1.release(), col2.release(), col3.release(), col4.release(), col5.release(), col6.release(), col7.release(), col8.release(), col9.release(), col10.release()); + return cudf::table(std::move(colsptr)); +} +TEST_F(ParquetTest, ParquetWriter) +{ + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab) - .metadata(tab_metadata); + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); } TEST_F(ParquetTest, ParquetReader) { - constexpr auto num_rows = 10; - - std::vector zeros(num_rows, 0); - std::vector ones(num_rows, 1); - auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{12}}; - }); - auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{-12}}; - }); - - cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); - cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); - - cudf::test::lists_column_wrapper col8{ - {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; - - cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); - cudf::test::structs_column_wrapper col9{child_col}; - - std::vector col10_data(num_rows, "rapids"); - cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); - - cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); - - cudf::io::table_input_metadata tab_metadata(tab); - tab_metadata.column_metadata[0].set_name("bools"); - tab_metadata.column_metadata[1].set_name("int8s"); - tab_metadata.column_metadata[2].set_name("int16s"); - tab_metadata.column_metadata[3].set_name("int32s"); - tab_metadata.column_metadata[4].set_name("floats"); - tab_metadata.column_metadata[5].set_name("doubles"); - tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); - tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); - tab_metadata.column_metadata[8].set_name("lists"); - tab_metadata.column_metadata[9].set_name("structs"); - tab_metadata.column_metadata[10].set_name("strings"); - + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::parquet_writer_options out_opts = - cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab) - .metadata(tab_metadata); + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); cudf::io::write_parquet(out_opts, cudf::test::get_default_stream()); cudf::io::parquet_reader_options in_opts = @@ -147,54 +100,10 @@ TEST_F(ParquetTest, ParquetReader) TEST_F(ParquetTest, ChunkedOperations) { - constexpr auto num_rows = 10; - - std::vector zeros(num_rows, 0); - std::vector ones(num_rows, 1); - auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{12}}; - }); - auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{-12}}; - }); - - cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col2(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); - cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); - - cudf::test::lists_column_wrapper col8{ - {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; - - cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); - cudf::test::structs_column_wrapper col9{child_col}; - - std::vector col10_data(num_rows, "rapids"); - cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); - - cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8, col9, col10}); - - cudf::io::table_input_metadata tab_metadata(tab); - tab_metadata.column_metadata[0].set_name("bools"); - tab_metadata.column_metadata[1].set_name("int8s"); - tab_metadata.column_metadata[2].set_name("int16s"); - tab_metadata.column_metadata[3].set_name("int32s"); - tab_metadata.column_metadata[4].set_name("floats"); - tab_metadata.column_metadata[5].set_name("doubles"); - tab_metadata.column_metadata[6].set_name("decimal_pos_scale"); - tab_metadata.column_metadata[7].set_name("decimal_neg_scale"); - tab_metadata.column_metadata[8].set_name("lists"); - tab_metadata.column_metadata[9].set_name("structs"); - tab_metadata.column_metadata[10].set_name("strings"); - + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::chunked_parquet_writer_options out_opts = - cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath}) - .metadata(tab_metadata); + cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath}); cudf::io::parquet_chunked_writer(out_opts, cudf::test::get_default_stream()).write(tab); auto reader = cudf::io::chunked_parquet_reader( diff --git a/cpp/tests/utilities/identify_stream_usage.cpp b/cpp/tests/utilities/identify_stream_usage.cpp index 23e950fa47f..bdc338d2c92 100644 --- a/cpp/tests/utilities/identify_stream_usage.cpp +++ b/cpp/tests/utilities/identify_stream_usage.cpp @@ -105,13 +105,7 @@ bool stream_is_invalid(cudaStream_t stream) { #ifdef STREAM_MODE_TESTING // In this mode the _only_ valid stream is the one returned by cudf::test::get_default_stream. -#if defined(CUDF_USE_PER_THREAD_DEFAULT_STREAM) - rmm::cuda_stream_view default_stream_value{rmm::cuda_stream_per_thread}; -#else - rmm::cuda_stream_view default_stream_value{}; -#endif - return (stream == default_stream_value.value() || stream == cudaStreamDefault || - stream == cudaStreamLegacy || stream == cudaStreamPerThread); + return (stream != cudf::test::get_default_stream().value()); #else // We explicitly list the possibilities rather than using // `cudf::get_default_stream().value()` because there is no guarantee that From c9fe587319af135207f39f3b44f546cb3006a786 Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Jan 2024 18:39:38 +0000 Subject: [PATCH 6/8] oops, pre-commit check --- cpp/include/cudf/io/parquet.hpp | 2 +- cpp/src/io/functions.cpp | 2 +- cpp/tests/streams/io/parquet_test.cpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 106d522f244..dc035db8d39 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 5b411f4eeb3..ff3148e1a30 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023, NVIDIA CORPORATION. + * Copyright (c) 2019-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp index 067c61baafd..8011d179726 100644 --- a/cpp/tests/streams/io/parquet_test.cpp +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 6d89a40c481147d56b449c1077f44e48f6e60cde Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Jan 2024 18:46:58 +0000 Subject: [PATCH 7/8] more formatting fixes --- cpp/tests/streams/io/parquet_test.cpp | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp index 8011d179726..b8b89c3846f 100644 --- a/cpp/tests/streams/io/parquet_test.cpp +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -34,14 +34,16 @@ auto const temp_env = static_cast( class ParquetTest : public cudf::test::BaseFixture {}; -template -std::vector> make_uniqueptrs_vector(UniqPtrs&& ... uniqptrs) { +template +std::vector> make_uniqueptrs_vector(UniqPtrs&&... uniqptrs) +{ std::vector> ptrsvec; (ptrsvec.push_back(std::forward(uniqptrs)), ...); return ptrsvec; } -cudf::table construct_table() { +cudf::table construct_table() +{ constexpr auto num_rows = 10; std::vector zeros(num_rows, 0); @@ -71,13 +73,23 @@ cudf::table construct_table() { std::vector col10_data(num_rows, "rapids"); cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); - auto colsptr = make_uniqueptrs_vector(col0.release(), col1.release(), col2.release(), col3.release(), col4.release(), col5.release(), col6.release(), col7.release(), col8.release(), col9.release(), col10.release()); + auto colsptr = make_uniqueptrs_vector(col0.release(), + col1.release(), + col2.release(), + col3.release(), + col4.release(), + col5.release(), + col6.release(), + col7.release(), + col8.release(), + col9.release(), + col10.release()); return cudf::table(std::move(colsptr)); } TEST_F(ParquetTest, ParquetWriter) { - auto tab = construct_table(); + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); @@ -86,7 +98,7 @@ TEST_F(ParquetTest, ParquetWriter) TEST_F(ParquetTest, ParquetReader) { - auto tab = construct_table(); + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::parquet_writer_options out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab); @@ -100,7 +112,7 @@ TEST_F(ParquetTest, ParquetReader) TEST_F(ParquetTest, ChunkedOperations) { - auto tab = construct_table(); + auto tab = construct_table(); auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet"); cudf::io::chunked_parquet_writer_options out_opts = cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath}); From 79ca7b41814941020b3e1512f3620956bfb72a1a Mon Sep 17 00:00:00 2001 From: Shruti Shivakumar Date: Fri, 5 Jan 2024 22:15:21 +0000 Subject: [PATCH 8/8] minor fixes --- cpp/tests/streams/io/parquet_test.cpp | 34 +++++++++++++++++---------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/cpp/tests/streams/io/parquet_test.cpp b/cpp/tests/streams/io/parquet_test.cpp index b8b89c3846f..c6d531bc376 100644 --- a/cpp/tests/streams/io/parquet_test.cpp +++ b/cpp/tests/streams/io/parquet_test.cpp @@ -48,12 +48,6 @@ cudf::table construct_table() std::vector zeros(num_rows, 0); std::vector ones(num_rows, 1); - auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{12}}; - }); - auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { - return numeric::decimal128{ones[i], numeric::scale_type{-12}}; - }); cudf::test::fixed_width_column_wrapper col0(zeros.begin(), zeros.end()); cudf::test::fixed_width_column_wrapper col1(zeros.begin(), zeros.end()); @@ -61,17 +55,33 @@ cudf::table construct_table() cudf::test::fixed_width_column_wrapper col3(zeros.begin(), zeros.end()); cudf::test::fixed_width_column_wrapper col4(zeros.begin(), zeros.end()); cudf::test::fixed_width_column_wrapper col5(zeros.begin(), zeros.end()); - cudf::test::fixed_width_column_wrapper col6(col6_data, col6_data + num_rows); - cudf::test::fixed_width_column_wrapper col7(col7_data, col7_data + num_rows); + cudf::test::fixed_width_column_wrapper col6 = [&ones, num_rows] { + auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{12}}; + }); + return cudf::test::fixed_width_column_wrapper(col6_data, + col6_data + num_rows); + }(); + cudf::test::fixed_width_column_wrapper col7 = [&ones, num_rows] { + auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) { + return numeric::decimal128{ones[i], numeric::scale_type{-12}}; + }); + return cudf::test::fixed_width_column_wrapper(col7_data, + col7_data + num_rows); + }(); cudf::test::lists_column_wrapper col8{ {1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}}; - cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); - cudf::test::structs_column_wrapper col9{child_col}; + cudf::test::structs_column_wrapper col9 = [&ones] { + cudf::test::fixed_width_column_wrapper child_col(ones.begin(), ones.end()); + return cudf::test::structs_column_wrapper{child_col}; + }(); - std::vector col10_data(num_rows, "rapids"); - cudf::test::strings_column_wrapper col10(col10_data.begin(), col10_data.end()); + cudf::test::strings_column_wrapper col10 = [] { + std::vector col10_data(num_rows, "rapids"); + return cudf::test::strings_column_wrapper(col10_data.begin(), col10_data.end()); + }(); auto colsptr = make_uniqueptrs_vector(col0.release(), col1.release(),