diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 7f9ad1aa73d..3f63c8240ae 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -212,51 +212,59 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, TypeKind kind, ColStatsBlob const* stats) { - size_t sz = 0, lpos; - put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:RowIndex.entry - lpos = m_buf->size(); - put_byte(0xcd); // sz+2 - put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] - put_byte(0xcd); // sz - if (present_blk >= 0) sz += put_uint(present_blk); + std::vector positions_data; + ProtobufWriter position_writer(&positions_data); + auto const positions_size_offset = position_writer.put_uint( + encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] + position_writer.put_byte(0xcd); // positions size placeholder + uint32_t positions_size = 0; + if (present_blk >= 0) positions_size += position_writer.put_uint(present_blk); if (present_ofs >= 0) { - sz += put_uint(present_ofs); - sz += put_byte(0); // run pos = 0 - sz += put_byte(0); // bit pos = 0 + positions_size += position_writer.put_uint(present_ofs); + positions_size += position_writer.put_byte(0); // run pos = 0 + positions_size += position_writer.put_byte(0); // bit pos = 0 } - if (data_blk >= 0) { sz += put_uint(data_blk); } + if (data_blk >= 0) { positions_size += position_writer.put_uint(data_blk); } if (data_ofs >= 0) { - sz += put_uint(data_ofs); + positions_size += position_writer.put_uint(data_ofs); if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) { // RLE run pos always zero (assumes RLE aligned with row index boundaries) - sz += put_byte(0); + positions_size += position_writer.put_byte(0); if (kind == BOOLEAN) { // bit position in byte, always zero - sz += put_byte(0); + positions_size += position_writer.put_byte(0); } } } // INT kind can be passed in to bypass 2nd stream index (dictionary length streams) if (kind != INT) { - if (data2_blk >= 0) { sz += put_uint(data2_blk); } + if (data2_blk >= 0) { positions_size += position_writer.put_uint(data2_blk); } if (data2_ofs >= 0) { - sz += put_uint(data2_ofs); + positions_size += position_writer.put_uint(data2_ofs); // RLE run pos always zero (assumes RLE aligned with row index boundaries) - sz += put_byte(0); + positions_size += position_writer.put_byte(0); } } // size of the field 1 - m_buf->data()[lpos + 2] = (uint8_t)(sz); + positions_data[positions_size_offset] = static_cast(positions_size); + + auto const stats_size = (stats == nullptr) + ? 0 + : varint_size(encode_field_number(2)) + + varint_size(stats->size()) + stats->size(); + auto const entry_size = positions_data.size() + stats_size; + + // 1:RowIndex.entry + put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); + put_uint(entry_size); + put_bytes(positions_data); if (stats != nullptr) { - sz += put_uint(encode_field_number(2)); // 2: statistics + put_uint(encode_field_number(2)); // 2: statistics // Statistics field contains its length as varint and dtype specific data (encoded on the GPU) - sz += put_uint(stats->size()); - sz += put_bytes(*stats); + put_uint(stats->size()); + put_bytes(*stats); } - - // size of the whole row index entry - m_buf->data()[lpos] = (uint8_t)(sz + 2); } size_t ProtobufWriter::write(const PostScript& s) diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index b3f6a1647d7..858f7682b11 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -495,6 +495,17 @@ class ProtobufWriter { put_byte(static_cast(v)); return l; } + + uint32_t varint_size(uint64_t val) + { + auto len = 1u; + while (val > 0x7f) { + val >>= 7; + ++len; + } + return len; + } + uint32_t put_int(int64_t v) { int64_t s = (v < 0); diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 71c16566e53..080363fb3dd 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -239,8 +239,8 @@ enum row_entry_state_e { * @return bytes consumed */ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, - const uint8_t* start, - const uint8_t* end) + uint8_t const* const start, + uint8_t const* const end) { constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8; @@ -471,7 +471,7 @@ __global__ void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_gr : row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].num_rows; auto const start_row = (use_base_stride) - ? rowidx_stride + ? i * rowidx_stride : row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x].start_row; for (int j = t4; j < rowgroup_size4; j += 4) { ((uint32_t*)&row_groups[(s->rowgroup_start + i) * num_columns + blockIdx.x])[j] = diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index c5b6395394b..b19a1c4dc05 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1729,3 +1729,17 @@ def test_orc_reader_zstd_compression(list_struct_buff): assert_eq(expected, got) except RuntimeError: pytest.mark.xfail(reason="zstd support is not enabled") + + +def test_writer_protobuf_large_rowindexentry(): + s = [ + "Length of the two strings needs to add up to at least ~120", + "So that the encoded statistics are larger than 128 bytes", + ] * 5001 # generate more than 10K rows to have two row groups + df = cudf.DataFrame({"s1": s}) + + buff = BytesIO() + df.to_orc(buff) + + got = cudf.read_orc(buff) + assert_frame_equal(df, got)