Skip to content

Commit

Permalink
start merging in changes from rapidsai#13622
Browse files Browse the repository at this point in the history
  • Loading branch information
etseidl committed Jun 27, 2023
1 parent 5e9cf26 commit 9326321
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 7 deletions.
22 changes: 22 additions & 0 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

#include <cudf/detail/utilities/hash_functions.cuh>

#include <rmm/exec_policy.hpp>
#include <thrust/reduce.h>

namespace cudf {
namespace io {
namespace parquet {
Expand Down Expand Up @@ -920,6 +923,25 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(

} // anonymous namespace

uint32_t GetKernelMasks(cudf::detail::hostdevice_vector<PageInfo>& pages,
rmm::cuda_stream_view stream)
{
// determine which kernels to invoke
// FIXME: when running on device I get and 'invalid device function' error
#if 0
auto mask_iter = thrust::make_transform_iterator(
pages.d_begin(), [] __device__(auto const& p) { return p.kernel_mask; });
auto const kernel_mask = thrust::reduce(
rmm::exec_policy(stream), mask_iter, mask_iter + pages.size(), 0U, thrust::bit_or<uint32_t>{});
#else
auto mask_iter =
thrust::make_transform_iterator(pages.begin(), [](auto const& p) { return p.kernel_mask; });
auto const kernel_mask =
thrust::reduce(mask_iter, mask_iter + pages.size(), 0U, thrust::bit_or<uint32_t>{});
#endif
return kernel_mask;
}

/**
* @copydoc cudf::io::parquet::gpu::ComputePageSizes
*/
Expand Down
16 changes: 16 additions & 0 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,20 @@ __device__ void skip_struct_field(byte_stream_s* bs, int field_type)
} while (rep_cnt || struct_depth);
}

__device__ uint32_t get_kernel_mask(gpu::PageInfo const& page, gpu::ColumnChunkDesc const& chunk)
{
if (page.flags & PAGEINFO_FLAGS_DICTIONARY) { return 0; }

// non-string, non-nested, non-dict, non-boolean types
if (page.encoding == Encoding::DELTA_BINARY_PACKED) {
return KERNEL_MASK_DELTA_BINARY;
} else if (is_string_col(chunk)) {
return KERNEL_MASK_STRING;
}

return KERNEL_MASK_GENERAL;
}

/**
* @brief Functor to set value to 32 bit integer read from byte stream
*
Expand Down Expand Up @@ -370,6 +384,7 @@ __global__ void __launch_bounds__(128)
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
bs->page.kernel_mask = 0;
}
num_values = bs->ck.num_values;
page_info = bs->ck.page_info;
Expand Down Expand Up @@ -420,6 +435,7 @@ __global__ void __launch_bounds__(128)
}
bs->page.page_data = const_cast<uint8_t*>(bs->cur);
bs->cur += bs->page.compressed_page_size;
bs->page.kernel_mask = get_kernel_mask(bs->page, bs->ck);
} else {
bs->cur = bs->end;
}
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ enum level_type {
NUM_LEVEL_TYPES
};

enum kernel_mask_bits {
KERNEL_MASK_GENERAL = (1 << 0),
KERNEL_MASK_STRING = (1 << 1),
KERNEL_MASK_DELTA_BINARY = (1 << 2)
// KERNEL_MASK_FIXED_WIDTH_DICT,
// KERNEL_MASK_STRINGS,
// KERNEL_NESTED_
// etc
};

/**
* @brief Nesting information specifically needed by the decode and preprocessing
* kernels.
Expand Down Expand Up @@ -203,6 +213,8 @@ struct PageInfo {

// level decode buffers
uint8_t* lvl_decode_buf[level_type::NUM_LEVEL_TYPES];

uint32_t kernel_mask;
};

/**
Expand Down Expand Up @@ -454,6 +466,12 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
int32_t num_chunks,
rmm::cuda_stream_view stream);

/**
* @brief Get OR'd sum of page kernel masks.
*/
uint32_t GetKernelMasks(cudf::detail::hostdevice_vector<PageInfo>& pages,
rmm::cuda_stream_view stream);

/**
* @brief Compute page output size information.
*
Expand Down
12 changes: 5 additions & 7 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,16 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
return cursum + _metadata->get_output_nesting_depth(chunk.src_col_schema);
});

// figure out which kernels to run
auto const kernel_mask = GetKernelMasks(pages, _stream);

// Check to see if there are any string columns present. If so, then we need to get size info
// for each string page. This size info will be used to pre-allocate memory for the column,
// allowing the page decoder to write string data directly to the column buffer, rather than
// doing a gather operation later on.
// TODO: This step is somewhat redundant if size info has already been calculated (nested schema,
// chunked reader).
auto const has_strings = std::any_of(chunks.begin(), chunks.end(), gpu::is_string_col);

auto const has_strings = (kernel_mask & gpu::KERNEL_MASK_STRING) != 0;
std::vector<size_t> col_sizes(_input_columns.size(), 0L);
if (has_strings) {
gpu::ComputePageStringSizes(
Expand Down Expand Up @@ -176,10 +178,6 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_data.host_to_device_async(_stream);
_stream.synchronize();

bool const has_delta_binary = std::any_of(pages.begin(), pages.end(), [](auto& page) {
return page.encoding == Encoding::DELTA_BINARY_PACKED;
});

auto const level_type_size = _file_itm_data.level_type_size;

// launch the catch-all page decoder
Expand All @@ -193,7 +191,7 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
chunk_nested_str_data.host_to_device_async(streams.back());
gpu::DecodeStringPageData(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
}
if (has_delta_binary) {
if ((kernel_mask & gpu::KERNEL_MASK_DELTA_BINARY) != 0) {
streams.push_back(get_stream_pool().get_stream());
gpu::DecodeDeltaBinary(pages, chunks, num_rows, skip_rows, level_type_size, streams.back());
}
Expand Down

0 comments on commit 9326321

Please sign in to comment.