From dbf7236c4b30ee6f87223b728688cddf39453d14 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Tue, 5 Mar 2024 22:59:21 -0800 Subject: [PATCH] Add ability to request Parquet encodings on a per-column basis (#15081) Allows users to request specific page encodings to use on a column-by-column basis. This is accomplished by adding an `encoding` property to the `column_input_metadata` struct. This is a necessary change before adding `DELTA_BYTE_ARRAY` encoding. Authors: - Ed Seidl (https://github.com/etseidl) - Vukasin Milovanovic (https://github.com/vuule) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Yunsong Wang (https://github.com/PointKernel) - Nghia Truong (https://github.com/ttnghia) URL: https://github.com/rapidsai/cudf/pull/15081 --- cpp/include/cudf/io/types.hpp | 44 ++++++++++ cpp/src/io/parquet/page_enc.cu | 34 +++++++- cpp/src/io/parquet/parquet_gpu.hpp | 1 + cpp/src/io/parquet/writer_impl.cu | 85 +++++++++++++++++-- cpp/tests/io/parquet_writer_test.cpp | 122 +++++++++++++++++++++++++++ 5 files changed, 276 insertions(+), 10 deletions(-) diff --git a/cpp/include/cudf/io/types.hpp b/cpp/include/cudf/io/types.hpp index 3208a81cd63..64d627483e6 100644 --- a/cpp/include/cudf/io/types.hpp +++ b/cpp/include/cudf/io/types.hpp @@ -99,6 +99,26 @@ enum statistics_freq { STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP }; +/** + * @brief Valid encodings for use with `column_in_metadata::set_encoding()` + */ +enum class column_encoding { + // Common encodings: + USE_DEFAULT = -1, ///< No encoding has been requested, use default encoding + DICTIONARY, ///< Use dictionary encoding + // Parquet encodings: + PLAIN, ///< Use plain encoding + DELTA_BINARY_PACKED, ///< Use DELTA_BINARY_PACKED encoding (only valid for integer columns) + DELTA_LENGTH_BYTE_ARRAY, ///< Use DELTA_LENGTH_BYTE_ARRAY encoding (only + ///< valid for BYTE_ARRAY columns) + DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for + ///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns) + // ORC encodings: + DIRECT, ///< Use DIRECT encoding + DIRECT_V2, ///< Use DIRECT_V2 encoding + DICTIONARY_V2, ///< Use DICTIONARY_V2 encoding +}; + /** * @brief Statistics about compression performed by a writer. */ @@ -585,6 +605,7 @@ class column_in_metadata { std::optional _decimal_precision; std::optional _parquet_field_id; std::vector children; + column_encoding _encoding = column_encoding::USE_DEFAULT; public: column_in_metadata() = default; @@ -701,6 +722,22 @@ class column_in_metadata { return *this; } + /** + * @brief Sets the encoding to use for this column. + * + * This is just a request, and the encoder may still choose to use a different encoding + * depending on resource constraints. Use the constants defined in the `parquet_encoding` + * struct. + * + * @param encoding The encoding to use + * @return this for chaining + */ + column_in_metadata& set_encoding(column_encoding encoding) noexcept + { + _encoding = encoding; + return *this; + } + /** * @brief Get reference to a child of this column * @@ -806,6 +843,13 @@ class column_in_metadata { * @return Boolean indicating whether to encode this column as binary data */ [[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; } + + /** + * @brief Get the encoding that was set for this column. + * + * @return The encoding that was set for this column + */ + [[nodiscard]] column_encoding get_encoding() const { return _encoding; } }; /** diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 5aad31bd057..617cb1d0992 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -573,9 +573,13 @@ CUDF_KERNEL void __launch_bounds__(128) // at the worst case number of bytes needed to encode. auto const physical_type = col_g.physical_type; auto const type_id = col_g.leaf_column->type().id(); - auto const is_use_delta = - write_v2_headers && !ck_g.use_dictionary && + auto const is_requested_delta = + col_g.requested_encoding == column_encoding::DELTA_BINARY_PACKED || + col_g.requested_encoding == column_encoding::DELTA_LENGTH_BYTE_ARRAY; + auto const is_fallback_to_delta = + !ck_g.use_dictionary && write_v2_headers && (physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY); + auto const is_use_delta = is_requested_delta || is_fallback_to_delta; if (t < 32) { uint32_t fragments_in_chunk = 0; @@ -786,7 +790,31 @@ CUDF_KERNEL void __launch_bounds__(128) if (t == 0) { if (not pages.empty()) { // set encoding - if (is_use_delta) { + if (col_g.requested_encoding != column_encoding::USE_DEFAULT) { + switch (col_g.requested_encoding) { + case column_encoding::PLAIN: page_g.kernel_mask = encode_kernel_mask::PLAIN; break; + case column_encoding::DICTIONARY: + // user may have requested dict, but we may not be able to use it + // TODO: when DELTA_BYTE_ARRAY is added, rework the fallback logic so there + // isn't duplicated code here and below. + if (ck_g.use_dictionary) { + page_g.kernel_mask = encode_kernel_mask::DICTIONARY; + } else if (is_fallback_to_delta) { + page_g.kernel_mask = physical_type == BYTE_ARRAY + ? encode_kernel_mask::DELTA_LENGTH_BA + : encode_kernel_mask::DELTA_BINARY; + } else { + page_g.kernel_mask = encode_kernel_mask::PLAIN; + } + break; + case column_encoding::DELTA_BINARY_PACKED: + page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY; + break; + case column_encoding::DELTA_LENGTH_BYTE_ARRAY: + page_g.kernel_mask = encode_kernel_mask::DELTA_LENGTH_BA; + break; + } + } else if (is_use_delta) { // TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA // should always be preferred over PLAIN, but DELTA_BINARY is a different matter. // If the delta encoding size is going to be close to 32 bits anyway, then plain diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 86d6ec42c04..af9f1f1267e 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -460,6 +460,7 @@ struct parquet_column_device_view : stats_column_desc { //!< nullability of parent_column. May be different from //!< col.nullable() in case of chunked writing. bool output_as_byte_array; //!< Indicates this list column is being written as a byte array + column_encoding requested_encoding; //!< User specified encoding for this column. }; struct EncColumnChunk; diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index ecdbdd0fd5f..87c8b2f1611 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -267,11 +267,13 @@ bool is_col_fixed_width(column_view const& column) * 2. stats_dtype: datatype for statistics calculation required for the data stream of a leaf node. * 3. ts_scale: scale to multiply or divide timestamp by in order to convert timestamp to parquet * supported types + * 4. requested_encoding: A user provided encoding to use for the column. */ struct schema_tree_node : public SchemaElement { cudf::detail::LinkedColPtr leaf_column; statistics_dtype stats_dtype; int32_t ts_scale; + column_encoding requested_encoding; // TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The // function construct_schema_tree could be its constructor. It can have method to get the per @@ -588,7 +590,7 @@ std::vector construct_schema_tree( std::function add_schema = [&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) { - bool col_nullable = is_col_nullable(col, col_meta, write_mode); + bool const col_nullable = is_col_nullable(col, col_meta, write_mode); auto set_field_id = [&schema, parent_idx](schema_tree_node& s, column_in_metadata const& col_meta) { @@ -604,6 +606,52 @@ std::vector construct_schema_tree( return child_col_type == type_id::UINT8; }; + // only call this after col_schema.type has been set + auto set_encoding = [&schema, parent_idx](schema_tree_node& s, + column_in_metadata const& col_meta) { + s.requested_encoding = column_encoding::USE_DEFAULT; + + if (schema[parent_idx].name != "list" and + col_meta.get_encoding() != column_encoding::USE_DEFAULT) { + // do some validation + switch (col_meta.get_encoding()) { + case column_encoding::DELTA_BINARY_PACKED: + if (s.type != Type::INT32 && s.type != Type::INT64) { + CUDF_LOG_WARN( + "DELTA_BINARY_PACKED encoding is only supported for INT32 and INT64 columns; the " + "requested encoding will be ignored"); + return; + } + break; + + case column_encoding::DELTA_LENGTH_BYTE_ARRAY: + if (s.type != Type::BYTE_ARRAY) { + CUDF_LOG_WARN( + "DELTA_LENGTH_BYTE_ARRAY encoding is only supported for BYTE_ARRAY columns; the " + "requested encoding will be ignored"); + return; + } + break; + + // supported parquet encodings + case column_encoding::PLAIN: + case column_encoding::DICTIONARY: break; + + // not yet supported for write (soon...) + case column_encoding::DELTA_BYTE_ARRAY: [[fallthrough]]; + // all others + default: + CUDF_LOG_WARN( + "Unsupported page encoding requested: {}; the requested encoding will be ignored", + static_cast(col_meta.get_encoding())); + return; + } + + // requested encoding seems to be ok, set it + s.requested_encoding = col_meta.get_encoding(); + } + }; + // There is a special case for a list column with one byte column child. This column can // have a special flag that indicates we write this out as binary instead of a list. This is a // more efficient storage mechanism for a single-depth list of bytes, but is a departure from @@ -626,6 +674,7 @@ std::vector construct_schema_tree( col_schema.parent_idx = parent_idx; col_schema.leaf_column = col; set_field_id(col_schema, col_meta); + set_encoding(col_schema, col_meta); col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary(); schema.push_back(col_schema); } else if (col->type().id() == type_id::STRUCT) { @@ -761,6 +810,7 @@ std::vector construct_schema_tree( col_schema.parent_idx = parent_idx; col_schema.leaf_column = col; set_field_id(col_schema, col_meta); + set_encoding(col_schema, col_meta); schema.push_back(col_schema); } }; @@ -947,9 +997,10 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 | CompactProtocolReader::NumRequiredBits(max_def_level()); - desc.nullability = _d_nullability.data(); - desc.max_def_level = _max_def_level; - desc.max_rep_level = _max_rep_level; + desc.nullability = _d_nullability.data(); + desc.max_def_level = _max_def_level; + desc.max_rep_level = _max_rep_level; + desc.requested_encoding = schema_node.requested_encoding; return desc; } @@ -1169,9 +1220,15 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, std::vector> hash_maps_storage; hash_maps_storage.reserve(h_chunks.size()); for (auto& chunk : h_chunks) { - if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN || - (col_desc[chunk.col_desc_id].output_as_byte_array && - col_desc[chunk.col_desc_id].physical_type == Type::BYTE_ARRAY)) { + auto const& chunk_col_desc = col_desc[chunk.col_desc_id]; + auto const is_requested_non_dict = + chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT && + chunk_col_desc.requested_encoding != column_encoding::DICTIONARY; + auto const is_type_non_dict = + chunk_col_desc.physical_type == Type::BOOLEAN || + (chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY); + + if (is_type_non_dict || is_requested_non_dict) { chunk.use_dictionary = false; } else { chunk.use_dictionary = true; @@ -1191,6 +1248,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, chunks.device_to_host_sync(stream); // Make decision about which chunks have dictionary + bool cannot_honor_request = false; for (auto& ck : h_chunks) { if (not ck.use_dictionary) { continue; } std::tie(ck.use_dictionary, ck.dict_rle_bits) = [&]() -> std::pair { @@ -1217,6 +1275,19 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, return {true, nbits}; }(); + // If dictionary encoding was requested, but it cannot be used, then print a warning. It will + // actually be disabled in gpuInitPages. + if (not ck.use_dictionary) { + auto const& chunk_col_desc = col_desc[ck.col_desc_id]; + if (chunk_col_desc.requested_encoding == column_encoding::DICTIONARY) { + cannot_honor_request = true; + } + } + } + + // warn if we have to ignore requested encoding + if (cannot_honor_request) { + CUDF_LOG_WARN("DICTIONARY encoding was requested, but resource constraints prevent its use"); } // TODO: (enh) Deallocate hash map storage for chunks that don't use dict and clear pointers. diff --git a/cpp/tests/io/parquet_writer_test.cpp b/cpp/tests/io/parquet_writer_test.cpp index 62a24bf0a73..f4da9f59b8c 100644 --- a/cpp/tests/io/parquet_writer_test.cpp +++ b/cpp/tests/io/parquet_writer_test.cpp @@ -1426,6 +1426,128 @@ TEST_F(ParquetWriterTest, RowGroupMetadata) static_cast(num_rows * sizeof(column_type))); } +TEST_F(ParquetWriterTest, UserRequestedDictFallback) +{ + constexpr int num_rows = 100; + constexpr char const* big_string = + "a " + "very very very very very very very very very very very very very very very very very very " + "very very very very very very very very very very very very very very very very very very " + "very very very very very very very very very very very very very very very very very very " + "very very very very very very very very very very very very very very very very very very " + "very very very very very very very very very very very very very very very very very very " + "very very very very very very very very very very very very very very very very very very " + "long string"; + + auto const max_dict_size = strlen(big_string) * num_rows / 2; + + auto elements1 = cudf::detail::make_counting_transform_iterator( + 0, [big_string](auto i) { return big_string + std::to_string(i); }); + auto const col1 = cudf::test::strings_column_wrapper(elements1, elements1 + num_rows); + auto const table = table_view({col1}); + + cudf::io::table_input_metadata table_metadata(table); + table_metadata.column_metadata[0] + .set_name("big_strings") + .set_encoding(cudf::io::column_encoding::DICTIONARY) + .set_nullability(false); + + auto const filepath = temp_env->get_temp_filepath("UserRequestedDictFallback.parquet"); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) + .metadata(table_metadata) + .max_dictionary_size(max_dict_size); + cudf::io::write_parquet(opts); + + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // encoding should have fallen back to PLAIN + EXPECT_EQ(fmd.row_groups[0].columns[0].meta_data.encodings[0], + cudf::io::parquet::detail::Encoding::PLAIN); +} + +TEST_F(ParquetWriterTest, UserRequestedEncodings) +{ + using cudf::io::column_encoding; + using cudf::io::parquet::detail::Encoding; + constexpr int num_rows = 500; + + auto const ones = thrust::make_constant_iterator(1); + auto const col = + cudf::test::fixed_width_column_wrapper{ones, ones + num_rows, no_nulls()}; + + auto const strings = thrust::make_constant_iterator("string"); + auto const string_col = + cudf::test::strings_column_wrapper(strings, strings + num_rows, no_nulls()); + + auto const table = table_view( + {col, col, col, col, col, string_col, string_col, string_col, string_col, string_col}); + + cudf::io::table_input_metadata table_metadata(table); + + auto const set_meta = [&table_metadata](int idx, std::string const& name, column_encoding enc) { + table_metadata.column_metadata[idx].set_name(name).set_encoding(enc); + }; + + set_meta(0, "int_plain", column_encoding::PLAIN); + set_meta(1, "int_dict", column_encoding::DICTIONARY); + set_meta(2, "int_db", column_encoding::DELTA_BINARY_PACKED); + set_meta(3, "int_dlba", column_encoding::DELTA_LENGTH_BYTE_ARRAY); + table_metadata.column_metadata[4].set_name("int_none"); + + set_meta(5, "string_plain", column_encoding::PLAIN); + set_meta(6, "string_dict", column_encoding::DICTIONARY); + set_meta(7, "string_dlba", column_encoding::DELTA_LENGTH_BYTE_ARRAY); + set_meta(8, "string_db", column_encoding::DELTA_BINARY_PACKED); + table_metadata.column_metadata[9].set_name("string_none"); + + for (auto& col_meta : table_metadata.column_metadata) { + col_meta.set_nullability(false); + } + + auto const filepath = temp_env->get_temp_filepath("UserRequestedEncodings.parquet"); + cudf::io::parquet_writer_options opts = + cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, table) + .metadata(table_metadata) + .stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN) + .compression(cudf::io::compression_type::ZSTD); + cudf::io::write_parquet(opts); + + // check page headers to make sure each column is encoded with the appropriate encoder + auto const source = cudf::io::datasource::create(filepath); + cudf::io::parquet::detail::FileMetaData fmd; + read_footer(source, &fmd); + + // no nulls and no repetition, so the only encoding used should be for the data. + // since we're writing v1, both dict and data pages should use PLAIN_DICTIONARY. + auto const expect_enc = [&fmd](int idx, cudf::io::parquet::detail::Encoding enc) { + EXPECT_EQ(fmd.row_groups[0].columns[idx].meta_data.encodings[0], enc); + }; + + // requested plain + expect_enc(0, Encoding::PLAIN); + // requested dictionary + expect_enc(1, Encoding::PLAIN_DICTIONARY); + // requested delta_binary_packed + expect_enc(2, Encoding::DELTA_BINARY_PACKED); + // requested delta_length_byte_array, but should fall back to dictionary + expect_enc(3, Encoding::PLAIN_DICTIONARY); + // no request, should fall back to dictionary + expect_enc(4, Encoding::PLAIN_DICTIONARY); + // requested plain + expect_enc(5, Encoding::PLAIN); + // requested dictionary + expect_enc(6, Encoding::PLAIN_DICTIONARY); + // requested delta_length_byte_array + expect_enc(7, Encoding::DELTA_LENGTH_BYTE_ARRAY); + // requested delta_binary_packed, but should fall back to dictionary + expect_enc(8, Encoding::PLAIN_DICTIONARY); + // no request, should fall back to dictionary + expect_enc(9, Encoding::PLAIN_DICTIONARY); +} + TEST_F(ParquetWriterTest, DeltaBinaryStartsWithNulls) { // test that the DELTA_BINARY_PACKED writer can properly encode a column that begins with