From 302861dcc3eb499a8a1de151aeaf8cc12cc6322a Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Wed, 6 Dec 2023 13:48:51 -0800 Subject: [PATCH] PARQUET-2261 Size Statistics (#14000) Adds Parquet size statistics introduced in https://github.com/apache/parquet-format/pull/197. Authors: - Ed Seidl (https://github.com/etseidl) - Nghia Truong (https://github.com/ttnghia) Approvers: - Vukasin Milovanovic (https://github.com/vuule) - Nghia Truong (https://github.com/ttnghia) - Yunsong Wang (https://github.com/PointKernel) URL: https://github.com/rapidsai/cudf/pull/14000 --- .../io/parquet/compact_protocol_reader.cpp | 31 +- .../io/parquet/compact_protocol_reader.hpp | 1 + .../io/parquet/compact_protocol_writer.cpp | 38 +- .../io/parquet/compact_protocol_writer.hpp | 5 + cpp/src/io/parquet/page_enc.cu | 341 +++++++++++++----- cpp/src/io/parquet/parquet.hpp | 95 +++-- cpp/src/io/parquet/parquet_gpu.hpp | 36 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 23 ++ cpp/src/io/parquet/writer_impl.cu | 177 ++++++++- cpp/tests/io/parquet_test.cpp | 120 +++++- 10 files changed, 717 insertions(+), 150 deletions(-) diff --git a/cpp/src/io/parquet/compact_protocol_reader.cpp b/cpp/src/io/parquet/compact_protocol_reader.cpp index 5a2b8aa8f2a..e0b2471b30e 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.cpp +++ b/cpp/src/io/parquet/compact_protocol_reader.cpp @@ -289,7 +289,7 @@ class parquet_field_union_struct : public parquet_field { inline bool operator()(CompactProtocolReader* cpr, int field_type) { T v; - bool const res = parquet_field_struct(field(), v).operator()(cpr, field_type); + bool const res = parquet_field_struct{field(), v}(cpr, field_type); if (!res) { val = v; enum_val = static_cast(field()); @@ -424,7 +424,7 @@ class parquet_field_optional : public parquet_field { inline bool operator()(CompactProtocolReader* cpr, int field_type) { T v; - bool const res = FieldFunctor(field(), v).operator()(cpr, field_type); + bool const res = FieldFunctor{field(), v}(cpr, field_type); if (!res) { val = v; } return res; } @@ -631,6 +631,8 @@ bool CompactProtocolReader::read(ColumnChunk* c) bool CompactProtocolReader::read(ColumnChunkMetaData* c) { + using optional_size_statistics = + parquet_field_optional>; auto op = std::make_tuple(parquet_field_enum(1, c->type), parquet_field_enum_list(2, c->encodings), parquet_field_string_list(3, c->path_in_schema), @@ -641,7 +643,8 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c) parquet_field_int64(9, c->data_page_offset), parquet_field_int64(10, c->index_page_offset), parquet_field_int64(11, c->dictionary_page_offset), - parquet_field_struct(12, c->statistics)); + parquet_field_struct(12, c->statistics), + optional_size_statistics(16, c->size_statistics)); return function_builder(this, op); } @@ -700,17 +703,35 @@ bool CompactProtocolReader::read(PageLocation* p) bool CompactProtocolReader::read(OffsetIndex* o) { - auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations)); + using optional_list_i64 = parquet_field_optional, parquet_field_int64_list>; + + auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations), + optional_list_i64(2, o->unencoded_byte_array_data_bytes)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(SizeStatistics* s) +{ + using optional_i64 = parquet_field_optional; + using optional_list_i64 = parquet_field_optional, parquet_field_int64_list>; + + auto op = std::make_tuple(optional_i64(1, s->unencoded_byte_array_data_bytes), + optional_list_i64(2, s->repetition_level_histogram), + optional_list_i64(3, s->definition_level_histogram)); return function_builder(this, op); } bool CompactProtocolReader::read(ColumnIndex* c) { + using optional_list_i64 = parquet_field_optional, parquet_field_int64_list>; + auto op = std::make_tuple(parquet_field_bool_list(1, c->null_pages), parquet_field_binary_list(2, c->min_values), parquet_field_binary_list(3, c->max_values), parquet_field_enum(4, c->boundary_order), - parquet_field_int64_list(5, c->null_counts)); + parquet_field_int64_list(5, c->null_counts), + optional_list_i64(6, c->repetition_level_histogram), + optional_list_i64(7, c->definition_level_histogram)); return function_builder(this, op); } diff --git a/cpp/src/io/parquet/compact_protocol_reader.hpp b/cpp/src/io/parquet/compact_protocol_reader.hpp index cbb4161b138..bd4fa7f01ca 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.hpp +++ b/cpp/src/io/parquet/compact_protocol_reader.hpp @@ -116,6 +116,7 @@ class CompactProtocolReader { bool read(KeyValue* k); bool read(PageLocation* p); bool read(OffsetIndex* o); + bool read(SizeStatistics* s); bool read(ColumnIndex* c); bool read(Statistics* s); bool read(ColumnOrder* c); diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index fbeda7f1099..f857b75f707 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -182,6 +182,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s) if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); } if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); } c.field_struct(12, s.statistics); + if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); } return c.value(); } @@ -210,6 +211,24 @@ size_t CompactProtocolWriter::write(OffsetIndex const& s) { CompactProtocolFieldWriter c(*this); c.field_struct_list(1, s.page_locations); + if (s.unencoded_byte_array_data_bytes.has_value()) { + c.field_int_list(2, s.unencoded_byte_array_data_bytes.value()); + } + return c.value(); +} + +size_t CompactProtocolWriter::write(SizeStatistics const& s) +{ + CompactProtocolFieldWriter c(*this); + if (s.unencoded_byte_array_data_bytes.has_value()) { + c.field_int(1, s.unencoded_byte_array_data_bytes.value()); + } + if (s.repetition_level_histogram.has_value()) { + c.field_int_list(2, s.repetition_level_histogram.value()); + } + if (s.definition_level_histogram.has_value()) { + c.field_int_list(3, s.definition_level_histogram.value()); + } return c.value(); } @@ -286,13 +305,26 @@ inline void CompactProtocolFieldWriter::field_int(int field, int64_t val) current_field_value = field; } +template <> +inline void CompactProtocolFieldWriter::field_int_list(int field, + std::vector const& val) +{ + put_field_header(field, current_field_value, ST_FLD_LIST); + put_byte(static_cast((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I64)); + if (val.size() >= 0xfUL) { put_uint(val.size()); } + for (auto const v : val) { + put_int(v); + } + current_field_value = field; +} + template inline void CompactProtocolFieldWriter::field_int_list(int field, std::vector const& val) { put_field_header(field, current_field_value, ST_FLD_LIST); - put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I32)); - if (val.size() >= 0xf) put_uint(val.size()); - for (auto& v : val) { + put_byte(static_cast((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I32)); + if (val.size() >= 0xfUL) { put_uint(val.size()); } + for (auto const& v : val) { put_int(static_cast(v)); } current_field_value = field; diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index 4849a814b14..a2ed0f1f4dc 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -51,6 +51,7 @@ class CompactProtocolWriter { size_t write(Statistics const&); size_t write(PageLocation const&); size_t write(OffsetIndex const&); + size_t write(SizeStatistics const&); size_t write(ColumnOrder const&); protected: @@ -113,4 +114,8 @@ class CompactProtocolFieldWriter { inline void set_current_field(int const& field); }; +template <> +inline void CompactProtocolFieldWriter::field_int_list(int field, + std::vector const& val); + } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index ba751548e3f..976559a8b38 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -180,13 +180,15 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) auto const nvals = s->frag.num_leaf_values; auto const start_value_idx = s->frag.start_value_idx; + uint32_t num_valid = 0; + uint32_t len = 0; for (uint32_t i = 0; i < nvals; i += block_size) { auto const val_idx = start_value_idx + i + t; auto const is_valid = i + t < nvals && val_idx < s->col.leaf_column->size() && s->col.leaf_column->is_valid(val_idx); - uint32_t len; if (is_valid) { - len = dtype_len; + num_valid++; + len += dtype_len; if (physical_type == BYTE_ARRAY) { switch (leaf_type) { case type_id::STRING: { @@ -201,17 +203,22 @@ void __device__ calculate_frag_size(frag_init_state_s* const s, int t) default: CUDF_UNREACHABLE("Unsupported data type for leaf column"); } } - } else { - len = 0; } + } + __syncthreads(); + auto const total_len = block_reduce(reduce_storage).Sum(len); + auto const total_valid = block_reduce(reduce_storage).Sum(num_valid); - len = block_reduce(reduce_storage).Sum(len); - if (t == 0) { s->frag.fragment_data_size += len; } - __syncthreads(); - // page fragment size must fit in a 32-bit signed integer - if (s->frag.fragment_data_size > std::numeric_limits::max()) { - CUDF_UNREACHABLE("page fragment size exceeds maximum for i32"); - } + if (t == 0) { + s->frag.fragment_data_size = total_len; + s->frag.num_valid = total_valid; + } + + __syncthreads(); + // page fragment size must fit in a 32-bit signed integer + if (s->frag.fragment_data_size > static_cast(std::numeric_limits::max())) { + // TODO need to propagate this error back to the host + CUDF_UNREACHABLE("page fragment size exceeds maximum for i32"); } } @@ -241,6 +248,86 @@ Encoding __device__ determine_encoding(PageType page_type, } } +/** + * @brief Generate level histogram for a page. + * + * For definition levels, the histogram values h(0)...h(max_def-1) represent nulls at + * various levels of the hierarchy, and h(max_def) is the number of non-null values (num_valid). + * If the leaf level is nullable, then num_leaf_values is h(max_def-1) + h(max_def), + * and h(max_def-1) is num_leaf_values - num_valid. h(0) is derivable as num_values - + * sum(h(1)..h(max_def)). + * + * For repetition levels, h(0) equals the number of rows. Here we can calculate + * h(1)..h(max_rep-1), set h(0) directly, and then obtain h(max_rep) in the same way as + * for the definition levels. + * + * @param hist Pointer to the histogram (size is max_level + 1) + * @param s Page encode state + * @param lvl_data Pointer to the global repetition or definition level data + * @param lvl_end Last element of the histogram to encode (exclusive) + */ +template +void __device__ +generate_page_histogram(uint32_t* hist, state_buf const* s, uint8_t const* lvl_data, int lvl_end) +{ + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage temp_storage; + + auto const t = threadIdx.x; + auto const page_first_val_idx = s->col.level_offsets[s->page.start_row]; + auto const col_last_val_idx = s->col.level_offsets[s->col.num_rows]; + + // h(0) is always derivable, so start at 1 + for (int lvl = 1; lvl < lvl_end; lvl++) { + int nval_in_level = 0; + for (int i = 0; i < s->page.num_values; i += block_size) { + auto const lidx = i + t; + auto const gidx = page_first_val_idx + lidx; + if (lidx < s->page.num_values && gidx < col_last_val_idx && lvl_data[gidx] == lvl) { + nval_in_level++; + } + } + __syncthreads(); + auto const lvl_sum = block_reduce(temp_storage).Sum(nval_in_level); + if (t == 0) { hist[lvl] = lvl_sum; } + } +} + +/** + * @brief Generate definition level histogram for a block of values. + * + * This is used when the max repetition level is 0 (no lists) and the definition + * level data is not calculated in advance for the entire column. + * + * @param hist Pointer to the histogram (size is max_def_level + 1) + * @param s Page encode state + * @param nrows Number of rows to process + * @param rle_numvals Index (relative to start of page) of the first level value + * @param maxlvl Last element of the histogram to encode (exclusive) + */ +template +void __device__ generate_def_level_histogram(uint32_t* hist, + rle_page_enc_state_s const* s, + uint32_t nrows, + uint32_t rle_numvals, + uint32_t maxlvl) +{ + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage temp_storage; + auto const t = threadIdx.x; + + // Do a block sum for each level rather than each thread trying an atomicAdd. + // This way is much faster. + auto const mylvl = s->vals[rolling_index(rle_numvals + t)]; + // We can start at 1 because hist[0] can be derived. + for (uint32_t lvl = 1; lvl < maxlvl; lvl++) { + uint32_t const is_yes = t < nrows and mylvl == lvl; + auto const lvl_sum = block_reduce(temp_storage).Sum(is_yes); + if (t == 0) { hist[lvl] += lvl_sum; } + __syncthreads(); + } +} + // operator to use with warp_reduce. stolen from cub::Sum struct BitwiseOr { /// Binary OR operator, returns a | b @@ -254,17 +341,14 @@ struct BitwiseOr { // PT is the parquet physical type (INT32 or INT64). // I is the column type from the input table. template -__device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, - uint32_t valid_count, - uint64_t* buffer, - void* temp_space) +__device__ uint8_t const* delta_encode(page_enc_state_s<0>* s, uint64_t* buffer, void* temp_space) { using output_type = std::conditional_t; __shared__ delta_binary_packer packer; auto const t = threadIdx.x; if (t == 0) { - packer.init(s->cur, valid_count, reinterpret_cast(buffer), temp_space); + packer.init(s->cur, s->page.num_valid, reinterpret_cast(buffer), temp_space); } __syncthreads(); @@ -457,7 +541,9 @@ __global__ void __launch_bounds__(128) uint32_t rows_in_page = 0; uint32_t values_in_page = 0; uint32_t leaf_values_in_page = 0; + uint32_t num_valid = 0; size_t page_size = 0; + size_t var_bytes_size = 0; uint32_t num_pages = 0; uint32_t num_rows = 0; uint32_t page_start = 0; @@ -604,6 +690,7 @@ __global__ void __launch_bounds__(128) page_g.num_rows = rows_in_page; page_g.num_leaf_values = leaf_values_in_page; page_g.num_values = values_in_page; + page_g.num_valid = num_valid; auto const def_level_size = max_RLE_page_size(col_g.num_def_level_bits(), values_in_page); auto const rep_level_size = max_RLE_page_size(col_g.num_rep_level_bits(), values_in_page); // get a different bound if using delta encoding @@ -616,6 +703,12 @@ __global__ void __launch_bounds__(128) if (max_data_size > std::numeric_limits::max()) { CUDF_UNREACHABLE("page size exceeds maximum for i32"); } + // if byte_array then save the variable bytes size + if (ck_g.col_desc->physical_type == BYTE_ARRAY) { + // Page size is the sum of frag sizes, and frag sizes for strings includes the + // 4-byte length indicator, so subtract that. + page_g.var_bytes_size = var_bytes_size; + } page_g.max_data_size = static_cast(max_data_size); pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; @@ -632,6 +725,7 @@ __global__ void __launch_bounds__(128) __syncwarp(); if (t == 0) { if (not pages.empty()) { + // set encoding if (is_use_delta) { page_g.kernel_mask = encode_kernel_mask::DELTA_BINARY; } else if (ck_g.use_dictionary || physical_type == BOOLEAN) { @@ -639,6 +733,16 @@ __global__ void __launch_bounds__(128) } else { page_g.kernel_mask = encode_kernel_mask::PLAIN; } + // need space for the chunk histograms plus data page histograms + auto const num_histograms = num_pages - ck_g.num_dict_pages(); + if (ck_g.def_histogram_data != nullptr && col_g.max_def_level > 0) { + page_g.def_histogram = + ck_g.def_histogram_data + num_histograms * (col_g.max_def_level + 1); + } + if (ck_g.rep_histogram_data != nullptr && col_g.max_rep_level > 0) { + page_g.rep_histogram = + ck_g.rep_histogram_data + num_histograms * (col_g.max_rep_level + 1); + } pages[ck_g.first_page + num_pages] = page_g; } if (not page_sizes.empty()) { @@ -649,18 +753,23 @@ __global__ void __launch_bounds__(128) num_pages++; page_size = 0; + var_bytes_size = 0; rows_in_page = 0; values_in_page = 0; leaf_values_in_page = 0; + num_valid = 0; page_start = fragments_in_chunk; max_stats_len = 0; } max_stats_len = max(max_stats_len, minmax_len); num_dict_entries += frag_g.num_dict_vals; page_size += fragment_data_size; + // fragment_data_size includes the length indicator...remove it + var_bytes_size += frag_g.fragment_data_size - frag_g.num_valid * sizeof(size_type); rows_in_page += frag_g.num_rows; values_in_page += frag_g.num_values; leaf_values_in_page += frag_g.num_leaf_values; + num_valid += frag_g.num_valid; num_rows += frag_g.num_rows; fragments_in_chunk++; } while (frag_g.num_rows != 0); @@ -1195,6 +1304,13 @@ __global__ void __launch_bounds__(block_size, 8) gpuEncodePageLevels(device_span }(); s->vals[rolling_idx(rle_numvals + t)] = def_lvl; __syncthreads(); + // if max_def <= 1, then the histogram is trivial to calculate + if (s->page.def_histogram != nullptr and s->col.max_def_level > 1) { + // Only calculate up to max_def_level...the last entry is valid_count and will be filled + // in later. + generate_def_level_histogram( + s->page.def_histogram, s, nrows, rle_numvals, s->col.max_def_level); + } rle_numvals += nrows; RleEncode(s, rle_numvals, def_lvl_bits, (rle_numvals == s->page.num_rows), t); __syncthreads(); @@ -1267,7 +1383,6 @@ __global__ void __launch_bounds__(block_size, 8) gpuEncodePageLevels(device_span template __device__ void finish_page_encode(state_buf* s, - uint32_t valid_count, uint8_t const* end_ptr, device_span pages, device_span> comp_in, @@ -1277,13 +1392,67 @@ __device__ void finish_page_encode(state_buf* s, { auto const t = threadIdx.x; + // returns sum of histogram values from [1..max_level) + auto histogram_sum = [](uint32_t* const hist, int max_level) { + auto const hist_start = hist + 1; + auto const hist_end = hist + max_level; + return thrust::reduce(thrust::seq, hist_start, hist_end, 0U); + }; + // V2 does not compress rep and def level data size_t const skip_comp_size = write_v2_headers ? s->page.def_lvl_bytes + s->page.rep_lvl_bytes : 0; + // this will be true if max_rep > 0 (i.e. there are lists) + if (s->page.rep_histogram != nullptr) { + // for repetition we get hist[0] from num_rows, and can derive hist[max_rep_level] + if (s->col.max_rep_level > 1) { + generate_page_histogram( + s->page.rep_histogram, s, s->col.rep_values, s->col.max_rep_level); + } + + if (t == 0) { + // rep_hist[0] is num_rows, we have rep_hist[1..max_rep_level) calculated, so + // rep_hist[max_rep_level] is num_values minus the sum of the preceding values. + s->page.rep_histogram[0] = s->page.num_rows; + s->page.rep_histogram[s->col.max_rep_level] = + s->page.num_values - s->page.num_rows - + histogram_sum(s->page.rep_histogram, s->col.max_rep_level); + } + __syncthreads(); + + if (s->page.def_histogram != nullptr) { + // For definition, we know `hist[max_def_level] = num_valid`. If the leaf level is + // nullable, then `hist[max_def_level - 1] = num_leaf_values - num_valid`. Finally, + // hist[0] can be derived as `num_values - sum(hist[1]..hist[max_def_level])`. + bool const is_leaf_nullable = s->col.leaf_column->nullable(); + auto const last_lvl = is_leaf_nullable ? s->col.max_def_level - 1 : s->col.max_def_level; + if (last_lvl > 1) { + generate_page_histogram(s->page.def_histogram, s, s->col.def_values, last_lvl); + } + + if (t == 0) { + s->page.def_histogram[s->col.max_def_level] = s->page.num_valid; + if (is_leaf_nullable) { + s->page.def_histogram[last_lvl] = s->page.num_leaf_values - s->page.num_valid; + } + s->page.def_histogram[0] = s->page.num_values - s->page.num_leaf_values - + histogram_sum(s->page.def_histogram, last_lvl); + } + } + } else if (s->page.def_histogram != nullptr) { + // finish off what was started in generate_def_level_histogram + if (t == 0) { + // `hist[max_def_level] = num_valid`, and the values for hist[1..max_def_level) are known + s->page.def_histogram[s->col.max_def_level] = s->page.num_valid; + s->page.def_histogram[0] = s->page.num_values - s->page.num_valid - + histogram_sum(s->page.def_histogram, s->col.max_def_level); + } + } + if (t == 0) { // only need num_nulls for v2 data page headers - if (write_v2_headers) { s->page.num_nulls = s->page.num_values - valid_count; } + if (write_v2_headers) { s->page.num_nulls = s->page.num_values - s->page.num_valid; } uint8_t const* const base = s->page.page_data + s->page.max_hdr_size; auto const actual_data_size = static_cast(end_ptr - base); if (actual_data_size > s->page.max_data_size) { @@ -1324,12 +1493,8 @@ __global__ void __launch_bounds__(block_size, 8) bool write_v2_headers) { __shared__ __align__(8) page_enc_state_s<0> state_g; - using block_reduce = cub::BlockReduce; - using block_scan = cub::BlockScan; - __shared__ union { - typename block_reduce::TempStorage reduce_storage; - typename block_scan::TempStorage scan_storage; - } temp_storage; + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; auto* const s = &state_g; uint32_t t = threadIdx.x; @@ -1377,7 +1542,6 @@ __global__ void __launch_bounds__(block_size, 8) } __syncthreads(); - uint32_t num_valid = 0; for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); uint32_t len, pos; @@ -1403,7 +1567,6 @@ __global__ void __launch_bounds__(block_size, 8) return std::make_tuple(is_valid, val_idx); }(); - if (is_valid) { num_valid++; } cur_val_idx += nvals; // Non-dictionary encoding @@ -1423,7 +1586,7 @@ __global__ void __launch_bounds__(block_size, 8) len = 0; } uint32_t total_len = 0; - block_scan(temp_storage.scan_storage).ExclusiveSum(len, pos, total_len); + block_scan(scan_storage).ExclusiveSum(len, pos, total_len); __syncthreads(); if (t == 0) { s->cur = dst + total_len; } if (is_valid) { @@ -1550,10 +1713,8 @@ __global__ void __launch_bounds__(block_size, 8) __syncthreads(); } - uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid); - finish_page_encode( - s, valid_count, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); + s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } // DICTIONARY page data encoder @@ -1567,12 +1728,8 @@ __global__ void __launch_bounds__(block_size, 8) bool write_v2_headers) { __shared__ __align__(8) rle_page_enc_state_s state_g; - using block_reduce = cub::BlockReduce; - using block_scan = cub::BlockScan; - __shared__ union { - typename block_reduce::TempStorage reduce_storage; - typename block_scan::TempStorage scan_storage; - } temp_storage; + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; auto* const s = &state_g; uint32_t t = threadIdx.x; @@ -1633,7 +1790,6 @@ __global__ void __launch_bounds__(block_size, 8) } __syncthreads(); - uint32_t num_valid = 0; for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); @@ -1651,7 +1807,6 @@ __global__ void __launch_bounds__(block_size, 8) return std::make_tuple(is_valid, val_idx); }(); - if (is_valid) { num_valid++; } cur_val_idx += nvals; // Dictionary encoding @@ -1659,7 +1814,7 @@ __global__ void __launch_bounds__(block_size, 8) uint32_t rle_numvals; uint32_t rle_numvals_in_block; uint32_t pos; - block_scan(temp_storage.scan_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); + block_scan(scan_storage).ExclusiveSum(is_valid, pos, rle_numvals_in_block); rle_numvals = s->rle_numvals; if (is_valid) { uint32_t v; @@ -1683,8 +1838,6 @@ __global__ void __launch_bounds__(block_size, 8) __syncthreads(); } - uint32_t const valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid); - // save RLE length if necessary if (s->rle_len_pos != nullptr && t < 32) { // size doesn't include the 4 bytes for the length @@ -1694,7 +1847,7 @@ __global__ void __launch_bounds__(block_size, 8) } finish_page_encode( - s, valid_count, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); + s, s->cur, pages, comp_in, comp_out, comp_results, write_v2_headers); } // DELTA_BINARY_PACKED page data encoder @@ -1709,9 +1862,7 @@ __global__ void __launch_bounds__(block_size, 8) // block of shared memory for value storage and bit packing __shared__ uleb128_t delta_shared[delta::buffer_size + delta::block_size]; __shared__ __align__(8) page_enc_state_s<0> state_g; - using block_reduce = cub::BlockReduce; __shared__ union { - typename block_reduce::TempStorage reduce_storage; typename delta_binary_packer::index_scan::TempStorage delta_index_tmp; typename delta_binary_packer::block_reduce::TempStorage delta_reduce_tmp; typename delta_binary_packer::warp_reduce::TempStorage @@ -1758,58 +1909,36 @@ __global__ void __launch_bounds__(block_size, 8) } __syncthreads(); - // need to know the number of valid values for the null values calculation and to size - // the delta binary encoder. - uint32_t valid_count = 0; - if (not s->col.leaf_column->nullable()) { - valid_count = s->page.num_leaf_values; - } else { - uint32_t num_valid = 0; - for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { - uint32_t const nvals = min(s->page.num_leaf_values - cur_val_idx, block_size); - size_type const val_idx_in_block = cur_val_idx + t; - size_type const val_idx_in_leaf_col = s->page_start_val + val_idx_in_block; - - if (val_idx_in_leaf_col < s->col.leaf_column->size() && - val_idx_in_block < s->page.num_leaf_values && - s->col.leaf_column->is_valid(val_idx_in_leaf_col)) { - num_valid++; - } - cur_val_idx += nvals; - } - valid_count = block_reduce(temp_storage.reduce_storage).Sum(num_valid); - } - uint8_t const* delta_ptr = nullptr; // this will be the end of delta block pointer if (physical_type == INT32) { switch (dtype_len_in) { case 8: { // only DURATIONS map to 8 bytes, so safe to just use signed here? - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); break; } case 4: { if (type_id == type_id::UINT32) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } break; } case 2: { if (type_id == type_id::UINT16) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } break; } case 1: { if (type_id == type_id::UINT8) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } break; } @@ -1817,14 +1946,13 @@ __global__ void __launch_bounds__(block_size, 8) } } else { if (type_id == type_id::UINT64) { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } else { - delta_ptr = delta_encode(s, valid_count, delta_shared, &temp_storage); + delta_ptr = delta_encode(s, delta_shared, &temp_storage); } } - finish_page_encode( - s, valid_count, delta_ptr, pages, comp_in, comp_out, comp_results, true); + finish_page_encode(s, delta_ptr, pages, comp_in, comp_out, comp_results, true); } constexpr int decide_compression_warps_in_block = 4; @@ -2576,11 +2704,13 @@ __global__ void __launch_bounds__(1) if (column_stats.empty()) { return; } - EncColumnChunk* ck_g = &chunks[blockIdx.x]; - uint32_t num_pages = ck_g->num_pages; - parquet_column_device_view col_g = *ck_g->col_desc; - size_t first_data_page = ck_g->use_dictionary ? 1 : 0; - uint32_t pageidx = ck_g->first_page; + auto const ck_g = &chunks[blockIdx.x]; + uint32_t const num_pages = ck_g->num_pages; + auto const& col_g = *ck_g->col_desc; + uint32_t const first_data_page = ck_g->use_dictionary ? 1 : 0; + uint32_t const num_data_pages = num_pages - first_data_page; + uint32_t const pageidx = ck_g->first_page; + size_t var_bytes = 0; header_encoder encoder(ck_g->column_index_blob); @@ -2593,13 +2723,13 @@ __global__ void __launch_bounds__(1) : align8(ck_g->column_index_blob + ck_g->column_index_size - column_index_truncate_length); // null_pages - encoder.field_list_begin(1, num_pages - first_data_page, ST_FLD_TRUE); + encoder.field_list_begin(1, num_data_pages, ST_FLD_TRUE); for (uint32_t page = first_data_page; page < num_pages; page++) { encoder.put_bool(column_stats[pageidx + page].non_nulls == 0); } encoder.field_list_end(1); // min_values - encoder.field_list_begin(2, num_pages - first_data_page, ST_FLD_BINARY); + encoder.field_list_begin(2, num_data_pages, ST_FLD_BINARY); for (uint32_t page = first_data_page; page < num_pages; page++) { auto const [min_ptr, min_size] = get_extremum(&column_stats[pageidx + page].min_value, col_g.stats_dtype, @@ -2610,7 +2740,7 @@ __global__ void __launch_bounds__(1) } encoder.field_list_end(2); // max_values - encoder.field_list_begin(3, num_pages - first_data_page, ST_FLD_BINARY); + encoder.field_list_begin(3, num_data_pages, ST_FLD_BINARY); for (uint32_t page = first_data_page; page < num_pages; page++) { auto const [max_ptr, max_size] = get_extremum(&column_stats[pageidx + page].max_value, col_g.stats_dtype, @@ -2627,15 +2757,54 @@ __global__ void __launch_bounds__(1) col_g.converted_type, num_pages - first_data_page)); // null_counts - encoder.field_list_begin(5, num_pages - first_data_page, ST_FLD_I64); + encoder.field_list_begin(5, num_data_pages, ST_FLD_I64); for (uint32_t page = first_data_page; page < num_pages; page++) { encoder.put_int64(column_stats[pageidx + page].null_count); } encoder.field_list_end(5); + + // find pointers to chunk histograms + auto const cd = ck_g->col_desc; + auto const ck_def_hist = ck_g->def_histogram_data + (num_data_pages) * (cd->max_def_level + 1); + auto const ck_rep_hist = ck_g->rep_histogram_data + (num_data_pages) * (cd->max_rep_level + 1); + + auto const page_start = ck_g->pages + first_data_page; + auto const page_end = ck_g->pages + ck_g->num_pages; + + // optionally encode histograms and sum var_bytes. + if (cd->max_rep_level > REP_LVL_HIST_CUTOFF) { + encoder.field_list_begin(6, num_data_pages * (cd->max_rep_level + 1), ST_FLD_I64); + thrust::for_each(thrust::seq, page_start, page_end, [&] __device__(auto const& page) { + for (int i = 0; i < cd->max_rep_level + 1; i++) { + encoder.put_int64(page.rep_histogram[i]); + ck_rep_hist[i] += page.rep_histogram[i]; + } + }); + encoder.field_list_end(6); + } + + if (cd->max_def_level > DEF_LVL_HIST_CUTOFF) { + encoder.field_list_begin(7, num_data_pages * (cd->max_def_level + 1), ST_FLD_I64); + thrust::for_each(thrust::seq, page_start, page_end, [&] __device__(auto const& page) { + for (int i = 0; i < cd->max_def_level + 1; i++) { + encoder.put_int64(page.def_histogram[i]); + ck_def_hist[i] += page.def_histogram[i]; + } + }); + encoder.field_list_end(7); + } + + if (col_g.physical_type == BYTE_ARRAY) { + thrust::for_each(thrust::seq, page_start, page_end, [&] __device__(auto const& page) { + var_bytes += page.var_bytes_size; + }); + } + encoder.end(&col_idx_end, false); // now reset column_index_size to the actual size of the encoded column index blob ck_g->column_index_size = static_cast(col_idx_end - ck_g->column_index_blob); + ck_g->var_bytes_size = var_bytes; } void InitRowGroupFragments(device_2dspan frag, diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 9ab686b99d5..2b11f47a0a8 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -261,6 +261,67 @@ struct Statistics { thrust::optional> min_value; }; +/** + * @brief Thrift-derived struct containing statistics used to estimate page and column chunk sizes + */ +struct SizeStatistics { + // Number of variable-width bytes stored for the page/chunk. Should not be set for anything + // but the BYTE_ARRAY physical type. + thrust::optional unencoded_byte_array_data_bytes; + /** + * When present, there is expected to be one element corresponding to each + * repetition (i.e. size=max repetition_level+1) where each element + * represents the number of times the repetition level was observed in the + * data. + * + * This value should not be written if max_repetition_level is 0. + */ + thrust::optional> repetition_level_histogram; + + /** + * Same as repetition_level_histogram except for definition levels. + * + * This value should not be written if max_definition_level is 0 or 1. + */ + thrust::optional> definition_level_histogram; +}; + +/** + * @brief Thrift-derived struct describing page location information stored + * in the offsets index. + */ +struct PageLocation { + int64_t offset; // Offset of the page in the file + int32_t compressed_page_size; // Compressed page size in bytes plus the heeader length + int64_t first_row_index; // Index within the column chunk of the first row of the page. reset to + // 0 at the beginning of each column chunk +}; + +/** + * @brief Thrift-derived struct describing the offset index. + */ +struct OffsetIndex { + std::vector page_locations; + // per-page size info. see description of the same field in SizeStatistics. only present for + // columns with a BYTE_ARRAY physical type. + thrust::optional> unencoded_byte_array_data_bytes; +}; + +/** + * @brief Thrift-derived struct describing the column index. + */ +struct ColumnIndex { + std::vector null_pages; // Boolean used to determine if a page contains only null values + std::vector> min_values; // lower bound for values in each page + std::vector> max_values; // upper bound for values in each page + BoundaryOrder boundary_order = + BoundaryOrder::UNORDERED; // Indicates if min and max values are ordered + std::vector null_counts; // Optional count of null values per page + // Repetition/definition level histograms for the column chunk + thrust::optional> repetition_level_histogram; + thrust::optional> definition_level_histogram; +}; + /** * @brief Thrift-derived struct describing a column chunk */ @@ -279,6 +340,7 @@ struct ColumnChunkMetaData { int64_t dictionary_page_offset = 0; // Byte offset from the beginning of file to first (only) dictionary page Statistics statistics; // Encoded chunk-level statistics + thrust::optional size_statistics; // Size statistics for the chunk }; /** @@ -300,6 +362,9 @@ struct ColumnChunk { // Following fields are derived from other fields int schema_idx = -1; // Index in flattened schema (derived from path_in_schema) + // The indexes don't really live here, but it's a convenient place to hang them. + std::optional offset_index; + std::optional column_index; }; /** @@ -390,36 +455,6 @@ struct PageHeader { DataPageHeaderV2 data_page_header_v2; }; -/** - * @brief Thrift-derived struct describing page location information stored - * in the offsets index. - */ -struct PageLocation { - int64_t offset; // Offset of the page in the file - int32_t compressed_page_size; // Compressed page size in bytes plus the heeader length - int64_t first_row_index; // Index within the column chunk of the first row of the page. reset to - // 0 at the beginning of each column chunk -}; - -/** - * @brief Thrift-derived struct describing the offset index. - */ -struct OffsetIndex { - std::vector page_locations; -}; - -/** - * @brief Thrift-derived struct describing the column index. - */ -struct ColumnIndex { - std::vector null_pages; // Boolean used to determine if a page contains only null values - std::vector> min_values; // lower bound for values in each page - std::vector> max_values; // upper bound for values in each page - BoundaryOrder boundary_order = - BoundaryOrder::UNORDERED; // Indicates if min and max values are ordered - std::vector null_counts; // Optional count of null values per page -}; - // bit space we are reserving in column_buffer::user_data constexpr uint32_t PARQUET_COLUMN_BUFFER_SCHEMA_MASK = (0xff'ffffu); constexpr uint32_t PARQUET_COLUMN_BUFFER_FLAG_LIST_TERMINATED = (1 << 24); diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 25323cfaa9e..7f557d092c5 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -59,6 +59,20 @@ constexpr int rolling_index(int index) return index % rolling_size; } +// PARQUET-2261 allows for not writing the level histograms in certain cases. +// The repetition level histogram may be omitted when max_rep_level equals 0. The definition +// level histogram may be omitted when max_def_level equals 0 or 1. In the case of +// max_rep_level == 0, the rep histogram would have a single value equal to num_rows. In the +// case of max_def_level == 0, the def histogram would have a single value equal to num_values, +// and when max_def_level == 1, the histogram would be {num_nulls, num_values - num_nulls}. +// +// These constants control libcudf's behavior. Currently, each histogram will be written when +// max level is greater than 0. Even though this leads to some redundancy in the max_def_level == 1 +// case, having the histogram data relieves the reader from having to reconstruct it from the +// OffsetIndex and ColumnMetaData. +constexpr uint8_t REP_LVL_HIST_CUTOFF = 0; +constexpr uint8_t DEF_LVL_HIST_CUTOFF = 0; + // see setupLocalPageInfo() in page_decode.cuh for supported page encodings constexpr bool is_supported_encoding(Encoding enc) { @@ -410,8 +424,8 @@ struct parquet_column_device_view : stats_column_desc { //!< levels constexpr uint8_t num_def_level_bits() const { return level_bits & 0xf; } constexpr uint8_t num_rep_level_bits() const { return level_bits >> 4; } - size_type const* const* - nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level + uint8_t max_def_level; //!< needed for SizeStatistics calculation + uint8_t max_rep_level; size_type const* level_offsets; //!< Offset array for per-row pre-calculated rep/def level values uint8_t const* rep_values; //!< Pre-calculated repetition level values @@ -434,6 +448,7 @@ struct PageFragment { uint32_t start_value_idx; uint32_t num_leaf_values; //!< Number of leaf values in fragment. Does not include nulls at //!< non-leaf level + uint32_t num_valid; //data(), ender->footer_len); CUDF_EXPECTS(cp.read(this), "Cannot parse metadata"); CUDF_EXPECTS(cp.InitSchema(this), "Cannot initialize schema"); + + // loop through the column chunks and read column and offset indexes + for (auto& rg : row_groups) { + for (auto& col : rg.columns) { + if (col.column_index_length > 0 && col.column_index_offset > 0) { + auto const col_idx_buf = + source->host_read(col.column_index_offset, col.column_index_length); + cp.init(col_idx_buf->data(), col_idx_buf->size()); + ColumnIndex ci; + CUDF_EXPECTS(cp.read(&ci), "Cannot parse column index"); + col.column_index = std::move(ci); + } + if (col.offset_index_length > 0 && col.offset_index_offset > 0) { + auto const off_idx_buf = + source->host_read(col.offset_index_offset, col.offset_index_length); + cp.init(off_idx_buf->data(), off_idx_buf->size()); + OffsetIndex oi; + CUDF_EXPECTS(cp.read(&oi), "Cannot parse offset index"); + col.offset_index = std::move(oi); + } + } + } + sanitize_schema(); } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index c2b10e09b1a..c4a9a75bb5e 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -936,7 +937,9 @@ parquet_column_device_view parquet_column_view::get_device_view(rmm::cuda_stream desc.level_bits = CompactProtocolReader::NumRequiredBits(max_rep_level()) << 4 | CompactProtocolReader::NumRequiredBits(max_def_level()); - desc.nullability = _d_nullability.data(); + desc.nullability = _d_nullability.data(); + desc.max_def_level = _max_def_level; + desc.max_rep_level = _max_rep_level; return desc; } @@ -1370,6 +1373,9 @@ void encode_pages(hostdevice_2dvector& chunks, EncodePageHeaders(batch_pages, comp_res, batch_pages_stats, chunk_stats, stream); GatherPages(d_chunks_in_batch.flat_view(), pages, stream); + // By now, the var_bytes has been calculated in InitPages, and the histograms in EncodePages. + // EncodeColumnIndexes can encode the histograms in the ColumnIndex, and also sum up var_bytes + // and the histograms for inclusion in the chunk's SizeStats. if (column_stats != nullptr) { EncodeColumnIndexes(d_chunks_in_batch.flat_view(), {column_stats, pages.size()}, @@ -1395,10 +1401,13 @@ void encode_pages(hostdevice_2dvector& chunks, * column chunk. * * @param ck pointer to column chunk + * @param col `parquet_column_device_view` for the column * @param column_index_truncate_length maximum length of min or max values in column index, in bytes * @return Computed buffer size needed to encode the column index */ -size_t column_index_buffer_size(EncColumnChunk* ck, int32_t column_index_truncate_length) +size_t column_index_buffer_size(EncColumnChunk* ck, + parquet_column_device_view const& col, + int32_t column_index_truncate_length) { // encoding the column index for a given chunk requires: // each list (4 of them) requires 6 bytes of overhead @@ -1421,10 +1430,29 @@ size_t column_index_buffer_size(EncColumnChunk* ck, int32_t column_index_truncat // // add on some extra padding at the end (plus extra 7 bytes of alignment padding) // for scratch space to do stats truncation. - // + + // additional storage needed for SizeStatistics + // don't need stats for dictionary pages + auto const num_pages = ck->num_data_pages(); + + // only need variable length size info for BYTE_ARRAY + // 1 byte for marker, 1 byte vec type, 4 bytes length, 5 bytes per page for values + // (5 bytes is needed because the varint encoder only encodes 7 bits per byte) + auto const var_bytes_size = col.physical_type == BYTE_ARRAY ? 6 + 5 * num_pages : 0; + + // for the histograms, need 1 byte for marker, 1 byte vec type, 4 bytes length, + // (max_level + 1) * 5 bytes per page + auto const has_def = col.max_def_level > DEF_LVL_HIST_CUTOFF; + auto const has_rep = col.max_def_level > REP_LVL_HIST_CUTOFF; + auto const def_hist_size = has_def ? 6 + 5 * num_pages * (col.max_def_level + 1) : 0; + auto const rep_hist_size = has_rep ? 6 + 5 * num_pages * (col.max_rep_level + 1) : 0; + + // total size of SizeStruct is 1 byte marker, 1 byte end-of-struct, plus sizes for components + auto const size_struct_size = 2 + def_hist_size + rep_hist_size + var_bytes_size; + // calculating this per-chunk because the sizes can be wildly different. constexpr size_t padding = 7; - return ck->ck_stat_size * ck->num_pages + column_index_truncate_length + padding; + return ck->ck_stat_size * num_pages + column_index_truncate_length + padding + size_struct_size; } /** @@ -1827,14 +1855,16 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // Initialize batches of rowgroups to encode (mainly to limit peak memory usage) std::vector batch_list; - size_type num_pages = 0; - size_t max_uncomp_bfr_size = 0; - size_t max_comp_bfr_size = 0; - size_t max_chunk_bfr_size = 0; - size_type max_pages_in_batch = 0; - size_t bytes_in_batch = 0; - size_t comp_bytes_in_batch = 0; - size_t column_index_bfr_size = 0; + size_type num_pages = 0; + size_t max_uncomp_bfr_size = 0; + size_t max_comp_bfr_size = 0; + size_t max_chunk_bfr_size = 0; + size_type max_pages_in_batch = 0; + size_t bytes_in_batch = 0; + size_t comp_bytes_in_batch = 0; + size_t column_index_bfr_size = 0; + size_t def_histogram_bfr_size = 0; + size_t rep_histogram_bfr_size = 0; for (size_type r = 0, groups_in_batch = 0, pages_in_batch = 0; r <= num_rowgroups; r++) { size_t rowgroup_size = 0; size_t comp_rowgroup_size = 0; @@ -1849,7 +1879,19 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, max_chunk_bfr_size = std::max(max_chunk_bfr_size, (size_t)std::max(ck->bfr_size, ck->compressed_size)); if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { - column_index_bfr_size += column_index_buffer_size(ck, column_index_truncate_length); + auto const& col = col_desc[ck->col_desc_id]; + column_index_bfr_size += column_index_buffer_size(ck, col, column_index_truncate_length); + + // SizeStatistics are on the ColumnIndex, so only need to allocate the histograms data + // if we're doing page-level indexes. add 1 to num_pages for per-chunk histograms. + auto const num_histograms = ck->num_data_pages() + 1; + + if (col.max_def_level > DEF_LVL_HIST_CUTOFF) { + def_histogram_bfr_size += (col.max_def_level + 1) * num_histograms; + } + if (col.max_rep_level > REP_LVL_HIST_CUTOFF) { + rep_histogram_bfr_size += (col.max_rep_level + 1) * num_histograms; + } } } } @@ -1888,10 +1930,19 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, rmm::device_buffer col_idx_bfr(column_index_bfr_size, stream); rmm::device_uvector pages(num_pages, stream); + rmm::device_uvector def_level_histogram(def_histogram_bfr_size, stream); + rmm::device_uvector rep_level_histogram(rep_histogram_bfr_size, stream); + + thrust::uninitialized_fill( + rmm::exec_policy_nosync(stream), def_level_histogram.begin(), def_level_histogram.end(), 0); + thrust::uninitialized_fill( + rmm::exec_policy_nosync(stream), rep_level_histogram.begin(), rep_level_histogram.end(), 0); // This contains stats for both the pages and the rowgroups. TODO: make them separate. rmm::device_uvector page_stats(num_stats_bfr, stream); auto bfr_i = static_cast(col_idx_bfr.data()); + auto bfr_r = rep_level_histogram.data(); + auto bfr_d = def_level_histogram.data(); for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { auto bfr = static_cast(uncomp_bfr.data()); auto bfr_c = static_cast(comp_bfr.data()); @@ -1904,8 +1955,19 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, bfr += ck.bfr_size; bfr_c += ck.compressed_size; if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { - ck.column_index_size = column_index_buffer_size(&ck, column_index_truncate_length); + auto const& col = col_desc[ck.col_desc_id]; + ck.column_index_size = column_index_buffer_size(&ck, col, column_index_truncate_length); bfr_i += ck.column_index_size; + + auto const num_histograms = ck.num_data_pages() + 1; + if (col.max_def_level > DEF_LVL_HIST_CUTOFF) { + ck.def_histogram_data = bfr_d; + bfr_d += num_histograms * (col.max_def_level + 1); + } + if (col.max_rep_level > REP_LVL_HIST_CUTOFF) { + ck.rep_histogram_data = bfr_r; + bfr_r += num_histograms * (col.max_rep_level + 1); + } } } } @@ -1935,10 +1997,10 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, if (collect_compression_statistics) { comp_stats = writer_compression_statistics{}; } // Encode row groups in batches - for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { + for (auto b = 0, batch_r_start = 0; b < static_cast(batch_list.size()); b++) { // Count pages in this batch - auto const rnext = r + batch_list[b]; - auto const first_page_in_batch = chunks[r][0].first_page; + auto const rnext = batch_r_start + batch_list[b]; + auto const first_page_in_batch = chunks[batch_r_start][0].first_page; auto const first_page_in_next_batch = (rnext < num_rowgroups) ? chunks[rnext][0].first_page : num_pages; auto const pages_in_batch = first_page_in_next_batch - first_page_in_batch; @@ -1949,7 +2011,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, pages_in_batch, first_page_in_batch, batch_list[b], - r, + batch_r_start, (stats_granularity == statistics_freq::STATISTICS_PAGE) ? page_stats.data() : nullptr, (stats_granularity != statistics_freq::STATISTICS_NONE) ? page_stats.data() + num_pages : nullptr, @@ -1962,7 +2024,23 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, bool need_sync{false}; - for (; r < rnext; r++) { + // need to fetch the histogram data from the device + std::vector h_def_histogram; + std::vector h_rep_histogram; + if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { + if (def_histogram_bfr_size > 0) { + h_def_histogram = + std::move(cudf::detail::make_std_vector_async(def_level_histogram, stream)); + need_sync = true; + } + if (rep_histogram_bfr_size > 0) { + h_rep_histogram = + std::move(cudf::detail::make_std_vector_async(rep_level_histogram, stream)); + need_sync = true; + } + } + + for (int r = batch_r_start; r < rnext; r++) { int p = rg_to_part[r]; int global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; auto& row_group = agg_meta->file(p).row_groups[global_r]; @@ -1996,6 +2074,61 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, // Sync before calling the next `encode_pages` which may alter the stats data. if (need_sync) { stream.synchronize(); } + + // now add to the column chunk SizeStatistics if necessary + if (stats_granularity == statistics_freq::STATISTICS_COLUMN) { + auto h_def_ptr = h_def_histogram.data(); + auto h_rep_ptr = h_rep_histogram.data(); + + for (int r = batch_r_start; r < rnext; r++) { + int const p = rg_to_part[r]; + int const global_r = global_rowgroup_base[p] + r - first_rg_in_part[p]; + auto& row_group = agg_meta->file(p).row_groups[global_r]; + + for (auto i = 0; i < num_columns; i++) { + auto const& ck = chunks[r][i]; + auto const& col = col_desc[ck.col_desc_id]; + auto& column_chunk_meta = row_group.columns[i].meta_data; + + // Add SizeStatistics for the chunk. For now we're only going to do the column chunk + // stats if we're also doing them at the page level. There really isn't much value for + // us in per-chunk stats since everything we do processing wise is at the page level. + SizeStatistics chunk_stats; + + // var_byte_size will only be non-zero for byte array columns. + if (ck.var_bytes_size > 0) { + chunk_stats.unencoded_byte_array_data_bytes = ck.var_bytes_size; + } + + auto const num_data_pages = ck.num_data_pages(); + if (col.max_def_level > DEF_LVL_HIST_CUTOFF) { + size_t const hist_size = col.max_def_level + 1; + uint32_t const* const ck_hist = h_def_ptr + hist_size * num_data_pages; + host_span ck_def_hist{ck_hist, hist_size}; + + chunk_stats.definition_level_histogram = {ck_def_hist.begin(), ck_def_hist.end()}; + h_def_ptr += hist_size * (num_data_pages + 1); + } + + if (col.max_rep_level > REP_LVL_HIST_CUTOFF) { + size_t const hist_size = col.max_rep_level + 1; + uint32_t const* const ck_hist = h_rep_ptr + hist_size * num_data_pages; + host_span ck_rep_hist{ck_hist, hist_size}; + + chunk_stats.repetition_level_histogram = {ck_rep_hist.begin(), ck_rep_hist.end()}; + h_rep_ptr += hist_size * (num_data_pages + 1); + } + + if (chunk_stats.unencoded_byte_array_data_bytes.has_value() || + chunk_stats.definition_level_histogram.has_value() || + chunk_stats.repetition_level_histogram.has_value()) { + column_chunk_meta.size_statistics = std::move(chunk_stats); + } + } + } + } + + batch_r_start = rnext; } auto bounce_buffer = @@ -2251,6 +2384,9 @@ void writer::impl::write_parquet_data_to_sink( int64_t curr_pg_offset = column_chunk_meta.data_page_offset; OffsetIndex offset_idx; + std::vector var_bytes; + auto const is_byte_arr = column_chunk_meta.type == BYTE_ARRAY; + for (uint32_t pg = 0; pg < ck.num_pages; pg++) { auto const& enc_page = h_pages[curr_page_idx++]; @@ -2260,10 +2396,13 @@ void writer::impl::write_parquet_data_to_sink( int32_t this_page_size = enc_page.hdr_size + enc_page.max_data_size; // first_row_idx is relative to start of row group PageLocation loc{curr_pg_offset, this_page_size, enc_page.start_row - ck.start_row}; + if (is_byte_arr) { var_bytes.push_back(enc_page.var_bytes_size); } offset_idx.page_locations.push_back(loc); curr_pg_offset += this_page_size; } + if (is_byte_arr) { offset_idx.unencoded_byte_array_data_bytes = std::move(var_bytes); } + _stream.synchronize(); _agg_meta->file(p).offset_indexes.emplace_back(std::move(offset_idx)); _agg_meta->file(p).column_indexes.emplace_back(std::move(column_idx)); diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index fece83f891b..71cd62ede57 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -4614,6 +4614,12 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexStructNulls) read_footer(source, &fmd); + // all struct columns will have num_ordered_rows / 5 nulls at level 0. + // col1 will have num_ordered_rows / 2 nulls total + // col2 will have num_ordered_rows / 3 nulls total + // col3 will have num_ordered_rows / 4 nulls total + int const null_mods[] = {0, 2, 3, 4}; + for (size_t r = 0; r < fmd.row_groups.size(); r++) { auto const& rg = fmd.row_groups[r]; for (size_t c = 0; c < rg.columns.size(); c++) { @@ -4624,6 +4630,26 @@ TEST_P(ParquetV2Test, CheckColumnOffsetIndexStructNulls) auto const oi = read_offset_index(source, chunk); auto const ci = read_column_index(source, chunk); + // check definition level histogram (repetition will not be present) + if (c != 0) { + ASSERT_TRUE(chunk.meta_data.size_statistics.has_value()); + ASSERT_TRUE(chunk.meta_data.size_statistics->definition_level_histogram.has_value()); + // there are no lists so there should be no repetition level histogram + EXPECT_FALSE(chunk.meta_data.size_statistics->repetition_level_histogram.has_value()); + auto const& def_hist = chunk.meta_data.size_statistics->definition_level_histogram.value(); + ASSERT_TRUE(def_hist.size() == 3L); + auto const l0_nulls = num_ordered_rows / 5; + auto const l1_l0_nulls = num_ordered_rows / (5 * null_mods[c]); + auto const l1_nulls = num_ordered_rows / null_mods[c] - l1_l0_nulls; + auto const l2_vals = num_ordered_rows - l1_nulls - l0_nulls; + EXPECT_EQ(def_hist[0], l0_nulls); + EXPECT_EQ(def_hist[1], l1_nulls); + EXPECT_EQ(def_hist[2], l2_vals); + } else { + // column 0 has no lists and no nulls and no strings, so there should be no size stats + EXPECT_FALSE(chunk.meta_data.size_statistics.has_value()); + } + int64_t num_vals = 0; for (size_t o = 0; o < oi.page_locations.size(); o++) { auto const& page_loc = oi.page_locations[o]; @@ -4653,6 +4679,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [] // [4, 5] // NULL + // def histogram [1, 1, 2, 3] + // rep histogram [4, 3] lcw col0{{{{1, 2, 3}, nulls_at({0, 2})}, {}, {4, 5}, {}}, null_at(3)}; // 4 nulls @@ -4660,6 +4688,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [[7, 8]] // [] // [[]] + // def histogram [1, 3, 10] + // rep histogram [4, 4, 6] lcw col1{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, {{7, 8}}, lcw{}, lcw{lcw{}}}; // 4 nulls @@ -4667,6 +4697,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [[7, 8]] // [] // [[]] + // def histogram [1, 1, 2, 10] + // rep histogram [4, 4, 6] lcw col2{{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, null_at(3)}, {{7, 8}}, lcw{}, lcw{lcw{}}}; // 6 nulls @@ -4674,6 +4706,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [[7, 8]] // [] // [[]] + // def histogram [1, 1, 2, 2, 8] + // rep histogram [4, 4, 6] using dlcw = cudf::test::lists_column_wrapper; dlcw col3{{{{1., 2., 3.}, {}, {4., 5.}, {}, {{0., 6., 0.}, nulls_at({0, 2})}}, null_at(3)}, {{7., 8.}}, @@ -4685,6 +4719,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [[7, 8]] // [] // NULL + // def histogram [1, 1, 1, 1, 10] + // rep histogram [4, 4, 6] using ui16lcw = cudf::test::lists_column_wrapper; cudf::test::lists_column_wrapper col4{ {{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, null_at(3)}, {{7, 8}}, ui16lcw{}, ui16lcw{ui16lcw{}}}, @@ -4695,6 +4731,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // [[7, 8]] // [] // NULL + // def histogram [1, 1, 1, 1, 2, 8] + // rep histogram [4, 4, 6] lcw col5{{{{{1, 2, 3}, {}, {4, 5}, {}, {{0, 6, 0}, nulls_at({0, 2})}}, null_at(3)}, {{7, 8}}, lcw{}, @@ -4702,6 +4740,8 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) null_at(3)}; // 4 nulls + // def histogram [1, 3, 9] + // rep histogram [4, 4, 5] using strlcw = cudf::test::lists_column_wrapper; cudf::test::lists_column_wrapper col6{ {{"Monday", "Monday", "Friday"}, {}, {"Monday", "Friday"}, {}, {"Sunday", "Funday"}}, @@ -4709,12 +4749,35 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) strlcw{}, strlcw{strlcw{}}}; + // 5 nulls + // def histogram [1, 3, 1, 8] + // rep histogram [4, 4, 5] + using strlcw = cudf::test::lists_column_wrapper; + cudf::test::lists_column_wrapper col7{{{"Monday", "Monday", "Friday"}, + {}, + {{"Monday", "Friday"}, null_at(1)}, + {}, + {"Sunday", "Funday"}}, + {{"bee", "sting"}}, + strlcw{}, + strlcw{strlcw{}}}; + // 11 nulls + // D 5 6 5 6 5 6 5 6 6 + // R 0 3 3 3 1 3 3 2 3 // [[[NULL,2,NULL,4]], [[NULL,6,NULL], [8,9]]] + // D 2 6 6 6 6 2 + // R 0 1 2 3 3 1 // [NULL, [[13],[14,15,16]], NULL] + // D 2 3 2 4 + // R 0 1 1 1 // [NULL, [], NULL, [[]]] + // D 0 + // R 0 // NULL - lcw col7{{ + // def histogram [1, 0, 4, 1, 1, 4, 9] + // rep histogram [4, 6, 2, 8] + lcw col8{{ {{{{1, 2, 3, 4}, nulls_at({0, 2})}}, {{{5, 6, 7}, nulls_at({0, 2})}, {8, 9}}}, {{{{10, 11}, {12}}, {{13}, {14, 15, 16}}, {{17, 18}}}, nulls_at({0, 2})}, {{lcw{lcw{}}, lcw{}, lcw{}, lcw{lcw{}}}, nulls_at({0, 2})}, @@ -4724,7 +4787,25 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) table_view expected({col0, col1, col2, col3, col4, col5, col6, col7}); - int64_t const expected_null_counts[] = {4, 4, 4, 6, 4, 6, 4, 11}; + int64_t const expected_null_counts[] = {4, 4, 4, 6, 4, 6, 4, 5, 11}; + std::vector const expected_def_hists[] = {{1, 1, 2, 3}, + {1, 3, 10}, + {1, 1, 2, 10}, + {1, 1, 2, 2, 8}, + {1, 1, 1, 1, 10}, + {1, 1, 1, 1, 2, 8}, + {1, 3, 9}, + {1, 3, 1, 8}, + {1, 0, 4, 1, 1, 4, 9}}; + std::vector const expected_rep_hists[] = {{4, 3}, + {4, 4, 6}, + {4, 4, 6}, + {4, 4, 6}, + {4, 4, 6}, + {4, 4, 6}, + {4, 4, 5}, + {4, 4, 5}, + {4, 6, 2, 8}}; auto const filepath = temp_env->get_temp_filepath("ColumnIndexListWithNulls.parquet"); auto out_opts = cudf::io::parquet_writer_options::builder(cudf::io::sink_info{filepath}, expected) @@ -4744,6 +4825,25 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) for (size_t c = 0; c < rg.columns.size(); c++) { auto const& chunk = rg.columns[c]; + ASSERT_TRUE(chunk.meta_data.size_statistics.has_value()); + ASSERT_TRUE(chunk.meta_data.size_statistics->definition_level_histogram.has_value()); + ASSERT_TRUE(chunk.meta_data.size_statistics->repetition_level_histogram.has_value()); + // there is only one page, so chunk stats should match the page stats + EXPECT_EQ(chunk.meta_data.size_statistics->definition_level_histogram.value(), + expected_def_hists[c]); + EXPECT_EQ(chunk.meta_data.size_statistics->repetition_level_histogram.value(), + expected_rep_hists[c]); + // only column 6 has string data + if (c == 6) { + ASSERT_TRUE(chunk.meta_data.size_statistics->unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(chunk.meta_data.size_statistics->unencoded_byte_array_data_bytes.value(), 50L); + } else if (c == 7) { + ASSERT_TRUE(chunk.meta_data.size_statistics->unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(chunk.meta_data.size_statistics->unencoded_byte_array_data_bytes.value(), 44L); + } else { + EXPECT_FALSE(chunk.meta_data.size_statistics->unencoded_byte_array_data_bytes.has_value()); + } + // loop over offsets, read each page header, make sure it's a data page and that // the first row index is correct auto const oi = read_offset_index(source, chunk); @@ -4764,6 +4864,22 @@ TEST_P(ParquetV2Test, CheckColumnIndexListWithNulls) // should only be one page EXPECT_FALSE(ci.null_pages[0]); EXPECT_EQ(ci.null_counts[0], expected_null_counts[c]); + + ASSERT_TRUE(ci.definition_level_histogram.has_value()); + EXPECT_EQ(ci.definition_level_histogram.value(), expected_def_hists[c]); + + ASSERT_TRUE(ci.repetition_level_histogram.has_value()); + EXPECT_EQ(ci.repetition_level_histogram.value(), expected_rep_hists[c]); + + if (c == 6) { + ASSERT_TRUE(oi.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(oi.unencoded_byte_array_data_bytes.value()[0], 50L); + } else if (c == 7) { + ASSERT_TRUE(oi.unencoded_byte_array_data_bytes.has_value()); + EXPECT_EQ(oi.unencoded_byte_array_data_bytes.value()[0], 44L); + } else { + EXPECT_FALSE(oi.unencoded_byte_array_data_bytes.has_value()); + } } } }