diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index 44cea6169e4..f51fd28676e 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,10 +38,10 @@ uint32_t ProtobufReader::read_field_size(const uint8_t* end) void ProtobufReader::skip_struct_field(int t) { switch (t) { - case PB_TYPE_VARINT: get(); break; - case PB_TYPE_FIXED64: skip_bytes(8); break; - case PB_TYPE_FIXEDLEN: skip_bytes(get()); break; - case PB_TYPE_FIXED32: skip_bytes(4); break; + case ProtofType::VARINT: get(); break; + case ProtofType::FIXED64: skip_bytes(8); break; + case ProtofType::FIXEDLEN: skip_bytes(get()); break; + case ProtofType::FIXED32: skip_bytes(4); break; default: break; } } @@ -209,43 +209,54 @@ void ProtobufWriter::put_row_index_entry(int32_t present_blk, int32_t data_ofs, int32_t data2_blk, int32_t data2_ofs, - TypeKind kind) + TypeKind kind, + ColStatsBlob const* stats) { size_t sz = 0, lpos; - putb(1 * 8 + PB_TYPE_FIXEDLEN); // 1:RowIndex.entry + put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:RowIndex.entry lpos = m_buf->size(); - putb(0xcd); // sz+2 - putb(1 * 8 + PB_TYPE_FIXEDLEN); // 1:positions[packed=true] - putb(0xcd); // sz + put_byte(0xcd); // sz+2 + put_uint(encode_field_number(1, ProtofType::FIXEDLEN)); // 1:positions[packed=true] + put_byte(0xcd); // sz if (present_blk >= 0) sz += put_uint(present_blk); if (present_ofs >= 0) { - sz += put_uint(present_ofs) + 2; - putb(0); // run pos = 0 - putb(0); // bit pos = 0 + sz += put_uint(present_ofs); + sz += put_byte(0); // run pos = 0 + sz += put_byte(0); // bit pos = 0 } if (data_blk >= 0) { sz += put_uint(data_blk); } if (data_ofs >= 0) { sz += put_uint(data_ofs); if (kind != STRING && kind != FLOAT && kind != DOUBLE && kind != DECIMAL) { - putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) - sz++; + // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_byte(0); if (kind == BOOLEAN) { - putb(0); // bit position in byte, always zero - sz++; + // bit position in byte, always zero + sz += put_byte(0); } } } - if (kind != - INT) // INT kind can be passed in to bypass 2nd stream index (dictionary length streams) - { + // INT kind can be passed in to bypass 2nd stream index (dictionary length streams) + if (kind != INT) { if (data2_blk >= 0) { sz += put_uint(data2_blk); } if (data2_ofs >= 0) { - sz += put_uint(data2_ofs) + 1; - putb(0); // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_uint(data2_ofs); + // RLE run pos always zero (assumes RLE aligned with row index boundaries) + sz += put_byte(0); } } - m_buf->data()[lpos] = (uint8_t)(sz + 2); + // size of the field 1 m_buf->data()[lpos + 2] = (uint8_t)(sz); + + if (stats != nullptr) { + sz += put_uint(encode_field_number(2)); // 2: statistics + // Statistics field contains its length as varint and dtype specific data (encoded on the GPU) + sz += put_uint(stats->size()); + sz += put_bytes(*stats); + } + + // size of the whole row index entry + m_buf->data()[lpos] = (uint8_t)(sz + 2); } size_t ProtobufWriter::write(const PostScript& s) @@ -256,7 +267,7 @@ size_t ProtobufWriter::write(const PostScript& s) if (s.compression != NONE) { w.field_uint(3, s.compressionBlockSize); } w.field_packed_uint(4, s.version); w.field_uint(5, s.metadataLength); - w.field_string(8000, s.magic); + w.field_blob(8000, s.magic); return w.value(); } @@ -300,8 +311,8 @@ size_t ProtobufWriter::write(const SchemaType& s) size_t ProtobufWriter::write(const UserMetadataItem& s) { ProtobufFieldWriter w(this); - w.field_string(1, s.name); - w.field_string(2, s.value); + w.field_blob(1, s.name); + w.field_blob(2, s.value); return w.value(); } @@ -310,7 +321,7 @@ size_t ProtobufWriter::write(const StripeFooter& s) ProtobufFieldWriter w(this); w.field_repeated_struct(1, s.streams); w.field_repeated_struct(2, s.columns); - if (s.writerTimezone != "") { w.field_string(3, s.writerTimezone); } + if (s.writerTimezone != "") { w.field_blob(3, s.writerTimezone); } return w.value(); } diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 277c5d99f8f..4fa3480c90a 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -131,6 +131,67 @@ struct Metadata { std::vector stripeStats; }; +int inline constexpr encode_field_number(int field_number, ProtofType field_type) noexcept +{ + return (field_number * 8) + static_cast(field_type); +} + +namespace { +template ::value and + !std::is_enum::value>* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXEDLEN); +} + +template ::value or + std::is_enum::value>* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::VARINT); +} + +template >* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXED32); +} + +template >* = nullptr> +int static constexpr encode_field_number_base(int field_number) noexcept +{ + return encode_field_number(field_number, ProtofType::FIXED64); +} +}; // namespace + +template < + typename T, + typename std::enable_if_t::value or std::is_same_v>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + +// containters change the field number encoding +template < + typename T, + typename std::enable_if_t>::value>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + +// optional fields don't change the field number encoding +template >::value>* = nullptr> +int constexpr encode_field_number(int field_number) noexcept +{ + return encode_field_number_base(field_number); +} + /** * @brief Class for parsing Orc's Protocol Buffers encoded metadata */ @@ -181,60 +242,6 @@ class ProtobufReader { template void function_builder(T& s, size_t maxlen, std::tuple& op); - template ::value and - !std::is_enum::value>* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXEDLEN; - } - - template ::value or - std::is_enum::value>* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_VARINT; - } - - template >* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXED32; - } - - template >* = nullptr> - int static constexpr encode_field_number_base(int field_number) noexcept - { - return (field_number * 8) + PB_TYPE_FIXED64; - } - - template ::value or std::is_same_v>* = - nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - - // containters change the field number encoding - template >::value>* = nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - - // optional fields don't change the field number encoding - template >::value>* = nullptr> - int static constexpr encode_field_number(int field_number) noexcept - { - return encode_field_number_base(field_number); - } - uint32_t read_field_size(const uint8_t* end); template ::value>* = nullptr> @@ -470,16 +477,28 @@ class ProtobufWriter { public: ProtobufWriter() { m_buf = nullptr; } ProtobufWriter(std::vector* output) { m_buf = output; } - void putb(uint8_t v) { m_buf->push_back(v); } + uint32_t put_byte(uint8_t v) + { + m_buf->push_back(v); + return 1; + } + template + uint32_t put_bytes(host_span values) + { + static_assert(sizeof(T) == 1); + m_buf->reserve(m_buf->size() + values.size()); + m_buf->insert(m_buf->end(), values.begin(), values.end()); + return values.size(); + } uint32_t put_uint(uint64_t v) { int l = 1; while (v > 0x7f) { - putb(static_cast(v | 0x80)); + put_byte(static_cast(v | 0x80)); v >>= 7; l++; } - putb(static_cast(v)); + put_byte(static_cast(v)); return l; } uint32_t put_int(int64_t v) @@ -493,7 +512,8 @@ class ProtobufWriter { int32_t data_ofs, int32_t data2_blk, int32_t data2_ofs, - TypeKind kind); + TypeKind kind, + ColStatsBlob const* stats); public: size_t write(const PostScript&); diff --git a/cpp/src/io/orc/orc_common.h b/cpp/src/io/orc/orc_common.h index f88a84b0bfc..6bee5be81ed 100644 --- a/cpp/src/io/orc/orc_common.h +++ b/cpp/src/io/orc/orc_common.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,15 +76,15 @@ enum ColumnEncodingKind : int8_t { DICTIONARY_V2 = 3, // the encoding is dictionary-based using RLE v2 }; -enum : uint8_t { // Protobuf field types - PB_TYPE_VARINT = 0, - PB_TYPE_FIXED64 = 1, - PB_TYPE_FIXEDLEN = 2, - PB_TYPE_START_GROUP = 3, // deprecated - PB_TYPE_END_GROUP = 4, // deprecated - PB_TYPE_FIXED32 = 5, - PB_TYPE_INVALID_6 = 6, - PB_TYPE_INVALID_7 = 7, +enum ProtofType : uint8_t { + VARINT = 0, + FIXED64 = 1, + FIXEDLEN = 2, + START_GROUP = 3, // deprecated + END_GROUP = 4, // deprecated + FIXED32 = 5, + INVALID_6 = 6, + INVALID_7 = 7, }; } // namespace orc diff --git a/cpp/src/io/orc/orc_field_writer.hpp b/cpp/src/io/orc/orc_field_writer.hpp index afcd99a2cd6..9714277b54d 100644 --- a/cpp/src/io/orc/orc_field_writer.hpp +++ b/cpp/src/io/orc/orc_field_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -41,7 +41,7 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_uint(int field, const T& value) { - struct_size += p->put_uint(field * 8 + PB_TYPE_VARINT); + struct_size += p->put_uint(encode_field_number(field)); struct_size += p->put_uint(static_cast(value)); } @@ -52,9 +52,9 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_packed_uint(int field, const std::vector& value) { - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); + struct_size += p->put_uint(encode_field_number>(field)); auto lpos = p->m_buf->size(); - p->putb(0); + p->put_byte(0); auto sz = std::accumulate(value.begin(), value.end(), 0, [p = this->p](size_t sum, auto val) { return sum + p->put_uint(val); }); @@ -65,29 +65,15 @@ struct ProtobufWriter::ProtobufFieldWriter { (*(p->m_buf))[lpos] = static_cast(sz); } - /** - * @brief Function to write a string to the internal buffer - */ - void field_string(int field, const std::string& value) - { - size_t len = value.length(); - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); - struct_size += p->put_uint(len) + len; - for (size_t i = 0; i < len; i++) - p->putb(value[i]); - } - /** * @brief Function to write a blob to the internal buffer */ template - void field_blob(int field, const std::vector& value) + void field_blob(int field, T const& values) { - size_t len = value.size(); - struct_size += p->put_uint(field * 8 + PB_TYPE_FIXEDLEN); - struct_size += p->put_uint(len) + len; - for (size_t i = 0; i < len; i++) - p->putb(value[i]); + struct_size += p->put_uint(encode_field_number(field)); + struct_size += p->put_uint(values.size()); + struct_size += p->put_bytes(values); } /** @@ -96,9 +82,9 @@ struct ProtobufWriter::ProtobufFieldWriter { template void field_struct(int field, const T& value) { - struct_size += p->put_uint((field)*8 + PB_TYPE_FIXEDLEN); + struct_size += p->put_uint(encode_field_number(field, ProtofType::FIXEDLEN)); auto lpos = p->m_buf->size(); - p->putb(0); + p->put_byte(0); auto sz = p->write(value); struct_size += sz + 1; for (; sz > 0x7f; sz >>= 7, struct_size++) @@ -112,7 +98,7 @@ struct ProtobufWriter::ProtobufFieldWriter { void field_repeated_string(int field, const std::vector& value) { for (const auto& elem : value) - field_string(field, elem); + field_blob(field, elem); } /** diff --git a/cpp/src/io/orc/stats_enc.cu b/cpp/src/io/orc/stats_enc.cu index 7441819d7cd..b377a2e7076 100644 --- a/cpp/src/io/orc/stats_enc.cu +++ b/cpp/src/io/orc/stats_enc.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -150,7 +150,7 @@ __device__ inline uint8_t* pb_encode_uint(uint8_t* p, uint64_t v) // Protobuf field encoding for unsigned int __device__ inline uint8_t* pb_put_uint(uint8_t* p, uint32_t id, uint64_t v) { - p[0] = id * 8 + PB_TYPE_VARINT; // NOTE: Assumes id < 16 + p[0] = id * 8 + static_cast(ProtofType::VARINT); // NOTE: Assumes id < 16 return pb_encode_uint(p + 1, v); } @@ -165,7 +165,7 @@ __device__ inline uint8_t* pb_put_int(uint8_t* p, uint32_t id, int64_t v) __device__ inline uint8_t* pb_put_packed_uint(uint8_t* p, uint32_t id, uint64_t v) { uint8_t* p2 = pb_encode_uint(p + 2, v); - p[0] = id * 8 + PB_TYPE_FIXEDLEN; + p[0] = id * 8 + ProtofType::FIXEDLEN; p[1] = static_cast(p2 - (p + 2)); return p2; } @@ -173,7 +173,7 @@ __device__ inline uint8_t* pb_put_packed_uint(uint8_t* p, uint32_t id, uint64_t // Protobuf field encoding for binary/string __device__ inline uint8_t* pb_put_binary(uint8_t* p, uint32_t id, const void* bytes, uint32_t len) { - p[0] = id * 8 + PB_TYPE_FIXEDLEN; + p[0] = id * 8 + ProtofType::FIXEDLEN; p = pb_encode_uint(p + 1, len); memcpy(p, bytes, len); return p + len; @@ -182,7 +182,7 @@ __device__ inline uint8_t* pb_put_binary(uint8_t* p, uint32_t id, const void* by // Protobuf field encoding for 64-bit raw encoding (double) __device__ inline uint8_t* pb_put_fixed64(uint8_t* p, uint32_t id, const void* raw64) { - p[0] = id * 8 + PB_TYPE_FIXED64; + p[0] = id * 8 + ProtofType::FIXED64; memcpy(p + 1, raw64, 8); return p + 9; } @@ -248,7 +248,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint64 sum = 3; // } if (s->chunk.has_minmax || s->chunk.has_sum) { - *cur = 2 * 8 + PB_TYPE_FIXEDLEN; + *cur = 2 * 8 + ProtofType::FIXEDLEN; cur += 2; if (s->chunk.has_minmax) { cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); @@ -267,7 +267,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional double sum = 3; // } if (s->chunk.has_minmax) { - *cur = 3 * 8 + PB_TYPE_FIXEDLEN; + *cur = 3 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_fixed64(cur, 1, &s->chunk.min_value.fp_val); cur = pb_put_fixed64(cur, 2, &s->chunk.max_value.fp_val); @@ -286,7 +286,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) (pb_put_uint(cur, 1, s->chunk.min_value.str_val.length) - cur) + (pb_put_uint(cur, 2, s->chunk.max_value.str_val.length) - cur) + s->chunk.min_value.str_val.length + s->chunk.max_value.str_val.length; - cur[0] = 4 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 4 * 8 + ProtofType::FIXEDLEN; cur = pb_encode_uint(cur + 1, sz); cur = pb_put_binary( cur, 1, s->chunk.min_value.str_val.ptr, s->chunk.min_value.str_val.length); @@ -301,7 +301,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // repeated uint64 count = 1 [packed=true]; // } if (s->chunk.has_sum) { // Sum is equal to the number of 'true' values - cur[0] = 5 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 5 * 8 + ProtofType::FIXEDLEN; cur = pb_put_packed_uint(cur + 2, 1, s->chunk.sum.u_val); fld_start[1] = cur - (fld_start + 2); } @@ -325,7 +325,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint32 maximum = 2; // } if (s->chunk.has_minmax) { - cur[0] = 7 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 7 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_int(cur, 1, s->chunk.min_value.i_val); cur = pb_put_int(cur, 2, s->chunk.max_value.i_val); @@ -341,7 +341,7 @@ __global__ void __launch_bounds__(encode_threads_per_block) // optional sint64 maximumUtc = 4; // } if (s->chunk.has_minmax) { - cur[0] = 9 * 8 + PB_TYPE_FIXEDLEN; + cur[0] = 9 * 8 + ProtofType::FIXEDLEN; cur += 2; cur = pb_put_int(cur, 3, s->chunk.min_value.i_val); // minimumUtc cur = pb_put_int(cur, 4, s->chunk.max_value.i_val); // maximumUtc diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index be561530459..b197751d925 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -227,7 +227,7 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, const uint8_t* start, const uint8_t* end) { - constexpr uint32_t pb_rowindexentry_id = static_cast(PB_TYPE_FIXEDLEN) + 8; + constexpr uint32_t pb_rowindexentry_id = ProtofType::FIXEDLEN + 8; const uint8_t* cur = start; row_entry_state_e state = NOT_FOUND; @@ -246,13 +246,13 @@ static uint32_t __device__ ProtobufParseRowIndexEntry(rowindex_state_s* s, state = GET_LENGTH; } else { v &= 7; - if (v == PB_TYPE_FIXED64) + if (v == ProtofType::FIXED64) cur += 8; - else if (v == PB_TYPE_FIXED32) + else if (v == ProtofType::FIXED32) cur += 4; - else if (v == PB_TYPE_VARINT) + else if (v == ProtofType::VARINT) state = SKIP_VARINT; - else if (v == PB_TYPE_FIXEDLEN) + else if (v == ProtofType::FIXEDLEN) state = SKIP_FIXEDLEN; } break; diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index b0e674c206f..b7264cb81ac 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2021, NVIDIA CORPORATION. + * Copyright (c) 2019-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -1062,13 +1062,23 @@ void set_stat_desc_leaf_cols(device_span columns, [=] __device__(auto idx) { stat_desc[idx].leaf_column = &columns[idx]; }); } -std::vector> writer::impl::gather_statistic_blobs( - orc_table_view const& orc_table, file_segmentation const& segmentation) +writer::impl::encoded_statistics writer::impl::gather_statistic_blobs( + bool are_statistics_enabled, + orc_table_view const& orc_table, + file_segmentation const& segmentation) { - auto const num_stat_blobs = (1 + segmentation.num_stripes()) * orc_table.num_columns(); + auto const num_rowgroup_blobs = segmentation.rowgroups.count(); + auto const num_stripe_blobs = segmentation.num_stripes() * orc_table.num_columns(); + auto const num_file_blobs = orc_table.num_columns(); + auto const num_stat_blobs = num_rowgroup_blobs + num_stripe_blobs + num_file_blobs; + + if (not are_statistics_enabled or num_stat_blobs == 0) { return {}; } hostdevice_vector stat_desc(orc_table.num_columns(), stream); hostdevice_vector stat_merge(num_stat_blobs, stream); + auto rowgroup_stat_merge = stat_merge.host_ptr(); + auto stripe_stat_merge = rowgroup_stat_merge + num_rowgroup_blobs; + auto file_stat_merge = stripe_stat_merge + num_stripe_blobs; for (auto const& column : orc_table.columns) { stats_column_desc* desc = &stat_desc[column.index()]; @@ -1101,14 +1111,20 @@ std::vector> writer::impl::gather_statistic_blobs( desc->ts_scale = 0; } for (auto const& stripe : segmentation.stripes) { - auto grp = &stat_merge[column.index() * segmentation.num_stripes() + stripe.id]; - grp->col = stat_desc.device_ptr(column.index()); - grp->start_chunk = + auto& grp = stripe_stat_merge[column.index() * segmentation.num_stripes() + stripe.id]; + grp.col = stat_desc.device_ptr(column.index()); + grp.start_chunk = static_cast(column.index() * segmentation.num_rowgroups() + stripe.first); - grp->num_chunks = stripe.size; + grp.num_chunks = stripe.size; + for (auto rg_idx_it = stripe.cbegin(); rg_idx_it < stripe.cend(); ++rg_idx_it) { + auto& rg_grp = + rowgroup_stat_merge[column.index() * segmentation.num_rowgroups() + *rg_idx_it]; + rg_grp.col = stat_desc.device_ptr(column.index()); + rg_grp.start_chunk = *rg_idx_it; + rg_grp.num_chunks = 1; + } } - statistics_merge_group* col_stats = - &stat_merge[segmentation.num_stripes() * orc_table.num_columns() + column.index()]; + auto col_stats = &file_stat_merge[column.index()]; col_stats->col = stat_desc.device_ptr(column.index()); col_stats->start_chunk = static_cast(column.index() * segmentation.num_stripes()); col_stats->num_chunks = static_cast(segmentation.num_stripes()); @@ -1117,58 +1133,73 @@ std::vector> writer::impl::gather_statistic_blobs( stat_merge.host_to_device(stream); set_stat_desc_leaf_cols(orc_table.d_columns, stat_desc, stream); - auto const num_chunks = segmentation.rowgroups.count(); - rmm::device_uvector stat_chunks(num_chunks + num_stat_blobs, stream); - rmm::device_uvector stat_groups(num_chunks, stream); + rmm::device_uvector stat_chunks(num_stat_blobs, stream); + auto rowgroup_stat_chunks = stat_chunks.data(); + auto stripe_stat_chunks = rowgroup_stat_chunks + num_rowgroup_blobs; + auto file_stat_chunks = stripe_stat_chunks + num_stripe_blobs; + + rmm::device_uvector stat_groups(num_rowgroup_blobs, stream); gpu::orc_init_statistics_groups( stat_groups.data(), stat_desc.device_ptr(), segmentation.rowgroups, stream); detail::calculate_group_statistics( - stat_chunks.data(), stat_groups.data(), num_chunks, stream); + stat_chunks.data(), stat_groups.data(), num_rowgroup_blobs, stream); + detail::merge_group_statistics( - stat_chunks.data() + num_chunks, - stat_chunks.data(), - stat_merge.device_ptr(), - segmentation.num_stripes() * orc_table.num_columns(), + stripe_stat_chunks, + rowgroup_stat_chunks, + stat_merge.device_ptr(num_rowgroup_blobs), + num_stripe_blobs, stream); detail::merge_group_statistics( - stat_chunks.data() + num_chunks + segmentation.num_stripes() * orc_table.num_columns(), - stat_chunks.data() + num_chunks, - stat_merge.device_ptr(segmentation.num_stripes() * orc_table.num_columns()), - orc_table.num_columns(), + file_stat_chunks, + stripe_stat_chunks, + stat_merge.device_ptr(num_rowgroup_blobs + num_stripe_blobs), + num_file_blobs, stream); gpu::orc_init_statistics_buffersize( - stat_merge.device_ptr(), stat_chunks.data() + num_chunks, num_stat_blobs, stream); + stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); stat_merge.device_to_host(stream, true); hostdevice_vector blobs( stat_merge[num_stat_blobs - 1].start_chunk + stat_merge[num_stat_blobs - 1].num_chunks, stream); - gpu::orc_encode_statistics(blobs.device_ptr(), - stat_merge.device_ptr(), - stat_chunks.data() + num_chunks, - num_stat_blobs, - stream); + gpu::orc_encode_statistics( + blobs.device_ptr(), stat_merge.device_ptr(), stat_chunks.data(), num_stat_blobs, stream); stat_merge.device_to_host(stream); blobs.device_to_host(stream, true); - std::vector> stat_blobs(num_stat_blobs); - for (size_t i = 0; i < num_stat_blobs; i++) { - const uint8_t* stat_begin = blobs.host_ptr(stat_merge[i].start_chunk); - const uint8_t* stat_end = stat_begin + stat_merge[i].num_chunks; - stat_blobs[i].assign(stat_begin, stat_end); + std::vector rowgroup_blobs(num_rowgroup_blobs); + for (size_t i = 0; i < num_rowgroup_blobs; i++) { + auto const stat_begin = blobs.host_ptr(rowgroup_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + rowgroup_stat_merge[i].num_chunks; + rowgroup_blobs[i].assign(stat_begin, stat_end); + } + + std::vector stripe_blobs(num_stripe_blobs); + for (size_t i = 0; i < num_stripe_blobs; i++) { + auto const stat_begin = blobs.host_ptr(stripe_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + stripe_stat_merge[i].num_chunks; + stripe_blobs[i].assign(stat_begin, stat_end); } - return stat_blobs; + std::vector file_blobs(num_file_blobs); + for (size_t i = 0; i < num_file_blobs; i++) { + auto const stat_begin = blobs.host_ptr(file_stat_merge[i].start_chunk); + auto const stat_end = stat_begin + file_stat_merge[i].num_chunks; + file_blobs[i].assign(stat_begin, stat_end); + } + return {std::move(rowgroup_blobs), std::move(stripe_blobs), std::move(file_blobs)}; } void writer::impl::write_index_stream(int32_t stripe_id, int32_t stream_id, host_span columns, - stripe_rowgroups const& rowgroups_range, + file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, host_span comp_out, + std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, ProtobufWriter* pbw) @@ -1226,9 +1257,18 @@ void writer::impl::write_index_stream(int32_t stripe_id, buffer_.resize((compression_kind_ != NONE) ? 3 : 0); // Add row index entries + auto const& rowgroups_range = segmentation.stripes[stripe_id]; std::for_each(rowgroups_range.cbegin(), rowgroups_range.cend(), [&](auto rowgroup) { - pbw->put_row_index_entry( - present.comp_pos, present.pos, data.comp_pos, data.pos, data2.comp_pos, data2.pos, kind); + pbw->put_row_index_entry(present.comp_pos, + present.pos, + data.comp_pos, + data.pos, + data2.comp_pos, + data2.pos, + kind, + (rg_stats.empty() or stream_id == 0) + ? nullptr + : (&rg_stats[column_id * segmentation.num_rowgroups() + rowgroup])); if (stream_id != 0) { const auto& strm = enc_streams[column_id][rowgroup]; @@ -1852,11 +1892,6 @@ void writer::impl::write(table_view const& table) auto stripes = gather_stripes(num_index_streams, segmentation, &enc_data.streams, &strm_descs); if (num_rows > 0) { - // Gather column statistics - auto const column_stats = enable_statistics_ && table.num_columns() > 0 - ? gather_statistic_blobs(orc_table, segmentation) - : std::vector{}; - // Allocate intermediate output stream buffer size_t compressed_bfr_size = 0; size_t num_compressed_blocks = 0; @@ -1919,11 +1954,12 @@ void writer::impl::write(table_view const& table) ProtobufWriter pbw_(&buffer_); + auto const statistics = gather_statistic_blobs(enable_statistics_, orc_table, segmentation); + // Write stripes std::vector> write_tasks; for (size_t stripe_id = 0; stripe_id < stripes.size(); ++stripe_id) { - auto const& rowgroups_range = segmentation.stripes[stripe_id]; - auto& stripe = stripes[stripe_id]; + auto& stripe = stripes[stripe_id]; stripe.offset = out_sink_->bytes_written(); @@ -1932,10 +1968,11 @@ void writer::impl::write(table_view const& table) write_index_stream(stripe_id, stream_id, orc_table.columns, - rowgroups_range, + segmentation, enc_data.streams, strm_descs, comp_out, + statistics.rowgroup_level, &stripe, &streams, &pbw_); @@ -1943,13 +1980,13 @@ void writer::impl::write(table_view const& table) // Column data consisting one or more separate streams for (auto const& strm_desc : strm_descs[stripe_id]) { - write_tasks.push_back( - write_data_stream(strm_desc, - enc_data.streams[strm_desc.column_id][rowgroups_range.first], - static_cast(compressed_data.data()), - stream_output.get(), - &stripe, - &streams)); + write_tasks.push_back(write_data_stream( + strm_desc, + enc_data.streams[strm_desc.column_id][segmentation.stripes[stripe_id].first], + static_cast(compressed_data.data()), + stream_output.get(), + &stripe, + &streams)); } // Write stripefooter consisting of stream information @@ -1980,37 +2017,34 @@ void writer::impl::write(table_view const& table) task.wait(); } - if (not column_stats.empty()) { - // File-level statistics - // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls - if (single_write_mode) { - // First entry contains total number of rows - buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); - pbw_.put_uint(num_rows); - ff.statistics.reserve(1 + orc_table.num_columns()); - ff.statistics.emplace_back(std::move(buffer_)); - // Add file stats, stored after stripe stats in `column_stats` - ff.statistics.insert( - ff.statistics.end(), - std::make_move_iterator(column_stats.begin()) + stripes.size() * orc_table.num_columns(), - std::make_move_iterator(column_stats.end())); - } - // Stripe-level statistics + // File-level statistics + // NOTE: Excluded from chunked write mode to avoid the need for merging stats across calls + if (single_write_mode and not statistics.file_level.empty()) { + // First entry contains total number of rows + buffer_.resize(0); + pbw_.put_uint(encode_field_number(1)); + pbw_.put_uint(num_rows); + ff.statistics.reserve(1 + orc_table.num_columns()); + ff.statistics.emplace_back(std::move(buffer_)); + // Add file stats, stored after stripe stats in `column_stats` + ff.statistics.insert(ff.statistics.end(), + std::make_move_iterator(statistics.file_level.begin()), + std::make_move_iterator(statistics.file_level.end())); + } + // Stripe-level statistics + if (not statistics.stripe_level.empty()) { size_t first_stripe = md.stripeStats.size(); md.stripeStats.resize(first_stripe + stripes.size()); for (size_t stripe_id = 0; stripe_id < stripes.size(); stripe_id++) { md.stripeStats[first_stripe + stripe_id].colStats.resize(1 + orc_table.num_columns()); buffer_.resize(0); - pbw_.putb(1 * 8 + PB_TYPE_VARINT); + pbw_.put_uint(encode_field_number(1)); pbw_.put_uint(stripes[stripe_id].numberOfRows); md.stripeStats[first_stripe + stripe_id].colStats[0] = std::move(buffer_); for (size_t col_idx = 0; col_idx < orc_table.num_columns(); col_idx++) { size_t idx = stripes.size() * col_idx + stripe_id; - if (idx < column_stats.size()) { - md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = - std::move(column_stats[idx]); - } + md.stripeStats[first_stripe + stripe_id].colStats[1 + col_idx] = + std::move(statistics.stripe_level[idx]); } } } diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index d989721334e..2738a77e50a 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -284,17 +284,24 @@ class writer::impl { hostdevice_2dvector* enc_streams, hostdevice_2dvector* strm_desc); + struct encoded_statistics { + std::vector rowgroup_level; + std::vector stripe_level; + std::vector file_level; + }; + /** - * @brief Returns per-stripe and per-file column statistics encoded - * in ORC protobuf format. + * @brief Returns column statistics encoded in ORC protobuf format. * + * @param are_statistics_enabled True if statistics are to be included in the output file * @param orc_table Table information to be written * @param columns List of columns * @param segmentation stripe and rowgroup ranges * @return The statistic blobs */ - std::vector> gather_statistic_blobs(orc_table_view const& orc_table, - file_segmentation const& segmentation); + encoded_statistics gather_statistic_blobs(bool are_statistics_enabled, + orc_table_view const& orc_table, + file_segmentation const& segmentation); /** * @brief Writes the specified column's row index stream. @@ -302,10 +309,11 @@ class writer::impl { * @param[in] stripe_id Stripe's identifier * @param[in] stream_id Stream identifier (column id + 1) * @param[in] columns List of columns - * @param[in] rowgroups_range Indexes of rowgroups in the stripe + * @param[in] segmentation stripe and rowgroup ranges * @param[in] enc_streams List of encoder chunk streams [column][rowgroup] * @param[in] strm_desc List of stream descriptors * @param[in] comp_out Output status for compressed streams + * @param[in] rg_stats row group level statistics * @param[in,out] stripe Stream's parent stripe * @param[in,out] streams List of all streams * @param[in,out] pbw Protobuf writer @@ -313,10 +321,11 @@ class writer::impl { void write_index_stream(int32_t stripe_id, int32_t stream_id, host_span columns, - stripe_rowgroups const& rowgroups_range, + file_segmentation const& segmentation, host_2dspan enc_streams, host_2dspan strm_desc, host_span comp_out, + std::vector const& rg_stats, StripeInformation* stripe, orc_streams* streams, ProtobufWriter* pbw);