Skip to content

Commit

Permalink
Use nvcomp's snappy decompression in ORC reader (#9235)
Browse files Browse the repository at this point in the history
Issue #9205

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Vukasin Milovanovic (https://github.com/vuule)

URL: #9235
  • Loading branch information
devavret authored Sep 18, 2021
1 parent 11781e8 commit f08d6f1
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 19 deletions.
10 changes: 6 additions & 4 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ struct CompressedStreamInfo {
copyctl(nullptr),
num_compressed_blocks(0),
num_uncompressed_blocks(0),
max_uncompressed_size(0)
max_uncompressed_size(0),
max_uncompressed_block_size(0)
{
}
const uint8_t* compressed_data; // [in] base ptr to compressed stream data
Expand All @@ -60,9 +61,10 @@ struct CompressedStreamInfo {
copyctl; // [in] base ptr to copy structure to be filled for uncompressed blocks
uint32_t num_compressed_blocks; // [in,out] number of entries in decctl(in), number of compressed
// blocks(out)
uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of
// uncompressed blocks(out)
uint64_t max_uncompressed_size; // [out] maximum uncompressed data size
uint32_t num_uncompressed_blocks; // [in,out] number of entries in copyctl(in), number of
// uncompressed blocks(out)
uint64_t max_uncompressed_size; // [out] maximum uncompressed data size of stream
uint32_t max_uncompressed_block_size; // [out] maximum uncompressed size of any block in stream
};

enum StreamIndexType {
Expand Down
86 changes: 82 additions & 4 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
#include <rmm/device_uvector.hpp>
#include <rmm/exec_policy.hpp>

#include <nvcomp/snappy.h>

#include <algorithm>
#include <array>

Expand Down Expand Up @@ -549,6 +551,68 @@ class aggregate_orc_metadata {
}
};

void snappy_decompress(device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_stat,
size_t max_uncomp_page_size,
rmm::cuda_stream_view stream)
{
size_t num_blocks = comp_in.size();
size_t temp_size;

auto status =
nvcompBatchedSnappyDecompressGetTempSize(num_blocks, max_uncomp_page_size, &temp_size);
CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status,
"Unable to get scratch size for snappy decompression");

rmm::device_buffer scratch(temp_size, stream);
rmm::device_uvector<void const*> compressed_data_ptrs(num_blocks, stream);
rmm::device_uvector<size_t> compressed_data_sizes(num_blocks, stream);
rmm::device_uvector<void*> uncompressed_data_ptrs(num_blocks, stream);
rmm::device_uvector<size_t> uncompressed_data_sizes(num_blocks, stream);

rmm::device_uvector<size_t> actual_uncompressed_data_sizes(num_blocks, stream);
rmm::device_uvector<nvcompStatus_t> statuses(num_blocks, stream);

// Prepare the vectors
auto comp_it = thrust::make_zip_iterator(compressed_data_ptrs.begin(),
compressed_data_sizes.begin(),
uncompressed_data_ptrs.begin(),
uncompressed_data_sizes.data());
thrust::transform(rmm::exec_policy(stream),
comp_in.begin(),
comp_in.end(),
comp_it,
[] __device__(gpu_inflate_input_s in) {
return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice, in.dstSize);
});

status = nvcompBatchedSnappyDecompressAsync(compressed_data_ptrs.data(),
compressed_data_sizes.data(),
uncompressed_data_sizes.data(),
actual_uncompressed_data_sizes.data(),
num_blocks,
scratch.data(),
scratch.size(),
uncompressed_data_ptrs.data(),
statuses.data(),
stream.value());
CUDF_EXPECTS(nvcompStatus_t::nvcompSuccess == status, "unable to perform snappy decompression");

CUDF_EXPECTS(thrust::equal(rmm::exec_policy(stream),
statuses.begin(),
statuses.end(),
thrust::make_constant_iterator(nvcompStatus_t::nvcompSuccess)),
"Error during snappy decompression");
thrust::for_each_n(
rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
num_blocks,
[=, actual_uncomp_sizes = actual_uncompressed_data_sizes.data()] __device__(auto i) {
comp_stat[i].bytes_written = actual_uncomp_sizes[i];
comp_stat[i].status = 0;
});
}

