Skip to content

Commit

Permalink
Adding binary read/write as options for parquet (#11160)
Browse files Browse the repository at this point in the history
There are a couple of issues(#11044 and #10778) revolving around adding support for binary writes and reads to parquet. The desire is to be able to write strings and lists of int8 values as binary. This PR adds support for strings to be written as binary and for binary data to be read as binary or strings. I have left the default for binary data to read as a string to prevent any surprises upon upgrade.

Single-depth list columns of int8 and uint8 values are not written as binary with this change. That will be another PR after discussions about the possible impact of the change.

Closes #11044 
Issue #10778

Authors:
  - Mike Wilson (https://github.com/hyperbolic2346)

Approvers:
  - Karthikeyan (https://github.com/karthikeyann)
  - MithunR (https://github.com/mythrocks)
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)
  - https://github.com/nvdbaranec
  - Vyas Ramasubramani (https://github.com/vyasr)

URL: #11160
  • Loading branch information
hyperbolic2346 authored Jul 29, 2022
1 parent c11b780 commit d94d761
Show file tree
Hide file tree
Showing 12 changed files with 298 additions and 36 deletions.
39 changes: 39 additions & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class parquet_reader_options {
bool _use_pandas_metadata = true;
// Cast timestamp columns to a specific type
data_type _timestamp_type{type_id::EMPTY};
// Whether to store binary data as a string column
std::optional<std::vector<bool>> _convert_binary_to_strings{std::nullopt};

/**
* @brief Constructor from source info.
Expand Down Expand Up @@ -118,6 +120,19 @@ class parquet_reader_options {
*/
[[nodiscard]] bool is_enabled_use_pandas_metadata() const { return _use_pandas_metadata; }

/**
* @brief Returns optional vector of true/false values depending on whether binary data should be
* converted to strings or not.
*
* @return vector with ith value `true` if binary data should be converted to strings for the ith
* column. Will return std::nullopt if the user did not set this option, which defaults to all
* binary data being converted to strings.
*/
[[nodiscard]] std::optional<std::vector<bool>> get_convert_binary_to_strings() const
{
return _convert_binary_to_strings;
}

/**
* @brief Returns number of rows to skip from the start.
*
Expand Down Expand Up @@ -191,6 +206,17 @@ class parquet_reader_options {
*/
void enable_use_pandas_metadata(bool val) { _use_pandas_metadata = val; }

/**
* @brief Sets to enable/disable conversion of binary to strings per column.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
*/
void set_convert_binary_to_strings(std::vector<bool> val)
{
_convert_binary_to_strings = std::move(val);
}

/**
* @brief Sets number of rows to skip.
*
Expand Down Expand Up @@ -296,6 +322,19 @@ class parquet_reader_options_builder {
return *this;
}

/**
* @brief Sets enable/disable conversion of binary to strings per column.
*
* @param val Vector of boolean values to enable/disable conversion of binary to string columns.
* Note default is to convert to string columns.
* @return this for chaining
*/
parquet_reader_options_builder& convert_binary_to_strings(std::vector<bool> val)
{
options._convert_binary_to_strings = std::move(val);
return *this;
}

/**
* @brief Sets number of rows to skip.
*
Expand Down
23 changes: 22 additions & 1 deletion cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ class column_in_metadata {
thrust::optional<bool> _nullable;
bool _list_column_is_map = false;
bool _use_int96_timestamp = false;
// bool _output_as_binary = false;
bool _output_as_binary = false;
thrust::optional<uint8_t> _decimal_precision;
thrust::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
Expand Down Expand Up @@ -489,6 +489,20 @@ class column_in_metadata {
return *this;
}

/**
* @brief Specifies whether this column should be written as binary or string data
* Only valid for the following column types:
* string
*
* @param binary True = use binary data type. False = use string data type
* @return this for chaining
*/
column_in_metadata& set_output_as_binary(bool binary)
{
_output_as_binary = binary;
return *this;
}

/**
* @brief Get reference to a child of this column
*
Expand Down Expand Up @@ -581,6 +595,13 @@ class column_in_metadata {
* @return The number of children of this column
*/
[[nodiscard]] size_type num_children() const { return children.size(); }

/**
* @brief Get whether to encode this column as binary or string data
*
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_output_as_binary() const { return _output_as_binary; }
};

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ struct SchemaElement {
int32_t decimal_scale = 0;
int32_t decimal_precision = 0;
thrust::optional<int32_t> field_id = thrust::nullopt;
bool output_as_byte_array = false;

// The following fields are filled in later during schema initialization
int max_definition_level = 0;
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/parquet/parquet_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ enum Type : int8_t {
*/
enum ConvertedType {
UNKNOWN = -1, // No type information present
UTF8 = 0, // a BYTE_ARRAY actually contains UTF8 encoded chars
UTF8 = 0, // a BYTE_ARRAY may contain UTF8 encoded chars
MAP = 1, // a map is converted as an optional field containing a repeated key/value pair
MAP_KEY_VALUE = 2, // a key/value pair is converted into a group of two fields
LIST =
Expand Down
23 changes: 14 additions & 9 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ struct parquet_column_device_view : stats_column_desc {
uint8_t const* nullability; //!< Array of nullability of each nesting level. e.g. nullable[0] is
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
};

constexpr int max_page_fragment_size = 5000; //!< Max number of rows in a page fragment
Expand Down Expand Up @@ -300,15 +301,19 @@ inline uint32_t __device__ int32_logical_len(type_id id)
inline size_type __device__ row_to_value_idx(size_type idx,
parquet_column_device_view const& parquet_col)
{
auto col = *parquet_col.parent_column;
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
if (col.type().id() == type_id::STRUCT) {
idx += col.offset();
col = col.child(0);
} else {
auto list_col = cudf::detail::lists_column_device_view(col);
idx = list_col.offset_at(idx);
col = list_col.child();
// with a byte array, we can't go all the way down to the leaf node, but instead we want to leave
// the size at the parent level because we are writing out parent row byte arrays.
if (!parquet_col.output_as_byte_array) {
auto col = *parquet_col.parent_column;
while (col.type().id() == type_id::LIST or col.type().id() == type_id::STRUCT) {
if (col.type().id() == type_id::STRUCT) {
idx += col.offset();
col = col.child(0);
} else {
auto list_col = cudf::detail::lists_column_device_view(col);
idx = list_col.offset_at(idx);
col = list_col.child();
}
}
}
return idx;
Expand Down
23 changes: 22 additions & 1 deletion cpp/src/io/parquet/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1593,6 +1593,9 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>>&& sources,
// Strings may be returned as either string or categorical columns
_strings_to_categorical = options.is_enabled_convert_strings_to_categories();

// Binary columns can be read as binary or strings
_force_binary_columns_as_strings = options.get_convert_binary_to_strings();

// Select only columns required by the options
std::tie(_input_columns, _output_columns, _output_column_schemas) =
_metadata->select_columns(options.get_columns(),
Expand Down Expand Up @@ -1762,10 +1765,28 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// decoding of column data itself
decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows);

auto make_output_column = [&](column_buffer& buf, column_name_info* schema_info, int i) {
auto col = make_column(buf, schema_info, _stream, _mr);
if (should_write_byte_array(i)) {
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
if (schema.converted_type == parquet::UNKNOWN) {
auto const num_rows = col->size();
auto data = col->release();
return make_lists_column(
num_rows,
std::move(data.children[strings_column_view::offsets_column_index]),
std::move(data.children[strings_column_view::chars_column_index]),
UNKNOWN_NULL_COUNT,
std::move(*data.null_mask));
}
}
return col;
};

// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
column_name_info& col_name = out_metadata.schema_info.emplace_back("");
out_columns.emplace_back(make_column(_output_columns[i], &col_name, _stream, _mr));
out_columns.emplace_back(make_output_column(_output_columns[i], &col_name, i));
}
}
}
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,20 @@ class reader::impl {
size_t min_row,
size_t total_rows);

/**
* @brief Indicates if a column should be written as a byte array
*
* @param col column to check
* @return true if the column should be written as a byte array
* @return false if the column should be written as normal for that type
*/
bool should_write_byte_array(int col)
{
return _output_columns[col].type.id() == type_id::STRING &&
_force_binary_columns_as_strings.has_value() &&
!_force_binary_columns_as_strings.value()[col];
}

private:
rmm::cuda_stream_view _stream;
rmm::mr::device_memory_resource* _mr = nullptr;
Expand All @@ -203,6 +217,7 @@ class reader::impl {
std::vector<int> _output_column_schemas;

bool _strings_to_categorical = false;
std::optional<std::vector<bool>> _force_binary_columns_as_strings;
data_type _timestamp_type{type_id::EMPTY};
};

Expand Down
11 changes: 8 additions & 3 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -337,9 +337,14 @@ struct leaf_schema_fn {
template <typename T>
std::enable_if_t<std::is_same_v<T, cudf::string_view>, void> operator()()
{
col_schema.type = Type::BYTE_ARRAY;
col_schema.converted_type = ConvertedType::UTF8;
col_schema.stats_dtype = statistics_dtype::dtype_string;
col_schema.type = Type::BYTE_ARRAY;
if (col_meta.is_enabled_output_as_binary()) {
col_schema.converted_type = ConvertedType::UNKNOWN;
col_schema.stats_dtype = statistics_dtype::dtype_byte_array;
} else {
col_schema.converted_type = ConvertedType::UTF8;
col_schema.stats_dtype = statistics_dtype::dtype_string;
}
}

template <typename T>
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/statistics/column_statistics.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ struct calculate_group_statistics_functor {
*/
template <typename T,
std::enable_if_t<detail::statistics_type_category<T, IO>::include_extrema and
(IO == detail::io_file_format::ORC or
(IO != detail::io_file_format::PARQUET or
!std::is_same_v<T, list_view>)>* = nullptr>
__device__ void operator()(stats_state_s& s, uint32_t t)
{
Expand Down
Loading

0 comments on commit d94d761

Please sign in to comment.