Skip to content

Commit

Permalink
Use nvcomp's snappy compressor in ORC writer (#9242)
Browse files Browse the repository at this point in the history
Issue #9205

depends on #9235

Authors:
  - Devavret Makkar (https://github.com/devavret)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Elias Stehle (https://github.com/elstehle)
  - https://github.com/nvdbaranec
  - Mike Wilson (https://github.com/hyperbolic2346)

URL: #9242
  • Loading branch information
devavret authored Sep 22, 2021
1 parent ef5ba4c commit 08cbbcd
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 46 deletions.
3 changes: 2 additions & 1 deletion cpp/src/io/orc/orc_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,6 +24,7 @@ namespace orc {

// ORC rows are divided into groups and assigned indexes for faster seeking
static constexpr uint32_t default_row_index_stride = 10000;
static constexpr uint32_t BLOCK_HEADER_SIZE = 3;

enum CompressionKind : uint8_t {
NONE = 0,
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ void CompactOrcDataStreams(device_2dspan<StripeStream> strm_desc,
* @param[in] num_compressed_blocks Total number of compressed blocks
* @param[in] compression Type of compression
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
* @param[in,out] strm_desc StripeStream device array [stripe][stream]
* @param[in,out] enc_streams chunk streams device array [column][rowgroup]
* @param[out] comp_in Per-block compression input parameters
Expand All @@ -365,10 +366,11 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
uint32_t num_compressed_blocks,
CompressionKind compression,
uint32_t comp_blk_size,
uint32_t max_comp_blk_size,
device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
rmm::cuda_stream_view stream);

/**
Expand Down
96 changes: 83 additions & 13 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@

#include <cub/cub.cuh>
#include <rmm/cuda_stream_view.hpp>
#include <rmm/exec_policy.hpp>

#include <nvcomp/snappy.h>

namespace cudf {
namespace io {
Expand Down Expand Up @@ -1102,15 +1105,17 @@ __global__ void __launch_bounds__(1024)
* @param[out] comp_out Per-block compression status
* @param[in] compressed_bfr Compression output buffer
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
*/
// blockDim {256,1,1}
__global__ void __launch_bounds__(256)
gpuInitCompressionBlocks(device_2dspan<StripeStream const> strm_desc,
device_2dspan<encoder_chunk_streams> streams, // const?
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
uint8_t* compressed_bfr,
uint32_t comp_blk_size)
uint32_t comp_blk_size,
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ uint8_t* volatile uncomp_base_g;
Expand All @@ -1135,8 +1140,8 @@ __global__ void __launch_bounds__(256)
uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size));
blk_in->srcDevice = src + b * comp_blk_size;
blk_in->srcSize = blk_size;
blk_in->dstDevice = dst + b * (3 + comp_blk_size) + 3; // reserve 3 bytes for block header
blk_in->dstSize = blk_size;
blk_in->dstDevice = dst + b * (BLOCK_HEADER_SIZE + max_comp_blk_size) + BLOCK_HEADER_SIZE;
blk_in->dstSize = max_comp_blk_size;
blk_out->bytes_written = blk_size;
blk_out->status = 1;
blk_out->reserved = 0;
Expand All @@ -1153,14 +1158,16 @@ __global__ void __launch_bounds__(256)
* @param[in] comp_out Per-block compression status
* @param[in] compressed_bfr Compression output buffer
* @param[in] comp_blk_size Compression block size
* @param[in] max_comp_blk_size Max size of any block after compression
*/
// blockDim {1024,1,1}
__global__ void __launch_bounds__(1024)
gpuCompactCompressedBlocks(device_2dspan<StripeStream> strm_desc,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
uint8_t* compressed_bfr,
uint32_t comp_blk_size)
uint32_t comp_blk_size,
uint32_t max_comp_blk_size)
{
__shared__ __align__(16) StripeStream ss;
__shared__ const uint8_t* volatile comp_src_g;
Expand Down Expand Up @@ -1271,20 +1278,83 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
uint32_t num_compressed_blocks,
CompressionKind compression,
uint32_t comp_blk_size,
uint32_t max_comp_blk_size,
device_2dspan<StripeStream> strm_desc,
device_2dspan<encoder_chunk_streams> enc_streams,
gpu_inflate_input_s* comp_in,
gpu_inflate_status_s* comp_out,
device_span<gpu_inflate_input_s> comp_in,
device_span<gpu_inflate_status_s> comp_out,
rmm::cuda_stream_view stream)
{
dim3 dim_block_init(256, 1);
dim3 dim_grid(strm_desc.size().first, strm_desc.size().second);
gpuInitCompressionBlocks<<<dim_grid, dim_block_init, 0, stream.value()>>>(
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size);
if (compression == SNAPPY) { gpu_snap(comp_in, comp_out, num_compressed_blocks, stream); }
strm_desc, enc_streams, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size);
if (compression == SNAPPY) {
auto env_use_nvcomp = std::getenv("LIBCUDF_USE_NVCOMP");
bool use_nvcomp = env_use_nvcomp != nullptr ? std::atoi(env_use_nvcomp) : 0;
if (use_nvcomp) {
try {
size_t temp_size;
nvcompStatus_t nvcomp_status = nvcompBatchedSnappyCompressGetTempSize(
num_compressed_blocks, comp_blk_size, nvcompBatchedSnappyDefaultOpts, &temp_size);

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess,
"Error in getting snappy compression scratch size");

rmm::device_buffer scratch(temp_size, stream);
rmm::device_uvector<void const*> uncompressed_data_ptrs(num_compressed_blocks, stream);
rmm::device_uvector<size_t> uncompressed_data_sizes(num_compressed_blocks, stream);
rmm::device_uvector<void*> compressed_data_ptrs(num_compressed_blocks, stream);
rmm::device_uvector<size_t> compressed_bytes_written(num_compressed_blocks, stream);

auto comp_it = thrust::make_zip_iterator(uncompressed_data_ptrs.begin(),
uncompressed_data_sizes.begin(),
compressed_data_ptrs.begin());
thrust::transform(rmm::exec_policy(stream),
comp_in.begin(),
comp_in.end(),
comp_it,
[] __device__(gpu_inflate_input_s in) {
return thrust::make_tuple(in.srcDevice, in.srcSize, in.dstDevice);
});
nvcomp_status = nvcompBatchedSnappyCompressAsync(uncompressed_data_ptrs.data(),
uncompressed_data_sizes.data(),
max_comp_blk_size,
num_compressed_blocks,
scratch.data(),
scratch.size(),
compressed_data_ptrs.data(),
compressed_bytes_written.data(),
nvcompBatchedSnappyDefaultOpts,
stream.value());

CUDF_EXPECTS(nvcomp_status == nvcompStatus_t::nvcompSuccess, "Error in snappy compression");

thrust::transform(rmm::exec_policy(stream),
compressed_bytes_written.begin(),
compressed_bytes_written.end(),
comp_out.begin(),
[] __device__(size_t size) {
gpu_inflate_status_s status{};
status.bytes_written = size;
return status;
});
} catch (...) {
// If we reach this then there was an error in compressing so set an error status for each
// block
thrust::for_each(rmm::exec_policy(stream),
comp_out.begin(),
comp_out.end(),
[] __device__(gpu_inflate_status_s & stat) { stat.status = 1; });
};

} else {
gpu_snap(comp_in.data(), comp_out.data(), num_compressed_blocks, stream);
}
}
dim3 dim_block_compact(1024, 1);
gpuCompactCompressedBlocks<<<dim_grid, dim_block_compact, 0, stream.value()>>>(
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size);
strm_desc, comp_in, comp_out, compressed_data, comp_blk_size, max_comp_blk_size);
}

} // namespace gpu
Expand Down
16 changes: 9 additions & 7 deletions cpp/src/io/orc/stripe_init.cu
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -52,13 +52,13 @@ extern "C" __global__ void __launch_bounds__(128, 8) gpuParseCompressedStripeDat
uint32_t max_uncompressed_block_size = 0;
uint32_t num_compressed_blocks = 0;
uint32_t num_uncompressed_blocks = 0;
while (cur + 3 < end) {
while (cur + BLOCK_HEADER_SIZE < end) {
uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0);
uint32_t is_uncompressed = block_len & 1;
uint32_t uncompressed_size;
gpu_inflate_input_s* init_ctl = nullptr;
block_len >>= 1;
cur += 3;
cur += BLOCK_HEADER_SIZE;
if (block_len > block_size || cur + block_len > end) {
// Fatal
num_compressed_blocks = 0;
Expand Down Expand Up @@ -145,12 +145,12 @@ extern "C" __global__ void __launch_bounds__(128, 8)
uint32_t num_compressed_blocks = 0;
uint32_t max_compressed_blocks = s->info.num_compressed_blocks;

while (cur + 3 < end) {
while (cur + BLOCK_HEADER_SIZE < end) {
uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0);
uint32_t is_uncompressed = block_len & 1;
uint32_t uncompressed_size_est, uncompressed_size_actual;
block_len >>= 1;
cur += 3;
cur += BLOCK_HEADER_SIZE;
if (cur + block_len > end) { break; }
if (is_uncompressed) {
uncompressed_size_est = block_len;
Expand Down Expand Up @@ -367,9 +367,11 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s,
for (;;) {
uint32_t block_len, is_uncompressed;

if (cur + 3 > end || cur + 3 >= start + compressed_offset) { break; }
if (cur + BLOCK_HEADER_SIZE > end || cur + BLOCK_HEADER_SIZE >= start + compressed_offset) {
break;
}
block_len = cur[0] | (cur[1] << 8) | (cur[2] << 16);
cur += 3;
cur += BLOCK_HEADER_SIZE;
is_uncompressed = block_len & 1;
block_len >>= 1;
cur += block_len;
Expand Down
51 changes: 28 additions & 23 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include <rmm/device_buffer.hpp>
#include <rmm/device_uvector.hpp>

#include <nvcomp/snappy.h>

#include <algorithm>
#include <cstring>
#include <numeric>
Expand Down Expand Up @@ -999,10 +1001,10 @@ void writer::impl::write_index_stream(int32_t stripe_id,
record.pos += stream.lengths[type];
while ((record.pos >= 0) && (record.blk_pos >= 0) &&
(static_cast<size_t>(record.pos) >= compression_blocksize_) &&
(record.comp_pos + 3 + comp_out[record.blk_pos].bytes_written <
(record.comp_pos + BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written <
static_cast<size_t>(record.comp_size))) {
record.pos -= compression_blocksize_;
record.comp_pos += 3 + comp_out[record.blk_pos].bytes_written;
record.comp_pos += BLOCK_HEADER_SIZE + comp_out[record.blk_pos].bytes_written;
record.blk_pos += 1;
}
}
Expand Down Expand Up @@ -1472,29 +1474,31 @@ void writer::impl::write(table_view const& table)
}

// Allocate intermediate output stream buffer
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
auto stream_output = [&]() {
size_t compressed_bfr_size = 0;
size_t num_compressed_blocks = 0;
size_t max_compressed_block_size = 0;
if (compression_kind_ != NONE) {
nvcompBatchedSnappyCompressGetMaxOutputChunkSize(
compression_blocksize_, nvcompBatchedSnappyDefaultOpts, &max_compressed_block_size);
}
auto stream_output = [&]() {
size_t max_stream_size = 0;
bool all_device_write = true;

for (size_t stripe_id = 0; stripe_id < segmentation.num_stripes(); stripe_id++) {
for (size_t i = 0; i < num_data_streams; i++) { // TODO range for (at least)
gpu::StripeStream* ss = &strm_descs[stripe_id][i];
if (!out_sink_->is_device_write_preferred(ss->stream_size)) { all_device_write = false; }
size_t stream_size = ss->stream_size;
if (compression_kind_ != NONE) {
ss->first_block = num_compressed_blocks;
ss->bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * 3;
num_compressed_blocks += num_blocks;
compressed_bfr_size += stream_size;
}
max_stream_size = std::max(max_stream_size, stream_size);
for (auto& ss : strm_descs.host_view().flat_view()) {
if (!out_sink_->is_device_write_preferred(ss.stream_size)) { all_device_write = false; }
size_t stream_size = ss.stream_size;
if (compression_kind_ != NONE) {
ss.first_block = num_compressed_blocks;
ss.bfr_offset = compressed_bfr_size;

auto num_blocks = std::max<uint32_t>(
(stream_size + compression_blocksize_ - 1) / compression_blocksize_, 1);
stream_size += num_blocks * BLOCK_HEADER_SIZE;
num_compressed_blocks += num_blocks;
compressed_bfr_size += (max_compressed_block_size + BLOCK_HEADER_SIZE) * num_blocks;
}
max_stream_size = std::max(max_stream_size, stream_size);
}

if (all_device_write) {
Expand All @@ -1519,10 +1523,11 @@ void writer::impl::write(table_view const& table)
num_compressed_blocks,
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
strm_descs,
enc_data.streams,
comp_in.device_ptr(),
comp_out.device_ptr(),
comp_in,
comp_out,
stream);
strm_descs.device_to_host(stream);
comp_out.device_to_host(stream, true);
Expand Down

0 comments on commit 08cbbcd

Please sign in to comment.