Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable RLE boolean encoding for v2 Parquet files #13886

Merged
merged 8 commits into from
Aug 17, 2023
122 changes: 80 additions & 42 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ namespace parquet {
namespace gpu {

namespace {
// Spark doesn't support RLE encoding for BOOLEANs
#ifdef ENABLE_BOOL_RLE
constexpr bool enable_bool_rle = true;
#else
constexpr bool enable_bool_rle = false;
#endif

using ::cudf::detail::device_2dspan;

Expand All @@ -70,6 +64,9 @@ constexpr uint32_t WARP_MASK = cudf::detail::warp_size - 1;
// currently 64k - 1
constexpr uint32_t MAX_GRID_Y_SIZE = (1 << 16) - 1;

// space needed for RLE length field
constexpr int RLE_LENGTH_FIELD_LEN = 4;

struct frag_init_state_s {
parquet_column_device_view col;
PageFragment frag;
Expand All @@ -78,6 +75,7 @@ struct frag_init_state_s {
struct page_enc_state_s {
uint8_t* cur; //!< current output ptr
uint8_t* rle_out; //!< current RLE write ptr
uint8_t* rle_len_pos; //!< position to write RLE length (for V2 boolean data)
uint32_t rle_run; //!< current RLE run
uint32_t run_val; //!< current RLE run value
uint32_t rle_pos; //!< RLE encoder positions
Expand Down Expand Up @@ -210,6 +208,27 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t)
}
}

Encoding __device__ determine_encoding(PageType page_type,
Type physical_type,
bool use_dictionary,
bool write_v2_headers)
{
// NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and
// RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and
// data pages (actual encoding is identical).
switch (page_type) {
case PageType::DATA_PAGE: return use_dictionary ? Encoding::PLAIN_DICTIONARY : Encoding::PLAIN;
case PageType::DATA_PAGE_V2:
// TODO need to work in delta encodings here when they're added
return physical_type == BOOLEAN ? Encoding::RLE
: use_dictionary ? Encoding::RLE_DICTIONARY
: Encoding::PLAIN;
case PageType::DICTIONARY_PAGE:
return write_v2_headers ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY;
default: CUDF_UNREACHABLE("unsupported page type");
}
}

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -384,6 +403,11 @@ __global__ void __launch_bounds__(128)
num_pages = 1;
}
__syncwarp();

// page padding needed for RLE encoded boolean data
auto const rle_pad =
write_v2_headers && col_g.physical_type == BOOLEAN ? RLE_LENGTH_FIELD_LEN : 0;

// This loop goes over one page fragment at a time and adds it to page.
// When page size crosses a particular limit, then it moves on to the next page and then next
// page fragment gets added to that one.
Expand Down Expand Up @@ -427,12 +451,12 @@ __global__ void __launch_bounds__(128)
// override this_max_page_size if the requested size is smaller
this_max_page_size = min(this_max_page_size, max_page_size_bytes);

// subtract size of rep and def level vectors
auto num_vals = values_in_page + frag_g.num_values;
this_max_page_size =
underflow_safe_subtract(this_max_page_size,
max_RLE_page_size(col_g.num_def_level_bits(), num_vals) +
max_RLE_page_size(col_g.num_rep_level_bits(), num_vals));
// subtract size of rep and def level vectors and RLE length field
auto num_vals = values_in_page + frag_g.num_values;
this_max_page_size = underflow_safe_subtract(
this_max_page_size,
max_RLE_page_size(col_g.num_def_level_bits(), num_vals) +
max_RLE_page_size(col_g.num_rep_level_bits(), num_vals) + rle_pad);

// checks to see when we need to close the current page and start a new one
auto const is_last_chunk = num_rows >= ck_g.num_rows;
Expand Down Expand Up @@ -474,7 +498,7 @@ __global__ void __launch_bounds__(128)
page_g.num_values = values_in_page;
auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page);
auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page);
auto const max_data_size = page_size + def_level_size + rep_level_size;
auto const max_data_size = page_size + def_level_size + rep_level_size + rle_pad;
// page size must fit in 32-bit signed integer
if (max_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page size exceeds maximum for i32");
Expand Down Expand Up @@ -967,7 +991,8 @@ __global__ void __launch_bounds__(128, 8)
gpuEncodePages(device_span<gpu::EncPage> pages,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_results)
device_span<compression_result> comp_results,
bool write_v2_headers)
{
__shared__ __align__(8) page_enc_state_s state_g;
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
Expand All @@ -990,6 +1015,7 @@ __global__ void __launch_bounds__(128, 8)
s->page.def_lvl_bytes = 0;
s->page.rep_lvl_bytes = 0;
s->page.num_nulls = 0;
s->rle_len_pos = nullptr;
}
__syncthreads();

