Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable RLE boolean encoding for v2 Parquet files #13886

Merged
merged 8 commits into from
Aug 17, 2023
106 changes: 70 additions & 36 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,6 @@ namespace parquet {
namespace gpu {

namespace {
// Spark doesn't support RLE encoding for BOOLEANs
#ifdef ENABLE_BOOL_RLE
constexpr bool enable_bool_rle = true;
#else
constexpr bool enable_bool_rle = false;
#endif

using ::cudf::detail::device_2dspan;

Expand All @@ -78,6 +72,7 @@ struct frag_init_state_s {
struct page_enc_state_s {
uint8_t* cur; //!< current output ptr
uint8_t* rle_out; //!< current RLE write ptr
uint8_t* rle_len_pos; //!< position to write RLE length (for V2 boolean data)
uint32_t rle_run; //!< current RLE run
uint32_t run_val; //!< current RLE run value
uint32_t rle_pos; //!< RLE encoder positions
Expand Down Expand Up @@ -210,6 +205,27 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t)
}
}

Encoding __device__ determine_encoding(PageType page_type,
Type physical_type,
bool use_dictionary,
bool write_v2_headers)
{
// NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and
// RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and
// data pages (actual encoding is identical).
switch (page_type) {
case PageType::DATA_PAGE: return use_dictionary ? Encoding::PLAIN_DICTIONARY : Encoding::PLAIN;
case PageType::DATA_PAGE_V2:
// TODO need to work in delta encodings here when they're added
return physical_type == BOOLEAN ? Encoding::RLE
: use_dictionary ? Encoding::RLE_DICTIONARY
: Encoding::PLAIN;
case PageType::DICTIONARY_PAGE:
return write_v2_headers ? Encoding::PLAIN : Encoding::PLAIN_DICTIONARY;
default: CUDF_UNREACHABLE("unsupported page type");
}
}

} // anonymous namespace

// blockDim {512,1,1}
Expand Down Expand Up @@ -474,7 +490,11 @@ __global__ void __launch_bounds__(128)
page_g.num_values = values_in_page;
auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page);
auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page);
auto const max_data_size = page_size + def_level_size + rep_level_size;
if (write_v2_headers && col_g.physical_type == BOOLEAN) {
// if RLE encoding booleans then will need an extra 4 bytes
vuule marked this conversation as resolved.
Show resolved Hide resolved
page_size += 4;
}
auto const max_data_size = page_size + def_level_size + rep_level_size;
// page size must fit in 32-bit signed integer
if (max_data_size > std::numeric_limits<int32_t>::max()) {
CUDF_UNREACHABLE("page size exceeds maximum for i32");
Expand Down Expand Up @@ -967,7 +987,8 @@ __global__ void __launch_bounds__(128, 8)
gpuEncodePages(device_span<gpu::EncPage> pages,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_results)
device_span<compression_result> comp_results,
bool write_v2_headers)
{
__shared__ __align__(8) page_enc_state_s state_g;
using block_reduce = cub::BlockReduce<uint32_t, block_size>;
Expand All @@ -990,6 +1011,7 @@ __global__ void __launch_bounds__(128, 8)
s->page.def_lvl_bytes = 0;
s->page.rep_lvl_bytes = 0;
s->page.num_nulls = 0;
s->rle_len_pos = nullptr;
}
__syncthreads();

