Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] POC add data total size information to Parquet file metadata #12974

Closed
wants to merge 58 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ffcf963
add raw data size info to column chunk metadata
etseidl Mar 10, 2023
54249e5
remove prints, calculate page sizes too
etseidl Mar 10, 2023
3212720
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 10, 2023
66e420e
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 12, 2023
066a0da
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 14, 2023
ccee4f6
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 15, 2023
7024534
add serialized structure to footer
etseidl Mar 15, 2023
d75d53c
read back page indexes and size info and save in metadata
etseidl Mar 16, 2023
a424425
don't read indexes if not doing chunking...update copyright
etseidl Mar 16, 2023
372529b
calculate splits with metadata
etseidl Mar 16, 2023
c59013b
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 16, 2023
be492fa
check for empty offset index
etseidl Mar 16, 2023
ab61586
do not clear chunk_read_limit if size stats are not present
etseidl Mar 16, 2023
501a209
fudge sizes for now
etseidl Mar 16, 2023
b3741c5
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 17, 2023
5e3608c
get rid of sizes with overhead
etseidl Mar 17, 2023
2b77f22
add writer for vector<int64_t>
etseidl Mar 17, 2023
5c7480e
only do size metadata if also doing column indexes
etseidl Mar 17, 2023
cdf8abe
get rid of host_pages vector, add some TODOs
etseidl Mar 17, 2023
3ea748b
use constexpr for KV keys
etseidl Mar 17, 2023
9f0e9ec
checkpoint before refactor
etseidl Mar 17, 2023
5fc7ace
refactor to reuse compute_splits
etseidl Mar 17, 2023
0096169
cleanup
etseidl Mar 17, 2023
81981ca
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 17, 2023
a9bf41b
post refactor cleanup
etseidl Mar 17, 2023
8eca6f4
fix writer::impl::close
etseidl Mar 17, 2023
6de2552
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 20, 2023
79b4319
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 21, 2023
3d47934
clean up copy paste cruft
etseidl Mar 21, 2023
e9a4312
need to pass memory resource to compute_splits
etseidl Mar 21, 2023
3e17232
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 23, 2023
2a034e7
Merge branch 'rapidsai:branch-23.04' into feature/chunk_stats
etseidl Mar 24, 2023
2e2aa07
Merge remote-tracking branch 'origin/branch-23.06' into feature/chunk…
etseidl Apr 14, 2023
7ff4033
post merge cleanup
etseidl Apr 14, 2023
39f5e8c
Merge remote-tracking branch 'origin/branch-23.08' into feature/chunk…
etseidl Jun 12, 2023
f3e96f2
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 16, 2023
5f41d24
restore more stuff lost in merge
etseidl Jun 16, 2023
4391c93
Merge branch 'feature/chunk_stats' of github.com:etseidl/cudf into fe…
etseidl Jun 16, 2023
5d8b54e
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 20, 2023
a057a0b
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 23, 2023
21c7414
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 23, 2023
307e8a6
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 23, 2023
62f55af
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 26, 2023
a87432b
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 30, 2023
e6432c5
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jun 30, 2023
e35c002
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jul 7, 2023
54e7a08
Merge branch 'rapidsai:branch-23.08' into feature/chunk_stats
etseidl Jul 14, 2023
bc1d084
Merge branch 'branch-23.10' into feature/chunk_stats
etseidl Jul 26, 2023
8d4afec
Merge remote-tracking branch 'origin/branch-23.10' into feature/chunk…
etseidl Jul 28, 2023
0fa655b
finish merge
etseidl Jul 28, 2023
88035b8
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Jul 31, 2023
5389423
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 1, 2023
13e5b08
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 2, 2023
62ebeb6
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 3, 2023
d0c3d94
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 4, 2023
41800e4
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 9, 2023
e2bdd57
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 14, 2023
c22ce08
Merge branch 'rapidsai:branch-23.10' into feature/chunk_stats
etseidl Aug 17, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cpp/src/io/parquet/compact_protocol_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c)
ParquetFieldInt64(5, c->num_values),
ParquetFieldInt64(6, c->total_uncompressed_size),
ParquetFieldInt64(7, c->total_compressed_size),
ParquetFieldStructList(8, c->key_value_metadata),
ParquetFieldInt64(9, c->data_page_offset),
ParquetFieldInt64(10, c->index_page_offset),
ParquetFieldInt64(11, c->dictionary_page_offset),
Expand Down Expand Up @@ -308,6 +309,13 @@ bool CompactProtocolReader::read(OffsetIndex* o)
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnChunkSize* c)
{
auto op =
std::make_tuple(ParquetFieldInt64(1, c->chunk_size), ParquetFieldInt64List(2, c->page_sizes));
return function_builder(this, op);
}