Expand Down Expand Up @@ -1132,9 +1158,15 @@ __global__ void __launch_bounds__(128, 8)
s->rle_pos = 0;
s->rle_numvals = 0;
s->rle_out = dst;
s->page.encoding =
determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers);
if (dict_bits >= 0 && physical_type != BOOLEAN) {
dst[0] = dict_bits;
s->rle_out = dst + 1;
} else if (is_v2 && physical_type == BOOLEAN) {
// save space for RLE length. we don't know the total length yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is such adjustment only needed for bool RLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So RLE is only used for 1) rep/def levels, 2) dictionary indexes, and 3) booleans. So booleans are the only data type that will be RLE encoded...everything else will be plain or dictionary (or eventually delta).

There's actually a table in the spec that lists when a length is prepended and when it is not.

+--------------+------------------------+-----------------+
| Page kind    | RLE-encoded data kind  | Prepend length? |
+--------------+------------------------+-----------------+
| Data page v1 | Definition levels      | Y               |
|              | Repetition levels      | Y               |
|              | Dictionary indices     | N               |
|              | Boolean values         | Y               |
+--------------+------------------------+-----------------+
| Data page v2 | Definition levels      | N               |
|              | Repetition levels      | N               |
|              | Dictionary indices     | N               |
|              | Boolean values         | Y               |
+--------------+------------------------+-----------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it, thanks.
And this is something that does not make sense to include in fragment size?
I'm asking because this "max RLE page size" is implemented as a util function in the ORC writer, so this manual padding looked odd in comparison.

BTW, must be nice not having RLE in almost every data type (cries in ORC).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is something that does not make sense to include in fragment size?

yeah, because it's just once per page, not every fragment. although, yeah, maybe it should be included in the calculation at line 451. I'll ponder that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think that fixed it. nice catch! That would have been a lot of fun tracking down later :P

s->rle_out = dst + RLE_LENGTH_FIELD_LEN;
s->rle_len_pos = dst;
}
s->page_start_val = row_to_value_idx(s->page.start_row, s->col);
s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col);
Expand Down Expand Up @@ -1188,7 +1220,7 @@ __global__ void __launch_bounds__(128, 8)
}
rle_numvals += rle_numvals_in_block;
__syncthreads();
if ((!enable_bool_rle) && (physical_type == BOOLEAN)) {
if (!is_v2 && physical_type == BOOLEAN) {
PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t);
} else {
RleEncode(s, rle_numvals, dict_bits, (cur_val_idx == s->page.num_leaf_values), t);
Expand Down Expand Up @@ -1345,29 +1377,45 @@ __global__ void __launch_bounds__(128, 8)

uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid);

// save RLE length if necessary
if (s->rle_len_pos != nullptr && t < 32) {
// size doesn't include the 4 bytes for the length
auto const rle_size = static_cast<uint32_t>(s->cur - s->rle_len_pos) - RLE_LENGTH_FIELD_LEN;
if (t < RLE_LENGTH_FIELD_LEN) { s->rle_len_pos[t] = rle_size >> (t * 8); }
__syncwarp();
}

// V2 does not compress rep and def level data
size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes;

