Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose streams in Parquet reader and writer APIs #14359

Merged
merged 14 commits into from
Jan 11, 2024
Merged
16 changes: 13 additions & 3 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -401,13 +401,15 @@ class parquet_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_parquet(
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -438,11 +440,13 @@ class chunked_parquet_reader {
* @param chunk_read_limit Limit on total number of bytes to be returned per read,
* or `0` if there is no limit
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand All @@ -461,12 +465,14 @@ class chunked_parquet_reader {
* @param pass_read_limit Limit on the amount of memory used for reading and decompressing data or
* `0` if there is no limit
* @param options The options used to read Parquet file
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
chunked_parquet_reader(
std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/**
Expand Down Expand Up @@ -1163,11 +1169,13 @@ class parquet_writer_options_builder {
* @endcode
*
* @param options Settings for controlling writing behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @return A blob that contains the file metadata (parquet FileMetadata thrift message) if
* requested in parquet_writer_options (empty blob otherwise).
*/

std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options);
std::unique_ptr<std::vector<uint8_t>> write_parquet(
parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Merges multiple raw metadata blobs that were previously created by write_parquet
Expand Down Expand Up @@ -1778,8 +1786,10 @@ class parquet_chunked_writer {
* @brief Constructor with chunked writer options
*
* @param[in] options options used to write table
* @param[in] stream CUDA stream used for device memory operations and kernel launches
*/
parquet_chunked_writer(chunked_parquet_writer_options const& options);
parquet_chunked_writer(chunked_parquet_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream());

/**
* @brief Writes table to output.
Expand Down
27 changes: 14 additions & 13 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -479,13 +479,14 @@ using namespace cudf::io::parquet::detail;
namespace detail_parquet = cudf::io::parquet::detail;

table_with_metadata read_parquet(parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

auto datasources = make_datasources(options.get_source());
auto reader = std::make_unique<detail_parquet::reader>(
std::move(datasources), options, cudf::get_default_stream(), mr);
auto reader =
std::make_unique<detail_parquet::reader>(std::move(datasources), options, stream, mr);

return reader->read(options);
}
Expand Down Expand Up @@ -545,15 +546,16 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata)
/**
* @copydoc cudf::io::write_parquet
*/
std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options)
std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const& options,
rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

CUDF_FUNC_RANGE();

auto sinks = make_datasinks(options.get_sink());
auto writer = std::make_unique<detail_parquet::writer>(
std::move(sinks), options, io_detail::single_write_mode::YES, cudf::get_default_stream());
std::move(sinks), options, io_detail::single_write_mode::YES, stream);

writer->write(options.get_table(), options.get_partitions());

Expand All @@ -565,13 +567,10 @@ std::unique_ptr<std::vector<uint8_t>> write_parquet(parquet_writer_options const
*/
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
0,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
mr)}
: reader{std::make_unique<detail_parquet::chunked_reader>(
chunk_read_limit, 0, make_datasources(options.get_source()), options, stream, mr)}
{
}

Expand All @@ -581,12 +580,13 @@ chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
chunked_parquet_reader::chunked_parquet_reader(std::size_t chunk_read_limit,
std::size_t pass_read_limit,
parquet_reader_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
: reader{std::make_unique<detail_parquet::chunked_reader>(chunk_read_limit,
pass_read_limit,
make_datasources(options.get_source()),
options,
cudf::get_default_stream(),
stream,
mr)}
{
}
Expand Down Expand Up @@ -619,14 +619,15 @@ table_with_metadata chunked_parquet_reader::read_chunk() const
/**
* @copydoc cudf::io::parquet_chunked_writer::parquet_chunked_writer
*/
parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options)
parquet_chunked_writer::parquet_chunked_writer(chunked_parquet_writer_options const& options,
rmm::cuda_stream_view stream)
{
namespace io_detail = cudf::io::detail;

auto sinks = make_datasinks(options.get_sink());

writer = std::make_unique<detail_parquet::writer>(
std::move(sinks), options, io_detail::single_write_mode::NO, cudf::get_default_stream());
std::move(sinks), options, io_detail::single_write_mode::NO, stream);
}

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_PARQUETIO_TEST streams/io/parquet_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_POOL_TEST streams/pool_test.cu STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
Expand Down
138 changes: 138 additions & 0 deletions cpp/tests/streams/io/parquet_test.cpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PQ reader launches a separate kernel for many different encoding types; in theory we need to cover all those kernels. But, I think we can ignore this for now, since it requires carefully crafting tests that lead to specific encodings.
CC @nvdbaranec @etseidl for viz (short for wizardry)

Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/detail/parquet.hpp>
#include <cudf/io/parquet.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <string>
#include <vector>

// Global environment for temporary files
auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

class ParquetTest : public cudf::test::BaseFixture {};

template <typename... UniqPtrs>
std::vector<std::unique_ptr<cudf::column>> make_uniqueptrs_vector(UniqPtrs&&... uniqptrs)
{
std::vector<std::unique_ptr<cudf::column>> ptrsvec;
(ptrsvec.push_back(std::forward<UniqPtrs>(uniqptrs)), ...);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

omg, a fold expression
very cool

return ptrsvec;
}

cudf::table construct_table()
{
constexpr auto num_rows = 10;

std::vector<size_t> zeros(num_rows, 0);
std::vector<size_t> ones(num_rows, 1);

cudf::test::fixed_width_column_wrapper<bool> col0(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int8_t> col1(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int16_t> col2(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int32_t> col3(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<float> col4(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<double> col5(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col6 = [&ones, num_rows] {
auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col6_data,
col6_data + num_rows);
}();
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col7 = [&ones, num_rows] {
auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{-12}};
});
return cudf::test::fixed_width_column_wrapper<numeric::decimal128>(col7_data,
col7_data + num_rows);
}();

cudf::test::lists_column_wrapper<int64_t> col8{
{1, 1}, {1, 1, 1}, {}, {1}, {1, 1, 1, 1}, {1, 1, 1, 1, 1}, {}, {1, -1}, {}, {-1, -1}};

cudf::test::structs_column_wrapper col9 = [&ones] {
cudf::test::fixed_width_column_wrapper<int32_t> child_col(ones.begin(), ones.end());
return cudf::test::structs_column_wrapper{child_col};
}();

cudf::test::strings_column_wrapper col10 = [] {
std::vector<std::string> col10_data(num_rows, "rapids");
return cudf::test::strings_column_wrapper(col10_data.begin(), col10_data.end());
}();

auto colsptr = make_uniqueptrs_vector(col0.release(),
col1.release(),
col2.release(),
col3.release(),
col4.release(),
col5.release(),
col6.release(),
col7.release(),
col8.release(),
col9.release(),
col10.release());
return cudf::table(std::move(colsptr));
}

TEST_F(ParquetTest, ParquetWriter)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_parquet(out_opts, cudf::test::get_default_stream());
}

TEST_F(ParquetTest, ParquetReader)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::parquet_writer_options out_opts =
cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, tab);
cudf::io::write_parquet(out_opts, cudf::test::get_default_stream());

cudf::io::parquet_reader_options in_opts =
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath});
auto result = cudf::io::read_parquet(in_opts, cudf::test::get_default_stream());
auto meta = cudf::io::read_parquet_metadata(cudf::io::source_info{filepath});
}

TEST_F(ParquetTest, ChunkedOperations)
{
auto tab = construct_table();
auto filepath = temp_env->get_temp_filepath("MultiColumn.parquet");
cudf::io::chunked_parquet_writer_options out_opts =
cudf::io::chunked_parquet_writer_options::builder(cudf::io::sink_info{filepath});
cudf::io::parquet_chunked_writer(out_opts, cudf::test::get_default_stream()).write(tab);

auto reader = cudf::io::chunked_parquet_reader(
1L << 31,
cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}),
cudf::test::get_default_stream());
while (reader.has_next()) {
auto chunk = reader.read_chunk();
}
}