Skip to content

Commit

Permalink
Fix alignment of compressed blocks in ORC writer (#12077)
Browse files Browse the repository at this point in the history
Closes #11812
Fixed alignment of compressed blocks in ORC writer - impacted ZLIB compression.
Re-enabled nvCOMP DEFLATE compression in ORC - nvCOMP 2.5+ only.

Refactored the nvCOMP feature status(enabled/disabled in cuDF) checks to include reason why features are not enabled (if not enabled).
Refactored call sites to return the detailed error message if an operation fails because of nvCOMP integration config.
Refactored nvCOMP adapter macros to allow mocking of the parameters that determine if an nvCOMP feature is enabled (env var, GPU compute capability, nvCOMP version).
Added tests to verify the logic of the newly refactored feature status checks (allowed by the mocking above).
Fix a Parquet test that was calling ORC reader/writer 😬

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Jim Brennan (https://github.com/jbrennan333)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Bradley Dice (https://github.com/bdice)

URL: #12077
  • Loading branch information
vuule authored Nov 11, 2022
1 parent 3894427 commit d335aa3
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 134 deletions.
227 changes: 144 additions & 83 deletions cpp/src/io/comp/nvcomp_adapter.cpp

Large diffs are not rendered by default.

48 changes: 44 additions & 4 deletions cpp/src/io/comp/nvcomp_adapter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

#include "gpuinflate.hpp"

#include <io/utilities/config_utils.hpp>

#include <cudf/utilities/error.hpp>
#include <cudf/utilities/span.hpp>

Expand All @@ -30,14 +32,52 @@ namespace cudf::io::nvcomp {
enum class compression_type { SNAPPY, ZSTD, DEFLATE };

/**
* @brief Whether the given compression type is enabled through nvCOMP.
* @brief Set of parameters that impact whether the use nvCOMP features is enabled.
*/
struct feature_status_parameters {
int lib_major_version;
int lib_minor_version;
int lib_patch_version;
bool are_all_integrations_enabled;
bool are_stable_integrations_enabled;
int compute_capability_major;

feature_status_parameters();
feature_status_parameters(
int major, int minor, int patch, bool all_enabled, bool stable_enabled, int cc_major)
: lib_major_version{major},
lib_minor_version{minor},
lib_patch_version{patch},
are_all_integrations_enabled{all_enabled},
are_stable_integrations_enabled{stable_enabled},
compute_capability_major{cc_major}
{
}
};

/**
* @brief If a compression type is disabled through nvCOMP, returns the reason as a string.
*
* Result cab depend on nvCOMP version and environment variables.
*
* @param compression Compression type
* @param params Optional parameters to query status with different configurations
* @returns Reason for the feature disablement, `std::nullopt` if the feature is enabled
*/
[[nodiscard]] std::optional<std::string> is_compression_disabled(
compression_type compression, feature_status_parameters params = feature_status_parameters());

/**
* @brief If a decompression type is disabled through nvCOMP, returns the reason as a string.
*
* Result depends on nvCOMP version and environment variables.
* Result can depend on nvCOMP version and environment variables.
*
* @param compression Compression type
* @returns true if nvCOMP use is enabled; false otherwise
* @param params Optional parameters to query status with different configurations
* @returns Reason for the feature disablement, `std::nullopt` if the feature is enabled
*/
[[nodiscard]] bool is_compression_enabled(compression_type compression);
[[nodiscard]] std::optional<std::string> is_decompression_disabled(
compression_type compression, feature_status_parameters params = feature_status_parameters());

/**
* @brief Device batch decompression of given type.
Expand Down
23 changes: 13 additions & 10 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -379,34 +379,37 @@ rmm::device_buffer reader::impl::decompress_stripe_data(
device_span<device_span<uint8_t>> inflate_out_view{inflate_out.data(), num_compressed_blocks};
switch (decompressor.compression()) {
case compression_type::ZLIB:
// See https://github.com/rapidsai/cudf/issues/11812
if (false) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) {
gpuinflate(
inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream);
} else {
nvcomp::batched_decompress(nvcomp::compression_type::DEFLATE,
inflate_in_view,
inflate_out_view,
inflate_res,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpuinflate(
inflate_in_view, inflate_out_view, inflate_res, gzip_header_included::NO, stream);
}
break;
case compression_type::SNAPPY:
if (nvcomp_integration::is_stable_enabled()) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream);
} else {
nvcomp::batched_decompress(nvcomp::compression_type::SNAPPY,
inflate_in_view,
inflate_out_view,
inflate_res,
max_uncomp_block_size,
total_decomp_size,
stream);
} else {
gpu_unsnap(inflate_in_view, inflate_out_view, inflate_res, stream);
}
break;
case compression_type::ZSTD:
if (auto const reason = nvcomp::is_decompression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Decompression error: " + reason.value());
}
nvcomp::batched_decompress(nvcomp::compression_type::ZSTD,
inflate_in_view,
inflate_out_view,
Expand Down Expand Up @@ -522,8 +525,8 @@ void update_null_mask(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks
parent_mask_len, mask_state::ALL_NULL, rmm::cuda_stream_view(stream), mr);
auto merged_mask = static_cast<bitmask_type*>(merged_null_mask.data());
uint32_t* dst_idx_ptr = dst_idx.data();
// Copy child valid bits from child column to valid indexes, this will merge both child and
// parent null masks
// Copy child valid bits from child column to valid indexes, this will merge both child
// and parent null masks
thrust::for_each(rmm::exec_policy(stream),
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(0) + dst_idx.size(),
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1332,11 +1332,11 @@ void CompressOrcDataStreams(uint8_t* compressed_data,

if (compression == SNAPPY) {
try {
if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) {
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_snap(comp_in, comp_out, comp_res, stream);
} else {
nvcomp::batched_compress(
nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream);
} else {
gpu_snap(comp_in, comp_out, comp_res, stream);
}
} catch (...) {
// There was an error in compressing so set an error status for each block
Expand All @@ -1348,12 +1348,18 @@ void CompressOrcDataStreams(uint8_t* compressed_data,
// Since SNAPPY is the default compression (may not be explicitly requested), fall back to
// writing without compression
}
} else if (compression == ZLIB and
nvcomp::is_compression_enabled(nvcomp::compression_type::DEFLATE)) {
} else if (compression == ZLIB) {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::DEFLATE);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(
nvcomp::compression_type::DEFLATE, comp_in, comp_out, comp_res, stream);
} else if (compression == ZSTD and
nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) {
} else if (compression == ZSTD) {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);
} else if (compression != NONE) {
CUDF_FAIL("Unsupported compression type");
Expand Down
21 changes: 11 additions & 10 deletions cpp/src/io/orc/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ constexpr size_t compression_block_size(orc::CompressionKind compression)
if (compression == orc::CompressionKind::NONE) { return 0; }

auto const ncomp_type = to_nvcomp_compression_type(compression);
auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type)
? nvcomp::compress_max_allowed_chunk_size(ncomp_type)
: std::nullopt;
auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type)
? std::nullopt
: nvcomp::compress_max_allowed_chunk_size(ncomp_type);

constexpr size_t max_block_size = 256 * 1024;
return std::min(nvcomp_limit.value_or(max_block_size), max_block_size);
Expand Down Expand Up @@ -537,7 +537,7 @@ constexpr size_t RLE_stream_size(TypeKind kind, size_t count)
auto uncomp_block_alignment(CompressionKind compression_kind)
{
if (compression_kind == NONE or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) {
return 1u;
}

Expand All @@ -547,7 +547,7 @@ auto uncomp_block_alignment(CompressionKind compression_kind)
auto comp_block_alignment(CompressionKind compression_kind)
{
if (compression_kind == NONE or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(compression_kind))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(compression_kind))) {
return 1u;
}

Expand Down Expand Up @@ -2161,15 +2161,16 @@ void writer::impl::write(table_view const& table)

auto dec_chunk_sizes = decimal_chunk_sizes(orc_table, segmentation, stream);

auto const uncomp_block_align = uncomp_block_alignment(compression_kind_);
auto const uncompressed_block_align = uncomp_block_alignment(compression_kind_);
auto const compressed_block_align = comp_block_alignment(compression_kind_);
auto streams =
create_streams(orc_table.columns, segmentation, decimal_column_sizes(dec_chunk_sizes.rg_sizes));
auto enc_data = encode_columns(orc_table,
std::move(dictionaries),
std::move(dec_chunk_sizes),
segmentation,
streams,
uncomp_block_align,
uncompressed_block_align,
stream);

// Assemble individual disparate column chunks into contiguous data streams
Expand All @@ -2187,9 +2188,9 @@ void writer::impl::write(table_view const& table)
auto const max_compressed_block_size =
max_compression_output_size(compression_kind_, compression_blocksize_);
auto const padded_max_compressed_block_size =
util::round_up_unsafe<size_t>(max_compressed_block_size, uncomp_block_align);
util::round_up_unsafe<size_t>(max_compressed_block_size, compressed_block_align);
auto const padded_block_header_size =
util::round_up_unsafe<size_t>(block_header_size, uncomp_block_align);
util::round_up_unsafe<size_t>(block_header_size, compressed_block_align);

auto stream_output = [&]() {
size_t max_stream_size = 0;
Expand Down Expand Up @@ -2238,7 +2239,7 @@ void writer::impl::write(table_view const& table)
compression_kind_,
compression_blocksize_,
max_compressed_block_size,
comp_block_alignment(compression_kind_),
compressed_block_align,
strm_descs,
enc_data.streams,
comp_results,
Expand Down
25 changes: 14 additions & 11 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ auto to_nvcomp_compression_type(Compression codec)
auto page_alignment(Compression codec)
{
if (codec == Compression::UNCOMPRESSED or
not nvcomp::is_compression_enabled(to_nvcomp_compression_type(codec))) {
nvcomp::is_compression_disabled(to_nvcomp_compression_type(codec))) {
return 1u;
}

Expand Down Expand Up @@ -1172,19 +1172,22 @@ void writer::impl::encode_pages(hostdevice_2dvector<gpu::EncColumnChunk>& chunks
gpu::EncodePages(batch_pages, comp_in, comp_out, comp_res, stream);
switch (compression_) {
case parquet::Compression::SNAPPY:
if (nvcomp::is_compression_enabled(nvcomp::compression_type::SNAPPY)) {
if (nvcomp::is_compression_disabled(nvcomp::compression_type::SNAPPY)) {
gpu_snap(comp_in, comp_out, comp_res, stream);
} else {
nvcomp::batched_compress(
nvcomp::compression_type::SNAPPY, comp_in, comp_out, comp_res, stream);
} else {
gpu_snap(comp_in, comp_out, comp_res, stream);
}
break;
case parquet::Compression::ZSTD:
if (nvcomp::is_compression_enabled(nvcomp::compression_type::ZSTD)) {
nvcomp::batched_compress(
nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);
case parquet::Compression::ZSTD: {
if (auto const reason = nvcomp::is_compression_disabled(nvcomp::compression_type::ZSTD);
reason) {
CUDF_FAIL("Compression error: " + reason.value());
}
nvcomp::batched_compress(nvcomp::compression_type::ZSTD, comp_in, comp_out, comp_res, stream);

break;
}
case parquet::Compression::UNCOMPRESSED: break;
default: CUDF_FAIL("invalid compression type");
}
Expand Down Expand Up @@ -1246,9 +1249,9 @@ size_t max_page_bytes(Compression compression, size_t max_page_size_bytes)
if (compression == parquet::Compression::UNCOMPRESSED) { return max_page_size_bytes; }

auto const ncomp_type = to_nvcomp_compression_type(compression);
auto const nvcomp_limit = nvcomp::is_compression_enabled(ncomp_type)
? nvcomp::compress_max_allowed_chunk_size(ncomp_type)
: std::nullopt;
auto const nvcomp_limit = nvcomp::is_compression_disabled(ncomp_type)
? std::nullopt
: nvcomp::compress_max_allowed_chunk_size(ncomp_type);

return std::min(nvcomp_limit.value_or(max_page_size_bytes), max_page_size_bytes);
}
Expand Down
14 changes: 7 additions & 7 deletions cpp/src/io/text/bgzip_data_chunk_source.cu
Original file line number Diff line number Diff line change
Expand Up @@ -144,20 +144,20 @@ class bgzip_data_chunk_reader : public data_chunk_reader {
bgzip_nvcomp_transform_functor{reinterpret_cast<uint8_t const*>(d_compressed_blocks.data()),
reinterpret_cast<uint8_t*>(d_decompressed_blocks.begin())});
if (decompressed_size() > 0) {
if (cudf::io::detail::nvcomp_integration::is_all_enabled()) {
if (nvcomp::is_decompression_disabled(nvcomp::compression_type::DEFLATE)) {
gpuinflate(d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
gzip_header_included::NO,
stream);
} else {
cudf::io::nvcomp::batched_decompress(cudf::io::nvcomp::compression_type::DEFLATE,
d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
max_decompressed_size,
decompressed_size(),
stream);
} else {
gpuinflate(d_compressed_spans,
d_decompressed_spans,
d_decompression_results,
gzip_header_included::NO,
stream);
}
}
is_decompressed = true;
Expand Down
58 changes: 58 additions & 0 deletions cpp/tests/io/comp/decomp_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <io/comp/gpuinflate.hpp>
#include <io/utilities/hostdevice_vector.hpp>
#include <src/io/comp/nvcomp_adapter.hpp>

#include <cudf/utilities/default_stream.hpp>

Expand Down Expand Up @@ -118,6 +119,9 @@ struct BrotliDecompressTest : public DecompressTest<BrotliDecompressTest> {
}
};

struct NvcompConfigTest : public cudf::test::BaseFixture {
};

TEST_F(GzipDecompressTest, HelloWorld)
{
constexpr char uncompressed[] = "hello world";
Expand Down Expand Up @@ -166,4 +170,58 @@ TEST_F(BrotliDecompressTest, HelloWorld)
EXPECT_EQ(output, input);
}

TEST_F(NvcompConfigTest, Compression)
{
using cudf::io::nvcomp::compression_type;
auto const& comp_disabled = cudf::io::nvcomp::is_compression_disabled;

EXPECT_FALSE(comp_disabled(compression_type::DEFLATE, {2, 5, 0, true, true, 0}));
// version 2.5 required
EXPECT_TRUE(comp_disabled(compression_type::DEFLATE, {2, 4, 0, true, true, 0}));
// all integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::DEFLATE, {2, 5, 0, false, true, 0}));

EXPECT_FALSE(comp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 0}));
EXPECT_FALSE(comp_disabled(compression_type::ZSTD, {2, 4, 0, false, true, 0}));
// 2.4 version required
EXPECT_TRUE(comp_disabled(compression_type::ZSTD, {2, 3, 1, false, true, 0}));
// stable integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::ZSTD, {2, 4, 0, false, false, 0}));

EXPECT_FALSE(comp_disabled(compression_type::SNAPPY, {2, 5, 0, true, true, 0}));
EXPECT_FALSE(comp_disabled(compression_type::SNAPPY, {2, 4, 0, false, true, 0}));
// stable integrations enabled required
EXPECT_TRUE(comp_disabled(compression_type::SNAPPY, {2, 3, 0, false, false, 0}));
}

TEST_F(NvcompConfigTest, Decompression)
{
using cudf::io::nvcomp::compression_type;
auto const& decomp_disabled = cudf::io::nvcomp::is_decompression_disabled;

EXPECT_FALSE(decomp_disabled(compression_type::DEFLATE, {2, 5, 0, true, true, 7}));
// version 2.5 required
EXPECT_TRUE(decomp_disabled(compression_type::DEFLATE, {2, 4, 0, true, true, 7}));
// all integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::DEFLATE, {2, 5, 0, false, true, 7}));

EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 3, 2, false, true, 6}));
EXPECT_FALSE(decomp_disabled(compression_type::ZSTD, {2, 3, 0, true, true, 6}));
// 2.3.1 and earlier requires all integrations to be enabled
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 3, 1, false, true, 7}));
// 2.3 version required
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 2, 0, true, true, 7}));
// stable integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, false, false, 7}));
// 2.4.0 disabled on Pascal
EXPECT_TRUE(decomp_disabled(compression_type::ZSTD, {2, 4, 0, true, true, 6}));

EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 4, 0, true, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 3, 0, false, true, 7}));
EXPECT_FALSE(decomp_disabled(compression_type::SNAPPY, {2, 2, 0, false, true, 7}));
// stable integrations enabled required
EXPECT_TRUE(decomp_disabled(compression_type::SNAPPY, {2, 2, 0, false, false, 7}));
}

CUDF_TEST_PROGRAM_MAIN()
Loading

0 comments on commit d335aa3

Please sign in to comment.