bool CompactProtocolReader::read(ColumnIndex* c)
{
auto op = std::make_tuple(ParquetFieldBoolList(1, c->null_pages),
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/compact_protocol_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ class CompactProtocolReader {
bool read(KeyValue* k);
bool read(PageLocation* p);
bool read(OffsetIndex* o);
bool read(ColumnChunkSize* o);
bool read(ColumnIndex* c);
bool read(Statistics* s);

Expand Down
20 changes: 20 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ size_t CompactProtocolWriter::write(ColumnChunkMetaData const& s)
c.field_int(5, s.num_values);
c.field_int(6, s.total_uncompressed_size);
c.field_int(7, s.total_compressed_size);
if (s.key_value_metadata.size() != 0) { c.field_struct_list(8, s.key_value_metadata); }
c.field_int(9, s.data_page_offset);
if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); }
if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); }
Expand Down Expand Up @@ -233,6 +234,14 @@ size_t CompactProtocolWriter::write(OffsetIndex const& s)
return c.value();
}

size_t CompactProtocolWriter::write(const ColumnChunkSize& s)
{
CompactProtocolFieldWriter c(*this);
c.field_int(1, s.chunk_size);
c.field_int_list(2, s.page_sizes);
return c.value();
}

void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); }

void CompactProtocolFieldWriter::put_byte(uint8_t const* raw, uint32_t len)
Expand Down Expand Up @@ -308,6 +317,17 @@ inline void CompactProtocolFieldWriter::field_int_list(int field, std::vector<En
current_field_value = field;
}

inline void CompactProtocolFieldWriter::field_int_list(int field, const std::vector<int64_t>& val)
{
put_field_header(field, current_field_value, ST_FLD_LIST);
put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I64));
if (val.size() >= 0xf) put_uint(val.size());
for (auto& v : val) {
put_int(static_cast<int32_t>(v));
}
current_field_value = field;
}