Expand Down Expand Up @@ -1132,9 +1154,15 @@ __global__ void __launch_bounds__(128, 8)
s->rle_pos = 0;
s->rle_numvals = 0;
s->rle_out = dst;
s->page.encoding =
determine_encoding(s->page.page_type, physical_type, s->ck.use_dictionary, write_v2_headers);
if (dict_bits >= 0 && physical_type != BOOLEAN) {
dst[0] = dict_bits;
s->rle_out = dst + 1;
} else if (is_v2 && physical_type == BOOLEAN) {
// save space for RLE length. we don't know the total length yet.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is such adjustment only needed for bool RLE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So RLE is only used for 1) rep/def levels, 2) dictionary indexes, and 3) booleans. So booleans are the only data type that will be RLE encoded...everything else will be plain or dictionary (or eventually delta).

There's actually a table in the spec that lists when a length is prepended and when it is not.

+--------------+------------------------+-----------------+
| Page kind    | RLE-encoded data kind  | Prepend length? |
+--------------+------------------------+-----------------+
| Data page v1 | Definition levels      | Y               |
|              | Repetition levels      | Y               |
|              | Dictionary indices     | N               |
|              | Boolean values         | Y               |
+--------------+------------------------+-----------------+
| Data page v2 | Definition levels      | N               |
|              | Repetition levels      | N               |
|              | Dictionary indices     | N               |
|              | Boolean values         | Y               |
+--------------+------------------------+-----------------+

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I get it, thanks.
And this is something that does not make sense to include in fragment size?
I'm asking because this "max RLE page size" is implemented as a util function in the ORC writer, so this manual padding looked odd in comparison.

BTW, must be nice not having RLE in almost every data type (cries in ORC).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this is something that does not make sense to include in fragment size?

yeah, because it's just once per page, not every fragment. although, yeah, maybe it should be included in the calculation at line 451. I'll ponder that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I think that fixed it. nice catch! That would have been a lot of fun tracking down later :P

s->rle_out = dst + 4;
vuule marked this conversation as resolved.
Show resolved Hide resolved
s->rle_len_pos = dst;
}
s->page_start_val = row_to_value_idx(s->page.start_row, s->col);
s->chunk_start_val = row_to_value_idx(s->ck.start_row, s->col);
Expand Down Expand Up @@ -1188,7 +1216,7 @@ __global__ void __launch_bounds__(128, 8)
}
rle_numvals += rle_numvals_in_block;
__syncthreads();
if ((!enable_bool_rle) && (physical_type == BOOLEAN)) {
if ((!is_v2) && (physical_type == BOOLEAN)) {
bdice marked this conversation as resolved.
Show resolved Hide resolved
PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t);
} else {
RleEncode(s, rle_numvals, dict_bits, (cur_val_idx == s->page.num_leaf_values), t);
Expand Down Expand Up @@ -1345,29 +1373,45 @@ __global__ void __launch_bounds__(128, 8)

uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid);

// save RLE length if necessary
if (s->rle_len_pos != nullptr && t < 32) {
// size doesn't include the 4 bytes for the length
auto const rle_size = static_cast<uint32_t>(s->cur - s->rle_len_pos) - 4;
vuule marked this conversation as resolved.
Show resolved Hide resolved
if (t < 4) { s->rle_len_pos[t] = rle_size >> (t * 8); }
__syncwarp();
}

// V2 does not compress rep and def level data
size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes;

if (t == 0) {
s->page.num_nulls = s->page.num_values - valid_count;
uint8_t* base = s->page.page_data + s->page.max_hdr_size;
auto actual_data_size = static_cast<uint32_t>(s->cur - base);
s->page.num_nulls = s->page.num_values - valid_count;
uint8_t* const base = s->page.page_data + s->page.max_hdr_size;
auto const actual_data_size = static_cast<uint32_t>(s->cur - base);
if (actual_data_size > s->page.max_data_size) {
CUDF_UNREACHABLE("detected possible page data corruption");
}
s->page.max_data_size = actual_data_size;
if (not comp_in.empty()) {
// V2 does not compress rep and def level data
size_t const skip_comp_size = s->page.def_lvl_bytes + s->page.rep_lvl_bytes;
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_in[blockIdx.x] = {base + skip_comp_size, actual_data_size - skip_comp_size};
comp_out[blockIdx.x] = {s->page.compressed_data + s->page.max_hdr_size + skip_comp_size,
0}; // size is unused
// copy uncompressed bytes over
memcpy(s->page.compressed_data + s->page.max_hdr_size, base, skip_comp_size);
}
pages[blockIdx.x] = s->page;
if (not comp_results.empty()) {
comp_results[blockIdx.x] = {0, compression_status::FAILURE};
pages[blockIdx.x].comp_res = &comp_results[blockIdx.x];
}
}

// copy over uncompressed data
if (skip_comp_size != 0 && not comp_in.empty()) {
uint8_t const* const src = s->page.page_data + s->page.max_hdr_size;
uint8_t* const dst = s->page.compressed_data + s->page.max_hdr_size;
for (int i = t; i < skip_comp_size; i += block_size) {
dst[i] = src[i];
}
}
}

constexpr int decide_compression_warps_in_block = 4;
Expand Down Expand Up @@ -1865,28 +1909,16 @@ __global__ void __launch_bounds__(128)
}
header_encoder encoder(hdr_start);
PageType page_type = page_g.page_type;
// NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and
// RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and
// data pages (actual encoding is identical).
Encoding encoding;
if (enable_bool_rle) {
encoding = (col_g.physical_type == BOOLEAN) ? Encoding::RLE
: (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary)
? Encoding::PLAIN_DICTIONARY
: Encoding::PLAIN;
} else {
encoding = (page_type == PageType::DICTIONARY_PAGE || page_g.chunk->use_dictionary)
? Encoding::PLAIN_DICTIONARY
: Encoding::PLAIN;
}

encoder.field_int32(1, page_type);
encoder.field_int32(2, uncompressed_page_size);
encoder.field_int32(3, compressed_page_size);