if (t == 0) {
s->page.num_nulls = s->page.num_values - valid_count;
uint8_t* base = s->page.page_data + s->page.max_hdr_size;
auto actual_data_size = static_cast<uint32_t>(s->cur - base);
s->page.num_nulls = s->page.num_values - valid_count;
uint8_t* const base = s->page.page_data + s->page.max_hdr_size;
auto const actual_data_size = static_cast<uint32_t>(s->cur - base);
if (actual_data_size > s->page.max_data_size) {
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
if (not comp_in.empty()) {
// V2 does not compress rep and def level data
size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size,
0}; // size is unused
// copy uncompressed bytes over
memcpy(s->page.compressed_data + s->page.max_hdr_size, base, skip_comp_size);
}
pages[blockIdx.x] = s->page;
if (not comp_results.empty()) {
comp_results[blockIdx.x] = {0, compression_status::FAILURE};
pages[blockIdx.x].comp_res = &comp_results[blockIdx.x];
}
}

// copy over uncompressed data
if (skip_comp_size != 0 && not comp_in.empty()) {
uint8_t const* const src = s->page.page_data + s->page.max_hdr_size;
uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size;
for (int i = t; i < skip_comp_size; i += block_size) {
dst[i] = src[i];
}
}
}

constexpr int decide_compression_warps_in_block = 4;
Expand Down Expand Up @@ -1865,28 +1913,16 @@ __global__ void __launch_bounds__(128)
}
header_encoder encoder(hdr_start);
PageType page_type = page_g.page_type;
// NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and
// RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and
// data pages (actual encoding is identical).
Encoding encoding;
if (enable_bool_rle) {
encoding = (col_g.physical_type == BOOLEAN) ? Encoding::RLE
: (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary)
? Encoding::PLAIN_DICTIONARY
: Encoding::PLAIN;
} else {
encoding = (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary)
? Encoding::PLAIN_DICTIONARY
: Encoding::PLAIN;
}

encoder.field_int32(1, page_type);
encoder.field_int32(2, uncompressed_page_size);
encoder.field_int32(3, compressed_page_size);

if (page_type == PageType::DATA_PAGE) {
// DataPageHeader
encoder.field_struct_begin(5);
encoder.field_int32(1, page_g.num_values); // NOTE: num_values != num_rows for list types
encoder.field_int32(2, encoding); // encoding
encoder.field_int32(2, page_g.encoding); // encoding
encoder.field_int32(3, Encoding::RLE); // definition_level_encoding
encoder.field_int32(4, Encoding::RLE); // repetition_level_encoding
// Optionally encode page-level statistics
Expand All @@ -1898,11 +1934,12 @@ __global__ void __launch_bounds__(128)
}
encoder.field_struct_end(5);
} else if (page_type == PageType::DATA_PAGE_V2) {
// DataPageHeaderV2
encoder.field_struct_begin(8);
encoder.field_int32(1, page_g.num_values);
encoder.field_int32(2, page_g.num_nulls);
encoder.field_int32(3, page_g.num_rows);
encoder.field_int32(4, encoding);
encoder.field_int32(4, page_g.encoding);
encoder.field_int32(5, page_g.def_lvl_bytes);
encoder.field_int32(6, page_g.rep_lvl_bytes);
encoder.field_bool(7, ck_g.is_compressed); // TODO can compress at page level now
Expand All @@ -1918,7 +1955,7 @@ __global__ void __launch_bounds__(128)
// DictionaryPageHeader
encoder.field_struct_begin(7);
encoder.field_int32(1, ck_g.num_dict_entries); // number of values in dictionary
encoder.field_int32(2, encoding);
encoder.field_int32(2, page_g.encoding);
encoder.field_struct_end(7);
}
encoder.end(&hdr_end, false);
Expand Down Expand Up @@ -2237,6 +2274,7 @@ void InitEncoderPages(device_2dspan<EncColumnChunk> chunks,
}

