Skip to content

Commit

Permalink
Read FIXED_LEN_BYTE_ARRAY as binary in parquet reader (#13437)
Browse files Browse the repository at this point in the history
Closes #12590 

This PR adds support of reading `FIXED_LEN_BYTE_ARRAY` as lists of `INT8` in the parquet reader.

Authors:
  - Yunsong Wang (https://github.com/PointKernel)
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Mike Wilson (https://github.com/hyperbolic2346)
  - Robert (Bobby) Evans (https://github.com/revans2)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #13437
  • Loading branch information
PointKernel authored Aug 24, 2023
1 parent ff99f98 commit 6095a92
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 58 deletions.
16 changes: 12 additions & 4 deletions cpp/src/io/parquet/page_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ inline __device__ void gpuOutputString(volatile page_state_s* s,
void* dstv)
{
auto [ptr, len] = gpuGetStringData(s, sb, src_pos);
if (s->dtype_len == 4) {
// make sure to only hash `BYTE_ARRAY` when specified with the output type size
if (s->dtype_len == 4 and (s->col.data_type & 7) == BYTE_ARRAY) {
// Output hash. This hash value is used if the option to convert strings to
// categoricals is enabled. The seed value is chosen arbitrarily.
uint32_t constexpr hash_seed = 33;
Expand Down Expand Up @@ -456,8 +457,12 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
if (s->dict_base) {
out_thread0 = (s->dict_bits > 0) ? 64 : 32;
} else {
out_thread0 =
((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32;
switch (s->col.data_type & 7) {
case BOOLEAN: [[fallthrough]];
case BYTE_ARRAY: [[fallthrough]];
case FIXED_LEN_BYTE_ARRAY: out_thread0 = 64; break;
default: out_thread0 = 32;
}
}

PageNestingDecodeInfo* nesting_info_base = s->nesting_info;
Expand Down Expand Up @@ -494,7 +499,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
src_target_pos = gpuDecodeDictionaryIndices<false>(s, sb, src_target_pos, t & 0x1f).first;
} else if ((s->col.data_type & 7) == BOOLEAN) {
src_target_pos = gpuDecodeRleBooleans(s, sb, src_target_pos, t & 0x1f);
} else if ((s->col.data_type & 7) == BYTE_ARRAY) {
} else if ((s->col.data_type & 7) == BYTE_ARRAY or
(s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
gpuInitStringDescriptors<false>(s, sb, src_target_pos, t & 0x1f);
}
if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; }
Expand Down Expand Up @@ -564,6 +570,8 @@ __global__ void __launch_bounds__(decode_block_size) gpuDecodePageData(
}
break;
}
} else if (dtype == FIXED_LEN_BYTE_ARRAY) {
gpuOutputString(s, sb, val_src_pos, dst);
} else if (dtype == INT96) {
gpuOutputInt96Timestamp(s, sb, val_src_pos, static_cast<int64_t*>(dst));
} else if (dtype_len == 8) {
Expand Down
40 changes: 23 additions & 17 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -421,13 +421,15 @@ __device__ size_type gpuInitStringDescriptors(page_state_s volatile* s,
int k = s->dict_val;

while (pos < target_pos) {
int len;
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
int len = 0;
if ((s->col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
if (k < dict_size) { len = s->dtype_len_in; }
} else {
len = 0;
if (k + 4 <= dict_size) {
len = (cur[k]) | (cur[k + 1] << 8) | (cur[k + 2] << 16) | (cur[k + 3] << 24);
k += 4;
if (k + len > dict_size) { len = 0; }
}
}
if constexpr (!sizes_only) {
sb->dict_idx[rolling_index<state_buf::dict_buf_size>(pos)] = k;
Expand Down Expand Up @@ -1154,16 +1156,20 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
}
// Special check for downconversions
s->dtype_len_in = s->dtype_len;
if (s->col.converted_type == DECIMAL && data_type == FIXED_LEN_BYTE_ARRAY) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
} else if (dtype_len <= sizeof(int64_t)) {
return sizeof(int64_t);
} else {
return sizeof(__int128_t);
}
}();
if (data_type == FIXED_LEN_BYTE_ARRAY) {
if (s->col.converted_type == DECIMAL) {
s->dtype_len = [dtype_len = s->dtype_len]() {
if (dtype_len <= sizeof(int32_t)) {
return sizeof(int32_t);
} else if (dtype_len <= sizeof(int64_t)) {
return sizeof(int64_t);
} else {
return sizeof(__int128_t);
}
}();
} else {
s->dtype_len = sizeof(string_index_pair);
}
} else if (data_type == INT32) {
if (dtype_len_out == 1) {
// INT8 output
Expand Down Expand Up @@ -1219,7 +1225,7 @@ inline __device__ bool setupLocalPageInfo(page_state_s* const s,
uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len;
// if this is a string column, then dtype_len is a lie. data will be offsets rather
// than (ptr,len) tuples.
if (data_type == BYTE_ARRAY && s->dtype_len != 4) { len = sizeof(cudf::size_type); }
if (is_string_col(s->col)) { len = sizeof(cudf::size_type); }
nesting_info->data_out += (output_offset * len);
}
if (nesting_info->string_out != nullptr) {
Expand Down
69 changes: 37 additions & 32 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -521,39 +521,44 @@ __global__ void __launch_bounds__(preprocess_block_size) gpuComputePageStringSiz
pp->num_valids = s->page.num_valids;
}

// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
auto const& col = s->col;
uint8_t const* data = s->data_start;
uint8_t const* const end = s->data_end;
uint8_t const* dict_base = nullptr;
int dict_size = 0;
size_t str_bytes = 0;

switch (pp->encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
}
auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
if ((col.data_type & 7) == FIXED_LEN_BYTE_ARRAY) {
str_bytes = pp->num_valids * s->dtype_len_in;
} else {
// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
uint8_t const* data = s->data_start;
uint8_t const* const end = s->data_end;
uint8_t const* dict_base = nullptr;
int dict_size = 0;

switch (pp->encoding) {
case Encoding::PLAIN_DICTIONARY:
case Encoding::RLE_DICTIONARY:
// RLE-packed dictionary indices, first byte indicates index length in bits
if (col.str_dict_index) {
// String dictionary: use index
dict_base = reinterpret_cast<const uint8_t*>(col.str_dict_index);
dict_size = col.page_info[0].num_input_values * sizeof(string_index_pair);
} else {
dict_base = col.page_info[0].page_data; // dictionary is always stored in the first page
dict_size = col.page_info[0].uncompressed_page_size;
}

// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
break;
case Encoding::PLAIN:
dict_size = static_cast<int32_t>(end - data);
str_bytes = is_bounds_pg ? totalPlainEntriesSize(data, dict_size, start_value, end_value)
: dict_size - sizeof(int) * (pp->num_input_values - pp->num_nulls);
break;
// FIXME: need to return an error condition...this won't actually do anything
if (s->dict_bits > 32 || !dict_base) { CUDF_UNREACHABLE("invalid dictionary bit size"); }

str_bytes = totalDictEntriesSize(
data, dict_base, s->dict_bits, dict_size, (end - data), start_value, end_value);
break;
case Encoding::PLAIN:
dict_size = static_cast<int32_t>(end - data);
str_bytes = is_bounds_pg ? totalPlainEntriesSize(data, dict_size, start_value, end_value)
: dict_size - sizeof(int) * pp->num_valids;
break;
}
}

if (t == 0) {
Expand Down
7 changes: 5 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,11 @@ struct EncPage {
*/
constexpr bool is_string_col(ColumnChunkDesc const& chunk)
{
return (chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4 and
chunk.converted_type != DECIMAL;
auto const not_converted_to_decimal = chunk.converted_type != DECIMAL;
auto const non_hashed_byte_array =
(chunk.data_type & 7) == BYTE_ARRAY and (chunk.data_type >> 3) != 4;
auto const fixed_len_byte_array = (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY;
return not_converted_to_decimal and (non_hashed_byte_array or fixed_len_byte_array);
}

/**
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -401,9 +401,15 @@ table_with_metadata reader::impl::read_chunk_internal(

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
auto const metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto metadata = _reader_column_schema.has_value()
? std::make_optional<reader_column_schema>((*_reader_column_schema)[i])
: std::nullopt;
auto const& schema = _metadata->get_schema(_output_column_schemas[i]);
// FIXED_LEN_BYTE_ARRAY never read as string
if (schema.type == FIXED_LEN_BYTE_ARRAY and schema.converted_type != DECIMAL) {
metadata = std::make_optional<reader_column_schema>();
metadata->set_convert_binary_to_strings(false);
}
// Only construct `out_metadata` if `_output_metadata` has not been cached.
if (!_output_metadata) {
column_name_info& col_name = out_metadata.schema_info[i];
Expand Down
Binary file not shown.
9 changes: 9 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2568,6 +2568,15 @@ def test_parquet_reader_binary_decimal(datadir):
assert_eq(expect, got)


def test_parquet_reader_fixed_bin(datadir):
fname = datadir / "fixed_len_byte_array.parquet"

expect = pd.read_parquet(fname)
got = cudf.read_parquet(fname)

assert_eq(expect, got)


def test_parquet_reader_rle_boolean(datadir):
fname = datadir / "rle_boolean_encoding.parquet"

Expand Down

0 comments on commit 6095a92

Please sign in to comment.