From 9bb6c6a6496355cabc33dbab24022e1b47cdef3a Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 10:40:05 -0700 Subject: [PATCH 1/2] round trip fixed_len_byte_array data properly --- cpp/include/cudf/io/types.hpp | 59 ++++++++++++++- cpp/src/io/functions.cpp | 2 + cpp/src/io/parquet/page_enc.cu | 32 +++++--- cpp/src/io/parquet/parquet_gpu.hpp | 1 + cpp/src/io/parquet/reader_impl.cpp | 16 ++-- cpp/src/io/parquet/reader_impl_helpers.cpp | 7 +- cpp/src/io/parquet/writer_impl.cu | 13 +++- cpp/src/io/utilities/column_buffer.cpp | 5 ++ cpp/tests/io/parquet_writer_test.cpp | 86 ++++++++++++++++++++++ 9 files changed, 198 insertions(+), 23 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index b3dea0ab280..150e997f533 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -236,6 +236,8 @@ enum dictionary_policy { struct column_name_info { std::string name; ///< Column name std::optional is_nullable; ///< Column nullability + std::optional is_binary; ///< Column is binary (i.e. not a list) + std::optional type_length; ///< Byte width of data (for fixed length data) std::vector children; ///< Child column names /** @@ -243,9 +245,12 @@ struct column_name_info { * * @param _name Column name * @param _is_nullable True if column is nullable + * @param _is_binary True if column is binary data */ - column_name_info(std::string const& _name, std::optional _is_nullable = std::nullopt) - : name(_name), is_nullable(_is_nullable) + column_name_info(std::string const& _name, + std::optional _is_nullable = std::nullopt, + std::optional _is_binary = std::nullopt) + : name(_name), is_nullable(_is_nullable), is_binary(_is_binary) { } @@ -606,6 +611,7 @@ class column_in_metadata { bool _skip_compression = false; std::optional _decimal_precision; std::optional _parquet_field_id; + std::optional _type_length; std::vector children; column_encoding _encoding = column_encoding::USE_DEFAULT; @@ -693,6 +699,19 @@ class column_in_metadata { return *this; } + /** + * @brief Set the data length of the column. Only valid if this column is a + * fixed-length byte array. + * + * @param length The data length to set for this column + * @return this for chaining + */ + column_in_metadata& set_type_length(int32_t length) noexcept + { + _type_length = length; + return *this; + } + /** * @brief Set the parquet field id of this column. * @@ -826,6 +845,22 @@ class column_in_metadata { */ [[nodiscard]] uint8_t get_decimal_precision() const { return _decimal_precision.value(); } + /** + * @brief Get whether type length has been set for this column + * + * @return Boolean indicating whether type length has been set for this column + */ + [[nodiscard]] bool is_type_length_set() const noexcept { return _type_length.has_value(); } + + /** + * @brief Get the type length that was set for this column. + * + * @throws std::bad_optional_access If type length was not set for this + * column. Check using `is_type_length_set()` first. + * @return The decimal precision that was set for this column + */ + [[nodiscard]] uint8_t get_type_length() const { return _type_length.value(); } + /** * @brief Get whether parquet field id has been set for this column. * @@ -932,6 +967,7 @@ struct partition_info { class reader_column_schema { // Whether to read binary data as a string column bool _convert_binary_to_strings{true}; + int32_t _type_length{0}; std::vector children; @@ -997,6 +1033,18 @@ class reader_column_schema { return *this; } + /** + * @brief Sets the length of fixed length data. + * + * @param type_length Size of the data type in bytes + * @return this for chaining + */ + reader_column_schema& set_type_length(int32_t type_length) + { + _type_length = type_length; + return *this; + } + /** * @brief Get whether to encode this column as binary or string data * @@ -1007,6 +1055,13 @@ class reader_column_schema { return _convert_binary_to_strings; } + /** + * @brief Get the length in bytes of this fixed length data. + * + * @return The length in bytes of the data type + */ + [[nodiscard]] int32_t get_type_length() const { return _type_length; } + /** * @brief Get the number of child objects * diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 12059dffa4e..7287d6ba786 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -534,6 +534,8 @@ table_input_metadata::table_input_metadata(table_metadata const& metadata) [&](column_name_info const& name) { auto col_meta = column_in_metadata{name.name}; if (name.is_nullable.has_value()) { col_meta.set_nullability(name.is_nullable.value()); } + if (name.is_binary.value_or(false)) { col_meta.set_output_as_binary(true); } + if (name.type_length.has_value()) { col_meta.set_type_length(name.type_length.value()); } std::transform(name.children.begin(), name.children.end(), std::back_inserter(col_meta.children), diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 227f13db60e..1af2166351d 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -109,10 +109,10 @@ using rle_page_enc_state_s = page_enc_state_s; /** * @brief Returns the size of the type in the Parquet file. */ -constexpr uint32_t physical_type_len(Type physical_type, type_id id) +constexpr uint32_t physical_type_len(Type physical_type, type_id id, int type_length) { - if (physical_type == FIXED_LEN_BYTE_ARRAY and id == type_id::DECIMAL128) { - return sizeof(__int128_t); + if (physical_type == FIXED_LEN_BYTE_ARRAY) { + return id == type_id::DECIMAL128 ? sizeof(__int128_t) : type_length; } switch (physical_type) { case INT96: return 12u; @@ -183,7 +183,7 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) auto const physical_type = s->col.physical_type; auto const leaf_type = s->col.leaf_column->type().id(); - auto const dtype_len = physical_type_len(physical_type, leaf_type); + auto const dtype_len = physical_type_len(physical_type, leaf_type, s->col.type_length); auto const nvals = s->frag.num_leaf_values; auto const start_value_idx = s->frag.start_value_idx; @@ -541,7 +541,8 @@ __device__ size_t delta_data_len(Type physical_type, size_t page_size, encode_kernel_mask encoding) { - auto const dtype_len_out = physical_type_len(physical_type, type_id); + // dtype_len_out is for the lengths, rather than the char data, so pass sizeof(int32_t) + auto const dtype_len_out = physical_type_len(physical_type, type_id, sizeof(int32_t)); auto const dtype_len = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1662,7 +1663,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) __syncthreads(); auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -1837,6 +1838,19 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) thrust::make_reverse_iterator(v_char_ptr), dst + pos); } + } else { + auto const elem = + get_element(*(s->col.leaf_column), val_idx); + if (len != 0 and elem.data() != nullptr) { + if (is_split_stream) { + auto const v_char_ptr = reinterpret_cast(elem.data()); + for (int i = 0; i < dtype_len_out; i++, pos += stride) { + dst[pos] = v_char_ptr[i]; + } + } else { + memcpy(dst + pos, elem.data(), len); + } + } } } break; } @@ -1884,7 +1898,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -2016,7 +2030,7 @@ CUDF_KERNEL void __launch_bounds__(block_size, 8) // Encode data values auto const physical_type = s->col.physical_type; auto const type_id = s->col.leaf_column->type().id(); - auto const dtype_len_out = physical_type_len(physical_type, type_id); + auto const dtype_len_out = physical_type_len(physical_type, type_id, s->col.type_length); auto const dtype_len_in = [&]() -> uint32_t { if (physical_type == INT32) { return int32_logical_len(type_id); } if (physical_type == INT96) { return sizeof(int64_t); } @@ -3215,7 +3229,7 @@ __device__ int32_t calculate_boundary_order(statistics_chunk const* s, } // align ptr to an 8-byte boundary. address returned will be <= ptr. -constexpr __device__ void* align8(void* ptr) +inline __device__ void* align8(void* ptr) { // it's ok to round down because we have an extra 7 bytes in the buffer auto algn = 3 & reinterpret_cast(ptr); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index c06fb63acda..cc33fa24043 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -470,6 +470,7 @@ struct chunk_page_info { struct parquet_column_device_view : stats_column_desc { Type physical_type; //!< physical data type ConvertedType converted_type; //!< logical data type + int32_t type_length; //!< length of fixed_length_byte_array data uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b7172f5ba67..e358da26563 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -497,14 +497,18 @@ table_with_metadata reader::impl::read_chunk_internal( // Create the final output cudf columns. for (size_t i = 0; i < _output_buffers.size(); ++i) { - auto metadata = _reader_column_schema.has_value() - ? std::make_optional((*_reader_column_schema)[i]) - : std::nullopt; - auto const& schema = _metadata->get_schema(_output_column_schemas[i]); - // FIXED_LEN_BYTE_ARRAY never read as string - if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) { + auto metadata = _reader_column_schema.has_value() + ? std::make_optional((*_reader_column_schema)[i]) + : std::nullopt; + auto const& schema = _metadata->get_schema(_output_column_schemas[i]); + auto const logical_type = schema.logical_type.value_or(LogicalType{}); + // FIXED_LEN_BYTE_ARRAY never read as string. + // TODO: if we ever decide that the default reader behavior is to treat unannotated BINARY as + // binary and not strings, this test needs to change. + if (schema.type == FIXED_LEN_BYTE_ARRAY and logical_type.type != LogicalType::DECIMAL) { metadata = std::make_optional(); metadata->set_convert_binary_to_strings(false); + metadata->set_type_length(schema.type_length); } // Only construct `out_metadata` if `_output_metadata` has not been cached. if (!_output_metadata) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 402ccef7a15..3ef89f1b378 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -165,9 +165,10 @@ type_id to_type_id(SchemaElement const& schema, case FLOAT: return type_id::FLOAT32; case DOUBLE: return type_id::FLOAT64; case BYTE_ARRAY: - case FIXED_LEN_BYTE_ARRAY: - // Can be mapped to INT32 (32-bit hash) or STRING - return strings_to_categorical ? type_id::INT32 : type_id::STRING; + // strings can be mapped to a 32-bit hash + if (strings_to_categorical) { return type_id::INT32; } + [[fallthrough]]; + case FIXED_LEN_BYTE_ARRAY: return type_id::STRING; case INT96: return (timestamp_type_id != type_id::EMPTY) ? timestamp_type_id : type_id::TIMESTAMP_NANOSECONDS; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 5509a33f9f0..98bc37f689e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -704,7 +704,14 @@ std::vector construct_schema_tree( } schema_tree_node col_schema{}; - col_schema.type = Type::BYTE_ARRAY; + // test if this should this be output as FIXED_LEN_BYTE_ARRAY + if (col_meta.is_type_length_set()) { + col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; + col_schema.type_length = col_meta.get_type_length(); + } else { + col_schema.type = Type::BYTE_ARRAY; + } + col_schema.converted_type = thrust::nullopt; col_schema.stats_dtype = statistics_dtype::dtype_byte_array; col_schema.repetition_type = col_nullable ? OPTIONAL : REQUIRED; @@ -1024,6 +1031,7 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream auto desc = parquet_column_device_view{}; // Zero out all fields desc.stats_dtype = schema_node.stats_dtype; desc.ts_scale = schema_node.ts_scale; + desc.type_length = schema_node.type_length; if (is_list()) { desc.level_offsets = _dremel_offsets.data(); @@ -1266,8 +1274,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT && chunk_col_desc.requested_encoding != column_encoding::DICTIONARY; auto const is_type_non_dict = - chunk_col_desc.physical_type == Type::BOOLEAN || - (chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY); + chunk_col_desc.physical_type == Type::BOOLEAN || chunk_col_desc.output_as_byte_array; if (is_type_non_dict || is_requested_non_dict) { chunk.use_dictionary = false; diff --git a/cpp/src/io/utilities/column_buffer.cpp b/cpp/src/io/utilities/column_buffer.cpp index 5dc2291abdc..c01311ad0c7 100644 --- a/cpp/src/io/utilities/column_buffer.cpp +++ b/cpp/src/io/utilities/column_buffer.cpp @@ -198,6 +198,11 @@ std::unique_ptr make_column(column_buffer_base& buffer, if (schema_info != nullptr) { schema_info->children.push_back(column_name_info{"offsets"}); schema_info->children.push_back(column_name_info{"binary"}); + // cuDF type will be list, but remember it was originally binary data + schema_info->is_binary = true; + if (schema.has_value() and schema->get_type_length() > 0) { + schema_info->type_length = schema->get_type_length(); + } } return make_lists_column( diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index a16b3d63177..2a3bb40a49f 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1857,6 +1857,92 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit) test_durations([](auto i) { return false; }, true); } +TEST_F(ParquetWriterTest, WriteFixedLenByteArray) +{ + using cudf::io::parquet::detail::Encoding; + constexpr int fixed_width = 16; + constexpr cudf::size_type num_rows = fixed_width * fixed_width; + std::vector data(num_rows * fixed_width, 0); + std::vector offsets(num_rows + 1); + + for (int i = 0; i < fixed_width; i++) { + for (int j = 0; j < fixed_width; j++) { + auto const rowid = i * fixed_width + j; + auto const off = rowid * fixed_width; + offsets[rowid] = off; + data[off + fixed_width - 2] = i; + data[off + fixed_width - 1] = j; + } + } + offsets[num_rows] = num_rows * fixed_width; + + auto data_child = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()); + auto off_child = cudf::test::fixed_width_column_wrapper(offsets.begin(), offsets.end()); + auto col = cudf::make_lists_column(num_rows, off_child.release(), data_child.release(), 0, {}); + + auto expected = table_view{{*col, *col, *col, *col}}; + cudf::io::table_input_metadata expected_metadata(expected); + + expected_metadata.column_metadata[0] + .set_name("flba_plain") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::PLAIN) + .set_output_as_binary(true); + expected_metadata.column_metadata[1] + .set_name("flba_split") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::BYTE_STREAM_SPLIT) + .set_output_as_binary(true); + expected_metadata.column_metadata[2] + .set_name("flba_delta") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DELTA_BYTE_ARRAY) + .set_output_as_binary(true); + expected_metadata.column_metadata[3] + .set_name("flba_dict") + .set_type_length(fixed_width) + .set_encoding(cudf::io::column_encoding::DICTIONARY) + .set_output_as_binary(true); + + auto filepath = temp_env->get_temp_filepath("WriteFixedLenByteArray.parquet"); + cudf::io::parquet_writer_options out_opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) + .metadata(expected_metadata); + cudf::io::write_parquet(out_opts); + + cudf::io::parquet_reader_options in_opts = + cudf::io::parquet_reader_options::builder(cudf::io::source_info{filepath}); + auto result = cudf::io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + + // check page headers to make sure each column is encoded with the appropriate encoder + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // check that the schema retains the FIXED_LEN_BYTE_ARRAY type + for (int i = 1; i <= 4; i++) { + EXPECT_EQ(fmd.schema[i].type, cudf::io::parquet::detail::Type::FIXED_LEN_BYTE_ARRAY); + EXPECT_EQ(fmd.schema[i].type_length, fixed_width); + } + + // no nulls and no repetition, so the only encoding used should be for the data. + auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) { + EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc); + }; + + // requested plain + expect_enc(0, Encoding::PLAIN); + // requested byte_stream_split + expect_enc(1, Encoding::BYTE_STREAM_SPLIT); + // requested delta_byte_array + expect_enc(2, Encoding::DELTA_BYTE_ARRAY); + // requested dictionary, but should fall back to plain + // TODO: update if we get FLBA working with dictionary encoding + expect_enc(3, Encoding::PLAIN); +} + ///////////////////////////////////////////////////////////// // custom mem mapped data sink that supports device writes template From b4750842e6cc98f492245a60631e5edfb370c95c Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 25 Apr 2024 21:00:32 +0000 Subject: [PATCH 2/2] address review comments --- cpp/src/io/parquet/writer_impl.cu | 2 +- cpp/tests/io/parquet_writer_test.cpp | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 98bc37f689e..cf0d7c18d95 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -704,7 +704,7 @@ std::vector construct_schema_tree( } schema_tree_node col_schema{}; - // test if this should this be output as FIXED_LEN_BYTE_ARRAY + // test if this should be output as FIXED_LEN_BYTE_ARRAY if (col_meta.is_type_length_set()) { col_schema.type = Type::FIXED_LEN_BYTE_ARRAY; col_schema.type_length = col_meta.get_type_length(); diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 2a3bb40a49f..3996321a066 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1859,22 +1859,22 @@ TEST_F(ParquetWriterTest, DurationByteStreamSplit) TEST_F(ParquetWriterTest, WriteFixedLenByteArray) { + srand(31337); using cudf::io::parquet::detail::Encoding; constexpr int fixed_width = 16; - constexpr cudf::size_type num_rows = fixed_width * fixed_width; - std::vector data(num_rows * fixed_width, 0); + constexpr cudf::size_type num_rows = 200; + std::vector data(num_rows * fixed_width); std::vector offsets(num_rows + 1); - for (int i = 0; i < fixed_width; i++) { - for (int j = 0; j < fixed_width; j++) { - auto const rowid = i * fixed_width + j; - auto const off = rowid * fixed_width; - offsets[rowid] = off; - data[off + fixed_width - 2] = i; - data[off + fixed_width - 1] = j; + // fill a num_rows X fixed_width array with random numbers and populate offsets array + int cur_offset = 0; + for (int i = 0; i < num_rows; i++) { + offsets[i] = cur_offset; + for (int j = 0; j < fixed_width; j++, cur_offset++) { + data[cur_offset] = rand() & 0xff; } } - offsets[num_rows] = num_rows * fixed_width; + offsets[num_rows] = cur_offset; auto data_child = cudf::test::fixed_width_column_wrapper(data.begin(), data.end()); auto off_child = cudf::test::fixed_width_column_wrapper(offsets.begin(), offsets.end());