From 5e9cf26a6a2ea9053a57befa11f9ea866ebb9e8f Mon Sep 17 00:00:00 2001 From: seidl Date: Mon, 26 Jun 2023 15:54:54 -0700 Subject: [PATCH] add DELTA_BINARY_PACKED decoder --- cpp/CMakeLists.txt | 1 + cpp/src/io/parquet/delta_binary.cuh | 272 +++++++++++++++++++ cpp/src/io/parquet/page_data.cu | 43 +-- cpp/src/io/parquet/page_decode.cuh | 4 + cpp/src/io/parquet/page_delta_decode.cu | 165 +++++++++++ cpp/src/io/parquet/page_string_decode.cu | 88 +----- cpp/src/io/parquet/page_string_utils.cuh | 110 ++++++++ cpp/src/io/parquet/parquet_gpu.hpp | 19 ++ cpp/src/io/parquet/reader_impl.cpp | 32 ++- cpp/src/io/parquet/reader_impl_preprocess.cu | 7 +- 10 files changed, 626 insertions(+), 115 deletions(-) create mode 100644 cpp/src/io/parquet/delta_binary.cuh create mode 100644 cpp/src/io/parquet/page_delta_decode.cu create mode 100644 cpp/src/io/parquet/page_string_utils.cuh diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 0742d039092..f8e500ac906 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -392,6 +392,7 @@ add_library( src/io/parquet/chunk_dict.cu src/io/parquet/page_enc.cu src/io/parquet/page_hdr.cu + src/io/parquet/page_delta_decode.cu src/io/parquet/page_string_decode.cu src/io/parquet/reader.cpp src/io/parquet/reader_impl.cpp diff --git a/cpp/src/io/parquet/delta_binary.cuh b/cpp/src/io/parquet/delta_binary.cuh new file mode 100644 index 00000000000..b02164a377f --- /dev/null +++ b/cpp/src/io/parquet/delta_binary.cuh @@ -0,0 +1,272 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "page_decode.cuh" + +namespace cudf::io::parquet::gpu { + +// DELTA_XXX encoding support +// +// DELTA_BINARY_PACKED is used for INT32 and INT64 data types. Encoding begins with a header +// containing a block size, number of mini-blocks in each block, total value count, and first +// value. The first three are ULEB128 variable length ints, and the last is a zigzag ULEB128 +// varint. +// -- the block size is a multiple of 128 +// -- the mini-block count is chosen so that each mini-block will contain a multiple of 32 values +// -- the value count includes the first value stored in the header +// +// It seems most Parquet encoders will stick with a block size of 128, and 4 mini-blocks of 32 +// elements each. arrow-rs will use a block size of 256 for 64-bit ints. +// +// Following the header are the data blocks. Each block is further divided into mini-blocks, with +// each mini-block having its own encoding bitwidth. Each block begins with a header containing a +// zigzag ULEB128 encoded minimum delta value, followed by an array of uint8 bitwidths, one entry +// per mini-block. While encoding, the lowest delta value is subtracted from all the deltas in the +// block to ensure that all encoded values are positive. The deltas for each mini-block are bit +// packed using the same encoding as the RLE/Bit-Packing Hybrid encoder. +// +// DELTA_BYTE_ARRAY encoding (incremental encoding or front compression), is used for BYTE_ARRAY +// columns. For each element in a sequence of strings, a prefix length from the preceding string +// and a suffix is stored. The prefix lengths are DELTA_BINARY_PACKED encoded. The suffixes are +// encoded with DELTA_LENGTH_BYTE_ARRAY encoding, which is a DELTA_BINARY_PACKED list of suffix +// lengths, followed by the concatenated suffix data. + +// TODO: The delta encodings use ULEB128 integers, but for now we're only +// using max 64 bits. Need to see what the performance impact is of useing +// __int128_t rather than int64_t. +using uleb128_t = uint64_t; +using zigzag128_t = int64_t; + +/** + * @brief Read a ULEB128 varint integer + * + * @param[in,out] cur The current data position, updated after the read + * @param[in] end The end data position + * + * @return The value read + */ +inline __device__ uleb128_t get_uleb128(uint8_t const*& cur, uint8_t const* end) +{ + uleb128_t v = 0, l = 0, c; + while (cur < end) { + c = *cur++; + v |= (c & 0x7f) << l; + l += 7; + if ((c & 0x80) == 0) { return v; } + } + return v; +} + +/** + * @brief Read a ULEB128 zig-zag encoded varint integer + * + * @param[in,out] cur The current data position, updated after the read + * @param[in] end The end data position + * + * @return The value read + */ +inline __device__ zigzag128_t get_zz128(uint8_t const*& cur, uint8_t const* end) +{ + uleb128_t u = get_uleb128(cur, end); + return static_cast((u >> 1u) ^ -static_cast(u & 1)); +} + +struct delta_binary_decoder { + uint8_t const* block_start; // start of data, but updated as data is read + uint8_t const* block_end; // end of data + uleb128_t block_size; // usually 128, must be multiple of 128 + uleb128_t mini_block_count; // usually 4, chosen such that block_size/mini_block_count is a + // multiple of 32 + uleb128_t value_count; // total values encoded in the block + zigzag128_t last_value; // last value decoded, initialized to first_value from header + + uint32_t values_per_mb; // block_size / mini_block_count, must be multiple of 32 + uint32_t current_value_idx; // current value index, initialized to 0 at start of block + + zigzag128_t cur_min_delta; // min delta for the block + uint32_t cur_mb; // index of the current mini-block within the block + uint8_t const* cur_mb_start; // pointer to the start of the current mini-block data + uint8_t const* cur_bitwidths; // pointer to the bitwidth array in the block + + uleb128_t value[non_zero_buffer_size]; // circular buffer of delta values + + // returns the number of values encoded in the block data. when is_decode is true, + // account for the first value in the header. otherwise just count the values encoded + // in the mini-block data. + constexpr uint32_t num_encoded_values(bool is_decode) + { + return value_count == 0 ? 0 : is_decode ? value_count : value_count - 1; + } + + // read mini-block header into state object. should only be called from init_binary_block or + // setup_next_mini_block. header format is: + // + // | min delta (int) | bit-width array (1 byte * mini_block_count) | + // + // on exit db->cur_mb is 0 and db->cur_mb_start points to the first mini-block of data, or + // nullptr if out of data. + inline __device__ void init_mini_block(bool is_decode) + { + cur_mb = 0; + cur_mb_start = nullptr; + + if (current_value_idx < num_encoded_values(is_decode)) { + auto d_start = block_start; + cur_min_delta = get_zz128(d_start, block_end); + cur_bitwidths = d_start; + + d_start += mini_block_count; + cur_mb_start = d_start; + } + } + + // read delta binary header into state object. should be called on thread 0. header format is: + // + // | block size (uint) | mini-block count (uint) | value count (uint) | first value (int) | + // + // also initializes the first mini-block before exit + inline __device__ void init_binary_block(uint8_t const* d_start, uint8_t const* d_end) + { + block_end = d_end; + block_size = get_uleb128(d_start, d_end); + mini_block_count = get_uleb128(d_start, d_end); + value_count = get_uleb128(d_start, d_end); + last_value = get_zz128(d_start, d_end); + + current_value_idx = 0; + values_per_mb = block_size / mini_block_count; + + // init the first mini-block + block_start = d_start; + init_mini_block(false); + } + + // skip to the start of the next mini-block. should only be called on thread 0. + // calls init_binary_block if currently on the last mini-block in a block. + inline __device__ void setup_next_mini_block(bool is_decode) + { + if (current_value_idx >= num_encoded_values(is_decode)) { return; } + + current_value_idx += values_per_mb; + + // just set pointer to start of next mini_block + if (cur_mb < mini_block_count - 1) { + cur_mb_start += cur_bitwidths[cur_mb] * values_per_mb / 8; + cur_mb++; + } + // out of mini-blocks, start a new block + else { + block_start = cur_mb_start + cur_bitwidths[cur_mb] * values_per_mb / 8; + init_mini_block(is_decode); + } + } + + // decode the current mini-batch of deltas, and convert to values. + // called by all threads in a warp, currently only one warp supported. + inline __device__ void calc_mini_block_values(int lane_id) + { + using cudf::detail::warp_size; + if (current_value_idx >= value_count) { return; } + + // need to save first value from header on first pass + if (current_value_idx == 0) { + if (lane_id == 0) { + current_value_idx++; + value[0] = last_value; + } + __syncwarp(); + } + + uint32_t const mb_bits = cur_bitwidths[cur_mb]; + + // need to do in multiple passes if values_per_mb != 32 + uint32_t const num_pass = values_per_mb / warp_size; + + auto d_start = cur_mb_start; + + for (int i = 0; i < num_pass; i++) { + // position at end of the current mini-block since the following calculates + // negative indexes + d_start += (warp_size * mb_bits) / 8; + + // unpack deltas. modified from version in gpuDecodeDictionaryIndices(), but + // that one only unpacks up to bitwidths of 24. simplified some since this + // will always do batches of 32. also replaced branching with a loop. + int64_t delta = 0; + if (lane_id + current_value_idx < value_count) { + int32_t ofs = (lane_id - warp_size) * mb_bits; + uint8_t const* p = d_start + (ofs >> 3); + ofs &= 7; + if (p < block_end) { + uint32_t c = 8 - ofs; // 0 - 7 bits + delta = (*p++) >> ofs; + + while (c < mb_bits && p < block_end) { + delta |= (*p++) << c; + c += 8; + } + delta &= (1 << mb_bits) - 1; + } + } + + // add min delta to get true delta + delta += cur_min_delta; + + // do inclusive scan to get value - first_value at each position + __shared__ cub::WarpScan::TempStorage temp_storage; + cub::WarpScan(temp_storage).InclusiveSum(delta, delta); + + // now add first value from header or last value from previous block to get true value + delta += last_value; + value[rolling_index(current_value_idx + warp_size * i + lane_id)] = delta; + + // save value from last lane in warp. this will become the 'first value' added to the + // deltas calculated in the next iteration (or invocation). + if (lane_id == 31) { last_value = delta; } + __syncwarp(); + } + } + + inline __device__ void skip_values(int skip) + { + int const t = threadIdx.x; + int const lane_id = t & 0x1f; + + while (current_value_idx < skip && current_value_idx < num_encoded_values(true)) { + if (t < 32) { + calc_mini_block_values(lane_id); + if (lane_id == 0) { setup_next_mini_block(true); } + } + __syncthreads(); + } + } + + inline __device__ void decode_batch() + { + int const t = threadIdx.x; + int const lane_id = t & 0x1f; + + // unpack deltas and save in db->value + calc_mini_block_values(lane_id); + + // set up for next mini-block + if (lane_id == 0) { setup_next_mini_block(true); } + } +}; + +} // namespace cudf::io::parquet::gpu diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index e49378485fc..b93cbb6c2c5 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -35,8 +35,8 @@ namespace { * @param[in] src_pos Source position * @param[in] dstv Pointer to row output data (string descriptor or 32-bit hash) */ -inline __device__ void gpuOutputString(volatile page_state_s* s, - volatile page_state_buffers_s* sb, +inline __device__ void gpuOutputString(page_state_s volatile* s, + page_state_buffers_s volatile* sb, int src_pos, void* dstv) { @@ -62,7 +62,7 @@ inline __device__ void gpuOutputString(volatile page_state_s* s, * @param[in] src_pos Source position * @param[in] dst Pointer to row output data */ -inline __device__ void gpuOutputBoolean(volatile page_state_buffers_s* sb, +inline __device__ void gpuOutputBoolean(page_state_buffers_s volatile* sb, int src_pos, uint8_t* dst) { @@ -137,8 +137,8 @@ inline __device__ void gpuStoreOutput(uint2* dst, * @param[in] src_pos Source position * @param[out] dst Pointer to row output data */ -inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, - volatile page_state_buffers_s* sb, +inline __device__ void gpuOutputInt96Timestamp(page_state_s volatile* s, + page_state_buffers_s volatile* sb, int src_pos, int64_t* dst) { @@ -210,8 +210,8 @@ inline __device__ void gpuOutputInt96Timestamp(volatile page_state_s* s, * @param[in] src_pos Source position * @param[in] dst Pointer to row output data */ -inline __device__ void gpuOutputInt64Timestamp(volatile page_state_s* s, - volatile page_state_buffers_s* sb, +inline __device__ void gpuOutputInt64Timestamp(page_state_s volatile* s, + page_state_buffers_s volatile* sb, int src_pos, int64_t* dst) { @@ -292,8 +292,8 @@ __device__ void gpuOutputByteArrayAsInt(char const* ptr, int32_t len, T* dst) * @param[in] dst Pointer to row output data */ template -__device__ void gpuOutputFixedLenByteArrayAsInt(volatile page_state_s* s, - volatile page_state_buffers_s* sb, +__device__ void gpuOutputFixedLenByteArrayAsInt(page_state_s volatile* s, + page_state_buffers_s volatile* sb, int src_pos, T* dst) { @@ -327,8 +327,8 @@ __device__ void gpuOutputFixedLenByteArrayAsInt(volatile page_state_s* s, * @param[in] dst Pointer to row output data */ template -inline __device__ void gpuOutputFast(volatile page_state_s* s, - volatile page_state_buffers_s* sb, +inline __device__ void gpuOutputFast(page_state_s volatile* s, + page_state_buffers_s volatile* sb, int src_pos, T* dst) { @@ -358,7 +358,7 @@ inline __device__ void gpuOutputFast(volatile page_state_s* s, * @param[in] len Length of element */ static __device__ void gpuOutputGeneric( - volatile page_state_s* s, volatile page_state_buffers_s* sb, int src_pos, uint8_t* dst8, int len) + page_state_s volatile* s, page_state_buffers_s volatile* sb, int src_pos, uint8_t* dst8, int len) { uint8_t const* dict; uint32_t dict_pos, dict_size = s->dict_size; @@ -422,7 +422,7 @@ __device__ size_type gpuDecodeTotalPageStringSize(page_state_s* s, int t) } else if ((s->col.data_type & 7) == BYTE_ARRAY) { str_len = gpuInitStringDescriptors(s, nullptr, target_pos, t); } - if (!t) { *(volatile int32_t*)&s->dict_pos = target_pos; } + if (!t) { *(int32_t volatile*)&s->dict_pos = target_pos; } return str_len; } @@ -736,6 +736,17 @@ __global__ void __launch_bounds__(preprocess_block_size) } } +// skips strings and delta encodings +struct catch_all_filter { + device_span chunks; + + __device__ inline bool operator()(PageInfo const& page) + { + return !(is_string_col(page, chunks) || page.encoding == Encoding::DELTA_BINARY_PACKED || + page.encoding == Encoding::DELTA_BYTE_ARRAY); + } +}; + /** * @brief Kernel for computing the column data stored in the pages * @@ -764,7 +775,7 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( [[maybe_unused]] null_count_back_copier _{s, t}; if (!setupLocalPageInfo( - s, &pages[page_idx], chunks, min_row, num_rows, non_string_filter{chunks}, true)) { + s, &pages[page_idx], chunks, min_row, num_rows, catch_all_filter{chunks}, true)) { return; } @@ -814,7 +825,7 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( } else if ((s->col.data_type & 7) == BYTE_ARRAY) { gpuInitStringDescriptors(s, sb, src_target_pos, t & 0x1f); } - if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; } + if (t == 32) { *(int32_t volatile*)&s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values int const dtype = s->col.data_type & 7; @@ -901,7 +912,7 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData( } } - if (t == out_thread0) { *(volatile int32_t*)&s->src_pos = target_pos; } + if (t == out_thread0) { *(int32_t volatile*)&s->src_pos = target_pos; } } __syncthreads(); } diff --git a/cpp/src/io/parquet/page_decode.cuh b/cpp/src/io/parquet/page_decode.cuh index 4469ec59b7a..eba3698730c 100644 --- a/cpp/src/io/parquet/page_decode.cuh +++ b/cpp/src/io/parquet/page_decode.cuh @@ -1248,6 +1248,10 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s, if ((s->col.data_type & 7) == BOOLEAN) { s->dict_run = s->dict_size * 2 + 1; } break; case Encoding::RLE: s->dict_run = 0; break; + case Encoding::DELTA_BINARY_PACKED: + // nothing to do, just don't error + break; + default: s->error = 1; // Unsupported encoding break; diff --git a/cpp/src/io/parquet/page_delta_decode.cu b/cpp/src/io/parquet/page_delta_decode.cu new file mode 100644 index 00000000000..3017783cd44 --- /dev/null +++ b/cpp/src/io/parquet/page_delta_decode.cu @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "delta_binary.cuh" +#include "page_string_utils.cuh" +#include "parquet_gpu.hpp" + +#include + +#include +#include + +namespace cudf::io::parquet::gpu { + +namespace { + +// functor for setupLocalPageInfo +struct delta_binary_filter { + __device__ inline bool operator()(PageInfo const& page) + { + return page.encoding == Encoding::DELTA_BINARY_PACKED; + } +}; + +// Decode page data that is DELTA_BINARY_PACKED encoded. This encoding is +// only used for int32 and int64 physical types (and appears to only be used +// with V2 page headers; see https://www.mail-archive.com/dev@parquet.apache.org/msg11826.html). +// this kernel only needs 96 threads (3 warps)(for now). +template +__global__ void __launch_bounds__(96) gpuDecodeDeltaBinary( + PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) +{ + __shared__ __align__(16) delta_binary_decoder db_state; + __shared__ __align__(16) page_state_buffers_s state_buffers; + __shared__ __align__(16) page_state_s state_g; + + page_state_s* const s = &state_g; + page_state_buffers_s* const sb = &state_buffers; + int const page_idx = blockIdx.x; + int const t = threadIdx.x; + int const lane_id = t & 0x1f; + auto* const db = &db_state; + [[maybe_unused]] null_count_back_copier _{s, t}; + + if (!setupLocalPageInfo( + s, &pages[page_idx], chunks, min_row, num_rows, delta_binary_filter{}, true)) { + return; + } + + bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + + // copying logic from gpuDecodePageData. + PageNestingDecodeInfo const* nesting_info_base = s->nesting_info; + + __shared__ level_t rep[non_zero_buffer_size]; // circular buffer of repetition level values + __shared__ level_t def[non_zero_buffer_size]; // circular buffer of definition level values + + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t const skipped_leaf_values = s->page.skipped_leaf_values; + + // initialize delta state + if (t == 0) { db->init_binary_block(s->data_start, s->data_end); } + __syncthreads(); + + auto const batch_size = db->values_per_mb; + + // if skipped_leaf_values is non-zero, then we need to decode up to the first mini-block + // that has a value we need. + if (skipped_leaf_values > 0) { db->skip_values(skipped_leaf_values); } + + while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { + uint32_t target_pos; + uint32_t const src_pos = s->src_pos; + + if (t < 64) { // warp0..1 + target_pos = min(src_pos + 2 * batch_size, s->nz_count + batch_size); + } else { // warp2... + target_pos = min(s->nz_count, src_pos + batch_size); + } + __syncthreads(); + + // warp0 will decode the rep/def levels, warp1 will unpack a mini-batch of deltas. + // warp2 waits one cycle for warps 0/1 to produce a batch, and then stuffs values + // into the proper location in the output. + if (t < 32) { + // warp 0 + // decode repetition and definition levels. + // - update validity vectors + // - updates offsets (for nested columns) + // - produces non-NULL value indices in s->nz_idx for subsequent decoding + gpuDecodeLevels(s, sb, target_pos, rep, def, t); + } else if (t < 64) { + // warp 1 + db->decode_batch(); + + } else if (t < 96 && src_pos < target_pos) { + // warp 2 + // nesting level that is storing actual leaf values + int const leaf_level_index = s->col.max_nesting_depth - 1; + + // process the mini-block in batches of 32 + for (uint32_t sp = src_pos + lane_id; sp < src_pos + batch_size; sp += 32) { + // the position in the output column/buffer + int32_t dst_pos = sb->nz_idx[rolling_index(sp)]; + + // handle skip_rows here. flat hierarchies can just skip up to first_row. + if (!has_repetition) { dst_pos -= s->first_row; } + + // place value for this thread + if (dst_pos >= 0 && sp < target_pos) { + void* const dst = nesting_info_base[leaf_level_index].data_out + dst_pos * s->dtype_len; + if (s->dtype_len == 8) { + *static_cast(dst) = db->value[rolling_index(sp + skipped_leaf_values)]; + } else if (s->dtype_len == 4) { + *static_cast(dst) = db->value[rolling_index(sp + skipped_leaf_values)]; + } + } + } + + if (lane_id == 0) { s->src_pos = src_pos + batch_size; } + } + __syncthreads(); + } +} + +} // anonymous namespace + +/** + * @copydoc cudf::io::parquet::gpu::DecodeDeltaBinary + */ +void __host__ DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + rmm::cuda_stream_view stream) +{ + CUDF_EXPECTS(pages.size() > 0, "There is no page to decode"); + + dim3 dim_block(96, 1); + dim3 dim_grid(pages.size(), 1); // 1 threadblock per page + + if (level_type_size == 1) { + gpuDecodeDeltaBinary + <<>>(pages.device_ptr(), chunks, min_row, num_rows); + } else { + gpuDecodeDeltaBinary + <<>>(pages.device_ptr(), chunks, min_row, num_rows); + } +} + +} // namespace cudf::io::parquet::gpu diff --git a/cpp/src/io/parquet/page_string_decode.cu b/cpp/src/io/parquet/page_string_decode.cu index 9173d408192..20686ba7c84 100644 --- a/cpp/src/io/parquet/page_string_decode.cu +++ b/cpp/src/io/parquet/page_string_decode.cu @@ -15,6 +15,7 @@ */ #include "page_decode.cuh" +#include "page_string_utils.cuh" #include #include @@ -26,93 +27,6 @@ namespace gpu { namespace { -// stole this from cudf/strings/detail/gather.cuh. modified to run on a single string on one warp. -// copies from src to dst in 16B chunks per thread. -__device__ void wideStrcpy(uint8_t* dst, uint8_t const* src, size_t len, uint32_t lane_id) -{ - using cudf::detail::warp_size; - using cudf::strings::detail::load_uint4; - - constexpr size_t out_datatype_size = sizeof(uint4); - constexpr size_t in_datatype_size = sizeof(uint); - - auto const alignment_offset = reinterpret_cast(dst) % out_datatype_size; - uint4* out_chars_aligned = reinterpret_cast(dst - alignment_offset); - auto const in_start = src; - - // Both `out_start_aligned` and `out_end_aligned` are indices into `dst`. - // `out_start_aligned` is the first 16B aligned memory location after `dst + 4`. - // `out_end_aligned` is the last 16B aligned memory location before `len - 4`. Characters - // between `[out_start_aligned, out_end_aligned)` will be copied using uint4. - // `dst + 4` and `len - 4` are used instead of `dst` and `len` to avoid - // `load_uint4` reading beyond string boundaries. - // use signed int since out_end_aligned can be negative. - int64_t out_start_aligned = (in_datatype_size + alignment_offset + out_datatype_size - 1) / - out_datatype_size * out_datatype_size - - alignment_offset; - int64_t out_end_aligned = - (len - in_datatype_size + alignment_offset) / out_datatype_size * out_datatype_size - - alignment_offset; - - for (int64_t ichar = out_start_aligned + lane_id * out_datatype_size; ichar < out_end_aligned; - ichar += warp_size * out_datatype_size) { - *(out_chars_aligned + (ichar + alignment_offset) / out_datatype_size) = - load_uint4((const char*)in_start + ichar); - } - - // Tail logic: copy characters of the current string outside - // `[out_start_aligned, out_end_aligned)`. - if (out_end_aligned <= out_start_aligned) { - // In this case, `[out_start_aligned, out_end_aligned)` is an empty set, and we copy the - // entire string. - for (int64_t ichar = lane_id; ichar < len; ichar += warp_size) { - dst[ichar] = in_start[ichar]; - } - } else { - // Copy characters in range `[0, out_start_aligned)`. - if (lane_id < out_start_aligned) { dst[lane_id] = in_start[lane_id]; } - // Copy characters in range `[out_end_aligned, len)`. - int64_t ichar = out_end_aligned + lane_id; - if (ichar < len) { dst[ichar] = in_start[ichar]; } - } -} - -/** - * @brief char-parallel string copy. - */ -__device__ void ll_strcpy(uint8_t* dst, uint8_t const* src, size_t len, uint32_t lane_id) -{ - using cudf::detail::warp_size; - if (len > 64) { - wideStrcpy(dst, src, len, lane_id); - } else { - for (int i = lane_id; i < len; i += warp_size) { - dst[i] = src[i]; - } - } -} - -/** - * @brief Perform exclusive scan on an array of any length using a single block of threads. - */ -template -__device__ void block_excl_sum(size_type* arr, size_type length, size_type initial_value) -{ - using block_scan = cub::BlockScan; - __shared__ typename block_scan::TempStorage scan_storage; - int const t = threadIdx.x; - - // do a series of block sums, storing results in arr as we go - for (int pos = 0; pos < length; pos += block_size) { - int const tidx = pos + t; - size_type tval = tidx < length ? arr[tidx] : 0; - size_type block_sum; - block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum); - if (tidx < length) { arr[tidx] = tval; } - initial_value += block_sum; - } -} - /** * @brief Compute the start and end page value bounds for this page * diff --git a/cpp/src/io/parquet/page_string_utils.cuh b/cpp/src/io/parquet/page_string_utils.cuh new file mode 100644 index 00000000000..fb36c09052c --- /dev/null +++ b/cpp/src/io/parquet/page_string_utils.cuh @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2018-2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace cudf::io::parquet::gpu { + +// stole this from cudf/strings/detail/gather.cuh. modified to run on a single string on one warp. +// copies from src to dst in 16B chunks per thread. +inline __device__ void wideStrcpy(uint8_t* dst, uint8_t const* src, size_t len, uint32_t lane_id) +{ + using cudf::detail::warp_size; + using cudf::strings::detail::load_uint4; + + constexpr size_t out_datatype_size = sizeof(uint4); + constexpr size_t in_datatype_size = sizeof(uint); + + auto const alignment_offset = reinterpret_cast(dst) % out_datatype_size; + uint4* out_chars_aligned = reinterpret_cast(dst - alignment_offset); + auto const in_start = src; + + // Both `out_start_aligned` and `out_end_aligned` are indices into `dst`. + // `out_start_aligned` is the first 16B aligned memory location after `dst + 4`. + // `out_end_aligned` is the last 16B aligned memory location before `len - 4`. Characters + // between `[out_start_aligned, out_end_aligned)` will be copied using uint4. + // `dst + 4` and `len - 4` are used instead of `dst` and `len` to avoid + // `load_uint4` reading beyond string boundaries. + // use signed int since out_end_aligned can be negative. + int64_t out_start_aligned = (in_datatype_size + alignment_offset + out_datatype_size - 1) / + out_datatype_size * out_datatype_size - + alignment_offset; + int64_t out_end_aligned = + (len - in_datatype_size + alignment_offset) / out_datatype_size * out_datatype_size - + alignment_offset; + + for (int64_t ichar = out_start_aligned + lane_id * out_datatype_size; ichar < out_end_aligned; + ichar += warp_size * out_datatype_size) { + *(out_chars_aligned + (ichar + alignment_offset) / out_datatype_size) = + load_uint4((const char*)in_start + ichar); + } + + // Tail logic: copy characters of the current string outside + // `[out_start_aligned, out_end_aligned)`. + if (out_end_aligned <= out_start_aligned) { + // In this case, `[out_start_aligned, out_end_aligned)` is an empty set, and we copy the + // entire string. + for (int64_t ichar = lane_id; ichar < len; ichar += warp_size) { + dst[ichar] = in_start[ichar]; + } + } else { + // Copy characters in range `[0, out_start_aligned)`. + if (lane_id < out_start_aligned) { dst[lane_id] = in_start[lane_id]; } + // Copy characters in range `[out_end_aligned, len)`. + int64_t ichar = out_end_aligned + lane_id; + if (ichar < len) { dst[ichar] = in_start[ichar]; } + } +} + +/** + * @brief char-parallel string copy. + */ +inline __device__ void ll_strcpy(uint8_t* dst, uint8_t const* src, size_t len, uint32_t lane_id) +{ + using cudf::detail::warp_size; + if (len > 64) { + wideStrcpy(dst, src, len, lane_id); + } else { + for (int i = lane_id; i < len; i += warp_size) { + dst[i] = src[i]; + } + } +} + +/** + * @brief Perform exclusive scan for offsets array. Called for each page. + */ +template +__device__ void block_excl_sum(size_type* arr, size_type length, size_type initial_value) +{ + using block_scan = cub::BlockScan; + __shared__ typename block_scan::TempStorage scan_storage; + int const t = threadIdx.x; + + // do a series of block sums, storing results in arr as we go + for (int pos = 0; pos < length; pos += block_size) { + int const tidx = pos + t; + size_type tval = tidx < length ? arr[tidx] : 0; + size_type block_sum; + block_scan(scan_storage).ExclusiveScan(tval, tval, initial_value, cub::Sum(), block_sum); + if (tidx < length) { arr[tidx] = tval; } + initial_value += block_sum; + } +} + +} // namespace cudf::io::parquet::gpu diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 25d2885b7da..7690dcae611 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -549,6 +549,25 @@ void DecodeStringPageData(cudf::detail::hostdevice_vector& pages, int level_type_size, rmm::cuda_stream_view stream); +/** + * @brief Launches kernel for reading the DELTA_BINARY_PACKED column data stored in the pages + * + * The page data will be written to the output pointed to in the page's + * associated column chunk. + * + * @param[in,out] pages All pages to be decoded + * @param[in] chunks All chunks to be decoded + * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read + * @param[in] stream CUDA stream to use, default 0 + */ +void DecodeDeltaBinary(cudf::detail::hostdevice_vector& pages, + cudf::detail::hostdevice_vector const& chunks, + size_t num_rows, + size_t min_row, + int level_type_size, + rmm::cuda_stream_view stream); + /** * @brief Launches kernel for initializing encoder row group fragments * diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 0237bf820b0..44d5e0d4fba 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -25,7 +25,7 @@ namespace cudf::io::detail::parquet { namespace { -int constexpr NUM_DECODERS = 2; // how many decode kernels are there to run +int constexpr NUM_DECODERS = 3; // how many decode kernels are there to run int constexpr APPROX_NUM_THREADS = 4; // guestimate from DaveB int constexpr STREAM_POOL_SIZE = NUM_DECODERS * APPROX_NUM_THREADS; @@ -176,16 +176,30 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows) chunk_nested_data.host_to_device_async(_stream); _stream.synchronize(); - auto stream1 = get_stream_pool().get_stream(); - gpu::DecodePageData(pages, chunks, num_rows, skip_rows, _file_itm_data.level_type_size, stream1); + bool const has_delta_binary = std::any_of(pages.begin(), pages.end(), [](auto& page) { + return page.encoding == Encoding::DELTA_BINARY_PACKED; + }); + + auto const level_type_size = _file_itm_data.level_type_size; + + // launch the catch-all page decoder + std::vector streams; + streams.push_back(get_stream_pool().get_stream()); + gpu::DecodePageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back()); + + // and then the specializations if (has_strings) { - auto stream2 = get_stream_pool().get_stream(); - chunk_nested_str_data.host_to_device_async(stream2); - gpu::DecodeStringPageData( - pages, chunks, num_rows, skip_rows, _file_itm_data.level_type_size, stream2); - stream2.synchronize(); + streams.push_back(get_stream_pool().get_stream()); + chunk_nested_str_data.host_to_device_async(streams.back()); + gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back()); } - stream1.synchronize(); + if (has_delta_binary) { + streams.push_back(get_stream_pool().get_stream()); + gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams.back()); + } + + // synchronize the streams + std::for_each(streams.begin(), streams.end(), [](auto& stream) { stream.synchronize(); }); pages.device_to_host_async(_stream); page_nesting.device_to_host_async(_stream); diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 8c3bdabe6b4..d3ff9651f8a 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -323,7 +323,8 @@ constexpr bool is_supported_encoding(Encoding enc) case Encoding::PLAIN: case Encoding::PLAIN_DICTIONARY: case Encoding::RLE: - case Encoding::RLE_DICTIONARY: return true; + case Encoding::RLE_DICTIONARY: + case Encoding::DELTA_BINARY_PACKED: return true; default: return false; } } @@ -729,8 +730,8 @@ std::pair>> reader::impl::create_and_read_co auto& chunks = _file_itm_data.chunks; // Descriptors for all the chunks that make up the selected columns - const auto num_input_columns = _input_columns.size(); - const auto num_chunks = row_groups_info.size() * num_input_columns; + auto const num_input_columns = _input_columns.size(); + auto const num_chunks = row_groups_info.size() * num_input_columns; chunks = cudf::detail::hostdevice_vector(0, num_chunks, _stream); // Association between each column chunk and its source