Skip to content

Commit

Permalink
PARQUET-2261 Size Statistics (rapidsai#14000)
Browse files Browse the repository at this point in the history
Adds Parquet size statistics introduced in apache/parquet-format#197.

Authors:
  - Ed Seidl (https://github.com/etseidl)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)
  - Yunsong Wang (https://github.com/PointKernel)

URL: rapidsai#14000
  • Loading branch information
etseidl authored and karthikeyann committed Dec 12, 2023
1 parent 1ea75c0 commit 302861d
Show file tree
Hide file tree
Showing 10 changed files with 717 additions and 150 deletions.
31 changes: 26 additions & 5 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class parquet_field_union_struct : public parquet_field {
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
T v;
bool const res = parquet_field_struct<T>(field(), v).operator()(cpr, field_type);
bool const res = parquet_field_struct<T>{field(), v}(cpr, field_type);
if (!res) {
val = v;
enum_val = static_cast<E>(field());
Expand Down Expand Up @@ -424,7 +424,7 @@ class parquet_field_optional : public parquet_field {
inline bool operator()(CompactProtocolReader* cpr, int field_type)
{
T v;
bool const res = FieldFunctor(field(), v).operator()(cpr, field_type);
bool const res = FieldFunctor{field(), v}(cpr, field_type);
if (!res) { val = v; }
return res;
}
Expand Down Expand Up @@ -631,6 +631,8 @@ bool CompactProtocolReader::read(ColumnChunk* c)

bool CompactProtocolReader::read(ColumnChunkMetaData* c)
{
using optional_size_statistics =
parquet_field_optional<SizeStatistics, parquet_field_struct<SizeStatistics>>;
auto op = std::make_tuple(parquet_field_enum<Type>(1, c->type),
parquet_field_enum_list(2, c->encodings),
parquet_field_string_list(3, c->path_in_schema),
Expand All @@ -641,7 +643,8 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c)
parquet_field_int64(9, c->data_page_offset),
parquet_field_int64(10, c->index_page_offset),
parquet_field_int64(11, c->dictionary_page_offset),
parquet_field_struct(12, c->statistics));
parquet_field_struct(12, c->statistics),
optional_size_statistics(16, c->size_statistics));
return function_builder(this, op);
}

Expand Down Expand Up @@ -700,17 +703,35 @@ bool CompactProtocolReader::read(PageLocation* p)

bool CompactProtocolReader::read(OffsetIndex* o)
{
auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations));
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(parquet_field_struct_list(1, o->page_locations),
optional_list_i64(2, o->unencoded_byte_array_data_bytes));
return function_builder(this, op);
}

bool CompactProtocolReader::read(SizeStatistics* s)
{
using optional_i64 = parquet_field_optional<int64_t, parquet_field_int64>;
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(optional_i64(1, s->unencoded_byte_array_data_bytes),
optional_list_i64(2, s->repetition_level_histogram),
optional_list_i64(3, s->definition_level_histogram));
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnIndex* c)
{
using optional_list_i64 = parquet_field_optional<std::vector<int64_t>, parquet_field_int64_list>;

auto op = std::make_tuple(parquet_field_bool_list(1, c->null_pages),
parquet_field_binary_list(2, c->min_values),
parquet_field_binary_list(3, c->max_values),
parquet_field_enum<BoundaryOrder>(4, c->boundary_order),
parquet_field_int64_list(5, c->null_counts));
parquet_field_int64_list(5, c->null_counts),
optional_list_i64(6, c->repetition_level_histogram),
optional_list_i64(7, c->definition_level_histogram));
return function_builder(this, op);
}

Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ class CompactProtocolReader {
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
bool read(SizeStatistics* s);
bool read(ColumnIndex* c);
bool read(Statistics* s);
bool read(ColumnOrder* c);
Expand Down
38 changes: 35 additions & 3 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
c.field_struct(12, s.statistics);
if (s.size_statistics.has_value()) { c.field_struct(16, s.size_statistics.value()); }
return c.value();
}

Expand Down Expand Up @@ -210,6 +211,24 @@ size_t CompactProtocolWriter::write(OffsetIndex const& s)
{
CompactProtocolFieldWriter c(*this);
c.field_struct_list(1, s.page_locations);
if (s.unencoded_byte_array_data_bytes.has_value()) {
c.field_int_list(2, s.unencoded_byte_array_data_bytes.value());
}
return c.value();
}

size_t CompactProtocolWriter::write(SizeStatistics const& s)
{
CompactProtocolFieldWriter c(*this);
if (s.unencoded_byte_array_data_bytes.has_value()) {
c.field_int(1, s.unencoded_byte_array_data_bytes.value());
}
if (s.repetition_level_histogram.has_value()) {
c.field_int_list(2, s.repetition_level_histogram.value());
}
if (s.definition_level_histogram.has_value()) {
c.field_int_list(3, s.definition_level_histogram.value());
}
return c.value();
}

Expand Down Expand Up @@ -286,13 +305,26 @@ inline void CompactProtocolFieldWriter::field_int(int field, int64_t val)
current_field_value = field;
}

template <>
inline void CompactProtocolFieldWriter::field_int_list<int64_t>(int field,
std::vector<int64_t> const& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte(static_cast<uint8_t>((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I64));
if (val.size() >= 0xfUL) { put_uint(val.size()); }
for (auto const v : val) {
put_int(v);
}
current_field_value = field;
}

template <typename Enum>
inline void CompactProtocolFieldWriter::field_int_list(int field, std::vector<Enum> const& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I32));
if (val.size() >= 0xf) put_uint(val.size());
for (auto& v : val) {
put_byte(static_cast<uint8_t>((std::min(val.size(), 0xfUL) << 4) | ST_FLD_I32));
if (val.size() >= 0xfUL) { put_uint(val.size()); }
for (auto const& v : val) {
put_int(static_cast<int32_t>(v));
}
current_field_value = field;
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class CompactProtocolWriter {
size_t write(Statistics const&);
size_t write(PageLocation const&);
size_t write(OffsetIndex const&);
size_t write(SizeStatistics const&);
size_t write(ColumnOrder const&);

protected:
Expand Down Expand Up @@ -113,4 +114,8 @@ class CompactProtocolFieldWriter {
inline void set_current_field(int const& field);
};

template <>
inline void CompactProtocolFieldWriter::field_int_list<int64_t>(int field,
std::vector<int64_t> const& val);

} // namespace cudf::io::parquet::detail
Loading

0 comments on commit 302861d

Please sign in to comment.