if (page_type == PageType::DATA_PAGE) {
// DataPageHeader
encoder.field_struct_begin(5);
encoder.field_int32(1, page_g.num_values); // NOTE: num_values != num_rows for list types
encoder.field_int32(2, encoding); // encoding
encoder.field_int32(2, page_g.encoding); // encoding
encoder.field_int32(3, Encoding::RLE); // definition_level_encoding
encoder.field_int32(4, Encoding::RLE); // repetition_level_encoding
// Optionally encode page-level statistics
Expand All @@ -1898,11 +1930,12 @@ __global__ void __launch_bounds__(128)
}
encoder.field_struct_end(5);
} else if (page_type == PageType::DATA_PAGE_V2) {
// DataPageHeaderV2
encoder.field_struct_begin(8);
encoder.field_int32(1, page_g.num_values);
encoder.field_int32(2, page_g.num_nulls);
encoder.field_int32(3, page_g.num_rows);
encoder.field_int32(4, encoding);
encoder.field_int32(4, page_g.encoding);
encoder.field_int32(5, page_g.def_lvl_bytes);
encoder.field_int32(6, page_g.rep_lvl_bytes);
encoder.field_bool(7, ck_g.is_compressed); // TODO can compress at page level now
Expand All @@ -1918,7 +1951,7 @@ __global__ void __launch_bounds__(128)
// DictionaryPageHeader
encoder.field_struct_begin(7);
encoder.field_int32(1, ck_g.num_dict_entries); // number of values in dictionary
encoder.field_int32(2, encoding);
encoder.field_int32(2, page_g.encoding);
encoder.field_struct_end(7);
}
encoder.end(&hdr_end, false);
Expand Down Expand Up @@ -2240,13 +2273,14 @@ void EncodePages(device_span<gpu::EncPage> pages,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_results,
bool write_v2_headers,
rmm::cuda_stream_view stream)
{
auto num_pages = pages.size();
// A page is part of one column. This is launching 1 block per page. 1 block will exclusively
// deal with one datatype.
gpuEncodePages<128>
<<<num_pages, 128, 0, stream.value()>>>(pages, comp_in, comp_out, comp_results);
gpuEncodePages<128><<<num_pages, 128, 0, stream.value()>>>(
pages, comp_in, comp_out, comp_results, write_v2_headers);
}

void DecideCompression(device_span<EncColumnChunk> chunks, rmm::cuda_stream_view stream)
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ struct EncPage {
uint8_t* compressed_data; //!< Ptr to compressed page
uint16_t num_fragments; //!< Number of fragments in page
PageType page_type; //!< Page type
Encoding encoding; //!< Encoding used for page data
EncColumnChunk* chunk; //!< Chunk that this page belongs to
uint32_t chunk_id; //!< Index in chunk array
uint32_t hdr_size; //!< Size of page header
Expand Down Expand Up @@ -676,12 +677,14 @@ void InitEncoderPages(cudf::detail::device_2dspan<EncColumnChunk> chunks,
* @param[out] comp_in Compressor input buffers
* @param[out] comp_in Compressor output buffers
bdice marked this conversation as resolved.
Show resolved Hide resolved
* @param[out] comp_stats Compressor results
bdice marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] write_v2_headers True if V2 page headers should be written
etseidl marked this conversation as resolved.
Show resolved Hide resolved
* @param[in] stream CUDA stream to use, default 0
bdice marked this conversation as resolved.
Show resolved Hide resolved
*/
void EncodePages(device_span<EncPage> pages,
device_span<device_span<uint8_t const>> comp_in,
device_span<device_span<uint8_t>> comp_out,
device_span<compression_result> comp_res,
bool write_v2_headers,
rmm::cuda_stream_view stream);

/**
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1266,6 +1266,7 @@ void init_encoder_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
* @param comp_stats optional compression statistics (nullopt if none)
* @param compression compression format
* @param column_index_truncate_length maximum length of min or max values in column index, in bytes
* @param write_v2_headers True if V2 page headers should be written
* @param stream CUDA stream used for device memory operations and kernel launches
*/
void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
Expand All @@ -1280,6 +1281,7 @@ void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
std::optional<writer_compression_statistics>& comp_stats,
Compression compression,
int32_t column_index_truncate_length,
bool write_v2_headers,
rmm::cuda_stream_view stream)
{
auto batch_pages = pages.subspan(first_page_in_batch, pages_in_batch);
Expand All @@ -1300,7 +1302,7 @@ void encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
comp_res.end(),
compression_result{0, compression_status::FAILURE});

gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream);
gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, write_v2_headers, stream);
switch (compression) {
case parquet::Compression::SNAPPY:
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
Expand Down Expand Up @@ -1926,6 +1928,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta,
comp_stats,
compression,
column_index_truncate_length,
write_v2_headers,
stream);

bool need_sync{false};
Expand Down