Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added streams to CSV reader and writer api #14340

Merged
merged 12 commits into from
Nov 14, 2023
Merged
4 changes: 4 additions & 0 deletions cpp/include/cudf/io/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1307,13 +1307,15 @@ class csv_reader_options_builder {
* @endcode
*
* @param options Settings for controlling reading behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource used to allocate device memory of the table in the returned
* table_with_metadata
*
* @return The set of columns along with metadata
*/
table_with_metadata read_csv(
csv_reader_options options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down Expand Up @@ -1715,9 +1717,11 @@ class csv_writer_options_builder {
* @endcode
*
* @param options Settings for controlling writing behavior
* @param stream CUDA stream used for device memory operations and kernel launches
* @param mr Device memory resource to use for device memory allocation
*/
void write_csv(csv_writer_options const& options,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource());

/** @} */ // end of group
Expand Down
1 change: 0 additions & 1 deletion cpp/include/cudf/io/detail/csv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <cudf/io/csv.hpp>
#include <cudf/utilities/default_stream.hpp>

#include <rmm/cuda_stream_view.hpp>

Expand Down
16 changes: 11 additions & 5 deletions cpp/include/cudf_test/column_wrapper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ class strings_column_wrapper : public detail::column_wrapper {
offsets, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource());
auto d_bitmask = cudf::detail::make_device_uvector_sync(
null_mask, cudf::test::get_default_stream(), rmm::mr::get_current_device_resource());
wrapped = cudf::make_strings_column(d_chars, d_offsets, d_bitmask, null_count);
wrapped = cudf::make_strings_column(
d_chars, d_offsets, d_bitmask, null_count, cudf::test::get_default_stream());
Comment on lines +806 to +807
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this include changes from other PR? The additional stream parameter here and below seems unrelated to this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My guess is that CSV stream test is exercising this code path while no prior stream test was, so before this PR we didn't observe that there was a missing stream argument here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this change is needed for the string column in the tests - make_strings_column is called with the default stream otherwise.

}

/**
Expand Down Expand Up @@ -1846,7 +1847,8 @@ class structs_column_wrapper : public detail::column_wrapper {
child_column_wrappers.end(),
std::back_inserter(child_columns),
[&](auto const& column_wrapper) {
return std::make_unique<cudf::column>(column_wrapper.get());
return std::make_unique<cudf::column>(column_wrapper.get(),
cudf::test::get_default_stream());
});
init(std::move(child_columns), validity);
}
Expand Down Expand Up @@ -1882,7 +1884,8 @@ class structs_column_wrapper : public detail::column_wrapper {
child_column_wrappers.end(),
std::back_inserter(child_columns),
[&](auto const& column_wrapper) {
return std::make_unique<cudf::column>(column_wrapper.get());
return std::make_unique<cudf::column>(column_wrapper.get(),
cudf::test::get_default_stream());
});
init(std::move(child_columns), validity_iter);
}
Expand All @@ -1906,8 +1909,11 @@ class structs_column_wrapper : public detail::column_wrapper {
return cudf::test::detail::make_null_mask(validity.begin(), validity.end());
}();

wrapped = cudf::make_structs_column(
num_rows, std::move(child_columns), null_count, std::move(null_mask));
wrapped = cudf::make_structs_column(num_rows,
std::move(child_columns),
null_count,
std::move(null_mask),
cudf::test::get_default_stream());
}

template <typename V>
Expand Down
41 changes: 26 additions & 15 deletions cpp/src/io/csv/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ struct column_to_strings_fn {
{
}

~column_to_strings_fn() = default;
column_to_strings_fn(column_to_strings_fn const&) = delete;
column_to_strings_fn& operator=(column_to_strings_fn const&) = delete;
column_to_strings_fn(column_to_strings_fn&&) = delete;
column_to_strings_fn& operator=(column_to_strings_fn&&) = delete;
PointKernel marked this conversation as resolved.
Show resolved Hide resolved

// Note: `null` replacement with `na_rep` deferred to `concatenate()`
// instead of column-wise; might be faster
//
Expand All @@ -160,8 +166,9 @@ struct column_to_strings_fn {
std::enable_if_t<std::is_same_v<column_type, bool>, std::unique_ptr<column>> operator()(
column_view const& column) const
{
return cudf::strings::detail::from_booleans(
column, options_.get_true_value(), options_.get_false_value(), stream_, mr_);
string_scalar true_string(options_.get_true_value(), true, stream_, mr_);
string_scalar false_string(options_.get_false_value(), true, stream_, mr_);
return cudf::strings::detail::from_booleans(column, true_string, false_string, stream_, mr_);
}

// strings:
Expand All @@ -175,7 +182,7 @@ struct column_to_strings_fn {
}

// handle special characters: {delimiter, '\n', "} in row:
string_scalar delimiter{std::string{options_.get_inter_column_delimiter()}, true, stream_};
string_scalar delimiter{std::string{options_.get_inter_column_delimiter()}, true, stream_, mr_};
shrshi marked this conversation as resolved.
Show resolved Hide resolved

auto d_column = column_device_view::create(column_v, stream_);
escape_strings_fn fn{*d_column, delimiter.value(stream_)};
Expand Down Expand Up @@ -367,10 +374,10 @@ void write_chunked(data_sink* out_sink,

CUDF_EXPECTS(str_column_view.size() > 0, "Unexpected empty strings column.");

cudf::string_scalar newline{options.get_line_terminator()};
cudf::string_scalar newline(options.get_line_terminator(), true, stream);
shrshi marked this conversation as resolved.
Show resolved Hide resolved
auto p_str_col_w_nl = cudf::strings::detail::join_strings(str_column_view,
newline,
string_scalar("", false),
string_scalar("", false, stream),
stream,
rmm::mr::get_current_device_resource());
strings_column_view strings_column{p_str_col_w_nl->view()};
Expand Down Expand Up @@ -455,12 +462,14 @@ void write_csv(data_sink* out_sink,

// populate vector of string-converted columns:
//
std::transform(sub_view.begin(),
sub_view.end(),
std::back_inserter(str_column_vec),
[converter](auto const& current_col) {
return cudf::type_dispatcher(current_col.type(), converter, current_col);
});
std::transform(
sub_view.begin(),
sub_view.end(),
std::back_inserter(str_column_vec),
[&converter = std::as_const(converter)](auto const& current_col) {
return cudf::type_dispatcher<cudf::id_to_type_impl, column_to_strings_fn const&>(
current_col.type(), converter, current_col);
});

// create string table view from str_column_vec:
//
Expand All @@ -470,18 +479,20 @@ void write_csv(data_sink* out_sink,
// concatenate columns in each row into one big string column
// (using null representation and delimiter):
//
std::string delimiter_str{options.get_inter_column_delimiter()};
// std::string delimiter_str{options.get_inter_column_delimiter()};
shrshi marked this conversation as resolved.
Show resolved Hide resolved
auto str_concat_col = [&] {
cudf::string_scalar delimiter_str(
std::string{options.get_inter_column_delimiter()}, true, stream);
cudf::string_scalar options_narep(options.get_na_rep(), true, stream);
if (str_table_view.num_columns() > 1)
return cudf::strings::detail::concatenate(str_table_view,
delimiter_str,
options.get_na_rep(),
options_narep,
strings::separator_on_nulls::YES,
stream,
rmm::mr::get_current_device_resource());
cudf::string_scalar narep{options.get_na_rep()};
return cudf::strings::detail::replace_nulls(
str_table_view.column(0), narep, stream, rmm::mr::get_current_device_resource());
str_table_view.column(0), options_narep, stream, rmm::mr::get_current_device_resource());
}();

write_chunked(out_sink, str_concat_col->view(), options, stream, mr);
Expand Down
12 changes: 8 additions & 4 deletions cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,9 @@ void write_json(json_writer_options const& options,
mr);
}

table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_resource* mr)
table_with_metadata read_csv(csv_reader_options options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
CUDF_FUNC_RANGE();

Expand All @@ -245,12 +247,14 @@ table_with_metadata read_csv(csv_reader_options options, rmm::mr::device_memory_
return cudf::io::detail::csv::read_csv( //
std::move(datasources[0]),
options,
cudf::get_default_stream(),
stream,
mr);
}

// Freeform API wraps the detail writer class API
void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resource* mr)
void write_csv(csv_writer_options const& options,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
using namespace cudf::io::detail;

Expand All @@ -262,7 +266,7 @@ void write_csv(csv_writer_options const& options, rmm::mr::device_memory_resourc
options.get_table(),
options.get_names(),
options,
cudf::get_default_stream(),
stream,
mr);
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -629,10 +629,12 @@ ConfigureTest(
ConfigureTest(STREAM_BINARYOP_TEST streams/binaryop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_CONCATENATE_TEST streams/concatenate_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_COPYING_TEST streams/copying_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_CSVIO_TEST streams/io/csv_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_FILLING_TEST streams/filling_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_GROUPBY_TEST streams/groupby_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_HASHING_TEST streams/hash_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_INTEROP_TEST streams/interop_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_NULL_MASK_TEST streams/null_mask_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_REPLACE_TEST streams/replace_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_SEARCH_TEST streams/search_test.cpp STREAM_MODE testing)
Expand All @@ -658,7 +660,6 @@ ConfigureTest(
STREAM_TEXT_TEST streams/text/ngrams_test.cpp streams/text/tokenize_test.cpp STREAM_MODE testing
)
ConfigureTest(STREAM_LISTS_TEST streams/lists_test.cpp STREAM_MODE testing)
ConfigureTest(STREAM_JSONIO_TEST streams/io/json_test.cpp STREAM_MODE testing)

# ##################################################################################################
# Install tests ####################################################################################
Expand Down
104 changes: 104 additions & 0 deletions cpp/tests/streams/io/csv_test.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <cudf/io/csv.hpp>
#include <cudf/io/detail/csv.hpp>
#include <cudf/table/table.hpp>
#include <cudf/table/table_view.hpp>
#include <cudf/types.hpp>

#include <cudf_test/base_fixture.hpp>
#include <cudf_test/column_wrapper.hpp>
#include <cudf_test/default_stream.hpp>
#include <cudf_test/iterator_utilities.hpp>

#include <random>
shrshi marked this conversation as resolved.
Show resolved Hide resolved
#include <sstream>
#include <string>
#include <vector>

auto const temp_env = static_cast<cudf::test::TempDirTestEnvironment*>(
::testing::AddGlobalTestEnvironment(new cudf::test::TempDirTestEnvironment));

class CSVTest : public cudf::test::BaseFixture {};

TEST_F(CSVTest, CSVWriter)
{
constexpr auto num_rows = 10;

std::vector<size_t> zeros(num_rows, 0);
std::vector<size_t> ones(num_rows, 1);
auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{12}};
});
auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{-12}};
});

cudf::test::fixed_width_column_wrapper<bool> col0(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int8_t> col1(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int16_t> col2(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int32_t> col3(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<float> col4(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<double> col5(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col6(col6_data, col6_data + num_rows);
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col7(col7_data, col7_data + num_rows);

std::vector<std::string> col8_data(num_rows, "rapids");
cudf::test::strings_column_wrapper col8(col8_data.begin(), col8_data.end());

cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8});

auto const filepath = temp_env->get_temp_dir() + "multicolumn.csv";
auto w_options = cudf::io::csv_writer_options::builder(cudf::io::sink_info{filepath}, tab)
.include_header(false)
.inter_column_delimiter(',');
cudf::io::write_csv(w_options.build(), cudf::test::get_default_stream());
}

TEST_F(CSVTest, CSVReader)
{
constexpr auto num_rows = 10;

std::vector<size_t> zeros(num_rows, 0);
std::vector<size_t> ones(num_rows, 1);
auto col6_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{12}};
});
auto col7_data = cudf::detail::make_counting_transform_iterator(0, [&](auto i) {
return numeric::decimal128{ones[i], numeric::scale_type{-12}};
});

cudf::test::fixed_width_column_wrapper<bool> col0(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int8_t> col1(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int16_t> col2(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<int32_t> col3(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<float> col4(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<double> col5(zeros.begin(), zeros.end());
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col6(col6_data, col6_data + num_rows);
cudf::test::fixed_width_column_wrapper<numeric::decimal128> col7(col7_data, col7_data + num_rows);

std::vector<std::string> col8_data(num_rows, "rapids");
cudf::test::strings_column_wrapper col8(col8_data.begin(), col8_data.end());

cudf::table_view tab({col0, col1, col2, col3, col4, col5, col6, col7, col8});

auto const filepath = temp_env->get_temp_dir() + "multicolumn.csv";
auto w_options = cudf::io::csv_writer_options::builder(cudf::io::sink_info{filepath}, tab)
.include_header(false)
.inter_column_delimiter(',');
cudf::io::write_csv(w_options.build(), cudf::test::get_default_stream());
}
Loading