rmm::device_buffer reader::impl::decompress_stripe_data(
cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks,
const std::vector<rmm::device_buffer>& stripe_data,
Expand Down Expand Up @@ -592,9 +656,10 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
rmm::device_uvector<gpu_inflate_status_s> inflate_out(num_compressed_blocks, stream);

// Parse again to populate the decompression input/output buffers
size_t decomp_offset = 0;
uint32_t start_pos = 0;
uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks;
size_t decomp_offset = 0;
uint32_t max_uncomp_block_size = 0;
uint32_t start_pos = 0;
uint32_t start_pos_uncomp = (uint32_t)num_compressed_blocks;
for (size_t i = 0; i < compinfo.size(); ++i) {
auto dst_base = static_cast<uint8_t*>(decomp_data.data());
compinfo[i].uncompressed_data = dst_base + decomp_offset;
Expand All @@ -606,6 +671,8 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
decomp_offset += compinfo[i].max_uncompressed_size;
start_pos += compinfo[i].num_compressed_blocks;
start_pos_uncomp += compinfo[i].num_uncompressed_blocks;
max_uncomp_block_size =
std::max(max_uncomp_block_size, compinfo[i].max_uncompressed_block_size);
}
compinfo.host_to_device(stream);
gpu::ParseCompressedStripeData(compinfo.device_ptr(),
Expand All @@ -616,13 +683,24 @@ rmm::device_buffer reader::impl::decompress_stripe_data(

// Dispatch batches of blocks to decompress
if (num_compressed_blocks > 0) {
auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP");
bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0;
switch (decompressor->GetKind()) {
case orc::ZLIB:
CUDA_TRY(
gpuinflate(inflate_in.data(), inflate_out.data(), num_compressed_blocks, 0, stream));
break;
case orc::SNAPPY:
CUDA_TRY(gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream));
if (use_nvcomp) {
device_span<gpu_inflate_input_s> inflate_in_view{inflate_in.data(),
num_compressed_blocks};
device_span<gpu_inflate_status_s> inflate_out_view{inflate_out.data(),
num_compressed_blocks};
snappy_decompress(inflate_in_view, inflate_out_view, max_uncomp_block_size, stream);
} else {
CUDA_TRY(
gpu_unsnap(inflate_in.data(), inflate_out.data(), num_compressed_blocks, stream));
}
break;
default: CUDF_EXPECTS(false, "Unexpected decompression dispatch"); break;
}
Expand Down
26 changes: 15 additions & 11 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat
__syncthreads();
if (strm_id < num_streams) {
// Walk through the compressed blocks
const uint8_t* cur = s->info.compressed_data;
const uint8_t* end = cur + s->info.compressed_data_size;
uint8_t* uncompressed = s->info.uncompressed_data;
size_t max_uncompressed_size = 0;
uint32_t num_compressed_blocks = 0;
uint32_t num_uncompressed_blocks = 0;
const uint8_t* cur = s->info.compressed_data;
const uint8_t* end = cur + s->info.compressed_data_size;
uint8_t* uncompressed = s->info.uncompressed_data;
size_t max_uncompressed_size = 0;
uint32_t max_uncompressed_block_size = 0;
uint32_t num_compressed_blocks = 0;
uint32_t num_uncompressed_blocks = 0;
while (cur + 3 < end) {
uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0);
uint32_t is_uncompressed = block_len & 1;
Expand All @@ -60,8 +61,9 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat
cur += 3;
if (block_len > block_size || cur + block_len > end) {
// Fatal
num_compressed_blocks = 0;
max_uncompressed_size = 0;
num_compressed_blocks = 0;
max_uncompressed_size = 0;
max_uncompressed_block_size = 0;
break;
}
// TBD: For some codecs like snappy, it wouldn't be too difficult to get the actual
Expand Down Expand Up @@ -102,12 +104,14 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat
if (init_ctl && lane_id == 0) *init_ctl = s->ctl;
cur += block_len;
max_uncompressed_size += uncompressed_size;
max_uncompressed_block_size = max(max_uncompressed_block_size, uncompressed_size);
}
__syncwarp();
if (!lane_id) {
s->info.num_compressed_blocks = num_compressed_blocks;
s->info.num_uncompressed_blocks = num_uncompressed_blocks;
s->info.max_uncompressed_size = max_uncompressed_size;
s->info.num_compressed_blocks = num_compressed_blocks;
s->info.num_uncompressed_blocks = num_uncompressed_blocks;
s->info.max_uncompressed_size = max_uncompressed_size;
s->info.max_uncompressed_block_size = max_uncompressed_block_size;
}
}

Expand Down

0 comments on commit f08d6f1

Please sign in to comment.