Skip to content

Commit

Permalink
Add ability to request Parquet encodings on a per-column basis (#15081)
Browse files Browse the repository at this point in the history
Allows users to request specific page encodings to use on a column-by-column basis. This is accomplished by adding an `encoding` property to the `column_input_metadata` struct. This is a necessary change before adding `DELTA_BYTE_ARRAY` encoding.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)

URL: #15081
  • Loading branch information
etseidl authored Mar 6, 2024
1 parent aabfd83 commit dbf7236
Show file tree
Hide file tree
Showing 5 changed files with 276 additions and 10 deletions.
44 changes: 44 additions & 0 deletions cpp/include/cudf/io/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ enum statistics_freq {
STATISTICS_COLUMN = 3, ///< Full column and offset indices. Implies STATISTICS_ROWGROUP
};

/**
* @brief Valid encodings for use with `column_in_metadata::set_encoding()`
*/
enum class column_encoding {
// Common encodings:
USE_DEFAULT = -1, ///< No encoding has been requested, use default encoding
DICTIONARY, ///< Use dictionary encoding
// Parquet encodings:
PLAIN, ///< Use plain encoding
DELTA_BINARY_PACKED, ///< Use DELTA_BINARY_PACKED encoding (only valid for integer columns)
DELTA_LENGTH_BYTE_ARRAY, ///< Use DELTA_LENGTH_BYTE_ARRAY encoding (only
///< valid for BYTE_ARRAY columns)
DELTA_BYTE_ARRAY, ///< Use DELTA_BYTE_ARRAY encoding (only valid for
///< BYTE_ARRAY and FIXED_LEN_BYTE_ARRAY columns)
// ORC encodings:
DIRECT, ///< Use DIRECT encoding
DIRECT_V2, ///< Use DIRECT_V2 encoding
DICTIONARY_V2, ///< Use DICTIONARY_V2 encoding
};

/**
* @brief Statistics about compression performed by a writer.
*/
Expand Down Expand Up @@ -585,6 +605,7 @@ class column_in_metadata {
std::optional<uint8_t> _decimal_precision;
std::optional<int32_t> _parquet_field_id;
std::vector<column_in_metadata> children;
column_encoding _encoding = column_encoding::USE_DEFAULT;

public:
column_in_metadata() = default;
Expand Down Expand Up @@ -701,6 +722,22 @@ class column_in_metadata {
return *this;
}

/**
* @brief Sets the encoding to use for this column.
*
* This is just a request, and the encoder may still choose to use a different encoding
* depending on resource constraints. Use the constants defined in the `parquet_encoding`
* struct.
*
* @param encoding The encoding to use
* @return this for chaining
*/
column_in_metadata& set_encoding(column_encoding encoding) noexcept
{
_encoding = encoding;
return *this;
}

/**
* @brief Get reference to a child of this column
*
Expand Down Expand Up @@ -806,6 +843,13 @@ class column_in_metadata {
* @return Boolean indicating whether to encode this column as binary data
*/
[[nodiscard]] bool is_enabled_output_as_binary() const noexcept { return _output_as_binary; }

/**
* @brief Get the encoding that was set for this column.
*
* @return The encoding that was set for this column
*/
[[nodiscard]] column_encoding get_encoding() const { return _encoding; }
};