template <typename T>
inline void CompactProtocolFieldWriter::field_struct(int field, T const& val)
{
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/io/parquet/compact_protocol_writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class CompactProtocolWriter {
size_t write(Statistics const&);
size_t write(PageLocation const&);
size_t write(OffsetIndex const&);
size_t write(ColumnChunkSize const&);

protected:
std::vector<uint8_t>& m_buf;
Expand Down Expand Up @@ -91,6 +92,8 @@ class CompactProtocolFieldWriter {
template <typename Enum>
inline void field_int_list(int field, std::vector<Enum> const& val);

inline void field_int_list(int field, const std::vector<int64_t>& val);

template <typename T>
inline void field_struct(int field, T const& val);

Expand Down
30 changes: 22 additions & 8 deletions cpp/src/io/parquet/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ namespace io {
namespace parquet {
constexpr uint32_t parquet_magic = (('P' << 0) | ('A' << 8) | ('R' << 16) | ('1' << 24));

constexpr std::string_view COL_META_SIZES_OFFSET = "sizes_offset";
constexpr std::string_view COL_META_SIZES_SIZE = "sizes_size";

/**
* @brief Struct that describes the Parquet file data header
*/
Expand Down Expand Up @@ -210,6 +213,14 @@ struct Statistics {
std::vector<uint8_t> min_value; // min value for column determined by ColumnOrder
};

/**
* @brief Thrift-derived struct describing a key-value pair, for user metadata
*/
struct KeyValue {
std::string key;
std::string value;
};

/**
* @brief Thrift-derived struct describing a column chunk
*/
Expand All @@ -223,6 +234,7 @@ struct ColumnChunkMetaData {
0; // total byte size of all uncompressed pages in this column chunk (including the headers)
int64_t total_compressed_size =
0; // total byte size of all compressed pages in this column chunk (including the headers)
std::vector<KeyValue> key_value_metadata; // per chunk metadata
int64_t data_page_offset = 0; // Byte offset from beginning of file to first data page
int64_t index_page_offset = 0; // Byte offset from beginning of file to root index page
int64_t dictionary_page_offset =
Expand Down Expand Up @@ -263,14 +275,6 @@ struct RowGroup {
int64_t num_rows = 0;
};

/**
* @brief Thrift-derived struct describing a key-value pair, for user metadata
*/
struct KeyValue {
std::string key;
std::string value;
};

/**
* @brief Thrift-derived struct describing file-level metadata
*
Expand Down Expand Up @@ -357,6 +361,16 @@ struct OffsetIndex {
std::vector<PageLocation> page_locations;
};

/**
* @brief Thrift struct for column chunk size info.
*
* Want to add this to OffsetIndex in the future.
*/
struct ColumnChunkSize {
int64_t chunk_size; // sum of page_sizes...no overhead
std::vector<int64_t> page_sizes; // size of page data in bytes not accounting for overhead
};

/**
* @brief Thrift-derived struct describing the column index.
*/
Expand Down
54 changes: 39 additions & 15 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,9 +300,20 @@ reader::impl::impl(std::size_t chunk_read_limit,
_strings_to_categorical,
_timestamp_type.id());

// only need this info for chunk estimation, or if using indexes for pushdown
// filtering (or num_rows filtering). since we don't do the latter yet, just
// test for the former.
if (_chunk_read_limit > 0) {
_metadata->populate_column_metadata(_input_columns, _sources);
_meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit, _stream, _mr);
if (not _meta_chunk_read_info.empty()) {
_chunk_read_limit = 0; // don't need this since we already have splits
}
}

// Save the states of the output buffers for reuse in `chunk_read()`.
// Don't need to do it if we read the file all at once.
if (_chunk_read_limit > 0) {
if (_chunk_read_limit > 0 or not _meta_chunk_read_info.empty()) {
for (auto const& buff : _output_buffers) {
_output_buffers_template.emplace_back(inline_column_buffer::empty_like(buff));
}
Expand Down Expand Up @@ -372,7 +383,9 @@ table_with_metadata reader::impl::read_chunk_internal(
return finalize_output(out_metadata, out_columns, filter);
}

auto const& read_info = _chunk_read_info[_current_read_chunk++];
auto const& read_info = _meta_chunk_read_info.empty()
? _chunk_read_info[_current_read_chunk++]
: _meta_chunk_read_info[_current_read_chunk++];

// Allocate memory buffers for the output columns.
allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds);
Expand Down Expand Up @@ -452,29 +465,40 @@ table_with_metadata reader::impl::read_chunk()
{
// Reset the output buffers to their original states (right after reader construction).
// Don't need to do it if we read the file all at once.
if (_chunk_read_limit > 0) {
if (_chunk_read_limit > 0 || not _meta_chunk_read_info.empty()) {
_output_buffers.resize(0);
for (auto const& buff : _output_buffers_template) {
_output_buffers.emplace_back(inline_column_buffer::empty_like(buff));
}
}

prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
return read_chunk_internal(true, std::nullopt);
if (_meta_chunk_read_info.empty()) {
prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
return read_chunk_internal(true, std::nullopt);
} else {
auto const& chunk_info = _meta_chunk_read_info[_current_read_chunk];
_file_preprocessed = false;
prepare_data(chunk_info.skip_rows, chunk_info.num_rows, true, {}, std::nullopt);
return read_chunk_internal(true, std::nullopt);
}
}

bool reader::impl::has_next()
{
prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
return _current_read_chunk < _chunk_read_info.size();
if (_meta_chunk_read_info.empty()) {
prepare_data(0 /*skip_rows*/,
std::nullopt /*num_rows, `nullopt` means unlimited*/,
true /*uses_custom_row_bounds*/,
{} /*row_group_indices, empty means read all row groups*/,
std::nullopt /*filter*/);
return _current_read_chunk < _chunk_read_info.size();
} else {
return _current_read_chunk < _meta_chunk_read_info.size();
}
}

namespace {
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ class reader::impl {
cudf::io::parquet::gpu::file_intermediate_data _file_itm_data;
cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data;
std::vector<cudf::io::parquet::gpu::chunk_read_info> _chunk_read_info;
std::vector<cudf::io::parquet::gpu::chunk_read_info> _meta_chunk_read_info;
std::size_t _chunk_read_limit{0};
std::size_t _current_read_chunk{0};
bool _file_preprocessed{false};
Expand Down
Loading