From ffcf9633e2e4c7c04648ad9bc62ac7186c77bc1e Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 9 Mar 2023 17:53:30 -0800 Subject: [PATCH 01/24] add raw data size info to column chunk metadata --- .../io/parquet/compact_protocol_reader.cpp | 3 ++- .../io/parquet/compact_protocol_writer.cpp | 3 ++- cpp/src/io/parquet/parquet.hpp | 19 ++++++++++--------- cpp/src/io/parquet/writer_impl.cu | 14 ++++++++++++++ 4 files changed, 28 insertions(+), 11 deletions(-) diff --git a/cpp/src/io/parquet/compact_protocol_reader.cpp b/cpp/src/io/parquet/compact_protocol_reader.cpp index cb94e621262..ec6ced45cbc 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.cpp +++ b/cpp/src/io/parquet/compact_protocol_reader.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -242,6 +242,7 @@ bool CompactProtocolReader::read(ColumnChunkMetaData* c) ParquetFieldInt64(5, c->num_values), ParquetFieldInt64(6, c->total_uncompressed_size), ParquetFieldInt64(7, c->total_compressed_size), + ParquetFieldStructList(8, c->key_value_metadata), ParquetFieldInt64(9, c->data_page_offset), ParquetFieldInt64(10, c->index_page_offset), ParquetFieldInt64(11, c->dictionary_page_offset), diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index f5ae262fa3f..86e0109334c 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -198,6 +198,7 @@ size_t CompactProtocolWriter::write(const ColumnChunkMetaData& s) c.field_int(5, s.num_values); c.field_int(6, s.total_uncompressed_size); c.field_int(7, s.total_compressed_size); + if (s.key_value_metadata.size() != 0) { c.field_struct_list(8, s.key_value_metadata); } c.field_int(9, s.data_page_offset); if (s.index_page_offset != 0) { c.field_int(10, s.index_page_offset); } if (s.dictionary_page_offset != 0) { c.field_int(11, s.dictionary_page_offset); } diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 046ae38020c..33b2914ba2e 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -218,6 +218,14 @@ struct Statistics { std::vector min_value; // min value for column determined by ColumnOrder }; +/** + * @brief Thrift-derived struct describing a key-value pair, for user metadata + */ +struct KeyValue { + std::string key; + std::string value; +}; + /** * @brief Thrift-derived struct describing a column chunk */ @@ -231,6 +239,7 @@ struct ColumnChunkMetaData { 0; // total byte size of all uncompressed pages in this column chunk (including the headers) int64_t total_compressed_size = 0; // total byte size of all compressed pages in this column chunk (including the headers) + std::vector key_value_metadata; // per chunk metadata int64_t data_page_offset = 0; // Byte offset from beginning of file to first data page int64_t index_page_offset = 0; // Byte offset from beginning of file to root index page int64_t dictionary_page_offset = @@ -271,14 +280,6 @@ struct RowGroup { int64_t num_rows = 0; }; -/** - * @brief Thrift-derived struct describing a key-value pair, for user metadata - */ -struct KeyValue { - std::string key; - std::string value; -}; - /** * @brief Thrift-derived struct describing file-level metadata * diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 2c9bff33a14..7f1e0733491 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -31,6 +31,7 @@ #include #include +#include #include #include #include @@ -1459,6 +1460,8 @@ void writer::impl::write(table_view const& table, std::vector co single_streams_table.end(), std::back_inserter(column_sizes), [this](auto const& column) { return column_size(column, stream); }); + for (size_t i = 0; i < column_sizes.size(); i++) + printf("col %ld size %ld\n", i, column_sizes[i]); // adjust global fragment size if a single fragment will overrun a rowgroup auto const table_size = std::reduce(column_sizes.begin(), column_sizes.end()); @@ -1830,6 +1833,7 @@ void writer::impl::write(table_view const& table, std::vector co pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches + size_type row_offset = 0; for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { // Count pages in this batch auto const rnext = r + batch_list[b]; @@ -1866,6 +1870,16 @@ void writer::impl::write(table_view const& table, std::vector co dev_bfr = ck.uncompressed_bfr; } + // should slice once up front with all ranges + // get size of column data for this chunk + auto col_i = single_streams_table.column(i); + auto slice_i = + cudf::slice(col_i, {row_offset, row_offset + (size_type)row_group.num_rows})[0]; + auto foo = column_size(slice_i, stream); + printf("rg %d col %d size %ld\n", r, i, foo); + column_chunk_meta.key_value_metadata.push_back( + std::move(KeyValue{"data_size", std::to_string(foo)})); + if (out_sink_[p]->is_device_write_preferred(ck.compressed_size)) { // let the writer do what it wants to retrieve the data from the gpu. write_tasks.push_back(out_sink_[p]->device_write_async( From 54249e54824677406b47c7f8d4a41ad260ebd730 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 10 Mar 2023 12:39:46 -0800 Subject: [PATCH 02/24] remove prints, calculate page sizes too --- cpp/src/io/parquet/writer_impl.cu | 55 ++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 7f1e0733491..cc5874a4884 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1460,8 +1460,6 @@ void writer::impl::write(table_view const& table, std::vector co single_streams_table.end(), std::back_inserter(column_sizes), [this](auto const& column) { return column_size(column, stream); }); - for (size_t i = 0; i < column_sizes.size(); i++) - printf("col %ld size %ld\n", i, column_sizes[i]); // adjust global fragment size if a single fragment will overrun a rowgroup auto const table_size = std::reduce(column_sizes.begin(), column_sizes.end()); @@ -1818,6 +1816,9 @@ void writer::impl::write(table_view const& table, std::vector co } } + // need pages on host to create offset_indexes and chunk size metadata + thrust::host_vector host_pages; + if (num_pages != 0) { init_encoder_pages(chunks, col_desc, @@ -1828,12 +1829,12 @@ void writer::impl::write(table_view const& table, std::vector co num_columns, num_pages, num_stats_bfr); + host_pages = cudf::detail::make_host_vector_sync(pages, stream); } pinned_buffer host_bfr{nullptr, cudaFreeHost}; // Encode row groups in batches - size_type row_offset = 0; for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { // Count pages in this batch auto const rnext = r + batch_list[b]; @@ -1854,6 +1855,7 @@ void writer::impl::write(table_view const& table, std::vector co : nullptr, (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr); + auto first_page_in_chunk = first_page_in_batch; std::vector> write_tasks; for (; r < rnext; r++) { int p = rg_to_part[r]; @@ -1870,15 +1872,44 @@ void writer::impl::write(table_view const& table, std::vector co dev_bfr = ck.uncompressed_bfr; } - // should slice once up front with all ranges - // get size of column data for this chunk - auto col_i = single_streams_table.column(i); - auto slice_i = - cudf::slice(col_i, {row_offset, row_offset + (size_type)row_group.num_rows})[0]; - auto foo = column_size(slice_i, stream); - printf("rg %d col %d size %ld\n", r, i, foo); - column_chunk_meta.key_value_metadata.push_back( - std::move(KeyValue{"data_size", std::to_string(foo)})); + auto add_size_metadata = [](column_view const& col, + ColumnChunkMetaData& col_meta, + host_span col_pages, + rmm::cuda_stream_view stream) { + std::vector slice_offsets; + + for (auto& page : col_pages) { + if (page.page_type == PageType::DATA_PAGE) { + slice_offsets.push_back(page.start_row); + slice_offsets.push_back(page.start_row + page.num_rows); + } + } + + auto slices = cudf::slice(col, slice_offsets); + + std::vector page_sizes; + std::transform( + slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { + return column_size(slice, stream); + }); + size_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); + std::string page_sizes_str; + + std::for_each(page_sizes.begin(), page_sizes.end(), [&page_sizes_str](auto size) { + page_sizes_str.append(std::to_string(size)); + page_sizes_str.append(1, ','); + }); + + col_meta.key_value_metadata.push_back( + std::move(KeyValue{"data_size", std::to_string(chunk_size)})); + col_meta.key_value_metadata.push_back(std::move(KeyValue{"page_sizes", page_sizes_str})); + }; + + add_size_metadata(single_streams_table.column(i), + column_chunk_meta, + {host_pages.data() + first_page_in_chunk, ck.num_pages}, + stream); + first_page_in_chunk += ck.num_pages; if (out_sink_[p]->is_device_write_preferred(ck.compressed_size)) { // let the writer do what it wants to retrieve the data from the gpu. From 70245347fe1b54af4603c7b2a0533f79e5678a6f Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 15 Mar 2023 12:10:07 -0700 Subject: [PATCH 03/24] add serialized structure to footer --- .../io/parquet/compact_protocol_reader.cpp | 14 ++++++ .../io/parquet/compact_protocol_reader.hpp | 4 +- .../io/parquet/compact_protocol_writer.cpp | 17 +++++++ .../io/parquet/compact_protocol_writer.hpp | 4 +- cpp/src/io/parquet/parquet.hpp | 22 +++++++++ cpp/src/io/parquet/writer_impl.cu | 45 +++++++++++++------ 6 files changed, 90 insertions(+), 16 deletions(-) diff --git a/cpp/src/io/parquet/compact_protocol_reader.cpp b/cpp/src/io/parquet/compact_protocol_reader.cpp index ec6ced45cbc..04a0a16331a 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.cpp +++ b/cpp/src/io/parquet/compact_protocol_reader.cpp @@ -296,6 +296,20 @@ bool CompactProtocolReader::read(OffsetIndex* o) return function_builder(this, op); } +bool CompactProtocolReader::read(PageSize* p) +{ + auto op = std::make_tuple(ParquetFieldInt64(1, p->data_size), ParquetFieldInt64(2, p->page_size)); + return function_builder(this, op); +} + +bool CompactProtocolReader::read(ColumnChunkSize* c) +{ + auto op = std::make_tuple(ParquetFieldInt64(1, c->chunk_size), + ParquetFieldInt64(2, c->full_chunk_size), + ParquetFieldStructList(3, c->page_sizes)); + return function_builder(this, op); +} + bool CompactProtocolReader::read(ColumnIndex* c) { auto op = std::make_tuple(ParquetFieldBoolList(1, c->null_pages), diff --git a/cpp/src/io/parquet/compact_protocol_reader.hpp b/cpp/src/io/parquet/compact_protocol_reader.hpp index 74565b2f244..2976201f2d6 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.hpp +++ b/cpp/src/io/parquet/compact_protocol_reader.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -117,6 +117,8 @@ class CompactProtocolReader { bool read(KeyValue* k); bool read(PageLocation* p); bool read(OffsetIndex* o); + bool read(PageSize* p); + bool read(ColumnChunkSize* o); bool read(ColumnIndex* c); bool read(Statistics* s); diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index 86e0109334c..343b27d7c99 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -222,6 +222,23 @@ size_t CompactProtocolWriter::write(const OffsetIndex& s) return c.value(); } +size_t CompactProtocolWriter::write(const PageSize& s) +{ + CompactProtocolFieldWriter c(*this); + c.field_int(1, s.data_size); + c.field_int(2, s.page_size); + return c.value(); +} + +size_t CompactProtocolWriter::write(const ColumnChunkSize& s) +{ + CompactProtocolFieldWriter c(*this); + c.field_int(1, s.chunk_size); + c.field_int(2, s.full_chunk_size); + c.field_struct_list(3, s.page_sizes); + return c.value(); +} + void CompactProtocolFieldWriter::put_byte(uint8_t v) { writer.m_buf.push_back(v); } void CompactProtocolFieldWriter::put_byte(const uint8_t* raw, uint32_t len) diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index 739e4615099..aee2272b00a 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2018-2022, NVIDIA CORPORATION. + * Copyright (c) 2018-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -52,6 +52,8 @@ class CompactProtocolWriter { size_t write(const ColumnChunkMetaData&); size_t write(const PageLocation&); size_t write(const OffsetIndex&); + size_t write(const PageSize&); + size_t write(const ColumnChunkSize&); protected: std::vector& m_buf; diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 33b2914ba2e..6efa7865671 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -351,6 +351,28 @@ struct OffsetIndex { std::vector page_locations; }; +/** + * @brief Thrift struct for page size info. + * + * This is meant to be used by readers that need to know the full memory footprint of + * the fully decompressed and decoded page. Want to add this to PageLocation in the future. + */ +struct PageSize { + int64_t data_size; // size of data without overhead, meant to be writer agnostic + int64_t page_size; // optional writer-specific size with overhead +}; + +/** + * @brief Thrift struct for column chunk size info. + * + * Like PageSize, but for column chunks. Want to add this to OffsetIndex in the future. + */ +struct ColumnChunkSize { + int64_t chunk_size; // sum of page data_sizes...no overhead + int64_t full_chunk_size; // sum of page page_sizes...includes overhead + std::vector page_sizes; +}; + /** * @brief Thrift-derived struct describing the column index. */ diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index cc5874a4884..600a5465673 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -231,6 +231,7 @@ struct aggregate_writer_metadata { std::vector key_value_metadata; std::vector offset_indexes; std::vector> column_indexes; + std::vector column_sizes; }; std::vector files; std::string created_by = ""; @@ -1872,6 +1873,7 @@ void writer::impl::write(table_view const& table, std::vector co dev_bfr = ck.uncompressed_bfr; } + // experimental: add data sizes to parquet metadata. auto add_size_metadata = [](column_view const& col, ColumnChunkMetaData& col_meta, host_span col_pages, @@ -1892,23 +1894,22 @@ void writer::impl::write(table_view const& table, std::vector co slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { return column_size(slice, stream); }); - size_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); - std::string page_sizes_str; + int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); - std::for_each(page_sizes.begin(), page_sizes.end(), [&page_sizes_str](auto size) { - page_sizes_str.append(std::to_string(size)); - page_sizes_str.append(1, ','); - }); - - col_meta.key_value_metadata.push_back( - std::move(KeyValue{"data_size", std::to_string(chunk_size)})); - col_meta.key_value_metadata.push_back(std::move(KeyValue{"page_sizes", page_sizes_str})); + ColumnChunkSize cs{chunk_size, 0}; + std::transform( + page_sizes.begin(), page_sizes.end(), std::back_inserter(cs.page_sizes), [](auto sz) { + return PageSize{static_cast(sz), 0}; + }); + return cs; }; - add_size_metadata(single_streams_table.column(i), - column_chunk_meta, - {host_pages.data() + first_page_in_chunk, ck.num_pages}, - stream); + auto cs = add_size_metadata(single_streams_table.column(i), + column_chunk_meta, + {host_pages.data() + first_page_in_chunk, ck.num_pages}, + stream); + md->file(p).column_sizes.push_back(cs); + first_page_in_chunk += ck.num_pages; if (out_sink_[p]->is_device_write_preferred(ck.compressed_size)) { @@ -2027,6 +2028,22 @@ std::unique_ptr> writer::impl::close( CompactProtocolWriter cpw(&buffer); file_ender_s fendr; + // experimental: write page size info ahead of other footer metadata + { + auto& fmd = md->file(p); + int chunkidx = 0; + for (auto& r : fmd.row_groups) { + for (auto& c : r.columns) { + auto const& sizes = fmd.column_sizes[chunkidx++]; + buffer.resize(0); + int32_t len = cpw.write(sizes); + c.meta_data.key_value_metadata.push_back( + KeyValue{"sizes_offset", std::to_string(out_sink_[p]->bytes_written())}); + c.meta_data.key_value_metadata.push_back(KeyValue{"sizes_size", std::to_string(len)}); + out_sink_[p]->host_write(buffer.data(), buffer.size()); + } + } + } if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { auto& fmd = md->file(p); From d75d53cad1e783ac599c5092f50c9db270d645dd Mon Sep 17 00:00:00 2001 From: seidl Date: Wed, 15 Mar 2023 17:07:02 -0700 Subject: [PATCH 04/24] read back page indexes and size info and save in metadata --- cpp/src/io/parquet/reader_impl.cpp | 3 + cpp/src/io/parquet/reader_impl_helpers.cpp | 74 ++++++++++++++++++++++ cpp/src/io/parquet/reader_impl_helpers.hpp | 9 +++ 3 files changed, 86 insertions(+) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index b1c4dd22c0d..3c8d98fcce7 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -227,6 +227,9 @@ reader::impl::impl(std::size_t chunk_read_limit, _strings_to_categorical, _timestamp_type.id()); + // get column metadata (column/offset index, column sizes) for selected columns + _metadata->populate_column_metadata(_input_columns, _sources); + // Save the states of the output buffers for reuse in `chunk_read()`. // Don't need to do it if we read the file all at once. if (_chunk_read_limit > 0) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 4c2b5a324c0..2535c73dc14 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -18,6 +18,7 @@ #include #include +#include namespace cudf::io::detail::parquet { @@ -637,4 +638,77 @@ aggregate_reader_metadata::select_columns(std::optional std::move(input_columns), std::move(output_columns), std::move(output_column_schemas)); } +void aggregate_reader_metadata::populate_column_metadata( + std::vector const& input_columns, + std::vector> const& sources) +{ + std::set schema_cols; + std::transform(input_columns.begin(), + input_columns.end(), + std::inserter(schema_cols, schema_cols.end()), + [](auto& col) { return col.schema_idx; }); + + for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { + auto const& source = sources[src_idx]; + auto& metadata = per_file_metadata[src_idx]; + for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { + auto& rg = per_file_metadata[src_idx].row_groups[rg_idx]; + for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++) { + auto& col = rg.columns[col_idx]; + if (schema_cols.find(col.schema_idx) != schema_cols.end()) { + auto& chunk_meta = col.meta_data; + if (chunk_meta.key_value_metadata.size() > 0) { + size_t len = 0; + size_t offset = 0; + for (size_t i = 0; i < chunk_meta.key_value_metadata.size(); i++) { + auto& kv = chunk_meta.key_value_metadata[i]; + if (kv.key == "sizes_size") { + len = std::stol(kv.value); + } else if (kv.key == "sizes_offset") { + offset = std::stol(kv.value); + } + } + + if (len > 0) { + ColumnChunkSize colsize; + const auto ci_buf = source->host_read(offset, len); + cudf::io::parquet::CompactProtocolReader cp(ci_buf->data(), ci_buf->size()); + bool res = cp.read(&colsize); + if (res) { + metadata.column_sizes.push_back(colsize); + } else { + metadata.column_sizes.push_back(ColumnChunkSize{0, 0}); + } + } + } + + if (col.column_index_length > 0) { + cudf::io::parquet::ColumnIndex colidx; + const auto ci_buf = source->host_read(col.column_index_offset, col.column_index_length); + cudf::io::parquet::CompactProtocolReader cp(ci_buf->data(), ci_buf->size()); + bool res = cp.read(&colidx); + if (res) { + metadata.column_indexes.push_back(colidx); + } else { + metadata.column_indexes.push_back(ColumnIndex{}); + } + } + + if (col.offset_index_length > 0) { + cudf::io::parquet::OffsetIndex offidx; + const auto ci_buf = source->host_read(col.offset_index_offset, col.offset_index_length); + cudf::io::parquet::CompactProtocolReader cp(ci_buf->data(), ci_buf->size()); + bool res = cp.read(&offidx); + if (res) { + metadata.offset_indexes.push_back(offidx); + } else { + metadata.offset_indexes.push_back(OffsetIndex{}); + } + } + } + } + } + } +} + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 6fa86a77e46..ca3a807adc7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -65,6 +65,10 @@ struct row_group_info { */ struct metadata : public FileMetaData { explicit metadata(datasource* source); + + std::vector column_indexes; + std::vector offset_indexes; + std::vector column_sizes; }; class aggregate_reader_metadata { @@ -192,6 +196,11 @@ class aggregate_reader_metadata { bool include_index, bool strings_to_categorical, type_id timestamp_type_id) const; + + // read per-file page index and column/page size thrift structures for the selected input + // columns + void populate_column_metadata(std::vector const&, + std::vector> const& sources); }; } // namespace cudf::io::detail::parquet From a42442520fd7ccd6d2f6888280e8c09774eaa3b0 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 16 Mar 2023 09:23:13 -0700 Subject: [PATCH 05/24] don't read indexes if not doing chunking...update copyright --- cpp/src/io/parquet/reader_impl.cpp | 5 ++++- cpp/src/io/parquet/reader_impl_helpers.cpp | 2 +- cpp/src/io/parquet/reader_impl_helpers.hpp | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 3c8d98fcce7..2d631ab357e 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -227,8 +227,11 @@ reader::impl::impl(std::size_t chunk_read_limit, _strings_to_categorical, _timestamp_type.id()); + // only need this info for chunk estimation, or if using indexes for pushdown + // filtering (or num_rows filtering). since we don't do the latter yet, just + // test for the former. That's why this isn't in the if{} immediately following this one. // get column metadata (column/offset index, column sizes) for selected columns - _metadata->populate_column_metadata(_input_columns, _sources); + if (_chunk_read_limit > 0) { _metadata->populate_column_metadata(_input_columns, _sources); } // Save the states of the output buffers for reuse in `chunk_read()`. // Don't need to do it if we read the file all at once. diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 2535c73dc14..bd6c7f7c06f 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index ca3a807adc7..87aaa81520a 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, NVIDIA CORPORATION. + * Copyright (c) 2022-2023, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 372529b9eb555f5feccd82bc8038eb247565282f Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 16 Mar 2023 15:42:40 -0700 Subject: [PATCH 06/24] calculate splits with metadata --- cpp/src/io/parquet/reader_impl.cpp | 46 +++-- cpp/src/io/parquet/reader_impl.hpp | 1 + cpp/src/io/parquet/reader_impl_helpers.cpp | 173 ++++++++++++++++++- cpp/src/io/parquet/reader_impl_helpers.hpp | 3 + cpp/src/io/parquet/reader_impl_preprocess.cu | 38 ++-- 5 files changed, 235 insertions(+), 26 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 2d631ab357e..5f42faf731f 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -231,11 +231,16 @@ reader::impl::impl(std::size_t chunk_read_limit, // filtering (or num_rows filtering). since we don't do the latter yet, just // test for the former. That's why this isn't in the if{} immediately following this one. // get column metadata (column/offset index, column sizes) for selected columns - if (_chunk_read_limit > 0) { _metadata->populate_column_metadata(_input_columns, _sources); } + if (_chunk_read_limit > 0) { + _metadata->populate_column_metadata(_input_columns, _sources); + auto splits = _metadata->compute_splits(_chunk_read_limit); + _meta_chunk_read_info = std::move(splits); + _chunk_read_limit = 0; // don't need this since we already have splits + } // Save the states of the output buffers for reuse in `chunk_read()`. // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { + if (_chunk_read_limit > 0 or not _meta_chunk_read_info.empty()) { for (auto const& buff : _output_buffers) { _output_buffers_template.emplace_back(column_buffer::empty_like(buff)); } @@ -279,7 +284,9 @@ table_with_metadata reader::impl::read_chunk_internal(bool uses_custom_row_bound return finalize_output(out_metadata, out_columns); } - auto const& read_info = _chunk_read_info[_current_read_chunk++]; + auto const& read_info = _meta_chunk_read_info.empty() + ? _chunk_read_info[_current_read_chunk++] + : _meta_chunk_read_info[_current_read_chunk++]; // Allocate memory buffers for the output columns. allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds); @@ -352,27 +359,38 @@ table_with_metadata reader::impl::read_chunk() { // Reset the output buffers to their original states (right after reader construction). // Don't need to do it if we read the file all at once. - if (_chunk_read_limit > 0) { + if (_chunk_read_limit > 0 || not _meta_chunk_read_info.empty()) { _output_buffers.resize(0); for (auto const& buff : _output_buffers_template) { _output_buffers.emplace_back(column_buffer::empty_like(buff)); } } - prepare_data(0 /*skip_rows*/, - -1 /*num_rows, `-1` means unlimited*/, - true /*uses_custom_row_bounds*/, - {} /*row_group_indices, empty means read all row groups*/); - return read_chunk_internal(true); + if (_meta_chunk_read_info.empty()) { + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return read_chunk_internal(true); + } else { + auto const& chunk_info = _meta_chunk_read_info[_current_read_chunk]; + _file_preprocessed = false; + prepare_data(chunk_info.skip_rows, chunk_info.num_rows, true, {}); + return read_chunk_internal(true); + } } bool reader::impl::has_next() { - prepare_data(0 /*skip_rows*/, - -1 /*num_rows, `-1` means unlimited*/, - true /*uses_custom_row_bounds*/, - {} /*row_group_indices, empty means read all row groups*/); - return _current_read_chunk < _chunk_read_info.size(); + if (_meta_chunk_read_info.empty()) { + prepare_data(0 /*skip_rows*/, + -1 /*num_rows, `-1` means unlimited*/, + true /*uses_custom_row_bounds*/, + {} /*row_group_indices, empty means read all row groups*/); + return _current_read_chunk < _chunk_read_info.size(); + } else { + return _current_read_chunk < _meta_chunk_read_info.size(); + } } } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index 8b86412ae63..bd0d3977623 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -251,6 +251,7 @@ class reader::impl { cudf::io::parquet::gpu::file_intermediate_data _file_itm_data; cudf::io::parquet::gpu::chunk_intermediate_data _chunk_itm_data; std::vector _chunk_read_info; + std::vector _meta_chunk_read_info; std::size_t _chunk_read_limit{0}; std::size_t _current_read_chunk{0}; bool _file_preprocessed{false}; diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index bd6c7f7c06f..84a6cef61f3 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -20,6 +20,13 @@ #include #include +#include +#include +#include +#include +#include +#include + namespace cudf::io::detail::parquet { namespace { @@ -651,8 +658,8 @@ void aggregate_reader_metadata::populate_column_metadata( for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { auto const& source = sources[src_idx]; auto& metadata = per_file_metadata[src_idx]; - for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { - auto& rg = per_file_metadata[src_idx].row_groups[rg_idx]; + for (size_t rg_idx = 0; rg_idx < metadata.row_groups.size(); ++rg_idx) { + auto& rg = metadata.row_groups[rg_idx]; for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++) { auto& col = rg.columns[col_idx]; if (schema_cols.find(col.schema_idx) != schema_cols.end()) { @@ -711,4 +718,166 @@ void aggregate_reader_metadata::populate_column_metadata( } } +template +inline auto make_counting_transform_iterator(cudf::size_type start, UnaryFunction f) +{ + return thrust::make_transform_iterator(thrust::make_counting_iterator(start), f); +} + +std::vector aggregate_reader_metadata::compute_splits(size_t chunk_read_limit) +{ + std::vector splits; + if (per_file_metadata[0].column_sizes.size() == 0) return splits; + + struct cumulative_row_info { + size_t row_count; // cumulative row count + size_t size_bytes; // cumulative size in bytes + int key; // schema index + }; + + std::vector page_sizes; + + // create page_keys and page_index + std::vector page_keys; + std::vector page_index; + + int global_page_idx = 0; + for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { + auto const& metadata = per_file_metadata[src_idx]; + size_t chunk_idx = 0; + for (size_t rg_idx = 0; rg_idx < metadata.row_groups.size(); ++rg_idx) { + auto const& rg = metadata.row_groups[rg_idx]; + for (size_t col_idx = 0; col_idx < rg.columns.size(); col_idx++, chunk_idx++) { + auto const& col = rg.columns[col_idx]; + auto const& chunk_size = metadata.column_sizes[chunk_idx]; + auto const& offsets = metadata.offset_indexes[chunk_idx]; + + for (size_t page_idx = 0; page_idx < chunk_size.page_sizes.size(); page_idx++) { + page_keys.push_back(col.schema_idx); + page_index.push_back(global_page_idx++); + + size_t num_rows = (page_idx == chunk_size.page_sizes.size() - 1 + ? rg.num_rows + : offsets.page_locations[page_idx + 1].first_row_index) - + offsets.page_locations[page_idx].first_row_index; + size_t page_size = chunk_size.page_sizes[page_idx].data_size; + page_sizes.push_back({num_rows, page_size, col.schema_idx}); + } + } + } + } + + thrust::stable_sort_by_key( + page_keys.begin(), page_keys.end(), page_index.begin(), thrust::less()); + + struct cumulative_row_sum { + cumulative_row_info operator()(cumulative_row_info const& a, cumulative_row_info const& b) const + { + return cumulative_row_info{a.row_count + b.row_count, a.size_bytes + b.size_bytes, a.key}; + } + }; + + struct get_cumulative_row_info { + cumulative_row_info const* ci; + + cumulative_row_info operator()(size_type index) { return ci[index]; } + }; + + std::vector c_info(page_keys.size()); + auto page_input = + thrust::make_transform_iterator(page_index.begin(), get_cumulative_row_info{page_sizes.data()}); + thrust::inclusive_scan_by_key(page_keys.begin(), + page_keys.end(), + page_input, + c_info.begin(), + thrust::equal_to{}, + cumulative_row_sum{}); + + // sort by row count + std::vector c_info_sorted{c_info}; + thrust::sort(c_info_sorted.begin(), + c_info_sorted.end(), + [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { + return a.row_count < b.row_count; + }); + + // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per + // key + std::vector key_offsets(page_keys.size() + 1); + auto const key_offsets_end = thrust::reduce_by_key(page_keys.begin(), + page_keys.end(), + thrust::make_constant_iterator(1), + thrust::make_discard_iterator(), + key_offsets.begin()) + .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); + thrust::exclusive_scan(key_offsets.begin(), key_offsets.end(), key_offsets.begin()); + + struct row_total_size { + cumulative_row_info const* c_info; + size_type const* key_offsets; + size_t num_keys; + + cumulative_row_info operator()(cumulative_row_info const& i) + { + // sum sizes for each input column at this row + size_t sum = 0; + for (size_t idx = 0; idx < num_keys; idx++) { + auto const start = key_offsets[idx]; + auto const end = key_offsets[idx + 1]; + auto iter = + make_counting_transform_iterator(0, [&](size_type i) { return c_info[i].row_count; }); + auto const page_index = + thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_count) - iter; + sum += c_info[page_index].size_bytes; + } + return {i.row_count, sum, i.key}; + } + }; + + std::vector aggregated_info(c_info.size()); + thrust::transform(c_info_sorted.begin(), + c_info_sorted.end(), + aggregated_info.begin(), + row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); + + { + size_t cur_pos = 0; + size_t cur_cumulative_size = 0; + size_t cur_row_count = 0; + auto start = thrust::make_transform_iterator( + aggregated_info.begin(), + [&](cumulative_row_info const& i) { return i.size_bytes - cur_cumulative_size; }); + auto end = start + aggregated_info.size(); + while (cur_row_count < static_cast(num_rows)) { + int64_t split_pos = thrust::lower_bound(start + cur_pos, end, chunk_read_limit) - start; + + // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back + // one. + if (static_cast(split_pos) >= aggregated_info.size() || + (aggregated_info[split_pos].size_bytes - cur_cumulative_size > chunk_read_limit)) { + split_pos--; + } + + // best-try. if we can't find something that'll fit, we have to go bigger. we're doing this in + // a loop because all of the cumulative sizes for all the pages are sorted into one big list. + // so if we had two columns, both of which had an entry {1000, 10000}, that entry would be in + // the list twice. so we have to iterate until we skip past all of them. The idea is that we + // either do this, or we have to call unique() on the input first. + while (split_pos < (static_cast(aggregated_info.size()) - 1) && + (split_pos < 0 || aggregated_info[split_pos].row_count == cur_row_count)) { + split_pos++; + } + + auto const start_row = cur_row_count; + cur_row_count = aggregated_info[split_pos].row_count; + splits.push_back(gpu::chunk_read_info{start_row, cur_row_count - start_row}); + cur_pos = split_pos; + cur_cumulative_size = aggregated_info[split_pos].size_bytes; + } + } + + return splits; +} + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 87aaa81520a..73e2ce71ea5 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -201,6 +201,9 @@ class aggregate_reader_metadata { // columns void populate_column_metadata(std::vector const&, std::vector> const& sources); + + // use metadata to create {skip_rows,num_rows} pairs for the chunked reader + std::vector compute_splits(size_t chunk_read_limit); }; } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 6b5d4ba3640..18ef4de468b 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -825,6 +825,7 @@ struct cumulative_row_info { int key; // schema index }; +//#define PREPROCESS_DEBUG 1 #if defined(PREPROCESS_DEBUG) void print_pages(hostdevice_vector& pages, rmm::cuda_stream_view _stream) { @@ -1101,7 +1102,9 @@ std::vector find_splits(std::vector c cur_cumulative_size = sizes[split_pos].size_bytes; } } - // print_cumulative_row_info(sizes, "adjusted", splits); +#if defined(PREPROCESS_DEBUG) + print_cumulative_row_info(sizes, "adjusted", splits); +#endif return splits; } @@ -1138,7 +1141,15 @@ std::vector compute_splits(hostdevice_vector h_c_info(c_info.size()); + CUDF_CUDA_TRY(cudaMemcpy(h_c_info.data(), + c_info.data(), + sizeof(cumulative_row_info) * c_info.size(), + cudaMemcpyDefault)); + print_cumulative_page_info(pages, page_index, c_info, stream); + print_cumulative_row_info(h_c_info, "og"); +#endif // sort by row count rmm::device_uvector c_info_sorted{c_info, stream}; @@ -1148,13 +1159,14 @@ std::vector compute_splits(hostdevice_vector h_c_info_sorted(c_info_sorted.size()); - // CUDF_CUDA_TRY(cudaMemcpy(h_c_info_sorted.data(), - // c_info_sorted.data(), - // sizeof(cumulative_row_info) * c_info_sorted.size(), - // cudaMemcpyDefault)); - // print_cumulative_row_info(h_c_info_sorted, "raw"); +#if defined(PREPROCESS_DEBUG) + std::vector h_c_info_sorted(c_info_sorted.size()); + CUDF_CUDA_TRY(cudaMemcpy(h_c_info_sorted.data(), + c_info_sorted.data(), + sizeof(cumulative_row_info) * c_info_sorted.size(), + cudaMemcpyDefault)); + print_cumulative_row_info(h_c_info_sorted, "raw"); +#endif // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per // key @@ -1426,13 +1438,17 @@ void reader::impl::preprocess_pages(size_t skip_rows, pages.device_ptr() + pages.size(), page_keys.begin(), get_page_schema{}); + // page_keys: 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 thrust::sequence(rmm::exec_policy(_stream), page_index.begin(), page_index.end()); + // page_index: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 thrust::stable_sort_by_key(rmm::exec_policy(_stream), page_keys.begin(), page_keys.end(), page_index.begin(), thrust::less()); + // page_keys: 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 + // page_index: 0, 1, 6, 7, 2, 3, 8, 9, 4, 5, 10, 11 } // detect malformed columns. @@ -1550,7 +1566,9 @@ void reader::impl::preprocess_pages(size_t skip_rows, // retrieve pages back pages.device_to_host(_stream, true); - // print_pages(pages, _stream); +#if defined(PREPROCESS_DEBUG) + print_pages(pages, _stream); +#endif } // compute splits if necessary. otherwise return a single split representing From be492fad92ae61b4a0ec87f67f35c612f1783e91 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 16 Mar 2023 15:54:57 -0700 Subject: [PATCH 07/24] check for empty offset index --- cpp/src/io/parquet/reader_impl_helpers.cpp | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 84a6cef61f3..5862d2cf5c9 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -718,6 +718,9 @@ void aggregate_reader_metadata::populate_column_metadata( } } +// the following code intends to replicate compute_splits/find_splits using +// file metadata rather than having to read the pages. the should be refactored +// so both implementations can share code (if possible). template inline auto make_counting_transform_iterator(cudf::size_type start, UnaryFunction f) { @@ -727,7 +730,9 @@ inline auto make_counting_transform_iterator(cudf::size_type start, UnaryFunctio std::vector aggregate_reader_metadata::compute_splits(size_t chunk_read_limit) { std::vector splits; - if (per_file_metadata[0].column_sizes.size() == 0) return splits; + if (per_file_metadata[0].column_sizes.empty() or per_file_metadata[0].offset_indexes.empty()) { + return splits; + } struct cumulative_row_info { size_t row_count; // cumulative row count From ab615867dd4c883132663413d89023cf33214dee Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 16 Mar 2023 16:09:03 -0700 Subject: [PATCH 08/24] do not clear chunk_read_limit if size stats are not present --- cpp/src/io/parquet/reader_impl.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 5f42faf731f..727099d6faa 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -229,13 +229,13 @@ reader::impl::impl(std::size_t chunk_read_limit, // only need this info for chunk estimation, or if using indexes for pushdown // filtering (or num_rows filtering). since we don't do the latter yet, just - // test for the former. That's why this isn't in the if{} immediately following this one. - // get column metadata (column/offset index, column sizes) for selected columns + // test for the former. if (_chunk_read_limit > 0) { _metadata->populate_column_metadata(_input_columns, _sources); - auto splits = _metadata->compute_splits(_chunk_read_limit); - _meta_chunk_read_info = std::move(splits); - _chunk_read_limit = 0; // don't need this since we already have splits + _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit); + if (not _meta_chunk_read_info.empty()) { + _chunk_read_limit = 0; // don't need this since we already have splits + } } // Save the states of the output buffers for reuse in `chunk_read()`. From 501a2090eaf0c95d5f9f4eb262a65cc9f0e0afa9 Mon Sep 17 00:00:00 2001 From: seidl Date: Thu, 16 Mar 2023 16:50:04 -0700 Subject: [PATCH 09/24] fudge sizes for now --- cpp/src/io/parquet/writer_impl.cu | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 9d243211728..bcfaf44f03b 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1887,7 +1887,9 @@ void writer::impl::write(table_view const& table, std::vector co std::vector page_sizes; std::transform( slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { - return column_size(slice, stream); + // FIXME: fudge factor to try to match size including overhead. remove when we + // calculate both. + return (column_size(slice, stream) * 13) / 10; }); int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); From 5e3608c2ad3812ec9cfe32c460c4be46dd9fe5e0 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 10:03:03 -0700 Subject: [PATCH 10/24] get rid of sizes with overhead --- cpp/src/io/parquet/compact_protocol_reader.cpp | 11 ++--------- cpp/src/io/parquet/compact_protocol_reader.hpp | 1 - cpp/src/io/parquet/compact_protocol_writer.cpp | 11 +---------- cpp/src/io/parquet/compact_protocol_writer.hpp | 1 - cpp/src/io/parquet/parquet.hpp | 16 ++-------------- cpp/src/io/parquet/reader_impl_helpers.cpp | 4 ++-- cpp/src/io/parquet/reader_impl_helpers.hpp | 2 +- cpp/src/io/parquet/writer_impl.cu | 10 ++++------ 8 files changed, 12 insertions(+), 44 deletions(-) diff --git a/cpp/src/io/parquet/compact_protocol_reader.cpp b/cpp/src/io/parquet/compact_protocol_reader.cpp index 04a0a16331a..ab477dbec17 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.cpp +++ b/cpp/src/io/parquet/compact_protocol_reader.cpp @@ -296,17 +296,10 @@ bool CompactProtocolReader::read(OffsetIndex* o) return function_builder(this, op); } -bool CompactProtocolReader::read(PageSize* p) -{ - auto op = std::make_tuple(ParquetFieldInt64(1, p->data_size), ParquetFieldInt64(2, p->page_size)); - return function_builder(this, op); -} - bool CompactProtocolReader::read(ColumnChunkSize* c) { - auto op = std::make_tuple(ParquetFieldInt64(1, c->chunk_size), - ParquetFieldInt64(2, c->full_chunk_size), - ParquetFieldStructList(3, c->page_sizes)); + auto op = + std::make_tuple(ParquetFieldInt64(1, c->chunk_size), ParquetFieldInt64List(2, c->page_sizes)); return function_builder(this, op); } diff --git a/cpp/src/io/parquet/compact_protocol_reader.hpp b/cpp/src/io/parquet/compact_protocol_reader.hpp index 2976201f2d6..3bfecd3136d 100644 --- a/cpp/src/io/parquet/compact_protocol_reader.hpp +++ b/cpp/src/io/parquet/compact_protocol_reader.hpp @@ -117,7 +117,6 @@ class CompactProtocolReader { bool read(KeyValue* k); bool read(PageLocation* p); bool read(OffsetIndex* o); - bool read(PageSize* p); bool read(ColumnChunkSize* o); bool read(ColumnIndex* c); bool read(Statistics* s); diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index 343b27d7c99..55d03754e08 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -222,20 +222,11 @@ size_t CompactProtocolWriter::write(const OffsetIndex& s) return c.value(); } -size_t CompactProtocolWriter::write(const PageSize& s) -{ - CompactProtocolFieldWriter c(*this); - c.field_int(1, s.data_size); - c.field_int(2, s.page_size); - return c.value(); -} - size_t CompactProtocolWriter::write(const ColumnChunkSize& s) { CompactProtocolFieldWriter c(*this); c.field_int(1, s.chunk_size); - c.field_int(2, s.full_chunk_size); - c.field_struct_list(3, s.page_sizes); + c.field_int_list(2, s.page_sizes); return c.value(); } diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index aee2272b00a..f9e15842d57 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -52,7 +52,6 @@ class CompactProtocolWriter { size_t write(const ColumnChunkMetaData&); size_t write(const PageLocation&); size_t write(const OffsetIndex&); - size_t write(const PageSize&); size_t write(const ColumnChunkSize&); protected: diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 6efa7865671..5973dd0c24a 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -351,26 +351,14 @@ struct OffsetIndex { std::vector page_locations; }; -/** - * @brief Thrift struct for page size info. - * - * This is meant to be used by readers that need to know the full memory footprint of - * the fully decompressed and decoded page. Want to add this to PageLocation in the future. - */ -struct PageSize { - int64_t data_size; // size of data without overhead, meant to be writer agnostic - int64_t page_size; // optional writer-specific size with overhead -}; - /** * @brief Thrift struct for column chunk size info. * * Like PageSize, but for column chunks. Want to add this to OffsetIndex in the future. */ struct ColumnChunkSize { - int64_t chunk_size; // sum of page data_sizes...no overhead - int64_t full_chunk_size; // sum of page page_sizes...includes overhead - std::vector page_sizes; + int64_t chunk_size; // sum of page_sizes...no overhead + std::vector page_sizes; // size of page data in bytes not accounting for overhead }; /** diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 5862d2cf5c9..ac12afa1b25 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -684,7 +684,7 @@ void aggregate_reader_metadata::populate_column_metadata( if (res) { metadata.column_sizes.push_back(colsize); } else { - metadata.column_sizes.push_back(ColumnChunkSize{0, 0}); + metadata.column_sizes.push_back(ColumnChunkSize{0}); } } } @@ -765,7 +765,7 @@ std::vector aggregate_reader_metadata::compute_splits(size ? rg.num_rows : offsets.page_locations[page_idx + 1].first_row_index) - offsets.page_locations[page_idx].first_row_index; - size_t page_size = chunk_size.page_sizes[page_idx].data_size; + size_t page_size = chunk_size.page_sizes[page_idx]; page_sizes.push_back({num_rows, page_size, col.schema_idx}); } } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 73e2ce71ea5..4f95a34c7b7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -203,7 +203,7 @@ class aggregate_reader_metadata { std::vector> const& sources); // use metadata to create {skip_rows,num_rows} pairs for the chunked reader - std::vector compute_splits(size_t chunk_read_limit); + [[nodiscard]] std::vector compute_splits(size_t chunk_read_limit); }; } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index bcfaf44f03b..764c61cd119 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1875,7 +1875,7 @@ void writer::impl::write(table_view const& table, std::vector co rmm::cuda_stream_view stream) { std::vector slice_offsets; - for (auto& page : col_pages) { + for (auto const& page : col_pages) { if (page.page_type == PageType::DATA_PAGE) { slice_offsets.push_back(page.start_row); slice_offsets.push_back(page.start_row + page.num_rows); @@ -1887,16 +1887,14 @@ void writer::impl::write(table_view const& table, std::vector co std::vector page_sizes; std::transform( slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { - // FIXME: fudge factor to try to match size including overhead. remove when we - // calculate both. - return (column_size(slice, stream) * 13) / 10; + return column_size(slice, stream); }); int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); - ColumnChunkSize cs{chunk_size, 0}; + ColumnChunkSize cs{chunk_size}; std::transform( page_sizes.begin(), page_sizes.end(), std::back_inserter(cs.page_sizes), [](auto sz) { - return PageSize{static_cast(sz), 0}; + return static_cast(sz); }); return cs; }; From 2b77f22cb0879e48d09a6795a17e4372899cccdb Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 11:17:16 -0700 Subject: [PATCH 11/24] add writer for vector --- cpp/src/io/parquet/compact_protocol_writer.cpp | 11 +++++++++++ cpp/src/io/parquet/compact_protocol_writer.hpp | 2 ++ 2 files changed, 13 insertions(+) diff --git a/cpp/src/io/parquet/compact_protocol_writer.cpp b/cpp/src/io/parquet/compact_protocol_writer.cpp index 55d03754e08..23fcfbdb6d6 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.cpp +++ b/cpp/src/io/parquet/compact_protocol_writer.cpp @@ -305,6 +305,17 @@ inline void CompactProtocolFieldWriter::field_int_list(int field, const std::vec current_field_value = field; } +inline void CompactProtocolFieldWriter::field_int_list(int field, const std::vector& val) +{ + put_field_header(field, current_field_value, ST_FLD_LIST); + put_byte((uint8_t)((std::min(val.size(), (size_t)0xfu) << 4) | ST_FLD_I64)); + if (val.size() >= 0xf) put_uint(val.size()); + for (auto& v : val) { + put_int(static_cast(v)); + } + current_field_value = field; +} + template inline void CompactProtocolFieldWriter::field_struct(int field, const T& val) { diff --git a/cpp/src/io/parquet/compact_protocol_writer.hpp b/cpp/src/io/parquet/compact_protocol_writer.hpp index f9e15842d57..b1627088235 100644 --- a/cpp/src/io/parquet/compact_protocol_writer.hpp +++ b/cpp/src/io/parquet/compact_protocol_writer.hpp @@ -91,6 +91,8 @@ class CompactProtocolFieldWriter { template inline void field_int_list(int field, const std::vector& val); + inline void field_int_list(int field, const std::vector& val); + template inline void field_struct(int field, const T& val); From 5c7480e667bf9543119f99a09e8e5489ccfe4b2c Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 12:15:58 -0700 Subject: [PATCH 12/24] only do size metadata if also doing column indexes --- cpp/src/io/parquet/writer_impl.cu | 76 +++++++++++++++---------------- 1 file changed, 36 insertions(+), 40 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 764c61cd119..a8e682b0d15 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -126,6 +126,37 @@ bool is_col_fixed_width(column_view const& column) return is_fixed_width(column.type()); } +// given a column_view and the encoded page info for the pages in that chunk, +// return a ColumnChunkSize object with the data sizes for each page. +ColumnChunkSize sizes_for_chunk(column_view const& col, + host_span col_pages, + rmm::cuda_stream_view stream) +{ + std::vector slice_offsets; + std::for_each(col_pages.begin(), col_pages.end(), [&](auto const& page) { + if (page.page_type == PageType::DATA_PAGE) { + slice_offsets.push_back(page.start_row); + slice_offsets.push_back(page.start_row + page.num_rows); + } + }); + + auto slices = cudf::slice(col, slice_offsets); + + std::vector page_sizes; + std::transform( + slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { + return column_size(slice, stream); + }); + int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); + + ColumnChunkSize cs{chunk_size}; + std::transform( + page_sizes.begin(), page_sizes.end(), std::back_inserter(cs.page_sizes), [](auto sz) { + return static_cast(sz); + }); + return cs; +} + } // namespace struct aggregate_writer_metadata { @@ -1851,7 +1882,6 @@ void writer::impl::write(table_view const& table, std::vector co : nullptr, (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) ? page_stats.data() : nullptr); - auto first_page_in_chunk = first_page_in_batch; std::vector> write_tasks; for (; r < rnext; r++) { int p = rg_to_part[r]; @@ -1868,45 +1898,6 @@ void writer::impl::write(table_view const& table, std::vector co dev_bfr = ck.uncompressed_bfr; } - // experimental: add data sizes to parquet metadata. - auto add_size_metadata = [](column_view const& col, - ColumnChunkMetaData& col_meta, - host_span col_pages, - rmm::cuda_stream_view stream) { - std::vector slice_offsets; - - for (auto const& page : col_pages) { - if (page.page_type == PageType::DATA_PAGE) { - slice_offsets.push_back(page.start_row); - slice_offsets.push_back(page.start_row + page.num_rows); - } - } - - auto slices = cudf::slice(col, slice_offsets); - - std::vector page_sizes; - std::transform( - slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { - return column_size(slice, stream); - }); - int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); - - ColumnChunkSize cs{chunk_size}; - std::transform( - page_sizes.begin(), page_sizes.end(), std::back_inserter(cs.page_sizes), [](auto sz) { - return static_cast(sz); - }); - return cs; - }; - - auto cs = add_size_metadata(single_streams_table.column(i), - column_chunk_meta, - {host_pages.data() + first_page_in_chunk, ck.num_pages}, - stream); - md->file(p).column_sizes.push_back(cs); - - first_page_in_chunk += ck.num_pages; - if (out_sink_[p]->is_device_write_preferred(ck.compressed_size)) { // let the writer do what it wants to retrieve the data from the gpu. write_tasks.push_back(out_sink_[p]->device_write_async( @@ -1984,6 +1975,11 @@ void writer::impl::write(table_view const& table, std::vector co cudaMemcpyDefault, stream.value())); + // experimental: add data sizes to parquet metadata. + auto cs = sizes_for_chunk( + single_streams_table.column(i), {h_pages.data() + curr_page_idx, ck.num_pages}, stream); + md->file(p).column_sizes.push_back(cs); + // calculate offsets while the column index is transferring int64_t curr_pg_offset = column_chunk_meta.data_page_offset; From cdf8abe6908fa0b2c2bbff2fa81f129d404e8201 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 12:25:38 -0700 Subject: [PATCH 13/24] get rid of host_pages vector, add some TODOs --- cpp/src/io/parquet/writer_impl.cu | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index a8e682b0d15..29e2a21d340 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1843,9 +1843,6 @@ void writer::impl::write(table_view const& table, std::vector co } } - // need pages on host to create offset_indexes and chunk size metadata - thrust::host_vector host_pages; - if (num_pages != 0) { init_encoder_pages(chunks, col_desc, @@ -1856,7 +1853,6 @@ void writer::impl::write(table_view const& table, std::vector co num_columns, num_pages, num_stats_bfr); - host_pages = cudf::detail::make_host_vector_sync(pages, stream); } pinned_buffer host_bfr{nullptr, cudaFreeHost}; @@ -1951,8 +1947,7 @@ void writer::impl::write(table_view const& table, std::vector co if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { // need pages on host to create offset_indexes - thrust::host_vector h_pages = cudf::detail::make_host_vector_async(pages, stream); - stream.synchronize(); + thrust::host_vector h_pages = cudf::detail::make_host_vector_sync(pages, stream); // add column and offset indexes to metadata for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { @@ -2028,6 +2023,7 @@ std::unique_ptr> writer::impl::close( auto const& sizes = fmd.column_sizes[chunkidx++]; buffer.resize(0); int32_t len = cpw.write(sizes); + // TODO these should be constants c.meta_data.key_value_metadata.push_back( KeyValue{"sizes_offset", std::to_string(out_sink_[p]->bytes_written())}); c.meta_data.key_value_metadata.push_back(KeyValue{"sizes_size", std::to_string(len)}); From 3ea748b406003072e0119229bbc60f256ce78d41 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 13:06:02 -0700 Subject: [PATCH 14/24] use constexpr for KV keys --- cpp/src/io/parquet/parquet.hpp | 3 +++ cpp/src/io/parquet/reader_impl_helpers.cpp | 4 ++-- cpp/src/io/parquet/writer_impl.cu | 6 +++--- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index 5973dd0c24a..c7ca6da544a 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -28,6 +28,9 @@ namespace io { namespace parquet { constexpr uint32_t parquet_magic = (('P' << 0) | ('A' << 8) | ('R' << 16) | ('1' << 24)); +constexpr std::string_view COL_META_SIZES_OFFSET = "sizes_offset"; +constexpr std::string_view COL_META_SIZES_SIZE = "sizes_size"; + /** * @brief Struct that describes the Parquet file data header */ diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index ac12afa1b25..aed35a06c0e 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -669,9 +669,9 @@ void aggregate_reader_metadata::populate_column_metadata( size_t offset = 0; for (size_t i = 0; i < chunk_meta.key_value_metadata.size(); i++) { auto& kv = chunk_meta.key_value_metadata[i]; - if (kv.key == "sizes_size") { + if (kv.key == COL_META_SIZES_SIZE) { len = std::stol(kv.value); - } else if (kv.key == "sizes_offset") { + } else if (kv.key == COL_META_SIZES_OFFSET) { offset = std::stol(kv.value); } } diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 29e2a21d340..a7b71ccf50a 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -2023,10 +2023,10 @@ std::unique_ptr> writer::impl::close( auto const& sizes = fmd.column_sizes[chunkidx++]; buffer.resize(0); int32_t len = cpw.write(sizes); - // TODO these should be constants + c.meta_data.key_value_metadata.push_back(KeyValue{ + std::string(COL_META_SIZES_OFFSET), std::to_string(out_sink_[p]->bytes_written())}); c.meta_data.key_value_metadata.push_back( - KeyValue{"sizes_offset", std::to_string(out_sink_[p]->bytes_written())}); - c.meta_data.key_value_metadata.push_back(KeyValue{"sizes_size", std::to_string(len)}); + KeyValue{std::string(COL_META_SIZES_SIZE), std::to_string(len)}); out_sink_[p]->host_write(buffer.data(), buffer.size()); } } From 9f0e9ec9f93ca6b2acf44ac0270865c8d41db872 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 14:01:27 -0700 Subject: [PATCH 15/24] checkpoint before refactor --- cpp/src/io/parquet/reader_impl.cpp | 4 ++++ cpp/src/io/parquet/reader_impl_helpers.cpp | 16 +++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 727099d6faa..3a8a75b0f26 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -231,8 +231,12 @@ reader::impl::impl(std::size_t chunk_read_limit, // filtering (or num_rows filtering). since we don't do the latter yet, just // test for the former. if (_chunk_read_limit > 0) { + auto tstart = std::chrono::system_clock::now(); _metadata->populate_column_metadata(_input_columns, _sources); _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit); + auto tend = std::chrono::system_clock::now(); + auto t = std::chrono::duration_cast(tend - tstart).count(); + printf("meta data time %ldms\n", t); if (not _meta_chunk_read_info.empty()) { _chunk_read_limit = 0; // don't need this since we already have splits } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index aed35a06c0e..8cb24e3d0df 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -766,6 +766,7 @@ std::vector aggregate_reader_metadata::compute_splits(size : offsets.page_locations[page_idx + 1].first_row_index) - offsets.page_locations[page_idx].first_row_index; size_t page_size = chunk_size.page_sizes[page_idx]; + // FIXME: need to add overhead cost to page_size. figure out how to do that. page_sizes.push_back({num_rows, page_size, col.schema_idx}); } } @@ -782,15 +783,9 @@ std::vector aggregate_reader_metadata::compute_splits(size } }; - struct get_cumulative_row_info { - cumulative_row_info const* ci; - - cumulative_row_info operator()(size_type index) { return ci[index]; } - }; - std::vector c_info(page_keys.size()); auto page_input = - thrust::make_transform_iterator(page_index.begin(), get_cumulative_row_info{page_sizes.data()}); + thrust::make_transform_iterator(page_index.begin(), [&](auto idx) { return page_sizes[idx]; }); thrust::inclusive_scan_by_key(page_keys.begin(), page_keys.end(), page_input, @@ -798,6 +793,10 @@ std::vector aggregate_reader_metadata::compute_splits(size thrust::equal_to{}, cumulative_row_sum{}); + // TODO: move page_keys, page_index, and c_info to device, and then use the code that already + // exists in the reader. + auto tstart = std::chrono::system_clock::now(); + // sort by row count std::vector c_info_sorted{c_info}; thrust::sort(c_info_sorted.begin(), @@ -881,6 +880,9 @@ std::vector aggregate_reader_metadata::compute_splits(size cur_cumulative_size = aggregated_info[split_pos].size_bytes; } } + auto tend = std::chrono::system_clock::now(); + auto t = std::chrono::duration_cast(tend - tstart).count(); + printf("splits time %ldms\n", t); return splits; } From 5fc7aceaf01510bef2e61280f5a0f378c78df438 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 14:42:58 -0700 Subject: [PATCH 16/24] refactor to reuse compute_splits --- cpp/src/io/parquet/reader_impl.cpp | 2 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 140 +++-------------- cpp/src/io/parquet/reader_impl_helpers.hpp | 20 ++- cpp/src/io/parquet/reader_impl_preprocess.cu | 151 +++++++++---------- 4 files changed, 114 insertions(+), 199 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 3a8a75b0f26..626180d4534 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -233,7 +233,7 @@ reader::impl::impl(std::size_t chunk_read_limit, if (_chunk_read_limit > 0) { auto tstart = std::chrono::system_clock::now(); _metadata->populate_column_metadata(_input_columns, _sources); - _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit); + _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit, _stream); auto tend = std::chrono::system_clock::now(); auto t = std::chrono::duration_cast(tend - tstart).count(); printf("meta data time %ldms\n", t); diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 8cb24e3d0df..28d75b7d120 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -20,6 +20,8 @@ #include #include +#include + #include #include #include @@ -727,24 +729,19 @@ inline auto make_counting_transform_iterator(cudf::size_type start, UnaryFunctio return thrust::make_transform_iterator(thrust::make_counting_iterator(start), f); } -std::vector aggregate_reader_metadata::compute_splits(size_t chunk_read_limit) +std::vector aggregate_reader_metadata::compute_splits( + size_t chunk_read_limit, rmm::cuda_stream_view stream) { - std::vector splits; if (per_file_metadata[0].column_sizes.empty() or per_file_metadata[0].offset_indexes.empty()) { - return splits; + return {}; } - struct cumulative_row_info { - size_t row_count; // cumulative row count - size_t size_bytes; // cumulative size in bytes - int key; // schema index - }; - - std::vector page_sizes; - - // create page_keys and page_index + // need to replicate what's done in compute_splits() in reader_impl_preprocess.cu, but without + // the PageInfo data. + // create page_keys and page_index and populate page_sizes std::vector page_keys; std::vector page_index; + std::vector page_sizes; int global_page_idx = 0; for (size_t src_idx = 0; src_idx < per_file_metadata.size(); ++src_idx) { @@ -776,115 +773,26 @@ std::vector aggregate_reader_metadata::compute_splits(size thrust::stable_sort_by_key( page_keys.begin(), page_keys.end(), page_index.begin(), thrust::less()); - struct cumulative_row_sum { - cumulative_row_info operator()(cumulative_row_info const& a, cumulative_row_info const& b) const - { - return cumulative_row_info{a.row_count + b.row_count, a.size_bytes + b.size_bytes, a.key}; - } - }; - std::vector c_info(page_keys.size()); auto page_input = thrust::make_transform_iterator(page_index.begin(), [&](auto idx) { return page_sizes[idx]; }); - thrust::inclusive_scan_by_key(page_keys.begin(), - page_keys.end(), - page_input, - c_info.begin(), - thrust::equal_to{}, - cumulative_row_sum{}); - - // TODO: move page_keys, page_index, and c_info to device, and then use the code that already - // exists in the reader. - auto tstart = std::chrono::system_clock::now(); - - // sort by row count - std::vector c_info_sorted{c_info}; - thrust::sort(c_info_sorted.begin(), - c_info_sorted.end(), - [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { - return a.row_count < b.row_count; - }); - - // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per - // key - std::vector key_offsets(page_keys.size() + 1); - auto const key_offsets_end = thrust::reduce_by_key(page_keys.begin(), - page_keys.end(), - thrust::make_constant_iterator(1), - thrust::make_discard_iterator(), - key_offsets.begin()) - .second; - size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); - thrust::exclusive_scan(key_offsets.begin(), key_offsets.end(), key_offsets.begin()); - - struct row_total_size { - cumulative_row_info const* c_info; - size_type const* key_offsets; - size_t num_keys; - - cumulative_row_info operator()(cumulative_row_info const& i) - { - // sum sizes for each input column at this row - size_t sum = 0; - for (size_t idx = 0; idx < num_keys; idx++) { - auto const start = key_offsets[idx]; - auto const end = key_offsets[idx + 1]; - auto iter = - make_counting_transform_iterator(0, [&](size_type i) { return c_info[i].row_count; }); - auto const page_index = - thrust::lower_bound(thrust::seq, iter + start, iter + end, i.row_count) - iter; - sum += c_info[page_index].size_bytes; - } - return {i.row_count, sum, i.key}; - } - }; - - std::vector aggregated_info(c_info.size()); - thrust::transform(c_info_sorted.begin(), - c_info_sorted.end(), - aggregated_info.begin(), - row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); - - { - size_t cur_pos = 0; - size_t cur_cumulative_size = 0; - size_t cur_row_count = 0; - auto start = thrust::make_transform_iterator( - aggregated_info.begin(), - [&](cumulative_row_info const& i) { return i.size_bytes - cur_cumulative_size; }); - auto end = start + aggregated_info.size(); - while (cur_row_count < static_cast(num_rows)) { - int64_t split_pos = thrust::lower_bound(start + cur_pos, end, chunk_read_limit) - start; - - // if we're past the end, or if the returned bucket is > than the chunk_read_limit, move back - // one. - if (static_cast(split_pos) >= aggregated_info.size() || - (aggregated_info[split_pos].size_bytes - cur_cumulative_size > chunk_read_limit)) { - split_pos--; - } - - // best-try. if we can't find something that'll fit, we have to go bigger. we're doing this in - // a loop because all of the cumulative sizes for all the pages are sorted into one big list. - // so if we had two columns, both of which had an entry {1000, 10000}, that entry would be in - // the list twice. so we have to iterate until we skip past all of them. The idea is that we - // either do this, or we have to call unique() on the input first. - while (split_pos < (static_cast(aggregated_info.size()) - 1) && - (split_pos < 0 || aggregated_info[split_pos].row_count == cur_row_count)) { - split_pos++; - } + thrust::inclusive_scan_by_key( + page_keys.begin(), + page_keys.end(), + page_input, + c_info.begin(), + thrust::equal_to{}, + [](auto const& a, auto const& b) { + return cumulative_row_info{a.row_count + b.row_count, a.size_bytes + b.size_bytes, a.key}; + }); - auto const start_row = cur_row_count; - cur_row_count = aggregated_info[split_pos].row_count; - splits.push_back(gpu::chunk_read_info{start_row, cur_row_count - start_row}); - cur_pos = split_pos; - cur_cumulative_size = aggregated_info[split_pos].size_bytes; - } - } - auto tend = std::chrono::system_clock::now(); - auto t = std::chrono::duration_cast(tend - tstart).count(); - printf("splits time %ldms\n", t); + // now move to device and call parquet::compute_splits(). + auto const d_page_keys = cudf::detail::make_device_uvector_async(page_keys, stream); + auto const d_page_index = cudf::detail::make_device_uvector_async(page_index, stream); + auto const d_c_info = cudf::detail::make_device_uvector_async(c_info, stream); - return splits; + return parquet::compute_splits( + d_page_keys, d_page_index, d_c_info, num_rows, chunk_read_limit, stream); } } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 4f95a34c7b7..886aaee0bcf 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -47,6 +47,15 @@ using namespace cudf::io::parquet; : data_type{t_id}; } +/** + * @brief Struct used to compute splits for the chunked reader + */ +struct cumulative_row_info { + size_t row_count; // cumulative row count + size_t size_bytes; // cumulative size in bytes + int key; // schema index +}; + /** * @brief The row_group_info class */ @@ -203,7 +212,16 @@ class aggregate_reader_metadata { std::vector> const& sources); // use metadata to create {skip_rows,num_rows} pairs for the chunked reader - [[nodiscard]] std::vector compute_splits(size_t chunk_read_limit); + [[nodiscard]] std::vector compute_splits(size_t chunk_read_limit, + rmm::cuda_stream_view stream); }; +std::vector compute_splits( + rmm::device_uvector const& page_keys, + rmm::device_uvector const& page_index, + rmm::device_uvector const& c_info, + size_t num_rows, + size_t chunk_read_limit, + rmm::cuda_stream_view stream); + } // namespace cudf::io::detail::parquet diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index 18ef4de468b..c57e4fe9864 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -819,13 +819,6 @@ void reader::impl::load_and_decompress_data( namespace { -struct cumulative_row_info { - size_t row_count; // cumulative row count - size_t size_bytes; // cumulative size in bytes - int key; // schema index -}; - -//#define PREPROCESS_DEBUG 1 #if defined(PREPROCESS_DEBUG) void print_pages(hostdevice_vector& pages, rmm::cuda_stream_view _stream) { @@ -1102,9 +1095,7 @@ std::vector find_splits(std::vector c cur_cumulative_size = sizes[split_pos].size_bytes; } } -#if defined(PREPROCESS_DEBUG) - print_cumulative_row_info(sizes, "adjusted", splits); -#endif + // print_cumulative_row_info(sizes, "adjusted", splits); return splits; } @@ -1141,74 +1132,9 @@ std::vector compute_splits(hostdevice_vector h_c_info(c_info.size()); - CUDF_CUDA_TRY(cudaMemcpy(h_c_info.data(), - c_info.data(), - sizeof(cumulative_row_info) * c_info.size(), - cudaMemcpyDefault)); - print_cumulative_page_info(pages, page_index, c_info, stream); - print_cumulative_row_info(h_c_info, "og"); -#endif - - // sort by row count - rmm::device_uvector c_info_sorted{c_info, stream}; - thrust::sort(rmm::exec_policy(stream), - c_info_sorted.begin(), - c_info_sorted.end(), - [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { - return a.row_count < b.row_count; - }); -#if defined(PREPROCESS_DEBUG) - std::vector h_c_info_sorted(c_info_sorted.size()); - CUDF_CUDA_TRY(cudaMemcpy(h_c_info_sorted.data(), - c_info_sorted.data(), - sizeof(cumulative_row_info) * c_info_sorted.size(), - cudaMemcpyDefault)); - print_cumulative_row_info(h_c_info_sorted, "raw"); -#endif - - // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per - // key - rmm::device_uvector key_offsets(page_keys.size() + 1, stream); - auto const key_offsets_end = thrust::reduce_by_key(rmm::exec_policy(stream), - page_keys.begin(), - page_keys.end(), - thrust::make_constant_iterator(1), - thrust::make_discard_iterator(), - key_offsets.begin()) - .second; - size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); - thrust::exclusive_scan( - rmm::exec_policy(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); - - // adjust the cumulative info such that for each row count, the size includes any pages that span - // that row count. this is so that if we have this case: - // page row counts - // Column A: 0 <----> 100 <----> 200 - // Column B: 0 <---------------> 200 <--------> 400 - // | - // if we decide to split at row 100, we don't really know the actual amount of bytes in column B - // at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that - // page. - // - rmm::device_uvector aggregated_info(c_info.size(), stream); - thrust::transform(rmm::exec_policy(stream), - c_info_sorted.begin(), - c_info_sorted.end(), - aggregated_info.begin(), - row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); - - // bring back to the cpu - std::vector h_aggregated_info(aggregated_info.size()); - CUDF_CUDA_TRY(cudaMemcpyAsync(h_aggregated_info.data(), - aggregated_info.data(), - sizeof(cumulative_row_info) * c_info.size(), - cudaMemcpyDefault, - stream.value())); - stream.synchronize(); + // print_cumulative_page_info(pages, page_index, c_info, stream); - return find_splits(h_aggregated_info, num_rows, chunk_read_limit); + return compute_splits(page_keys, page_index, c_info, num_rows, chunk_read_limit, stream); } struct get_page_chunk_idx { @@ -1403,6 +1329,73 @@ void detect_malformed_pages(hostdevice_vector& pages, } // anonymous namespace +std::vector compute_splits( + rmm::device_uvector const& page_keys, + rmm::device_uvector const& page_index, + rmm::device_uvector const& c_info, + size_t num_rows, + size_t chunk_read_limit, + rmm::cuda_stream_view stream) +{ + // sort by row count + rmm::device_uvector c_info_sorted{c_info, stream}; + thrust::sort(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + [] __device__(cumulative_row_info const& a, cumulative_row_info const& b) { + return a.row_count < b.row_count; + }); + + // std::vector h_c_info_sorted(c_info_sorted.size()); + // CUDF_CUDA_TRY(cudaMemcpy(h_c_info_sorted.data(), + // c_info_sorted.data(), + // sizeof(cumulative_row_info) * c_info_sorted.size(), + // cudaMemcpyDefault)); + // print_cumulative_row_info(h_c_info_sorted, "raw"); + + // generate key offsets (offsets to the start of each partition of keys). worst case is 1 page per + // key + rmm::device_uvector key_offsets(page_keys.size() + 1, stream); + auto const key_offsets_end = thrust::reduce_by_key(rmm::exec_policy(stream), + page_keys.begin(), + page_keys.end(), + thrust::make_constant_iterator(1), + thrust::make_discard_iterator(), + key_offsets.begin()) + .second; + size_t const num_unique_keys = key_offsets_end - key_offsets.begin(); + thrust::exclusive_scan( + rmm::exec_policy(stream), key_offsets.begin(), key_offsets.end(), key_offsets.begin()); + + // adjust the cumulative info such that for each row count, the size includes any pages that span + // that row count. this is so that if we have this case: + // page row counts + // Column A: 0 <----> 100 <----> 200 + // Column B: 0 <---------------> 200 <--------> 400 + // | + // if we decide to split at row 100, we don't really know the actual amount of bytes in column B + // at that point. So we have to proceed as if we are taking the bytes from all 200 rows of that + // page. + // + rmm::device_uvector aggregated_info(c_info.size(), stream); + thrust::transform(rmm::exec_policy(stream), + c_info_sorted.begin(), + c_info_sorted.end(), + aggregated_info.begin(), + row_total_size{c_info.data(), key_offsets.data(), num_unique_keys}); + + // bring back to the cpu + std::vector h_aggregated_info(aggregated_info.size()); + CUDF_CUDA_TRY(cudaMemcpyAsync(h_aggregated_info.data(), + aggregated_info.data(), + sizeof(cumulative_row_info) * c_info.size(), + cudaMemcpyDefault, + stream.value())); + stream.synchronize(); + + return find_splits(h_aggregated_info, num_rows, chunk_read_limit); +} + void reader::impl::preprocess_pages(size_t skip_rows, size_t num_rows, bool uses_custom_row_bounds, @@ -1438,17 +1431,13 @@ void reader::impl::preprocess_pages(size_t skip_rows, pages.device_ptr() + pages.size(), page_keys.begin(), get_page_schema{}); - // page_keys: 1, 1, 2, 2, 3, 3, 1, 1, 2, 2, 3, 3 thrust::sequence(rmm::exec_policy(_stream), page_index.begin(), page_index.end()); - // page_index: 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 thrust::stable_sort_by_key(rmm::exec_policy(_stream), page_keys.begin(), page_keys.end(), page_index.begin(), thrust::less()); - // page_keys: 1, 1, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3 - // page_index: 0, 1, 6, 7, 2, 3, 8, 9, 4, 5, 10, 11 } // detect malformed columns. From 009616965cc6651097bf9f64db9b5bcb62be7cd8 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 14:45:27 -0700 Subject: [PATCH 17/24] cleanup --- cpp/src/io/parquet/reader_impl_preprocess.cu | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_preprocess.cu b/cpp/src/io/parquet/reader_impl_preprocess.cu index c57e4fe9864..4ee754e681a 100644 --- a/cpp/src/io/parquet/reader_impl_preprocess.cu +++ b/cpp/src/io/parquet/reader_impl_preprocess.cu @@ -1555,9 +1555,7 @@ void reader::impl::preprocess_pages(size_t skip_rows, // retrieve pages back pages.device_to_host(_stream, true); -#if defined(PREPROCESS_DEBUG) - print_pages(pages, _stream); -#endif + // print_pages(pages, _stream); } // compute splits if necessary. otherwise return a single split representing From a9bf41b7fde692299cf41b3507abe6f42ac469a5 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 15:31:23 -0700 Subject: [PATCH 18/24] post refactor cleanup --- cpp/src/io/parquet/parquet.hpp | 2 +- cpp/src/io/parquet/reader_impl.cpp | 4 --- cpp/src/io/parquet/reader_impl_helpers.cpp | 14 --------- cpp/src/io/parquet/reader_impl_helpers.hpp | 34 +++++++++++++++++++--- 4 files changed, 31 insertions(+), 23 deletions(-) diff --git a/cpp/src/io/parquet/parquet.hpp b/cpp/src/io/parquet/parquet.hpp index c7ca6da544a..9a20b48fc91 100644 --- a/cpp/src/io/parquet/parquet.hpp +++ b/cpp/src/io/parquet/parquet.hpp @@ -357,7 +357,7 @@ struct OffsetIndex { /** * @brief Thrift struct for column chunk size info. * - * Like PageSize, but for column chunks. Want to add this to OffsetIndex in the future. + * Want to add this to OffsetIndex in the future. */ struct ColumnChunkSize { int64_t chunk_size; // sum of page_sizes...no overhead diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 626180d4534..4fdacb9f193 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -231,12 +231,8 @@ reader::impl::impl(std::size_t chunk_read_limit, // filtering (or num_rows filtering). since we don't do the latter yet, just // test for the former. if (_chunk_read_limit > 0) { - auto tstart = std::chrono::system_clock::now(); _metadata->populate_column_metadata(_input_columns, _sources); _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit, _stream); - auto tend = std::chrono::system_clock::now(); - auto t = std::chrono::duration_cast(tend - tstart).count(); - printf("meta data time %ldms\n", t); if (not _meta_chunk_read_info.empty()) { _chunk_read_limit = 0; // don't need this since we already have splits } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 28d75b7d120..6478614bb96 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -22,11 +22,6 @@ #include -#include -#include -#include -#include -#include #include namespace cudf::io::detail::parquet { @@ -720,15 +715,6 @@ void aggregate_reader_metadata::populate_column_metadata( } } -// the following code intends to replicate compute_splits/find_splits using -// file metadata rather than having to read the pages. the should be refactored -// so both implementations can share code (if possible). -template -inline auto make_counting_transform_iterator(cudf::size_type start, UnaryFunction f) -{ - return thrust::make_transform_iterator(thrust::make_counting_iterator(start), f); -} - std::vector aggregate_reader_metadata::compute_splits( size_t chunk_read_limit, rmm::cuda_stream_view stream) { diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 886aaee0bcf..3dbd13da392 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -206,16 +206,42 @@ class aggregate_reader_metadata { bool strings_to_categorical, type_id timestamp_type_id) const; - // read per-file page index and column/page size thrift structures for the selected input - // columns - void populate_column_metadata(std::vector const&, + /** + * @brief Read per-file page index and size thrift structures for the selected input columns + * + * @param input_columns Columns selected by select_columns() + * @param sources List of datasources + */ + void populate_column_metadata(std::vector const& input_columns, std::vector> const& sources); - // use metadata to create {skip_rows,num_rows} pairs for the chunked reader + /** + * @brief Use file metadata to create {skip_rows, num_rows} pairs for the chunked reader. + * + * Each pair should generate output columns of total size <= `chunk_read_limit` bytes. + * + * @param chunk_read_limit Size to limit reads to. + * @param stream CUDA stream to use + * @return List of {skip_rows, num_rows} pairs + */ [[nodiscard]] std::vector compute_splits(size_t chunk_read_limit, rmm::cuda_stream_view stream); }; +/** + * @brief Generate {skip_rows, num_rows} pairs where each range will fit within a predefined limit + * + * Given a list of cumulative page sizes that have been computed by nesting level and + * a limit on total read size, generate a set of {skip_rows, num_rows} pairs representing + * a set of reads that will generate output columns of total size <= `chunk_read_limit` bytes. + * + * @param page_keys Schema index for each page, sorted + * @param page_index Global page index, sorted in same order as page_keys + * @param c_info Cumulative page size info + * @param num_rows Total number of rows to read + * @param chunk_read_limit Limit on total number of bytes to be returned per read, for all columns + * @param stream CUDA stream to use + */ std::vector compute_splits( rmm::device_uvector const& page_keys, rmm::device_uvector const& page_index, From 8eca6f4be155a7936f87face1550256b84b9a25b Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 17 Mar 2023 15:58:53 -0700 Subject: [PATCH 19/24] fix writer::impl::close --- cpp/src/io/parquet/writer_impl.cu | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index a7b71ccf50a..972abac91ba 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -2014,9 +2014,10 @@ std::unique_ptr> writer::impl::close( CompactProtocolWriter cpw(&buffer); file_ender_s fendr; - // experimental: write page size info ahead of other footer metadata - { - auto& fmd = md->file(p); + if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { + auto& fmd = md->file(p); + + // experimental: write page size info ahead of other footer metadata int chunkidx = 0; for (auto& r : fmd.row_groups) { for (auto& c : r.columns) { @@ -2030,12 +2031,9 @@ std::unique_ptr> writer::impl::close( out_sink_[p]->host_write(buffer.data(), buffer.size()); } } - } - if (stats_granularity_ == statistics_freq::STATISTICS_COLUMN) { - auto& fmd = md->file(p); // write column indices, updating column metadata along the way - int chunkidx = 0; + chunkidx = 0; for (auto& r : fmd.row_groups) { for (auto& c : r.columns) { auto const& index = fmd.column_indexes[chunkidx++]; From 3d4793460837803d13c74005f02450db4e28b0e4 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 21 Mar 2023 09:40:28 -0700 Subject: [PATCH 20/24] clean up copy paste cruft --- cpp/src/io/parquet/reader_impl_helpers.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 6478614bb96..1cdcd849935 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -675,13 +675,13 @@ void aggregate_reader_metadata::populate_column_metadata( if (len > 0) { ColumnChunkSize colsize; - const auto ci_buf = source->host_read(offset, len); - cudf::io::parquet::CompactProtocolReader cp(ci_buf->data(), ci_buf->size()); + const auto sz_buf = source->host_read(offset, len); + cudf::io::parquet::CompactProtocolReader cp(sz_buf->data(), sz_buf->size()); bool res = cp.read(&colsize); if (res) { metadata.column_sizes.push_back(colsize); } else { - metadata.column_sizes.push_back(ColumnChunkSize{0}); + metadata.column_sizes.push_back({0}); } } } @@ -694,7 +694,7 @@ void aggregate_reader_metadata::populate_column_metadata( if (res) { metadata.column_indexes.push_back(colidx); } else { - metadata.column_indexes.push_back(ColumnIndex{}); + metadata.column_indexes.push_back({}); } } @@ -706,7 +706,7 @@ void aggregate_reader_metadata::populate_column_metadata( if (res) { metadata.offset_indexes.push_back(offidx); } else { - metadata.offset_indexes.push_back(OffsetIndex{}); + metadata.offset_indexes.push_back({}); } } } From e9a4312e502f3e2b643babbdebdd02f19cb57436 Mon Sep 17 00:00:00 2001 From: seidl Date: Tue, 21 Mar 2023 09:49:57 -0700 Subject: [PATCH 21/24] need to pass memory resource to compute_splits --- cpp/src/io/parquet/reader_impl.cpp | 2 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 8 ++++---- cpp/src/io/parquet/reader_impl_helpers.hpp | 5 +++-- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 4fdacb9f193..2aac3b8402c 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -232,7 +232,7 @@ reader::impl::impl(std::size_t chunk_read_limit, // test for the former. if (_chunk_read_limit > 0) { _metadata->populate_column_metadata(_input_columns, _sources); - _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit, _stream); + _meta_chunk_read_info = _metadata->compute_splits(_chunk_read_limit, _stream, _mr); if (not _meta_chunk_read_info.empty()) { _chunk_read_limit = 0; // don't need this since we already have splits } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 1cdcd849935..04265b69baa 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -716,7 +716,7 @@ void aggregate_reader_metadata::populate_column_metadata( } std::vector aggregate_reader_metadata::compute_splits( - size_t chunk_read_limit, rmm::cuda_stream_view stream) + size_t chunk_read_limit, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { if (per_file_metadata[0].column_sizes.empty() or per_file_metadata[0].offset_indexes.empty()) { return {}; @@ -773,9 +773,9 @@ std::vector aggregate_reader_metadata::compute_splits( }); // now move to device and call parquet::compute_splits(). - auto const d_page_keys = cudf::detail::make_device_uvector_async(page_keys, stream); - auto const d_page_index = cudf::detail::make_device_uvector_async(page_index, stream); - auto const d_c_info = cudf::detail::make_device_uvector_async(c_info, stream); + auto const d_page_keys = cudf::detail::make_device_uvector_async(page_keys, stream, mr); + auto const d_page_index = cudf::detail::make_device_uvector_async(page_index, stream, mr); + auto const d_c_info = cudf::detail::make_device_uvector_async(c_info, stream, mr); return parquet::compute_splits( d_page_keys, d_page_index, d_c_info, num_rows, chunk_read_limit, stream); diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 3dbd13da392..e8890e0f457 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -222,10 +222,11 @@ class aggregate_reader_metadata { * * @param chunk_read_limit Size to limit reads to. * @param stream CUDA stream to use + * @param mr CUDA memory resource to use * @return List of {skip_rows, num_rows} pairs */ - [[nodiscard]] std::vector compute_splits(size_t chunk_read_limit, - rmm::cuda_stream_view stream); + [[nodiscard]] std::vector compute_splits( + size_t chunk_read_limit, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); }; /** From 7ff4033a6ab002089a2a55214178fc541f6fea7e Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 14 Apr 2023 10:15:24 -0700 Subject: [PATCH 22/24] post merge cleanup --- cpp/src/io/parquet/writer_impl.cu | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index ebe394efd32..8158154e3de 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1960,7 +1960,7 @@ void writer::impl::write(table_view const& table, std::vector co if (_stats_granularity == statistics_freq::STATISTICS_COLUMN) { // need pages on host to create offset_indexes - thrust::host_vector h_pages = cudf::detail::make_host_vector_sync(pages, stream); + thrust::host_vector h_pages = cudf::detail::make_host_vector_sync(pages, _stream); // add column and offset indexes to metadata for (auto b = 0, r = 0; b < static_cast(batch_list.size()); b++) { @@ -1984,9 +1984,10 @@ void writer::impl::write(table_view const& table, std::vector co _stream.value())); // experimental: add data sizes to parquet metadata. - auto cs = sizes_for_chunk( - single_streams_table.column(i), {h_pages.data() + curr_page_idx, ck.num_pages}, stream); - md->file(p).column_sizes.push_back(cs); + auto cs = sizes_for_chunk(single_streams_table.column(i), + {h_pages.data() + curr_page_idx, ck.num_pages}, + _stream); + _agg_meta->file(p).column_sizes.push_back(cs); // calculate offsets while the column index is transferring int64_t curr_pg_offset = column_chunk_meta.data_page_offset; @@ -2038,10 +2039,10 @@ std::unique_ptr> writer::impl::close( buffer.resize(0); int32_t len = cpw.write(sizes); c.meta_data.key_value_metadata.push_back(KeyValue{ - std::string(COL_META_SIZES_OFFSET), std::to_string(out_sink_[p]->bytes_written())}); + std::string(COL_META_SIZES_OFFSET), std::to_string(_out_sink[p]->bytes_written())}); c.meta_data.key_value_metadata.push_back( KeyValue{std::string(COL_META_SIZES_SIZE), std::to_string(len)}); - out_sink_[p]->host_write(buffer.data(), buffer.size()); + _out_sink[p]->host_write(buffer.data(), buffer.size()); } } From 5f41d2454279bd3d13c7edbdbccfc83e4c265258 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 16 Jun 2023 15:28:02 -0700 Subject: [PATCH 23/24] restore more stuff lost in merge --- cpp/src/io/parquet/writer_impl.cu | 35 ++++++++++++++++++++++++++++++ cpp/src/io/parquet/writer_impl.hpp | 2 ++ 2 files changed, 37 insertions(+) diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 949a2e906cc..40be24486c4 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -239,6 +239,37 @@ bool is_col_fixed_width(column_view const& column) return is_fixed_width(column.type()); } +// given a column_view and the encoded page info for the pages in that chunk, +// return a ColumnChunkSize object with the data sizes for each page. +ColumnChunkSize sizes_for_chunk(column_view const& col, + host_span col_pages, + rmm::cuda_stream_view stream) +{ + std::vector slice_offsets; + std::for_each(col_pages.begin(), col_pages.end(), [&](auto const& page) { + if (page.page_type == PageType::DATA_PAGE) { + slice_offsets.push_back(page.start_row); + slice_offsets.push_back(page.start_row + page.num_rows); + } + }); + + auto slices = cudf::slice(col, slice_offsets); + + std::vector page_sizes; + std::transform( + slices.begin(), slices.end(), std::back_inserter(page_sizes), [&stream](auto& slice) { + return column_size(slice, stream); + }); + int64_t chunk_size = std::reduce(page_sizes.begin(), page_sizes.end(), 0L); + + ColumnChunkSize cs{chunk_size}; + std::transform( + page_sizes.begin(), page_sizes.end(), std::back_inserter(cs.page_sizes), [](auto sz) { + return static_cast(sz); + }); + return cs; +} + /** * @brief Extends SchemaElement to add members required in constructing parquet_column_view * @@ -1957,6 +1988,7 @@ auto convert_table_to_parquet_data(table_input_metadata& table_meta, std::move(uncomp_bfr), std::move(comp_bfr), std::move(col_idx_bfr), + std::move(single_streams_table), std::move(bounce_buffer)}; } @@ -2060,6 +2092,7 @@ void writer::impl::write(table_view const& input, std::vector co uncomp_bfr, // unused, but contains data for later write to sink comp_bfr, // unused, but contains data for later write to sink col_idx_bfr, // unused, but contains data for later write to sink + single_streams_table, bounce_buffer] = [&] { try { return convert_table_to_parquet_data(*_table_meta, @@ -2098,6 +2131,7 @@ void writer::impl::write(table_view const& input, std::vector co first_rg_in_part, batch_list, rg_to_part, + single_streams_table, bounce_buffer); update_compression_statistics(comp_stats); @@ -2113,6 +2147,7 @@ void writer::impl::write_parquet_data_to_sink( host_span first_rg_in_part, host_span batch_list, host_span rg_to_part, + table_view const& single_streams_table, host_span bounce_buffer) { _agg_meta = std::move(updated_agg_meta); diff --git a/cpp/src/io/parquet/writer_impl.hpp b/cpp/src/io/parquet/writer_impl.hpp index a6c55e04b96..7b7e30a661e 100644 --- a/cpp/src/io/parquet/writer_impl.hpp +++ b/cpp/src/io/parquet/writer_impl.hpp @@ -136,6 +136,7 @@ class writer::impl { * @param first_rg_in_part The first rowgroup in each partition * @param batch_list The batches of rowgroups to encode * @param rg_to_part A map from rowgroup to partition + * @param single_streams_table input table converted to parquet columns * @param[out] bounce_buffer Temporary host output buffer */ void write_parquet_data_to_sink(std::unique_ptr& updated_agg_meta, @@ -145,6 +146,7 @@ class writer::impl { host_span first_rg_in_part, host_span batch_list, host_span rg_to_part, + table_view const& single_streams_table, host_span bounce_buffer); // Cuda stream to be used From 0fa655b80a06f16396a841af9f26ae797a684ed5 Mon Sep 17 00:00:00 2001 From: seidl Date: Fri, 28 Jul 2023 15:32:13 -0700 Subject: [PATCH 24/24] finish merge --- cpp/src/io/parquet/reader_impl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index ca1adf2bd4d..aeba740973e 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -478,12 +478,12 @@ table_with_metadata reader::impl::read_chunk() true /*uses_custom_row_bounds*/, {} /*row_group_indices, empty means read all row groups*/, std::nullopt /*filter*/); - return read_chunk_internal(true); + return read_chunk_internal(true, std::nullopt); } else { auto const& chunk_info = _meta_chunk_read_info[_current_read_chunk]; _file_preprocessed = false; prepare_data(chunk_info.skip_rows, chunk_info.num_rows, true, {}, std::nullopt); - return read_chunk_internal(true); + return read_chunk_internal(true, std::nullopt); } }