Skip to content

Commit

Permalink
Support additional dictionary bit widths in Parquet writer (NVIDIA#11547
Browse files Browse the repository at this point in the history
)

Continuation of NVIDIA#11216, this adds ability to use 3, 5, 6, 10, and 20 bit dictionary keys in the Parquet encoder.

Also adds unit tests for each of the supported bit widths.

Authors:
  - Ed Seidl (https://github.com/etseidl)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Yunsong Wang (https://github.com/PointKernel)

URL: rapidsai/cudf#11547
  • Loading branch information
etseidl authored Aug 18, 2022
1 parent 127d574 commit 288c81f
Show file tree
Hide file tree
Showing 3 changed files with 263 additions and 75 deletions.
243 changes: 170 additions & 73 deletions cpp/src/io/parquet/page_enc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -460,89 +460,186 @@ inline __device__ uint8_t* VlqEncode(uint8_t* p, uint32_t v)
}

/**
* @brief Pack literal values in output bitstream (1,2,4,8,12 or 16 bits per value)
* @brief Pack literal values in output bitstream (1,2,3,4,5,6,8,10,12,16,20 or 24 bits per value)
*/
inline __device__ void PackLiterals(
inline __device__ void PackLiteralsShuffle(
uint8_t* dst, uint32_t v, uint32_t count, uint32_t w, uint32_t t)
{
if (w == 1 || w == 2 || w == 4 || w == 8 || w == 12 || w == 16 || w == 24) {
if (t <= (count | 0x1f)) {
if (w == 1 || w == 2 || w == 4) {
uint32_t mask = 0;
if (w == 1) {
v |= shuffle_xor(v, 1) << 1;
v |= shuffle_xor(v, 2) << 2;
v |= shuffle_xor(v, 4) << 4;
mask = 0x7;
} else if (w == 2) {
v |= shuffle_xor(v, 1) << 2;
v |= shuffle_xor(v, 2) << 4;
mask = 0x3;
} else if (w == 4) {
v |= shuffle_xor(v, 1) << 4;
mask = 0x1;
}
if (t < count && mask && !(t & mask)) { dst[(t * w) >> 3] = v; }
return;
} else if (w == 8) {
if (t < count) { dst[t] = v; }
return;
} else if (w == 12) {
v |= shuffle_xor(v, 1) << 12;
if (t < count && !(t & 1)) {
dst[(t >> 1) * 3 + 0] = v;
dst[(t >> 1) * 3 + 1] = v >> 8;
dst[(t >> 1) * 3 + 2] = v >> 16;
}
return;
} else if (w == 16) {
if (t < count) {
dst[t * 2 + 0] = v;
dst[t * 2 + 1] = v >> 8;
}
return;
} else if (w == 24) {
if (t < count) {
dst[t * 3 + 0] = v;
dst[t * 3 + 1] = v >> 8;
dst[t * 3 + 2] = v >> 16;
}
return;
constexpr uint32_t MASK2T = 1; // mask for 2 thread leader
constexpr uint32_t MASK4T = 3; // mask for 4 thread leader
constexpr uint32_t MASK8T = 7; // mask for 8 thread leader
uint64_t vt;

if (t > (count | 0x1f)) { return; }

switch (w) {
case 1:
v |= shuffle_xor(v, 1) << 1;
v |= shuffle_xor(v, 2) << 2;
v |= shuffle_xor(v, 4) << 4;
if (t < count && !(t & MASK8T)) { dst[(t * w) >> 3] = v; }
return;
case 2:
v |= shuffle_xor(v, 1) << 2;
v |= shuffle_xor(v, 2) << 4;
if (t < count && !(t & MASK4T)) { dst[(t * w) >> 3] = v; }
return;
case 3:
v |= shuffle_xor(v, 1) << 3;
v |= shuffle_xor(v, 2) << 6;
v |= shuffle_xor(v, 4) << 12;
if (t < count && !(t & MASK8T)) {
dst[(t >> 3) * 3 + 0] = v;
dst[(t >> 3) * 3 + 1] = v >> 8;
dst[(t >> 3) * 3 + 2] = v >> 16;
}
return;
case 4:
v |= shuffle_xor(v, 1) << 4;
if (t < count && !(t & MASK2T)) { dst[(t * w) >> 3] = v; }
return;
case 5:
v |= shuffle_xor(v, 1) << 5;
v |= shuffle_xor(v, 2) << 10;
vt = shuffle_xor(v, 4);
vt = vt << 20 | v;
if (t < count && !(t & MASK8T)) {
dst[(t >> 3) * 5 + 0] = vt;
dst[(t >> 3) * 5 + 1] = vt >> 8;
dst[(t >> 3) * 5 + 2] = vt >> 16;
dst[(t >> 3) * 5 + 3] = vt >> 24;
dst[(t >> 3) * 5 + 4] = vt >> 32;
}
return;
case 6:
v |= shuffle_xor(v, 1) << 6;
v |= shuffle_xor(v, 2) << 12;
if (t < count && !(t & MASK4T)) {
dst[(t >> 2) * 3 + 0] = v;
dst[(t >> 2) * 3 + 1] = v >> 8;
dst[(t >> 2) * 3 + 2] = v >> 16;
}
return;
case 8:
if (t < count) { dst[t] = v; }
return;
case 10:
v |= shuffle_xor(v, 1) << 10;
vt = shuffle_xor(v, 2);
vt = vt << 20 | v;
if (t < count && !(t & MASK4T)) {
dst[(t >> 2) * 5 + 0] = vt;
dst[(t >> 2) * 5 + 1] = vt >> 8;
dst[(t >> 2) * 5 + 2] = vt >> 16;
dst[(t >> 2) * 5 + 3] = vt >> 24;
dst[(t >> 2) * 5 + 4] = vt >> 32;
}
return;
case 12:
v |= shuffle_xor(v, 1) << 12;
if (t < count && !(t & MASK2T)) {
dst[(t >> 1) * 3 + 0] = v;
dst[(t >> 1) * 3 + 1] = v >> 8;
dst[(t >> 1) * 3 + 2] = v >> 16;
}
return;
case 16:
if (t < count) {
dst[t * 2 + 0] = v;
dst[t * 2 + 1] = v >> 8;
}
return;
case 20:
vt = shuffle_xor(v, 1);
vt = vt << 20 | v;
if (t < count && !(t & MASK2T)) {
dst[(t >> 1) * 5 + 0] = vt;
dst[(t >> 1) * 5 + 1] = vt >> 8;
dst[(t >> 1) * 5 + 2] = vt >> 16;
dst[(t >> 1) * 5 + 3] = vt >> 24;
dst[(t >> 1) * 5 + 4] = vt >> 32;
}
return;
case 24:
if (t < count) {
dst[t * 3 + 0] = v;
dst[t * 3 + 1] = v >> 8;
dst[t * 3 + 2] = v >> 16;
}
} else {
return;
}
} else if (w <= 16) {
// Scratch space to temporarily write to. Needed because we will use atomics to write 32 bit
// words but the destination mem may not be a multiple of 4 bytes.
// TODO (dm): This assumes blockdim = 128 and max bits per value = 16. Reduce magic numbers.
__shared__ uint32_t scratch[64];
if (t < 64) { scratch[t] = 0; }
__syncthreads();

if (t <= count) {
uint64_t v64 = v;
v64 <<= (t * w) & 0x1f;
default: CUDF_UNREACHABLE("Unsupported bit width");
}
}

// Copy 64 bit word into two 32 bit words while following C++ strict aliasing rules.
uint32_t v32[2];
memcpy(&v32, &v64, sizeof(uint64_t));
/**
* @brief Pack literals of arbitrary bit-length in output bitstream.
*/
inline __device__ void PackLiteralsRoundRobin(
uint8_t* dst, uint32_t v, uint32_t count, uint32_t w, uint32_t t)
{
// Scratch space to temporarily write to. Needed because we will use atomics to write 32 bit
// words but the destination mem may not be a multiple of 4 bytes.
// TODO (dm): This assumes blockdim = 128 and max bits per value = 16. Reduce magic numbers.
// To allow up to 24 bit this needs to be sized at 96 words.
__shared__ uint32_t scratch[64];
if (t < 64) { scratch[t] = 0; }
__syncthreads();

// Atomically write result to scratch
if (v32[0]) { atomicOr(scratch + ((t * w) >> 5), v32[0]); }
if (v32[1]) { atomicOr(scratch + ((t * w) >> 5) + 1, v32[1]); }
}
__syncthreads();
if (t <= count) {
// shift symbol left by up to 31 bits
uint64_t v64 = v;
v64 <<= (t * w) & 0x1f;

// Copy scratch data to final destination
auto available_bytes = (count * w + 7) / 8;
// Copy 64 bit word into two 32 bit words while following C++ strict aliasing rules.
uint32_t v32[2];
memcpy(&v32, &v64, sizeof(uint64_t));

auto scratch_bytes = reinterpret_cast<char*>(&scratch[0]);
if (t < available_bytes) { dst[t] = scratch_bytes[t]; }
if (t + 128 < available_bytes) { dst[t + 128] = scratch_bytes[t + 128]; }
__syncthreads();
} else {
CUDF_UNREACHABLE("Unsupported bit width");
// Atomically write result to scratch
if (v32[0]) { atomicOr(scratch + ((t * w) >> 5), v32[0]); }
if (v32[1]) { atomicOr(scratch + ((t * w) >> 5) + 1, v32[1]); }
}
__syncthreads();

// Copy scratch data to final destination
auto available_bytes = (count * w + 7) / 8;

auto scratch_bytes = reinterpret_cast<char*>(&scratch[0]);
if (t < available_bytes) { dst[t] = scratch_bytes[t]; }
if (t + 128 < available_bytes) { dst[t + 128] = scratch_bytes[t + 128]; }
// would need the following for up to 24 bits
// if (t + 256 < available_bytes) { dst[t + 256] = scratch_bytes[t + 256]; }
__syncthreads();
}

/**
* @brief Pack literal values in output bitstream
*/
inline __device__ void PackLiterals(
uint8_t* dst, uint32_t v, uint32_t count, uint32_t w, uint32_t t)
{
switch (w) {
case 1:
case 2:
case 3:
case 4:
case 5:
case 6:
case 8:
case 10:
case 12:
case 16:
case 20:
case 24:
// bit widths that lie on easy boundaries can be handled either directly
// (8, 16, 24) or through fast shuffle operations.
PackLiteralsShuffle(dst, v, count, w, t);
break;
default:
if (w > 16) { CUDF_UNREACHABLE("Unsupported bit width"); }
// less efficient bit packing that uses atomics, but can handle arbitrary
// bit widths up to 16. used for repetition and definition level encoding
PackLiteralsRoundRobin(dst, v, count, w, t);
}
}

Expand Down
5 changes: 3 additions & 2 deletions cpp/src/io/parquet/writer_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1067,9 +1067,10 @@ auto build_chunk_dictionaries(hostdevice_2dvector<gpu::EncColumnChunk>& chunks,
if (nbits > 24) { return std::pair(false, 0); }

// Only these bit sizes are allowed for RLE encoding because it's compute optimized
constexpr auto allowed_bitsizes = std::array<size_type, 7>{1, 2, 4, 8, 12, 16, 24};
constexpr auto allowed_bitsizes =
std::array<size_type, 12>{1, 2, 3, 4, 5, 6, 8, 10, 12, 16, 20, 24};

// ceil to (1/2/4/8/12/16/24)
// ceil to (1/2/3/4/5/6/8/10/12/16/20/24)
auto rle_bits = *std::lower_bound(allowed_bitsizes.begin(), allowed_bitsizes.end(), nbits);
auto rle_byte_size = util::div_rounding_up_safe(ck.num_values * rle_bits, 8);

Expand Down
90 changes: 90 additions & 0 deletions cpp/tests/io/parquet_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,26 @@ void read_footer(const std::unique_ptr<cudf::io::datasource>& source,
CUDF_EXPECTS(res, "Cannot parse file metadata");
}

// returns the number of bits used for dictionary encoding data at the given page location.
// this assumes the data is uncompressed.
// throws cudf::logic_error if the page_loc data is invalid.
int read_dict_bits(const std::unique_ptr<cudf::io::datasource>& source,
const cudf::io::parquet::PageLocation& page_loc)
{
CUDF_EXPECTS(page_loc.offset > 0, "Cannot find page header");
CUDF_EXPECTS(page_loc.compressed_page_size > 0, "Invalid page header length");

cudf::io::parquet::PageHeader page_hdr;
const auto page_buf = source->host_read(page_loc.offset, page_loc.compressed_page_size);
cudf::io::parquet::CompactProtocolReader cp(page_buf->data(), page_buf->size());
bool res = cp.read(&page_hdr);
CUDF_EXPECTS(res, "Cannot parse page header");

// cp should be pointing at the start of page data now. the first byte
// should be the encoding bit size
return cp.getb();
}

// read column index from datasource at location indicated by chunk,
// parse and return as a ColumnIndex struct.
// throws cudf::logic_error if the chunk data is invalid.
Expand Down Expand Up @@ -362,6 +382,18 @@ struct ParquetChunkedWriterNumericTypeTest : public ParquetChunkedWriterTest {
// Declare typed test cases
TYPED_TEST_SUITE(ParquetChunkedWriterNumericTypeTest, SupportedTypes);

// Base test fixture for size-parameterized tests
class ParquetSizedTest : public ::testing::TestWithParam<int> {
};

// test the allowed bit widths for dictionary encoding
// values chosen to trigger 1, 2, 3, 4, 5, 6, 8, 10, 12, 16, 20, and 24 bit dictionaries
INSTANTIATE_TEST_SUITE_P(
ParquetDictionaryTest,
ParquetSizedTest,
testing::Values(2, 4, 8, 16, 32, 64, 256, 1024, 4096, 65536, 128 * 1024, 2 * 1024 * 1024),
testing::PrintToStringParamName());

namespace {
// Generates a vector of uniform random values of type T
template <typename T>
Expand Down Expand Up @@ -4204,4 +4236,62 @@ TEST_F(ParquetReaderTest, StructByteArray)
CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());
}

TEST_P(ParquetSizedTest, DictionaryTest)
{
constexpr int nrows = 3'000'000;

auto elements = cudf::detail::make_counting_transform_iterator(0, [](auto i) {
return "a unique string value suffixed with " + std::to_string(i % GetParam());
});
auto const col0 = cudf::test::strings_column_wrapper(elements, elements + nrows);
auto const expected = table_view{{col0}};

auto const filepath = temp_env->get_temp_filepath("DictionaryTest.parquet");
// set row group size so that there will be only one row group
// no compression so we can easily read page data
cudf::io::parquet_writer_options out_opts =
cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, expected)
.compression(cudf::io::compression_type::NONE)
.stats_level(cudf::io::statistics_freq::STATISTICS_COLUMN)
.row_group_size_rows(nrows)
.row_group_size_bytes(256 * 1024 * 1024);
cudf::io::write_parquet(out_opts);

cudf::io::parquet_reader_options default_in_opts =
cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath});
auto const result = cudf_io::read_parquet(default_in_opts);

CUDF_TEST_EXPECT_TABLES_EQUAL(expected, result.tbl->view());

// make sure dictionary was used
auto const source = cudf::io::datasource::create(filepath);
cudf::io::parquet::FileMetaData fmd;

read_footer(source, &fmd);
auto used_dict = [&fmd]() {
for (auto enc : fmd.row_groups[0].columns[0].meta_data.encodings) {
if (enc == cudf::io::parquet::Encoding::PLAIN_DICTIONARY or
enc == cudf::io::parquet::Encoding::RLE_DICTIONARY) {
return true;
}
}
return false;
};
EXPECT_TRUE(used_dict());

// and check that the correct number of bits was used
auto const oi = read_offset_index(source, fmd.row_groups[0].columns[0]);
auto const nbits = read_dict_bits(source, oi.page_locations[0]);
auto const expected_bits =
cudf::io::parquet::CompactProtocolReader::NumRequiredBits(GetParam() - 1);

// copied from writer_impl.cu
constexpr auto allowed_bitsizes =
std::array<cudf::size_type, 12>{1, 2, 3, 4, 5, 6, 8, 10, 12, 16, 20, 24};
auto const rle_bits =
*std::lower_bound(allowed_bitsizes.begin(), allowed_bitsizes.end(), expected_bits);

EXPECT_EQ(nbits, rle_bits);
}

CUDF_TEST_PROGRAM_MAIN()

0 comments on commit 288c81f

Please sign in to comment.