Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix compression in ORC writer #12194

Merged
merged 4 commits into from
Nov 21, 2022
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cpp/src/io/orc/stripe_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1179,8 +1179,9 @@ __global__ void __launch_bounds__(256)
num_blocks = (ss.stream_size > 0) ? (ss.stream_size - 1) / comp_blk_size + 1 : 1;
for (uint32_t b = t; b < num_blocks; b += 256) {
uint32_t blk_size = min(comp_blk_size, ss.stream_size - min(b * comp_blk_size, ss.stream_size));
inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size};
auto const dst_offset = b * (padded_block_header_size + padded_comp_block_size);
inputs[ss.first_block + b] = {src + b * comp_blk_size, blk_size};
auto const dst_offset =
padded_block_header_size + b * (padded_block_header_size + padded_comp_block_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this also wrongly computed before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sort of. We insert a 3-byte header before each block when the compact the block in the output buffer. output buffer is actually same as compressed data buffer. So without the additional padded_block_header_size offset we would overwrite first three bytes of the first compressed block and then copy the block 3 bytes ahead, potentially doing even more damage to the block. This is why tests failed when @nvdbaranec fixed the SUCCESS condition locally. Luckily, this one change to the location where we write compressed block fixes all issues in compaction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code that copies the blocks:

if (src != dst) {
for (uint32_t i = 0; i < blk_size; i += 1024) {
uint8_t v = (i + t < blk_size) ? src[i + t] : 0;
__syncthreads();
if (i + t < blk_size) { dst[i + t] = v; }
}
}

outputs[ss.first_block + b] = {dst + dst_offset, max_comp_blk_size};
results[ss.first_block + b] = {0, compression_status::FAILURE};
}
Expand Down Expand Up @@ -1234,7 +1235,7 @@ __global__ void __launch_bounds__(1024)
? results[ss.first_block + b].bytes_written
: src_len;
uint32_t blk_size24{};
if (results[ss.first_block + b].status == compression_status::SUCCESS) {
if (src_len < dst_len) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are the semantics here? "If we failed to produce a compressed block smaller than the uncompressed data (either by failure or because of unlucky compression), just use the uncompressed data"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly right.
Planning to add a comment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also just invert the original condition, but this gives us additional benefits with uncompressable data.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. This seems like a good way to do it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, so there is no longer a check condition for SUCCESS compression?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is, indirectly, in the code slightly above it:

  auto dst_len = (results[ss.first_block + b].status == compression_status::SUCCESS)
                       ? results[ss.first_block + b].bytes_written
                       : src_len;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't throw any exception if the decomp process failed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a comment. The logic should be refactored, but I don't want to make any unnecessary changes at this point.

// Copy from uncompressed source
src = inputs[ss.first_block + b].data();
results[ss.first_block + b].bytes_written = src_len;
Expand Down