From 8950c1a69e36bd7c90529c4f7ac7fb5b2493dde8 Mon Sep 17 00:00:00 2001 From: Devavret Makkar Date: Wed, 7 Oct 2020 22:28:26 +0530 Subject: [PATCH] List parquet writer support (#6075) --- CHANGELOG.md | 1 + cpp/src/io/parquet/page_dict.cu | 2 +- cpp/src/io/parquet/page_enc.cu | 693 +++++++++++++++++++++---- cpp/src/io/parquet/parquet_gpu.hpp | 86 ++- cpp/src/io/parquet/writer_impl.cu | 314 ++++++++--- cpp/src/io/statistics/column_stats.h | 1 + cpp/tests/io/parquet_test.cpp | 144 ++++- python/cudf/cudf/_lib/parquet.pyx | 23 +- python/cudf/cudf/tests/test_parquet.py | 45 ++ 9 files changed, 1114 insertions(+), 195 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88667eaf55b..48f682ac026 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -272,6 +272,7 @@ - PR #5815 LIST Support for ColumnVector - PR #5931 Support for `add_calendrical_months` API - PR #5992 Add support for `.dt.strftime` +- PR #6075 Parquet writer - add support for nested LIST columns ## Improvements diff --git a/cpp/src/io/parquet/page_dict.cu b/cpp/src/io/parquet/page_dict.cu index 1ae3392262c..196bb21222b 100644 --- a/cpp/src/io/parquet/page_dict.cu +++ b/cpp/src/io/parquet/page_dict.cu @@ -326,7 +326,7 @@ __global__ void __launch_bounds__(block_size, 1) /** * @brief Launches kernel for building chunk dictionaries * - * @param[in] chunks Column chunks + * @param[in,out] chunks Column chunks * @param[in] dev_scratch Device scratch data (kDictScratchSize per dictionary) * @param[in] num_chunks Number of column chunks * @param[in] stream CUDA stream to use, default 0 diff --git a/cpp/src/io/parquet/page_enc.cu b/cpp/src/io/parquet/page_enc.cu index 0f113be1144..c076bd88187 100644 --- a/cpp/src/io/parquet/page_enc.cu +++ b/cpp/src/io/parquet/page_enc.cu @@ -17,6 +17,11 @@ #include #include +#include + +#include +#include + namespace cudf { namespace io { namespace parquet { @@ -34,6 +39,7 @@ struct frag_init_state_s { EncColumnDesc col; PageFragment frag; uint32_t total_dupes; + size_type start_value_idx; volatile uint32_t scratch_red[32]; uint32_t dict[MAX_PAGE_FRAGMENT_SIZE]; union { @@ -55,6 +61,7 @@ struct page_enc_state_s { uint32_t rle_numvals; //!< RLE input value count uint32_t rle_lit_count; uint32_t rle_rpt_count; + uint32_t page_start_val; volatile uint32_t rpt_map[4]; volatile uint32_t scratch_red[32]; EncPage page; @@ -90,6 +97,9 @@ inline __device__ uint32_t uint64_init_hash(uint64_t v) /** * @brief Initializes encoder page fragments * + * Based on the number of rows in each fragment, populates the value count, the size of data in the + * fragment, the number of unique values, and the data size of unique values. + * * @param[in] frag Fragment array [fragment_id][column_id] * @param[in] col_desc Column description array [column_id] * @param[in] num_fragments Number of fragments per column @@ -116,7 +126,7 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment frag_init_state_s *const s = &state_g; uint32_t t = threadIdx.x; - uint32_t start_row, nrows, dtype_len, dtype_len_in, dtype; + uint32_t start_row, dtype_len, dtype_len_in, dtype; if (t < sizeof(EncColumnDesc) / sizeof(uint32_t)) { reinterpret_cast(&s->col)[t] = @@ -128,13 +138,39 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment __syncthreads(); start_row = blockIdx.y * fragment_size; if (!t) { - s->col.num_rows = min(s->col.num_rows, max_num_rows); + s->col.num_rows = min(s->col.num_rows, max_num_rows); + // frag.num_rows = fragment_size except for the last page fragment which can be smaller. + // num_rows is fixed but fragment size could be larger if the data is strings or nested. s->frag.num_rows = min(fragment_size, max_num_rows - min(start_row, max_num_rows)); s->frag.non_nulls = 0; s->frag.num_dict_vals = 0; s->frag.fragment_data_size = 0; s->frag.dict_data_size = 0; s->total_dupes = 0; + + // To use num_vals instead of num_rows, we need to calculate num_vals on the fly. + // For list>, values between i and i+50 can be calculated by + // off_11 = off[i], off_12 = off[i+50] + // off_21 = child.off[off_11], off_22 = child.off[off_12] + // etc... + s->start_value_idx = start_row; + size_type end_value_idx = start_row + s->frag.num_rows; + for (size_type i = 0; i < s->col.nesting_levels; i++) { + s->start_value_idx = s->col.nesting_offsets[i][s->start_value_idx]; + end_value_idx = s->col.nesting_offsets[i][end_value_idx]; + } + s->frag.num_leaf_values = end_value_idx - s->start_value_idx; + + if (s->col.nesting_levels > 0) { + // For nested schemas, the number of values in a fragment is not directly related to the + // number of encoded data elements or the number of rows. It is simply the number of + // repetition/definition values which together encode validity and nesting information. + size_type first_level_val_idx = s->col.level_offsets[start_row]; + size_type last_level_val_idx = s->col.level_offsets[start_row + s->frag.num_rows]; + s->frag.num_values = last_level_val_idx - first_level_val_idx; + } else { + s->frag.num_values = s->frag.num_rows; + } } dtype = s->col.physical_type; dtype_len = (dtype == INT64 || dtype == DOUBLE) ? 8 : (dtype == BOOLEAN) ? 1 : 4; @@ -144,12 +180,15 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment dtype_len_in = (dtype == BYTE_ARRAY) ? sizeof(nvstrdesc_s) : dtype_len; } __syncthreads(); - nrows = s->frag.num_rows; - for (uint32_t i = 0; i < nrows; i += block_size) { + + size_type nvals = s->frag.num_leaf_values; + size_type start_value_idx = s->start_value_idx; + + for (uint32_t i = 0; i < nvals; i += block_size) { const uint32_t *valid = s->col.valid_map_base; - uint32_t row = start_row + i + t; - uint32_t is_valid = (i + t < nrows && row < s->col.num_rows) - ? (valid) ? (valid[row >> 5] >> (row & 0x1f)) & 1 : 1 + uint32_t val_idx = start_value_idx + i + t; + uint32_t is_valid = (i + t < nvals && val_idx < s->col.num_values) + ? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1 : 0; uint32_t valid_warp = BALLOT(is_valid); uint32_t len, nz_pos, hash; @@ -157,25 +196,29 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment len = dtype_len; if (dtype != BOOLEAN) { if (dtype == BYTE_ARRAY) { - const char *ptr = reinterpret_cast(s->col.column_data_base)[row].ptr; + const char *ptr = + reinterpret_cast(s->col.column_data_base)[val_idx].ptr; uint32_t count = - (uint32_t) reinterpret_cast(s->col.column_data_base)[row].count; + (uint32_t) reinterpret_cast(s->col.column_data_base)[val_idx] + .count; len += count; hash = nvstr_init_hash(reinterpret_cast(ptr), count); } else if (dtype_len_in == 8) { - hash = uint64_init_hash(reinterpret_cast(s->col.column_data_base)[row]); + hash = + uint64_init_hash(reinterpret_cast(s->col.column_data_base)[val_idx]); } else { hash = uint32_init_hash( (dtype_len_in == 4) - ? reinterpret_cast(s->col.column_data_base)[row] + ? reinterpret_cast(s->col.column_data_base)[val_idx] : (dtype_len_in == 2) - ? reinterpret_cast(s->col.column_data_base)[row] - : reinterpret_cast(s->col.column_data_base)[row]); + ? reinterpret_cast(s->col.column_data_base)[val_idx] + : reinterpret_cast(s->col.column_data_base)[val_idx]); } } } else { len = 0; } + nz_pos = s->frag.non_nulls + __popc(valid_warp & (0x7fffffffu >> (0x1fu - ((uint32_t)t & 0x1f)))); len = warp_reduce(temp_storage.full[t / 32]).Sum(len); @@ -200,7 +243,7 @@ __global__ void __launch_bounds__(block_size) gpuInitPageFragments(PageFragment if (t >= 32) { nz_pos += s->scratch_red[(t - 32) >> 5]; } if (dict_index) { atomicAdd(&s->map.u32[hash >> 1], (hash & 1) ? 1 << 16 : 1); - dict_index[start_row + nz_pos] = + dict_index[start_value_idx + nz_pos] = ((i + t) << INIT_HASH_BITS) | hash; // Store the hash along with the index, so we don't have to recompute it } @@ -413,18 +456,20 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, } __syncthreads(); if (t < 32) { - uint32_t fragments_in_chunk = 0; - uint32_t rows_in_page = 0; - uint32_t page_size = 0; - uint32_t num_pages = 0; - uint32_t num_rows = 0; - uint32_t page_start = 0; - uint32_t page_offset = ck_g.ck_stat_size; - uint32_t num_dict_entries = 0; - uint32_t comp_page_offset = ck_g.ck_stat_size; - uint32_t cur_row = ck_g.start_row; - uint32_t ck_max_stats_len = 0; - uint32_t max_stats_len = 0; + uint32_t fragments_in_chunk = 0; + uint32_t rows_in_page = 0; + uint32_t values_in_page = 0; + uint32_t leaf_values_in_page = 0; + uint32_t page_size = 0; + uint32_t num_pages = 0; + uint32_t num_rows = 0; + uint32_t page_start = 0; + uint32_t page_offset = ck_g.ck_stat_size; + uint32_t num_dict_entries = 0; + uint32_t comp_page_offset = ck_g.ck_stat_size; + uint32_t cur_row = ck_g.start_row; + uint32_t ck_max_stats_len = 0; + uint32_t max_stats_len = 0; if (!t) { pagestats_g.col = &col_desc[blockIdx.x]; @@ -444,6 +489,8 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, page_g.max_data_size = ck_g.dictionary_size; page_g.start_row = cur_row; page_g.num_rows = ck_g.total_dict_entries; + page_g.num_leaf_values = ck_g.total_dict_entries; + page_g.num_values = ck_g.total_dict_entries; page_offset += page_g.max_hdr_size + page_g.max_data_size; comp_page_offset += page_g.max_hdr_size + GetMaxCompressedBfrSize(page_g.max_data_size); } @@ -459,6 +506,12 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, num_pages = 1; } SYNCWARP(); + // This loop goes over one page fragment at a time and adds it to page. + // When page size crosses a particular limit, then it moves on to the next page and then next + // page fragment gets added to that one. + + // This doesn't actually deal with data. It's agnostic. It only cares about number of rows and + // page size. do { uint32_t fragment_data_size, max_page_size, minmax_len = 0; SYNCWARP(); @@ -478,15 +531,16 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, SYNCWARP(); if (ck_g.has_dictionary && fragments_in_chunk < ck_g.num_dict_fragments) { fragment_data_size = - frag_g.num_rows * 2; // Assume worst-case of 2-bytes per dictionary index + frag_g.num_leaf_values * 2; // Assume worst-case of 2-bytes per dictionary index } else { fragment_data_size = frag_g.fragment_data_size; } - max_page_size = (rows_in_page * 2 >= ck_g.num_rows) + // TODO (dm): this convoluted logic to limit page size needs refactoring + max_page_size = (values_in_page * 2 >= ck_g.num_values) ? 256 * 1024 - : (rows_in_page * 3 >= ck_g.num_rows) ? 384 * 1024 : 512 * 1024; + : (values_in_page * 3 >= ck_g.num_values) ? 384 * 1024 : 512 * 1024; if (num_rows >= ck_g.num_rows || - (rows_in_page > 0 && + (values_in_page > 0 && (page_size + fragment_data_size > max_page_size || (ck_g.has_dictionary && fragments_in_chunk == ck_g.num_dict_fragments)))) { uint32_t dict_bits_plus1; @@ -506,17 +560,12 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, } else { dict_bits = 16; } - page_size = 1 + 5 + ((rows_in_page * dict_bits + 7) >> 3) + (rows_in_page >> 8); + page_size = 1 + 5 + ((values_in_page * dict_bits + 7) >> 3) + (values_in_page >> 8); dict_bits_plus1 = dict_bits + 1; } else { dict_bits_plus1 = 0; } if (!t) { - uint32_t def_level_bits = col_g.level_bits & 0xf; - uint32_t def_level_size = - (def_level_bits) - ? 4 + 5 + ((def_level_bits * rows_in_page + 7) >> 3) + (rows_in_page >> 8) - : 0; page_g.num_fragments = fragments_in_chunk - page_start; page_g.chunk_id = blockIdx.y * num_columns + blockIdx.x; page_g.page_type = PageType::DATA_PAGE; @@ -532,11 +581,26 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, } page_g.max_hdr_size += stats_hdr_len; } - page_g.max_data_size = page_size + def_level_size; page_g.page_data = ck_g.uncompressed_bfr + page_offset; page_g.compressed_data = ck_g.compressed_bfr + comp_page_offset; page_g.start_row = cur_row; page_g.num_rows = rows_in_page; + page_g.num_leaf_values = leaf_values_in_page; + page_g.num_values = values_in_page; + uint32_t def_level_bits = col_g.level_bits & 0xf; + uint32_t rep_level_bits = col_g.level_bits >> 4; + // Run length = 4, max(rle/bitpack header) = 5, add one byte per 256 values for overhead + // TODO (dm): Improve readability of these calculations. + uint32_t def_level_size = + (def_level_bits != 0) + ? 4 + 5 + ((def_level_bits * page_g.num_values + 7) >> 3) + (page_g.num_values >> 8) + : 0; + uint32_t rep_level_size = + (rep_level_bits != 0) + ? 4 + 5 + ((rep_level_bits * page_g.num_values + 7) >> 3) + (page_g.num_values >> 8) + : 0; + page_g.max_data_size = page_size + def_level_size + rep_level_size; + pagestats_g.start_chunk = ck_g.first_fragment + page_start; pagestats_g.num_chunks = page_g.num_fragments; page_offset += page_g.max_hdr_size + page_g.max_data_size; @@ -554,15 +618,19 @@ __global__ void __launch_bounds__(128) gpuInitPages(EncColumnChunk *chunks, reinterpret_cast(&pagestats_g)[t]; } num_pages++; - page_size = 0; - rows_in_page = 0; - page_start = fragments_in_chunk; - max_stats_len = 0; + page_size = 0; + rows_in_page = 0; + values_in_page = 0; + leaf_values_in_page = 0; + page_start = fragments_in_chunk; + max_stats_len = 0; } max_stats_len = max(max_stats_len, minmax_len); num_dict_entries += frag_g.num_dict_vals; page_size += fragment_data_size; rows_in_page += frag_g.num_rows; + values_in_page += frag_g.num_values; + leaf_values_in_page += frag_g.num_leaf_values; num_rows += frag_g.num_rows; fragments_in_chunk++; } while (frag_g.num_rows != 0); @@ -618,36 +686,75 @@ inline __device__ uint8_t *VlqEncode(uint8_t *p, uint32_t v) inline __device__ void PackLiterals( uint8_t *dst, uint32_t v, uint32_t count, uint32_t w, uint32_t t) { - if (t <= (count | 0x1f)) { - if (w < 8) { - uint32_t mask; - if (w <= 1) { - v |= SHFL_XOR(v, 1) << 1; - v |= SHFL_XOR(v, 2) << 2; - v |= SHFL_XOR(v, 4) << 4; - mask = 0x7; - } else if (w <= 2) { - v |= SHFL_XOR(v, 1) << 2; - v |= SHFL_XOR(v, 2) << 4; - mask = 0x3; - } else { // w=4 - v |= SHFL_XOR(v, 1) << 4; - mask = 0x1; - } - if (t < count && !(t & mask)) { dst[(t * w) >> 3] = v; } - } else if (w < 12) { // w=8 - if (t < count) { dst[t] = v; } - } else if (w < 16) { // w=12 - v |= SHFL_XOR(v, 1) << 12; - if (t < count && !(t & 1)) { - dst[(t >> 1) * 3 + 0] = v; - dst[(t >> 1) * 3 + 1] = v >> 8; - dst[(t >> 1) * 3 + 2] = v >> 16; + if (w == 1 || w == 2 || w == 4 || w == 8 || w == 12 || w == 16) { + if (t <= (count | 0x1f)) { + if (w == 1 || w == 2 || w == 4) { + uint32_t mask = 0; + if (w == 1) { + v |= SHFL_XOR(v, 1) << 1; + v |= SHFL_XOR(v, 2) << 2; + v |= SHFL_XOR(v, 4) << 4; + mask = 0x7; + } else if (w == 2) { + v |= SHFL_XOR(v, 1) << 2; + v |= SHFL_XOR(v, 2) << 4; + mask = 0x3; + } else if (w == 4) { + v |= SHFL_XOR(v, 1) << 4; + mask = 0x1; + } + if (t < count && mask && !(t & mask)) { dst[(t * w) >> 3] = v; } + return; + } else if (w == 8) { + if (t < count) { dst[t] = v; } + return; + } else if (w == 12) { + v |= SHFL_XOR(v, 1) << 12; + if (t < count && !(t & 1)) { + dst[(t >> 1) * 3 + 0] = v; + dst[(t >> 1) * 3 + 1] = v >> 8; + dst[(t >> 1) * 3 + 2] = v >> 16; + } + return; + } else if (w == 16) { + if (t < count) { + dst[t * 2 + 0] = v; + dst[t * 2 + 1] = v >> 8; + } + return; } - } else if (t < count) { // w=16 - dst[t * 2 + 0] = v; - dst[t * 2 + 1] = v >> 8; + } else { + return; + } + } else { + // Scratch space to temporarily write to. Needed because we will use atomics to write 32 bit + // words but the destination mem may not be a multiple of 4 bytes. + // TODO (dm): This assumes blockdim = 128 and max bits per value = 16. Reduce magic numbers. + __shared__ uint32_t scratch[64]; + if (t < 64) { scratch[t] = 0; } + __syncthreads(); + + if (t <= count) { + uint64_t v64 = v; + v64 <<= (t * w) & 0x1f; + + // Copy 64 bit word into two 32 bit words while following C++ strict aliasing rules. + uint32_t v32[2]; + memcpy(&v32, &v64, sizeof(uint64_t)); + + // Atomically write result to scratch + if (v32[0]) { atomicOr(scratch + ((t * w) >> 5), v32[0]); } + if (v32[1]) { atomicOr(scratch + ((t * w) >> 5) + 1, v32[1]); } } + __syncthreads(); + + // Copy scratch data to final destination + auto available_bytes = (count * w + 7) / 8; + + auto scratch_bytes = reinterpret_cast(&scratch[0]); + if (t < available_bytes) { dst[t] = scratch_bytes[t]; } + if (t + 128 < available_bytes) { dst[t + 128] = scratch_bytes[t + 128]; } + __syncthreads(); } } @@ -859,8 +966,10 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, __syncthreads(); if (!t) { s->cur = s->page.page_data + s->page.max_hdr_size; } __syncthreads(); - // Encode NULLs - if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.level_bits != 0) { + // Encode Repetition and Definition levels + if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.level_bits != 0 && + s->col.nesting_levels == 0) { + // Calculate definition levels from validity const uint32_t *valid = s->col.valid_map_base; uint32_t def_lvl_bits = s->col.level_bits & 0xf; if (def_lvl_bits != 0) { @@ -875,7 +984,9 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, uint32_t rle_numvals = s->rle_numvals; uint32_t nrows = min(s->page.num_rows - rle_numvals, 128); uint32_t row = s->page.start_row + rle_numvals + t; - uint32_t def_lvl = (rle_numvals + t < s->page.num_rows && row < s->col.num_rows) + // Definition level encodes validity. Checks the valid map and if it is valid, then sets the + // def_lvl accordingly and sets it in s->vals which is then given to RleEncode to encode + uint32_t def_lvl = (rle_numvals + t < s->page.num_rows && row < s->col.num_rows) ? (valid) ? (valid[row >> 5] >> (row & 0x1f)) & 1 : 1 : 0; s->vals[(rle_numvals + t) & (RLE_BFRSZ - 1)] = def_lvl; @@ -895,6 +1006,45 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, if (t == 0) { s->cur = rle_out; } } } + } else if (s->page.page_type != PageType::DICTIONARY_PAGE && s->col.nesting_levels > 0) { + auto encode_levels = [&](uint8_t const *lvl_val_data, uint32_t nbits) { + // For list types, the repetition and definition levels are pre-calculated. We just need to + // encode and write them now. + if (!t) { + s->rle_run = 0; + s->rle_pos = 0; + s->rle_numvals = 0; + s->rle_out = s->cur + 4; + } + __syncthreads(); + size_type page_first_val_idx = s->col.level_offsets[s->page.start_row]; + size_type col_last_val_idx = s->col.level_offsets[s->col.num_rows]; + while (s->rle_numvals < s->page.num_values) { + uint32_t rle_numvals = s->rle_numvals; + uint32_t nvals = min(s->page.num_values - rle_numvals, 128); + uint32_t idx = page_first_val_idx + rle_numvals + t; + uint32_t lvl_val = + (rle_numvals + t < s->page.num_values && idx < col_last_val_idx) ? lvl_val_data[idx] : 0; + s->vals[(rle_numvals + t) & (RLE_BFRSZ - 1)] = lvl_val; + __syncthreads(); + rle_numvals += nvals; + RleEncode(s, rle_numvals, nbits, (rle_numvals == s->page.num_values), t); + __syncthreads(); + } + if (t < 32) { + uint8_t *cur = s->cur; + uint8_t *rle_out = s->rle_out; + if (t < 4) { + uint32_t rle_bytes = (uint32_t)(rle_out - cur) - 4; + cur[t] = rle_bytes >> (t * 8); + } + SYNCWARP(); + if (t == 0) { s->cur = rle_out; } + } + }; + encode_levels(s->col.rep_values, s->col.level_bits >> 4); + __syncthreads(); + encode_levels(s->col.def_values, s->col.level_bits & 0xf); } // Encode data values __syncthreads(); @@ -916,24 +1066,28 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, dst[0] = dict_bits; s->rle_out = dst + 1; } + s->page_start_val = s->page.start_row; + for (size_type i = 0; i < s->col.nesting_levels; i++) { + s->page_start_val = s->col.nesting_offsets[i][s->page_start_val]; + } } __syncthreads(); - for (uint32_t cur_row = 0; cur_row < s->page.num_rows;) { - uint32_t nrows = min(s->page.num_rows - cur_row, 128); - uint32_t row = s->page.start_row + cur_row + t; + for (uint32_t cur_val_idx = 0; cur_val_idx < s->page.num_leaf_values;) { + uint32_t nvals = min(s->page.num_leaf_values - cur_val_idx, 128); + uint32_t val_idx = s->page_start_val + cur_val_idx + t; uint32_t is_valid, warp_valids, len, pos; if (s->page.page_type == PageType::DICTIONARY_PAGE) { - is_valid = (cur_row + t < s->page.num_rows); - row = (is_valid) ? s->col.dict_data[row] : row; + is_valid = (cur_val_idx + t < s->page.num_leaf_values); + val_idx = (is_valid) ? s->col.dict_data[val_idx] : val_idx; } else { const uint32_t *valid = s->col.valid_map_base; - is_valid = (row < s->col.num_rows && cur_row + t < s->page.num_rows) - ? (valid) ? (valid[row >> 5] >> (row & 0x1f)) & 1 : 1 + is_valid = (val_idx < s->col.num_values && cur_val_idx + t < s->page.num_leaf_values) + ? (valid) ? (valid[val_idx >> 5] >> (val_idx & 0x1f)) & 1 : 1 : 0; } warp_valids = BALLOT(is_valid); - cur_row += nrows; + cur_val_idx += nvals; if (dict_bits >= 0) { // Dictionary encoding if (dict_bits > 0) { @@ -949,18 +1103,18 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, if (is_valid) { uint32_t v; if (dtype == BOOLEAN) { - v = reinterpret_cast(s->col.column_data_base)[row]; + v = reinterpret_cast(s->col.column_data_base)[val_idx]; } else { - v = s->col.dict_index[row]; + v = s->col.dict_index[val_idx]; } s->vals[(rle_numvals + pos) & (RLE_BFRSZ - 1)] = v; } rle_numvals += s->scratch_red[3]; __syncthreads(); if ((!enable_bool_rle) && (dtype == BOOLEAN)) { - PlainBoolEncode(s, rle_numvals, (cur_row == s->page.num_rows), t); + PlainBoolEncode(s, rle_numvals, (cur_val_idx == s->page.num_leaf_values), t); } else { - RleEncode(s, rle_numvals, dict_bits, (cur_row == s->page.num_rows), t); + RleEncode(s, rle_numvals, dict_bits, (cur_val_idx == s->page.num_leaf_values), t); } __syncthreads(); } @@ -973,8 +1127,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, if (is_valid) { len = dtype_len_out; if (dtype == BYTE_ARRAY) { - len += - (uint32_t) reinterpret_cast(s->col.column_data_base)[row].count; + len += (uint32_t) reinterpret_cast(s->col.column_data_base)[val_idx] + .count; } } else { len = 0; @@ -987,8 +1141,8 @@ __global__ void __launch_bounds__(128, 8) gpuEncodePages(EncPage *pages, if (t == 0) { s->cur = dst + s->scratch_red[3]; } pos = pos + ((t >= 32) ? s->scratch_red[(t - 32) >> 5] : 0) - len; if (is_valid) { - const uint8_t *src8 = - reinterpret_cast(s->col.column_data_base) + row * (size_t)dtype_len_in; + const uint8_t *src8 = reinterpret_cast(s->col.column_data_base) + + val_idx * (size_t)dtype_len_in; switch (dtype) { case INT32: case FLOAT: { @@ -1327,8 +1481,8 @@ __global__ void __launch_bounds__(128) gpuEncodePageHeaders(EncPage *pages, header_encoder encoder(hdr_start); PageType page_type = page_g.page_type; // NOTE: For dictionary encoding, parquet v2 recommends using PLAIN in dictionary page and - // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and data - // pages (actual encoding is identical). + // RLE_DICTIONARY in data page, but parquet v1 uses PLAIN_DICTIONARY in both dictionary and + // data pages (actual encoding is identical). Encoding encoding; if (enable_bool_rle) { encoding = (col_g.physical_type != BOOLEAN) @@ -1347,10 +1501,10 @@ __global__ void __launch_bounds__(128) gpuEncodePageHeaders(EncPage *pages, if (page_type == PageType::DATA_PAGE) { // DataPageHeader encoder.field_struct_begin(5); - encoder.field_int32(1, page_g.num_rows); // NOTE: num_values != num_rows for list types - encoder.field_int32(2, encoding); // encoding - encoder.field_int32(3, Encoding::RLE); // definition_level_encoding - encoder.field_int32(4, Encoding::RLE); // repetition_level_encoding + encoder.field_int32(1, page_g.num_values); // NOTE: num_values != num_rows for list types + encoder.field_int32(2, encoding); // encoding + encoder.field_int32(3, Encoding::RLE); // definition_level_encoding + encoder.field_int32(4, Encoding::RLE); // repetition_level_encoding // Optionally encode page-level statistics if (page_stats) { encoder.field_struct_begin(5); @@ -1429,10 +1583,353 @@ __global__ void __launch_bounds__(1024) gpuGatherPages(EncColumnChunk *chunks, c } } +/** + * @brief Get the dremel offsets and repetition and definition levels for a LIST column + * + * The repetition and definition level values are ideally computed using a recursive call over a + * nested structure but in order to better utilize GPU resources, this function calculates them + * with a bottom up merge method. + * + * Given a LIST column of type `List>` like so: + * ``` + * col = { + * [], + * [[], [1, 2, 3], [4, 5]], + * [[]] + * } + * ``` + * We can represent it in cudf format with two level of offsets like this: + * ``` + * Level 0 offsets = {0, 0, 3, 5, 6} + * Level 1 offsets = {0, 0, 3, 5, 5} + * Values = {1, 2, 3, 4, 5} + * ``` + * The desired result of this function is the repetition and definition level values that + * correspond to the data values: + * ``` + * col = {[], [[], [1, 2, 3], [4, 5]], [[]]} + * def = { 0 1, 2, 2, 2, 2, 2, 1 } + * rep = { 0, 0, 0, 2, 2, 1, 2, 0 } + * ``` + * + * Since repetition and definition levels arrays contain a value for each empty list, the size of + * the rep/def level array can be given by + * ``` + * rep_level.size() = size of leaf column + number of empty lists in level 0 + * + number of empty lists in level 1 ... + * ``` + * + * We start with finding the empty lists in the penultimate level and merging it with the indices + * of the leaf level. The values for the merge are the definition and repetition levels + * ``` + * empties at level 1 = {0, 5} + * def values at 1 = {1, 1} + * rep values at 1 = {1, 1} + * indices at leaf = {0, 1, 2, 3, 4} + * def values at leaf = {2, 2, 2, 2, 2} + * rep values at leaf = {2, 2, 2, 2, 2} + * ``` + * + * merged def values = {1, 2, 2, 2, 2, 2, 1} + * merged rep values = {1, 2, 2, 2, 2, 2, 1} + * + * The size of the rep/def values is now larger than the leaf values and the offsets need to be + * adjusted in order to point to the correct start indices. We do this with an exclusive scan over + * the indices of offsets of empty lists and adding to existing offsets. + * ``` + * Level 1 new offsets = {0, 1, 4, 6, 7} + * ``` + * Repetition values at the beginning of a list need to be decremented. We use the new offsets to + * scatter the rep value. + * ``` + * merged rep values = {1, 2, 2, 2, 2, 2, 1} + * scatter (1, new offsets) + * new offsets = {0, 1, 4, 6, 7} + * new rep values = {1, 1, 2, 2, 1, 2, 1} + * ``` + * + * Similarly we merge up all the way till level 0 offsets + */ +dremel_data get_dremel_data(column_view h_col, cudaStream_t stream) +{ + CUDF_EXPECTS(h_col.type().id() == type_id::LIST, + "Can only get rep/def levels for LIST type column"); + + auto get_empties = [&](column_view col, size_type start, size_type end) { + auto lcv = lists_column_view(col); + rmm::device_uvector empties_idx(lcv.size(), stream); + rmm::device_uvector empties(lcv.size(), stream); + auto d_off = lcv.offsets().data(); + + auto empties_idx_end = + thrust::copy_if(rmm::exec_policy(stream)->on(stream), + thrust::make_counting_iterator(start), + thrust::make_counting_iterator(end), + empties_idx.begin(), + [d_off] __device__(auto i) { return d_off[i] == d_off[i + 1]; }); + auto empties_end = thrust::gather(rmm::exec_policy(stream)->on(stream), + empties_idx.begin(), + empties_idx_end, + lcv.offsets().begin(), + empties.begin()); + + auto empties_size = empties_end - empties.begin(); + return std::make_tuple(std::move(empties), std::move(empties_idx), empties_size); + }; + + // Reverse the nesting in order to merge the deepest level with the leaf first and merge bottom + // up + auto curr_col = h_col; + size_t max_vals_size = 0; + std::vector nesting_levels; + std::vector def_at_level; + while (curr_col.type().id() == type_id::LIST) { + nesting_levels.push_back(curr_col); + def_at_level.push_back(curr_col.nullable() ? 2 : 1); + auto lcv = lists_column_view(curr_col); + max_vals_size += lcv.offsets().size(); + curr_col = lcv.child(); + } + // One more entry for leaf col + def_at_level.push_back(curr_col.nullable() ? 2 : 1); + max_vals_size += curr_col.size(); + + thrust::exclusive_scan( + thrust::host, def_at_level.begin(), def_at_level.end(), def_at_level.begin()); + + // Sliced list column views only have offsets applied to top level. Get offsets for each level. + hostdevice_vector column_offsets(nesting_levels.size() + 1, stream); + hostdevice_vector column_ends(nesting_levels.size() + 1, stream); + + auto d_col = column_device_view::create(h_col, stream); + cudf::detail::device_single_thread( + [offset_at_level = column_offsets.device_ptr(), + end_idx_at_level = column_ends.device_ptr(), + col = *d_col] __device__() { + auto curr_col = col; + size_type off = curr_col.offset(); + size_type end = off + curr_col.size(); + size_type level = 0; + offset_at_level[level] = off; + end_idx_at_level[level] = end; + ++level; + // Apply offset recursively until we get to leaf data + while (curr_col.type().id() == type_id::LIST) { + off = curr_col.child(lists_column_view::offsets_column_index).element(off); + end = curr_col.child(lists_column_view::offsets_column_index).element(end); + offset_at_level[level] = off; + end_idx_at_level[level] = end; + ++level; + curr_col = curr_col.child(lists_column_view::child_column_index); + } + }, + stream); + + column_offsets.device_to_host(stream, true); + column_ends.device_to_host(stream, true); + + rmm::device_uvector rep_level(max_vals_size, stream); + rmm::device_uvector def_level(max_vals_size, stream); + + rmm::device_uvector temp_rep_vals(max_vals_size, stream); + rmm::device_uvector temp_def_vals(max_vals_size, stream); + rmm::device_uvector new_offsets(0, stream); + size_type curr_rep_values_size = 0; + { + // At this point, curr_col contains the leaf column. Max nesting level is + // nesting_levels.size(). + size_t level = nesting_levels.size() - 1; + curr_col = nesting_levels[level]; + auto lcv = lists_column_view(curr_col); + auto offset_size_at_level = column_ends[level] - column_offsets[level] + 1; + + // Get empties at this level + rmm::device_uvector empties(0, stream); + rmm::device_uvector empties_idx(0, stream); + size_t empties_size; + std::tie(empties, empties_idx, empties_size) = + get_empties(nesting_levels[level], column_offsets[level], column_ends[level]); + + // Merge empty at deepest parent level with the rep, def level vals at leaf level + + auto input_parent_rep_it = thrust::make_constant_iterator(level); + auto input_parent_def_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + [idx = empties_idx.data(), + mask = lcv.null_mask(), + curr_def_level = def_at_level[level]] __device__(auto i) { + return curr_def_level + ((mask && bit_is_set(mask, idx[i])) ? 1 : 0); + }); + + auto input_child_rep_it = thrust::make_constant_iterator(nesting_levels.size()); + auto input_child_def_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(column_offsets[level + 1]), + [mask = lcv.child().null_mask(), curr_def_level = def_at_level[level + 1]] __device__( + auto i) { return curr_def_level + ((mask && bit_is_set(mask, i)) ? 1 : 0); }); + + // Zip the input and output value iterators so that merge operation is done only once + auto input_parent_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(input_parent_rep_it, input_parent_def_it)); + + auto input_child_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(input_child_rep_it, input_child_def_it)); + + auto output_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(rep_level.begin(), def_level.begin())); + + auto ends = thrust::merge_by_key(rmm::exec_policy(stream)->on(stream), + empties.begin(), + empties.begin() + empties_size, + thrust::make_counting_iterator(column_offsets[level + 1]), + thrust::make_counting_iterator(column_ends[level + 1]), + input_parent_zip_it, + input_child_zip_it, + thrust::make_discard_iterator(), + output_zip_it); + + curr_rep_values_size = ends.second - output_zip_it; + + // Scan to get distance by which each offset value is shifted due to the insertion of empties + auto scan_it = + thrust::make_transform_iterator(thrust::make_counting_iterator(column_offsets[level]), + [off = lcv.offsets().data()] __device__( + auto i) -> int { return off[i] == off[i + 1]; }); + rmm::device_uvector scan_out(offset_size_at_level, stream); + thrust::exclusive_scan(rmm::exec_policy(stream)->on(stream), + scan_it, + scan_it + offset_size_at_level, + scan_out.begin()); + + // Add scan output to existing offsets to get new offsets into merged rep level values + new_offsets = rmm::device_uvector(offset_size_at_level, stream); + thrust::for_each_n(rmm::exec_policy(stream)->on(stream), + thrust::make_counting_iterator(0), + offset_size_at_level, + [off = lcv.offsets().data() + column_offsets[level], + scan_out = scan_out.data(), + new_off = new_offsets.data()] __device__(auto i) { + new_off[i] = off[i] - off[0] + scan_out[i]; + }); + + // Set rep level values at level starts to appropriate rep level + auto scatter_it = thrust::make_constant_iterator(level); + thrust::scatter(rmm::exec_policy(stream)->on(stream), + scatter_it, + scatter_it + new_offsets.size() - 1, + new_offsets.begin(), + rep_level.begin()); + } + + for (int level = nesting_levels.size() - 2; level >= 0; level--) { + curr_col = nesting_levels[level]; + auto lcv = lists_column_view(curr_col); + auto offset_size_at_level = column_ends[level] - column_offsets[level] + 1; + + // Get empties at this level + rmm::device_uvector empties(0, stream); + rmm::device_uvector empties_idx(0, stream); + size_t empties_size; + std::tie(empties, empties_idx, empties_size) = + get_empties(nesting_levels[level], column_offsets[level], column_ends[level]); + + auto offset_transformer = [new_child_offsets = new_offsets.data(), + child_start = column_offsets[level + 1]] __device__(auto x) { + return new_child_offsets[x - child_start]; // (x - child's offset) + }; + + // We will be reading from old rep_levels and writing again to rep_levels. Swap the current + // rep values into temp_rep_vals so it can become the input and rep_levels can again be output. + std::swap(temp_rep_vals, rep_level); + std::swap(temp_def_vals, def_level); + + // Merge empty at parent level with the rep, def level vals at current level + auto transformed_empties = thrust::make_transform_iterator(empties.begin(), offset_transformer); + + auto input_parent_rep_it = thrust::make_constant_iterator(level); + auto input_parent_def_it = thrust::make_transform_iterator( + thrust::make_counting_iterator(0), + [idx = empties_idx.data(), + mask = lcv.null_mask(), + curr_def_level = def_at_level[level]] __device__(auto i) { + return curr_def_level + ((mask && bit_is_set(mask, idx[i])) ? 1 : 0); + }); + + // Zip the input and output value iterators so that merge operation is done only once + auto input_parent_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(input_parent_rep_it, input_parent_def_it)); + + auto input_child_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(temp_rep_vals.begin(), temp_def_vals.begin())); + + auto output_zip_it = + thrust::make_zip_iterator(thrust::make_tuple(rep_level.begin(), def_level.begin())); + + auto ends = thrust::merge_by_key(rmm::exec_policy(stream)->on(stream), + transformed_empties, + transformed_empties + empties_size, + thrust::make_counting_iterator(0), + thrust::make_counting_iterator(curr_rep_values_size), + input_parent_zip_it, + input_child_zip_it, + thrust::make_discard_iterator(), + output_zip_it); + + curr_rep_values_size = ends.second - output_zip_it; + + // Scan to get distance by which each offset value is shifted due to the insertion of dremel + // level value fof an empty list + auto scan_it = + thrust::make_transform_iterator(thrust::make_counting_iterator(column_offsets[level]), + [off = lcv.offsets().data()] __device__( + auto i) -> int { return off[i] == off[i + 1]; }); + rmm::device_uvector scan_out(offset_size_at_level, stream); + thrust::exclusive_scan(rmm::exec_policy(stream)->on(stream), + scan_it, + scan_it + offset_size_at_level, + scan_out.begin()); + + // Add scan output to existing offsets to get new offsets into merged rep level values + rmm::device_uvector temp_new_offsets(offset_size_at_level, stream); + thrust::for_each_n(rmm::exec_policy(stream)->on(stream), + thrust::make_counting_iterator(0), + offset_size_at_level, + [off = lcv.offsets().data() + column_offsets[level], + scan_out = scan_out.data(), + new_off = temp_new_offsets.data(), + offset_transformer] __device__(auto i) { + new_off[i] = offset_transformer(off[i]) + scan_out[i]; + }); + new_offsets = std::move(temp_new_offsets); + + // Set rep level values at level starts to appropriate rep level + auto scatter_it = thrust::make_constant_iterator(level); + thrust::scatter(rmm::exec_policy(stream)->on(stream), + scatter_it, + scatter_it + new_offsets.size() - 1, + new_offsets.begin(), + rep_level.begin()); + } + + size_t level_vals_size = new_offsets.back_element(stream); + rep_level.resize(level_vals_size, stream); + def_level.resize(level_vals_size, stream); + + CUDA_TRY(cudaStreamSynchronize(stream)); + + size_type leaf_col_offset = column_offsets[column_offsets.size() - 1]; + size_type leaf_data_size = column_ends[column_ends.size() - 1] - leaf_col_offset; + + return dremel_data{std::move(new_offsets), + std::move(rep_level), + std::move(def_level), + leaf_col_offset, + leaf_data_size}; +} + /** * @brief Launches kernel for initializing encoder page fragments * - * @param[in] frag Fragment array [column_id][fragment_id] + * @param[in,out] frag Fragment array [column_id][fragment_id] * @param[in] col_desc Column description array [column_id] * @param[in] num_fragments Number of fragments per column * @param[in] num_columns Number of columns @@ -1489,8 +1986,8 @@ cudaError_t InitFragmentStatistics(statistics_group *groups, * @param[in] col_desc Column description array [column_id] * @param[in] num_rowgroups Number of fragments per column * @param[in] num_columns Number of columns - * @param[in] page_grstats Setup for page-level stats - * @param[in] chunk_grstats Setup for chunk-level stats + * @param[out] page_grstats Setup for page-level stats + * @param[out] chunk_grstats Setup for chunk-level stats * @param[in] stream CUDA stream to use, default 0 * * @return cudaSuccess if successful, a CUDA error code otherwise @@ -1531,6 +2028,8 @@ cudaError_t EncodePages(EncPage *pages, gpu_inflate_status_s *comp_out, cudaStream_t stream) { + // A page is part of one column. This is launching 1 block per page. 1 block will exclusively + // deal with one datatype. gpuEncodePages<<>>(pages, chunks, comp_in, comp_out, start_page); return cudaSuccess; } diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 2247926b316..044d4366481 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -23,8 +23,16 @@ #include #include #include + +#include +#include + +#include #include +#include +#include + namespace cudf { namespace io { namespace parquet { @@ -210,9 +218,18 @@ struct EncColumnDesc : stats_column_desc { uint32_t *dict_data; //!< Dictionary data (unique row indices) uint8_t physical_type; //!< physical data type uint8_t converted_type; //!< logical data type + // TODO (dm): Evaluate if this is sufficient. At 4 bits, this allows a maximum 16 level nesting uint8_t level_bits; //!< bits to encode max definition (lower nibble) & repetition (upper nibble) //!< levels - uint8_t pad; + size_type const *const + *nesting_offsets; //!< If column is a nested type, contains offset array of each nesting level + size_type nesting_levels; //!< Number of nesting levels in column. 0 means no nesting. + size_type num_values; //!< Number of data values in column. Different from num_rows in case of + //!< nested columns + + size_type const *level_offsets; //!< Offset array for per-row pre-calculated rep/def level values + uint8_t const *rep_values; //!< Pre-calculated repetition level values + uint8_t const *def_values; //!< Pre-calculated definition level values }; #define MAX_PAGE_FRAGMENT_SIZE 5000 //!< Max number of rows in a page fragment @@ -223,10 +240,12 @@ struct EncColumnDesc : stats_column_desc { struct PageFragment { uint32_t fragment_data_size; //!< Size of fragment data in bytes uint32_t dict_data_size; //!< Size of dictionary for this fragment - uint16_t num_rows; //!< Number of rows in fragment - uint16_t non_nulls; //!< Number of non-null values - uint16_t num_dict_vals; //!< Number of unique dictionary entries - uint16_t pad; + uint32_t num_values; //!< Number of values in fragment. Different from num_rows for nested type + uint32_t num_leaf_values; //!< Number of leaf values in fragment. Does not include nulls at + //!< non-leaf level + uint32_t non_nulls; //!< Number of non-null values + uint16_t num_rows; //!< Number of rows in fragment + uint16_t num_dict_vals; //!< Number of unique dictionary entries }; /** @@ -244,6 +263,9 @@ struct EncPage { uint32_t max_data_size; //!< Maximum size of coded page data (excluding header) uint32_t start_row; //!< First row of page uint32_t num_rows; //!< Rows in page + uint32_t num_leaf_values; //!< Values in page. Different from num_rows in case of nested types + uint32_t num_values; //!< Number of def/rep level values in page. Includes null/empty elements in + //!< non-leaf levels }; /// Size of hash used for building dictionaries @@ -286,16 +308,17 @@ struct EncColumnChunk { uint32_t compressed_size; //!< Compressed buffer size uint32_t start_row; //!< First row of chunk uint32_t num_rows; //!< Number of rows in chunk - uint32_t first_fragment; //!< First fragment of chunk - uint32_t first_page; //!< First page of chunk - uint32_t num_pages; //!< Number of pages in chunk - uint32_t dictionary_id; //!< Dictionary id for this chunk - uint8_t is_compressed; //!< Nonzero if the chunk uses compression - uint8_t has_dictionary; //!< Nonzero if the chunk uses dictionary encoding - uint16_t num_dict_fragments; //!< Number of fragments using dictionary - uint32_t dictionary_size; //!< Size of dictionary - uint32_t total_dict_entries; //!< Total number of entries in dictionary - uint32_t ck_stat_size; //!< Size of chunk-level statistics (included in 1st page header) + uint32_t num_values; //!< Number of values in chunk. Different from num_rows for nested types + uint32_t first_fragment; //!< First fragment of chunk + uint32_t first_page; //!< First page of chunk + uint32_t num_pages; //!< Number of pages in chunk + uint32_t dictionary_id; //!< Dictionary id for this chunk + uint8_t is_compressed; //!< Nonzero if the chunk uses compression + uint8_t has_dictionary; //!< Nonzero if the chunk uses dictionary encoding + uint16_t num_dict_fragments; //!< Number of fragments using dictionary + uint32_t dictionary_size; //!< Size of dictionary + uint32_t total_dict_entries; //!< Total number of entries in dictionary + uint32_t ck_stat_size; //!< Size of chunk-level statistics (included in 1st page header) }; /** @@ -375,6 +398,39 @@ cudaError_t DecodePageData(hostdevice_vector &pages, size_t min_row, cudaStream_t stream = (cudaStream_t)0); +/** + * @brief Dremel data that describes one nested type column + * + * @see get_dremel_data() + */ +struct dremel_data { + rmm::device_uvector dremel_offsets; + rmm::device_uvector rep_level; + rmm::device_uvector def_level; + + size_type leaf_col_offset; + size_type leaf_data_size; +}; + +/** + * @brief Get the dremel offsets and repetition and definition levels for a LIST column + * + * Dremel offsets are the per row offsets into the repetition and definition level arrays for a + * column. + * Example: + * ``` + * col = {{1, 2, 3}, { }, {5, 6}} + * dremel_offsets = { 0, 3, 4, 6} + * rep_level = { 0, 1, 1, 0, 0, 1} + * def_level = { 1, 1, 1, 0, 1, 1} + * ``` + * @param col Column of LIST type + * @param stream CUDA stream used for device memory operations and kernel launches. + * + * @return A struct containing dremel data + */ +dremel_data get_dremel_data(column_view h_col, cudaStream_t stream = (cudaStream_t)0); + /** * @brief Launches kernel for initializing encoder page fragments * diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index f445e1c1e80..3ce377a8a3d 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -21,15 +21,20 @@ #include "writer_impl.hpp" +#include +#include #include #include #include #include +#include #include #include #include +#include +#include namespace cudf { namespace io { @@ -60,6 +65,18 @@ parquet::Compression to_parquet_compression(compression_type compression) } } +/** + * @brief Get the leaf column + * + * Returns the dtype of the leaf column when `col` is a list column. + */ +column_view get_leaf_col(column_view col) +{ + column_view curr_col = col; + while (curr_col.type().id() == type_id::LIST) { curr_col = lists_column_view{curr_col}.child(); } + return curr_col; +} + } // namespace /** @@ -105,17 +122,23 @@ class parquet_column_view { column_view const &col, const table_metadata *metadata, cudaStream_t stream) - : _id(id), - _string_type(col.type().id() == type_id::STRING), - _type_width(_string_type ? 0 : cudf::size_of(col.type())), + : _col(col), + _leaf_col(get_leaf_col(col)), + _id(id), + _string_type(_leaf_col.type().id() == type_id::STRING), + _list_type(col.type().id() == type_id::LIST), + _type_width((_string_type || _list_type) ? 0 : cudf::size_of(col.type())), + _row_count(col.size()), + _null_count(_leaf_col.null_count()), + _data(col.head() + col.offset() * _type_width), + _nulls(_leaf_col.nullable() ? _leaf_col.null_mask() : nullptr), _converted_type(ConvertedType::UNKNOWN), _ts_scale(0), - _data_count(col.size()), - _null_count(col.null_count()), - _data(col.head() + col.offset() * _type_width), - _nulls(col.nullable() ? col.null_mask() : nullptr) + _dremel_offsets(0, stream), + _rep_level(0, stream), + _def_level(0, stream) { - switch (col.type().id()) { + switch (_leaf_col.type().id()) { case cudf::type_id::INT8: _physical_type = Type::INT32; _converted_type = ConvertedType::INT_8; @@ -232,39 +255,104 @@ class parquet_column_view { _stats_dtype = dtype_none; break; } + size_type leaf_col_offset = col.offset(); + _data_count = col.size(); + if (_list_type) { + // Top level column's offsets are not applied to all children. Get the effective offset and + // size of the leaf column + // Calculate row offset into dremel data (repetition/definition values) and the respective + // definition and repetition levels + gpu::dremel_data dremel = gpu::get_dremel_data(col, stream); + _dremel_offsets = std::move(dremel.dremel_offsets); + _rep_level = std::move(dremel.rep_level); + _def_level = std::move(dremel.def_level); + leaf_col_offset = dremel.leaf_col_offset; + _data_count = dremel.leaf_data_size; + + _type_width = (is_fixed_width(_leaf_col.type())) ? cudf::size_of(_leaf_col.type()) : 0; + _data = (is_fixed_width(_leaf_col.type())) + ? _leaf_col.head() + leaf_col_offset * _type_width + : nullptr; + + // Bring offset array to device + column_view curr_col = col; + std::vector offsets_array; + while (curr_col.type().id() == type_id::LIST) { + lists_column_view list_col(curr_col); + offsets_array.push_back(list_col.offsets().data()); + curr_col = list_col.child(); + } + _offsets_array = offsets_array; + + CUDA_TRY(cudaStreamSynchronize(stream)); + } if (_string_type && _data_count > 0) { - strings_column_view view{col}; + strings_column_view view{_leaf_col}; _indexes = rmm::device_buffer(_data_count * sizeof(gpu::nvstrdesc_s), stream); stringdata_to_nvstrdesc<<<((_data_count - 1) >> 8) + 1, 256, 0, stream>>>( reinterpret_cast(_indexes.data()), - view.offsets().data() + view.offset(), + view.offsets().data() + leaf_col_offset, view.chars().data(), _nulls, _data_count); _data = _indexes.data(); CUDA_TRY(cudaStreamSynchronize(stream)); } + // Generating default name if name isn't present in metadata if (metadata && _id < metadata->column_names.size()) { _name = metadata->column_names[_id]; } else { _name = "_col" + std::to_string(_id); } + _path_in_schema.push_back(_name); } auto is_string() const noexcept { return _string_type; } + auto is_list() const noexcept { return _list_type; } size_t type_width() const noexcept { return _type_width; } + size_t row_count() const noexcept { return _row_count; } size_t data_count() const noexcept { return _data_count; } size_t null_count() const noexcept { return _null_count; } bool nullable() const noexcept { return (_nulls != nullptr); } void const *data() const noexcept { return _data; } uint32_t const *nulls() const noexcept { return _nulls; } + // List related data + column_view cudf_col() const noexcept { return _col; } + column_view leaf_col() const noexcept { return _leaf_col; } + size_type const *const *nesting_offsets() const noexcept { return _offsets_array.data().get(); } + size_type nesting_levels() const noexcept { return _offsets_array.size(); } + size_type const *level_offsets() const noexcept { return _dremel_offsets.data(); } + uint8_t const *repetition_levels() const noexcept { return _rep_level.data(); } + uint8_t const *definition_levels() const noexcept { return _def_level.data(); } + uint16_t max_def_level() + { + if (_max_def_level > -1) return _max_def_level; + + uint16_t num_def_level = 0; + auto curr_col = cudf_col(); + while (curr_col.type().id() == type_id::LIST) { + // There is one definition level for each level of nesting and one for each level's + // nullability. If the level is nullable, then it needs 2 definition levels to describe it. + num_def_level += curr_col.nullable() ? 2 : 1; + lists_column_view lcw(curr_col); + curr_col = lcw.child(); + } + // This is the leaf level. There is no further nesting. + if (curr_col.nullable()) ++num_def_level; + _max_def_level = num_def_level; + return _max_def_level; + } + void set_def_level(uint16_t def_level) { _max_def_level = def_level; } + auto name() const noexcept { return _name; } auto physical_type() const noexcept { return _physical_type; } auto converted_type() const noexcept { return _converted_type; } auto stats_type() const noexcept { return _stats_dtype; } int32_t ts_scale() const noexcept { return _ts_scale; } + void set_path_in_schema(std::vector path) { _path_in_schema = std::move(path); } + auto get_path_in_schema() const noexcept { return _path_in_schema; } // Dictionary management uint32_t *get_dict_data() { return (_dict_data.size()) ? _dict_data.data().get() : nullptr; } @@ -287,11 +375,17 @@ class parquet_column_view { } private: + // cudf data column + column_view _col; + column_view _leaf_col; + // Identifier within set of columns size_t _id = 0; bool _string_type = false; + bool _list_type = false; size_t _type_width = 0; + size_t _row_count = 0; size_t _data_count = 0; size_t _null_count = 0; void const *_data = nullptr; @@ -303,12 +397,24 @@ class parquet_column_view { ConvertedType _converted_type; statistics_dtype _stats_dtype; int32_t _ts_scale; + std::vector _path_in_schema; // Dictionary-related members bool _dictionary_used = false; rmm::device_vector _dict_data; rmm::device_vector _dict_index; + // List-related members + // TODO (dm): convert to uvector + rmm::device_vector _offsets_array; ///< Array of pointers to offset columns at + ///< each level of nesting O(nesting depth) + rmm::device_uvector + _dremel_offsets; ///< For each row, the absolute offset into the repetition and definition + ///< level vectors. O(num rows) + rmm::device_uvector _rep_level; + rmm::device_uvector _def_level; + size_type _max_def_level = -1; + // String-related members rmm::device_buffer _indexes; }; @@ -536,14 +642,23 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) // first call. setup metadata. num_rows will get incremented as write_chunk is // called multiple times. + // Calculate the sum of depths of all list columns + size_type const list_col_depths = std::accumulate( + parquet_columns.cbegin(), parquet_columns.cend(), 0, [](size_type sum, auto const &col) { + return sum + col.nesting_levels(); + }); + if (state.md.version == 0) { state.md.version = 1; state.md.num_rows = num_rows; - state.md.schema.resize(1 + num_columns); - state.md.schema[0].type = UNDEFINED_TYPE; - state.md.schema[0].repetition_type = NO_REPETITION_TYPE; - state.md.schema[0].name = "schema"; - state.md.schema[0].num_children = num_columns; + // Each level of nesting requires two levels of Schema. The leaf level needs one schema element + state.md.schema.reserve(1 + num_columns + list_col_depths * 2); + SchemaElement root{}; + root.type = UNDEFINED_TYPE; + root.repetition_type = NO_REPETITION_TYPE; + root.name = "schema"; + root.num_children = num_columns; + state.md.schema.push_back(std::move(root)); state.md.column_order_listsize = (stats_granularity_ != statistics_freq::STATISTICS_NONE) ? num_columns : 0; if (state.user_metadata != nullptr) { @@ -555,35 +670,86 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) } for (auto i = 0; i < num_columns; i++) { auto &col = parquet_columns[i]; - // Column metadata - state.md.schema[1 + i].type = col.physical_type(); - state.md.schema[1 + i].converted_type = col.converted_type(); - // because the repetition type is global (in the sense of, not per-rowgroup or per - // write_chunk() call) we cannot know up front if the user is going to end up passing tables - // with nulls/no nulls in the multiple write_chunk() case. so we'll do some special - // handling. - // - // if the user is explicitly saying "I am only calling this once", fall back to the original - // behavior and assume the columns in this one table tell us everything we need to know. - if (state.single_write_mode) { - state.md.schema[1 + i].repetition_type = - (col.nullable() || col.data_count() < (size_t)num_rows) ? OPTIONAL : REQUIRED; - } - // otherwise, if the user is explicitly telling us global information about all the tables - // that will ever get passed in - else if (state.user_metadata_with_nullability.column_nullable.size() > 0) { - state.md.schema[1 + i].repetition_type = - state.user_metadata_with_nullability.column_nullable[i] ? OPTIONAL : REQUIRED; - } - // otherwise assume the worst case. - else { - state.md.schema[1 + i].repetition_type = OPTIONAL; + if (col.is_list()) { + CUDF_EXPECTS(state.single_write_mode, "Chunked write for lists is not supported"); + size_type nesting_depth = col.nesting_levels(); + // Each level of nesting requires two levels of Schema. The leaf level needs one schema + // element + std::vector list_schema(nesting_depth * 2 + 1); + column_view cudf_col = col.cudf_col(); + for (size_type j = 0; j < nesting_depth; j++) { + // List schema is denoted by two levels for each nesting level and one final level for + // leaf. The top level is the same name as the column name. + // So e.g. List> is denoted in the schema by + // "col_name" : { "list" : { "element" : { "list" : { "element" } } } } + auto const group_idx = 2 * j; + auto const list_idx = 2 * j + 1; + + list_schema[group_idx].name = (j == 0) ? col.name() : "element"; + list_schema[group_idx].repetition_type = (cudf_col.nullable()) ? OPTIONAL : REQUIRED; + list_schema[group_idx].converted_type = ConvertedType::LIST; + list_schema[group_idx].num_children = 1; + + list_schema[list_idx].name = "list"; + list_schema[list_idx].repetition_type = REPEATED; + list_schema[list_idx].num_children = 1; + + // Move on to next child + lists_column_view lcw(cudf_col); + cudf_col = lcw.child(); + } + list_schema[nesting_depth * 2].name = "element"; + list_schema[nesting_depth * 2].repetition_type = + col.leaf_col().nullable() ? OPTIONAL : REQUIRED; + list_schema[nesting_depth * 2].type = col.physical_type(); + list_schema[nesting_depth * 2].converted_type = col.converted_type(); + list_schema[nesting_depth * 2].num_children = 0; + + std::vector path_in_schema; + std::transform( + list_schema.cbegin(), list_schema.cend(), std::back_inserter(path_in_schema), [](auto s) { + return s.name; + }); + col.set_path_in_schema(path_in_schema); + state.md.schema.insert(state.md.schema.end(), list_schema.begin(), list_schema.end()); + } else { + SchemaElement col_schema{}; + // Column metadata + col_schema.type = col.physical_type(); + col_schema.converted_type = col.converted_type(); + // because the repetition type is global (in the sense of, not per-rowgroup or per + // write_chunk() call) we cannot know up front if the user is going to end up passing tables + // with nulls/no nulls in the multiple write_chunk() case. so we'll do some special + // handling. + // + // if the user is explicitly saying "I am only calling this once", fall back to the original + // behavior and assume the columns in this one table tell us everything we need to know. + if (state.single_write_mode) { + col_schema.repetition_type = + (col.nullable() || col.data_count() < (size_t)num_rows) ? OPTIONAL : REQUIRED; + } + // otherwise, if the user is explicitly telling us global information about all the tables + // that will ever get passed in + else if (state.user_metadata_with_nullability.column_nullable.size() > 0) { + col_schema.repetition_type = + state.user_metadata_with_nullability.column_nullable[i] ? OPTIONAL : REQUIRED; + col.set_def_level((col_schema.repetition_type == OPTIONAL) ? 1 : 0); + } + // otherwise assume the worst case. + else { + col_schema.repetition_type = OPTIONAL; + col.set_def_level(1); // def level for OPTIONAL is 1, for REQUIRED is 0 + } + col_schema.name = col.name(); + col_schema.num_children = 0; // Leaf node + + state.md.schema.push_back(std::move(col_schema)); } - state.md.schema[1 + i].name = col.name(); - state.md.schema[1 + i].num_children = 0; // Leaf node } } else { // verify the user isn't passing mismatched tables + // TODO (dm): Now needs to compare children of columns in case of list when we support chunked + // write for it CUDF_EXPECTS(state.md.schema[0].num_children == num_columns, "Mismatch in table structure between multiple calls to write_chunk"); for (auto i = 0; i < num_columns; i++) { @@ -605,22 +771,39 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) auto &col = parquet_columns[i]; // GPU column description auto *desc = &col_desc[i]; + *desc = gpu::EncColumnDesc{}; // Zero out all fields desc->column_data_base = col.data(); desc->valid_map_base = col.nulls(); desc->stats_dtype = col.stats_type(); desc->ts_scale = col.ts_scale(); - if (state.md.schema[1 + i].type != BOOLEAN && state.md.schema[1 + i].type != UNDEFINED_TYPE) { - col.alloc_dictionary(num_rows); + // TODO (dm): Enable dictionary for list after refactor + if (col.physical_type() != BOOLEAN && col.physical_type() != UNDEFINED_TYPE && !col.is_list()) { + col.alloc_dictionary(col.data_count()); desc->dict_index = col.get_dict_index(); desc->dict_data = col.get_dict_data(); + } + if (col.is_list()) { + desc->nesting_offsets = col.nesting_offsets(); + desc->nesting_levels = col.nesting_levels(); + desc->level_offsets = col.level_offsets(); + desc->rep_values = col.repetition_levels(); + desc->def_values = col.definition_levels(); + auto count_bits = [](uint16_t number) { + int16_t nbits = 0; + while (number > 0) { + nbits++; + number = number >> 1; + } + return nbits; + }; + desc->level_bits = count_bits(col.nesting_levels()) << 4 | count_bits(col.max_def_level()); } else { - desc->dict_data = nullptr; - desc->dict_index = nullptr; + desc->level_bits = (state.md.schema[1 + i].repetition_type == OPTIONAL) ? 1 : 0; } - desc->num_rows = col.data_count(); - desc->physical_type = static_cast(state.md.schema[1 + i].type); - desc->converted_type = static_cast(state.md.schema[1 + i].converted_type); - desc->level_bits = (state.md.schema[1 + i].repetition_type == OPTIONAL) ? 1 : 0; + desc->num_values = col.data_count(); + desc->num_rows = col.row_count(); + desc->physical_type = static_cast(col.physical_type()); + desc->converted_type = static_cast(col.converted_type()); } // Init page fragments @@ -629,7 +812,10 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) // iteratively reduce this value if the largest fragment exceeds the max page size limit (we // ideally want the page size to be below 1MB so as to have enough pages to get good // compression/decompression performance). - uint32_t fragment_size = 5000; + constexpr uint32_t fragment_size = 5000; + static_assert(fragment_size <= MAX_PAGE_FRAGMENT_SIZE, + "fragment size cannot be greater than MAX_PAGE_FRAGMENT_SIZE"); + uint32_t num_fragments = (uint32_t)((num_rows + fragment_size - 1) / fragment_size); hostdevice_vector fragments(num_columns * num_fragments); if (fragments.size() != 0) { @@ -645,6 +831,7 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) for (uint32_t f = 0, global_r = global_rowgroup_base, rowgroup_start = 0; f < num_fragments; f++) { size_t fragment_data_size = 0; + // Replace with STL algorithm to transform and sum for (auto i = 0; i < num_columns; i++) { fragment_data_size += fragments[i * num_fragments + f].fragment_data_size; } @@ -680,7 +867,6 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) state.stream); } } - // Initialize row groups and column chunks uint32_t num_chunks = num_rowgroups * num_columns; hostdevice_vector chunks(num_chunks); @@ -706,11 +892,16 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) ck->start_row = start_row; ck->num_rows = (uint32_t)state.md.row_groups[global_r].num_rows; ck->first_fragment = i * num_fragments + f; - ck->first_page = 0; - ck->num_pages = 0; - ck->is_compressed = 0; - ck->dictionary_id = num_dictionaries; - ck->ck_stat_size = 0; + ck->num_values = + std::accumulate(fragments.host_ptr(i * num_fragments + f), + fragments.host_ptr(i * num_fragments + f) + fragments_in_chunk, + 0, + [](uint32_t l, auto r) { return l + r.num_values; }); + ck->first_page = 0; + ck->num_pages = 0; + ck->is_compressed = 0; + ck->dictionary_id = num_dictionaries; + ck->ck_stat_size = 0; if (col_desc[i].dict_data) { const gpu::PageFragment *ck_frag = &fragments[i * num_fragments + f]; size_t plain_size = 0; @@ -728,17 +919,16 @@ void writer::impl::write_chunk(table_view const &table, pq_chunked_state &state) num_dictionaries++; } } - ck->has_dictionary = dict_enable; - state.md.row_groups[global_r].columns[i].meta_data.type = state.md.schema[1 + i].type; + ck->has_dictionary = dict_enable; + state.md.row_groups[global_r].columns[i].meta_data.type = parquet_columns[i].physical_type(); state.md.row_groups[global_r].columns[i].meta_data.encodings = {PLAIN, RLE}; if (dict_enable) { state.md.row_groups[global_r].columns[i].meta_data.encodings.push_back(PLAIN_DICTIONARY); } - state.md.row_groups[global_r].columns[i].meta_data.path_in_schema = { - state.md.schema[1 + i].name}; - state.md.row_groups[global_r].columns[i].meta_data.codec = UNCOMPRESSED; - state.md.row_groups[global_r].columns[i].meta_data.num_values = - state.md.row_groups[global_r].num_rows; + state.md.row_groups[global_r].columns[i].meta_data.path_in_schema = + parquet_columns[i].get_path_in_schema(); + state.md.row_groups[global_r].columns[i].meta_data.codec = UNCOMPRESSED; + state.md.row_groups[global_r].columns[i].meta_data.num_values = ck->num_values; } f += fragments_in_chunk; start_row += (uint32_t)state.md.row_groups[global_r].num_rows; diff --git a/cpp/src/io/statistics/column_stats.h b/cpp/src/io/statistics/column_stats.h index e9f056c2fa2..fc295acac35 100644 --- a/cpp/src/io/statistics/column_stats.h +++ b/cpp/src/io/statistics/column_stats.h @@ -18,6 +18,7 @@ namespace cudf { namespace io { + enum statistics_dtype { dtype_none, dtype_bool, diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index 31783f9fc5b..39681b25fcd 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -163,15 +164,6 @@ inline auto random_values(size_t size) return values; } -// Helper function to compare two tables -void CUDF_TEST_EXPECT_TABLES_EQUAL(cudf::table_view const& lhs, cudf::table_view const& rhs) -{ - EXPECT_EQ(lhs.num_columns(), rhs.num_columns()); - auto expected = lhs.begin(); - auto result = rhs.begin(); - while (result != rhs.end()) { CUDF_TEST_EXPECT_COLUMNS_EQUAL(*expected++, *result++); } -} - } // namespace TYPED_TEST(ParquetWriterNumericTypeTest, SingleColumn) @@ -451,19 +443,41 @@ TEST_F(ParquetWriterTest, SlicedTable) column_wrapper col1{strings.begin(), strings.end()}; column_wrapper col2{seq_col2.begin(), seq_col2.end(), validity}; + using lcw = cudf::test::lists_column_wrapper; + lcw col3{{9, 8}, {7, 6, 5}, {}, {4}, {3, 2, 1, 0}, {20, 21, 22, 23, 24}, {}, {66, 666}}; + + // [[[NULL,2,NULL,4]], [[NULL,6,NULL], [8,9]]] + // [NULL, [[13],[14,15,16]], NULL] + // [NULL, [], NULL, [[]]] + // NULL + // [[[NULL,2,NULL,4]], [[NULL,6,NULL], [8,9]]] + // [NULL, [[13],[14,15,16]], NULL] + // [[[]]] + // [NULL, [], NULL, [[]]] + auto valids = cudf::test::make_counting_transform_iterator(0, [](auto i) { return i % 2; }); + auto valids2 = cudf::test::make_counting_transform_iterator(0, [](auto i) { return i != 3; }); + lcw col4{{ + {{{{1, 2, 3, 4}, valids}}, {{{5, 6, 7}, valids}, {8, 9}}}, + {{{{10, 11}, {12}}, {{13}, {14, 15, 16}}, {{17, 18}}}, valids}, + {{lcw{lcw{}}, lcw{}, lcw{}, lcw{lcw{}}}, valids}, + lcw{lcw{lcw{}}}, + {{{{1, 2, 3, 4}, valids}}, {{{5, 6, 7}, valids}, {8, 9}}}, + {{{{10, 11}, {12}}, {{13}, {14, 15, 16}}, {{17, 18}}}, valids}, + lcw{lcw{lcw{}}}, + {{lcw{lcw{}}, lcw{}, lcw{}, lcw{lcw{}}}, valids}, + }, + valids2}; + cudf_io::table_metadata expected_metadata; expected_metadata.column_names.emplace_back("col_other"); expected_metadata.column_names.emplace_back("col_string"); expected_metadata.column_names.emplace_back("col_another"); + expected_metadata.column_names.emplace_back("col_list"); + expected_metadata.column_names.emplace_back("col_multi_level_list"); - std::vector> cols; - cols.push_back(col0.release()); - cols.push_back(col1.release()); - cols.push_back(col2.release()); - auto expected = std::make_unique(std::move(cols)); - EXPECT_EQ(3, expected->num_columns()); + auto expected = table_view({col0, col1, col2, col3, col4}); - auto expected_slice = cudf::slice(expected->view(), {2, static_cast(num_rows)}); + auto expected_slice = cudf::slice(expected, {2, static_cast(num_rows) - 1}); auto filepath = temp_env->get_temp_filepath("SlicedTable.parquet"); cudf_io::parquet_writer_options out_opts = @@ -479,6 +493,104 @@ TEST_F(ParquetWriterTest, SlicedTable) EXPECT_EQ(expected_metadata.column_names, result.metadata.column_names); } +TEST_F(ParquetWriterTest, ListColumn) +{ + auto valids = cudf::test::make_counting_transform_iterator(0, [](auto i) { return i % 2; }); + auto valids2 = cudf::test::make_counting_transform_iterator(0, [](auto i) { return i != 3; }); + + using lcw = cudf::test::lists_column_wrapper; + + // [NULL, 2, NULL] + // [] + // [4, 5] + // NULL + lcw col0{{{{1, 2, 3}, valids}, {}, {4, 5}, {}}, valids2}; + + // [[1, 2, 3], [], [4, 5], [], [0, 6, 0]] + // [[7, 8]] + // [] + // [[]] + lcw col1{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, {{7, 8}}, lcw{}, lcw{lcw{}}}; + + // [[1, 2, 3], [], [4, 5], NULL, [0, 6, 0]] + // [[7, 8]] + // [] + // [[]] + lcw col2{{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, valids2}, {{7, 8}}, lcw{}, lcw{lcw{}}}; + + // [[1, 2, 3], [], [4, 5], NULL, [NULL, 6, NULL]] + // [[7, 8]] + // [] + // [[]] + using dlcw = cudf::test::lists_column_wrapper; + dlcw col3{{{{1., 2., 3.}, {}, {4., 5.}, {}, {{0., 6., 0.}, valids}}, valids2}, + {{7., 8.}}, + dlcw{}, + dlcw{dlcw{}}}; + + // TODO: uint16_t lists are not read properly in parquet reader + // [[1, 2, 3], [], [4, 5], NULL, [0, 6, 0]] + // [[7, 8]] + // [] + // NULL + // using ui16lcw = cudf::test::lists_column_wrapper; + // cudf::test::lists_column_wrapper col4{ + // {{{{1, 2, 3}, {}, {4, 5}, {}, {0, 6, 0}}, valids2}, {{7, 8}}, ui16lcw{}, ui16lcw{ui16lcw{}}}, + // valids2}; + + // [[1, 2, 3], [], [4, 5], NULL, [NULL, 6, NULL]] + // [[7, 8]] + // [] + // NULL + lcw col5{ + {{{{1, 2, 3}, {}, {4, 5}, {}, {{0, 6, 0}, valids}}, valids2}, {{7, 8}}, lcw{}, lcw{lcw{}}}, + valids2}; + + using strlcw = cudf::test::lists_column_wrapper; + cudf::test::lists_column_wrapper col6{ + {{"Monday", "Monday", "Friday"}, {}, {"Monday", "Friday"}, {}, {"Sunday", "Funday"}}, + {{"bee", "sting"}}, + strlcw{}, + strlcw{strlcw{}}}; + + // [[[NULL,2,NULL,4]], [[NULL,6,NULL], [8,9]]] + // [NULL, [[13],[14,15,16]], NULL] + // [NULL, [], NULL, [[]]] + // NULL + lcw col7{{ + {{{{1, 2, 3, 4}, valids}}, {{{5, 6, 7}, valids}, {8, 9}}}, + {{{{10, 11}, {12}}, {{13}, {14, 15, 16}}, {{17, 18}}}, valids}, + {{lcw{lcw{}}, lcw{}, lcw{}, lcw{lcw{}}}, valids}, + lcw{lcw{lcw{}}}, + }, + valids2}; + + cudf_io::table_metadata expected_metadata; + expected_metadata.column_names.emplace_back("col_list_int_0"); + expected_metadata.column_names.emplace_back("col_list_list_int_1"); + expected_metadata.column_names.emplace_back("col_list_list_int_nullable_2"); + expected_metadata.column_names.emplace_back("col_list_list_nullable_double_nullable_3"); + // expected_metadata.column_names.emplace_back("col_list_list_uint16_4"); + expected_metadata.column_names.emplace_back("col_list_nullable_list_nullable_int_nullable_5"); + expected_metadata.column_names.emplace_back("col_list_list_string_6"); + expected_metadata.column_names.emplace_back("col_list_list_list_7"); + + table_view expected({col0, col1, col2, col3, /* col4, */ col5, col6, col7}); + + auto filepath = temp_env->get_temp_filepath("ListColumn.parquet"); + auto out_opts = cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected) + .metadata(&expected_metadata) + .compression(cudf_io::compression_type::NONE); + + cudf_io::write_parquet(out_opts); + + auto in_opts = cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + auto result = cudf_io::read_parquet(in_opts); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view()); + EXPECT_EQ(expected_metadata.column_names, result.metadata.column_names); +} + TEST_F(ParquetWriterTest, MultiIndex) { constexpr auto num_rows = 100; diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 4edc552a7fb..27480b95f9d 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -6,7 +6,11 @@ import cudf import errno import os import pyarrow as pa -import json + +try: + import ujson as json +except ImportError: + import json from cython.operator import dereference import numpy as np @@ -14,6 +18,7 @@ import numpy as np from cudf.utils.dtypes import ( np_to_pa_dtype, is_categorical_dtype, + is_list_dtype, is_struct_dtype ) from libc.stdlib cimport free @@ -107,6 +112,8 @@ cpdef generate_pandas_metadata(Table table, index): "'category' column dtypes are currently not " + "supported by the gpu accelerated parquet writer" ) + elif is_list_dtype(col): + types.append(col.dtype.to_arrow()) else: types.append(np_to_pa_dtype(col.dtype)) @@ -135,6 +142,8 @@ cpdef generate_pandas_metadata(Table table, index): "'category' column dtypes are currently not " + "supported by the gpu accelerated parquet writer" ) + elif is_list_dtype(col): + types.append(col.dtype.to_arrow()) else: types.append(np_to_pa_dtype(idx.dtype)) index_levels.append(idx) @@ -151,9 +160,15 @@ cpdef generate_pandas_metadata(Table table, index): types, ) - md = metadata[b'pandas'] - json_str = md.decode("utf-8") - return json_str + md_dict = json.loads(metadata[b"pandas"]) + + # correct metadata for list and struct types + for col_meta in md_dict["columns"]: + if col_meta["numpy_type"] in ("list", "struct"): + col_meta["numpy_type"] = "object" + + return json.dumps(md_dict) + cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, skiprows=None, num_rows=None, strings_to_categorical=False, diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9851ef81825..6ddf90849d1 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -1465,6 +1465,51 @@ def test_parquet_writer_sliced(tmpdir): assert_eq(cudf.read_parquet(cudf_path), df_select.reset_index(drop=True)) +def test_parquet_writer_list_basic(tmpdir): + expect = pd.DataFrame({"a": [[[1, 2], [3, 4]], None, [[5, 6], None]]}) + fname = tmpdir.join("test_parquet_writer_list_basic.parquet") + + gdf = cudf.from_pandas(expect) + + gdf.to_parquet(fname) + assert os.path.exists(fname) + + got = pd.read_parquet(fname) + assert_eq(expect, got) + + +def test_parquet_writer_list_large(tmpdir): + expect = pd.DataFrame({"a": list_gen(int_gen, 0, 256, 80, 50)}) + fname = tmpdir.join("test_parquet_writer_list_large.parquet") + + gdf = cudf.from_pandas(expect) + + gdf.to_parquet(fname) + assert os.path.exists(fname) + + got = pd.read_parquet(fname) + assert_eq(expect, got) + + +def test_parquet_writer_list_large_mixed(tmpdir): + expect = pd.DataFrame( + { + "a": list_gen(string_gen, 0, 128, 80, 50), + "b": list_gen(int_gen, 0, 128, 80, 50), + "c": list_gen(int_gen, 0, 128, 80, 50, include_validity=True), + "d": list_gen(string_gen, 0, 128, 80, 50, include_validity=True), + } + ) + fname = tmpdir.join("test_parquet_writer_list_large_mixed.parquet") + gdf = cudf.from_pandas(expect) + + gdf.to_parquet(fname) + assert os.path.exists(fname) + + got = pd.read_parquet(fname) + assert_eq(expect, got) + + @pytest.mark.parametrize("engine", ["cudf", "pyarrow"]) def test_parquet_nullable_boolean(tmpdir, engine): pandas_path = tmpdir.join("pandas_bools.parquet")