void EncodePages(device_span<gpu::EncPage> pages,
bool write_v2_headers,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_results,
Expand All @@ -2245,8 +2283,8 @@ void EncodePages(device_span<gpu::EncPage> pages,
auto num_pages = pages.size();
// A page is part of one column. This is launching 1 block per page. 1 block will exclusively
// deal with one datatype.
gpuEncodePages<128>
<<<num_pages, 128, 0, stream.value()>>>(pages, comp_in, comp_out, comp_results);
gpuEncodePages<128><<<num_pages, 128, 0, stream.value()>>>(
pages, comp_in, comp_out, comp_results, write_v2_headers);
}

void DecideCompression(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ struct EncPage {
uint8_t* compressed_data; //!< Ptr to compressed page
uint16_t num_fragments; //!< Number of fragments in page
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
EncColumnChunk* chunk; //!< Chunk that this page belongs to
uint32_t chunk_id; //!< Index in chunk array
uint32_t hdr_size; //!< Size of page header
Expand Down Expand Up @@ -672,13 +673,18 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
/**
* @brief Launches kernel for packing column data into parquet pages
*
* If compression is to be used, `comp_in`, `comp_out`, and `comp_res` will be initialized for
* use in subsequent compression operations.
*
* @param[in,out] pages Device array of EncPages (unordered)
* @param[in] write_v2_headers True if V2 page headers should be written
etseidl marked this conversation as resolved.
Show resolved Hide resolved
* @param[out] comp_in Compressor input buffers
* @param[out] comp_in Compressor output buffers
* @param[out] comp_stats Compressor results
* @param[in] stream CUDA stream to use, default 0
* @param[out] comp_out Compressor output buffers
* @param[out] comp_res Compressor results
* @param[in] stream CUDA stream to use
*/
void EncodePages(device_span<EncPage> pages,
bool write_v2_headers,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_res,
Expand All @@ -688,18 +694,18 @@ void EncodePages(device_span<EncPage> pages,
* @brief Launches kernel to make the compressed vs uncompressed chunk-level decision
*
* @param[in,out] chunks Column chunks (updated with actual compressed/uncompressed sizes)
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void DecideCompression(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream);

/**
* @brief Launches kernel to encode page headers
*
* @param[in,out] pages Device array of EncPages
* @param[in] comp_stats Compressor status
* @param[in] comp_res Compressor status
* @param[in] page_stats Optional page-level statistics to be included in page header
* @param[in] chunk_stats Optional chunk-level statistics to be encoded
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void EncodePageHeaders(device_span<EncPage> pages,
device_span<compression_result const> comp_res,
Expand All @@ -712,7 +718,7 @@ void EncodePageHeaders(device_span<EncPage> pages,
*
* @param[in,out] chunks Column chunks
* @param[in] pages Device array of EncPages
* @param[in] stream CUDA stream to use, default 0
* @param[in] stream CUDA stream to use
*/
void GatherPages(device_span<EncColumnChunk> chunks,
device_span<gpu::EncPage const> pages,
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,7 @@ void init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
* @param comp_stats optional compression statistics (nullopt if none)
* @param compression compression format
* @param column_index_truncate_length maximum length of min or max values in column index, in bytes
* @param write_v2_headers True if V2 page headers should be written
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
Expand All @@ -1280,6 +1281,7 @@ void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
std::optional<writer_compression_statistics>& comp_stats,
Compression compression,
int32_t column_index_truncate_length,
bool write_v2_headers,
rmm::cuda_stream_view stream)
{
auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch);
Expand All @@ -1300,7 +1302,7 @@ void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
comp_res.end(),
compression_result{0, compression_status::FAILURE});

gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream);
gpu::EncodePages(batch_pages, write_v2_headers, comp_in, comp_out, comp_res, stream);
switch (compression) {
case parquet::Compression::SNAPPY:
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
Expand Down Expand Up @@ -1926,6 +1928,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
comp_stats,
compression,
column_index_truncate_length,
write_v2_headers,
stream);

bool need_sync{false};
Expand Down