/**
Expand Down
34 changes: 31 additions & 3 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,13 @@ CUDF_KERNEL void __launch_bounds__(128)
// at the worst case number of bytes needed to encode.
auto const physical_type = col_g.physical_type;
auto const type_id = col_g.leaf_column->type().id();
auto const is_use_delta =
write_v2_headers && !ck_g.use_dictionary &&
auto const is_requested_delta =
col_g.requested_encoding == column_encoding::DELTA_BINARY_PACKED ||
col_g.requested_encoding == column_encoding::DELTA_LENGTH_BYTE_ARRAY;
auto const is_fallback_to_delta =
!ck_g.use_dictionary && write_v2_headers &&
(physical_type == INT32 || physical_type == INT64 || physical_type == BYTE_ARRAY);
auto const is_use_delta = is_requested_delta || is_fallback_to_delta;

if (t < 32) {
uint32_t fragments_in_chunk = 0;
Expand Down Expand Up @@ -786,7 +790,31 @@ CUDF_KERNEL void __launch_bounds__(128)
if (t == 0) {
if (not pages.empty()) {
// set encoding
if (is_use_delta) {
if (col_g.requested_encoding != column_encoding::USE_DEFAULT) {
switch (col_g.requested_encoding) {
case column_encoding::PLAIN: page_g.kernel_mask = encode_kernel_mask::PLAIN; break;
case column_encoding::DICTIONARY:
// user may have requested dict, but we may not be able to use it
// TODO: when DELTA_BYTE_ARRAY is added, rework the fallback logic so there
// isn't duplicated code here and below.
if (ck_g.use_dictionary) {
page_g.kernel_mask = encode_kernel_mask::DICTIONARY;
} else if (is_fallback_to_delta) {
page_g.kernel_mask = physical_type == BYTE_ARRAY
? encode_kernel_mask::DELTA_LENGTH_BA
: encode_kernel_mask::DELTA_BINARY;
} else {
page_g.kernel_mask = encode_kernel_mask::PLAIN;
}
break;
case column_encoding::DELTA_BINARY_PACKED:
page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY;
break;
case column_encoding::DELTA_LENGTH_BYTE_ARRAY:
page_g.kernel_mask = encode_kernel_mask::DELTA_LENGTH_BA;
break;
}
} else if (is_use_delta) {
// TODO(ets): at some point make a more intelligent decision on this. DELTA_LENGTH_BA
// should always be preferred over PLAIN, but DELTA_BINARY is a different matter.
// If the delta encoding size is going to be close to 32 bits anyway, then plain
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ struct parquet_column_device_view : stats_column_desc {
//!< nullability of parent_column. May be different from
//!< col.nullable() in case of chunked writing.
bool output_as_byte_array; //!< Indicates this list column is being written as a byte array
column_encoding requested_encoding; //!< User specified encoding for this column.
};

struct EncColumnChunk;
Expand Down
85 changes: 78 additions & 7 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -267,11 +267,13 @@ bool is_col_fixed_width(column_view const& column)
* 2. stats_dtype: datatype for statistics calculation required for the data stream of a leaf node.
* 3. ts_scale: scale to multiply or divide timestamp by in order to convert timestamp to parquet
* supported types
* 4. requested_encoding: A user provided encoding to use for the column.
*/
struct schema_tree_node : public SchemaElement {
cudf::detail::LinkedColPtr leaf_column;
statistics_dtype stats_dtype;
int32_t ts_scale;
column_encoding requested_encoding;

// TODO(fut): Think about making schema a class that holds a vector of schema_tree_nodes. The
// function construct_schema_tree could be its constructor. It can have method to get the per
Expand Down Expand Up @@ -588,7 +590,7 @@ std::vector<schema_tree_node> construct_schema_tree(

std::function<void(cudf::detail::LinkedColPtr const&, column_in_metadata&, size_t)> add_schema =
[&](cudf::detail::LinkedColPtr const& col, column_in_metadata& col_meta, size_t parent_idx) {
bool col_nullable = is_col_nullable(col, col_meta, write_mode);
bool const col_nullable = is_col_nullable(col, col_meta, write_mode);

auto set_field_id = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
Expand All @@ -604,6 +606,52 @@ std::vector<schema_tree_node> construct_schema_tree(
return child_col_type == type_id::UINT8;
};

// only call this after col_schema.type has been set
auto set_encoding = [&schema, parent_idx](schema_tree_node& s,
column_in_metadata const& col_meta) {
s.requested_encoding = column_encoding::USE_DEFAULT;

if (schema[parent_idx].name != "list" and
col_meta.get_encoding() != column_encoding::USE_DEFAULT) {
// do some validation
switch (col_meta.get_encoding()) {
case column_encoding::DELTA_BINARY_PACKED:
if (s.type != Type::INT32 && s.type != Type::INT64) {
CUDF_LOG_WARN(
"DELTA_BINARY_PACKED encoding is only supported for INT32 and INT64 columns; the "
"requested encoding will be ignored");
return;
}
break;

case column_encoding::DELTA_LENGTH_BYTE_ARRAY:
if (s.type != Type::BYTE_ARRAY) {
CUDF_LOG_WARN(
"DELTA_LENGTH_BYTE_ARRAY encoding is only supported for BYTE_ARRAY columns; the "
"requested encoding will be ignored");
return;
}
break;

// supported parquet encodings
case column_encoding::PLAIN:
case column_encoding::DICTIONARY: break;

// not yet supported for write (soon...)
case column_encoding::DELTA_BYTE_ARRAY: [[fallthrough]];
// all others
default:
CUDF_LOG_WARN(
"Unsupported page encoding requested: {}; the requested encoding will be ignored",
static_cast<int>(col_meta.get_encoding()));
return;
}

// requested encoding seems to be ok, set it
s.requested_encoding = col_meta.get_encoding();
}
};

// There is a special case for a list<int8> column with one byte column child. This column can
// have a special flag that indicates we write this out as binary instead of a list. This is a
// more efficient storage mechanism for a single-depth list of bytes, but is a departure from
Expand All @@ -626,6 +674,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
col_schema.output_as_byte_array = col_meta.is_enabled_output_as_binary();
schema.push_back(col_schema);
} else if (col->type().id() == type_id::STRUCT) {
Expand Down Expand Up @@ -761,6 +810,7 @@ std::vector<schema_tree_node> construct_schema_tree(
col_schema.parent_idx = parent_idx;
col_schema.leaf_column = col;
set_field_id(col_schema, col_meta);
set_encoding(col_schema, col_meta);
schema.push_back(col_schema);
}
};
Expand Down Expand Up @@ -947,9 +997,10 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream

desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 |
CompactProtocolReader::NumRequiredBits(max_def_level());
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.nullability = _d_nullability.data();
desc.max_def_level = _max_def_level;
desc.max_rep_level = _max_rep_level;
desc.requested_encoding = schema_node.requested_encoding;
return desc;
}

Expand Down Expand Up @@ -1169,9 +1220,15 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
std::vector<rmm::device_uvector<slot_type>> hash_maps_storage;
hash_maps_storage.reserve(h_chunks.size());
for (auto& chunk : h_chunks) {
if (col_desc[chunk.col_desc_id].physical_type == Type::BOOLEAN ||
(col_desc[chunk.col_desc_id].output_as_byte_array &&
col_desc[chunk.col_desc_id].physical_type == Type::BYTE_ARRAY)) {
auto const& chunk_col_desc = col_desc[chunk.col_desc_id];
auto const is_requested_non_dict =
chunk_col_desc.requested_encoding != column_encoding::USE_DEFAULT &&
chunk_col_desc.requested_encoding != column_encoding::DICTIONARY;
auto const is_type_non_dict =
chunk_col_desc.physical_type == Type::BOOLEAN ||
(chunk_col_desc.output_as_byte_array && chunk_col_desc.physical_type == Type::BYTE_ARRAY);

if (is_type_non_dict || is_requested_non_dict) {
chunk.use_dictionary = false;
} else {
chunk.use_dictionary = true;
Expand All @@ -1191,6 +1248,7 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,
chunks.device_to_host_sync(stream);

// Make decision about which chunks have dictionary
bool cannot_honor_request = false;
for (auto& ck : h_chunks) {
if (not ck.use_dictionary) { continue; }
std::tie(ck.use_dictionary, ck.dict_rle_bits) = [&]() -> std::pair<bool, uint8_t> {
Expand All @@ -1217,6 +1275,19 @@ build_chunk_dictionaries(hostdevice_2dvector<EncColumnChunk>& chunks,

return {true, nbits};
}();
// If dictionary encoding was requested, but it cannot be used, then print a warning. It will
// actually be disabled in gpuInitPages.
if (not ck.use_dictionary) {
auto const& chunk_col_desc = col_desc[ck.col_desc_id];
if (chunk_col_desc.requested_encoding == column_encoding::DICTIONARY) {
cannot_honor_request = true;
}
}
}

// warn if we have to ignore requested encoding
if (cannot_honor_request) {
CUDF_LOG_WARN("DICTIONARY encoding was requested, but resource constraints prevent its use");
}

// TODO: (enh) Deallocate hash map storage for chunks that don't use dict and clear pointers.
Expand Down
Loading

0 comments on commit dbf7236

Please sign in to comment.