Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use page statistics in Parquet reader #14973

Merged
merged 29 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a76d7a1
use page statistics
etseidl Feb 5, 2024
679c67c
Merge remote-tracking branch 'origin/branch-24.04' into page_stats
etseidl Feb 5, 2024
b2952d5
remove fixme
etseidl Feb 5, 2024
94aa6d6
add read test
etseidl Feb 6, 2024
66ed34d
tweak metadata reading
etseidl Feb 6, 2024
0616ff4
Merge branch 'branch-24.04' into page_stats
etseidl Feb 6, 2024
c8f07e5
Merge branch 'rapidsai:branch-24.04' into page_stats
etseidl Feb 14, 2024
6bd23d2
Merge branch 'branch-24.04' into page_stats
etseidl Feb 17, 2024
09c4a06
Merge branch 'branch-24.04' into page_stats
etseidl Feb 20, 2024
ef83bf3
Merge branch 'rapidsai:branch-24.04' into page_stats
etseidl Feb 21, 2024
642103b
Merge branch 'rapidsai:branch-24.04' into page_stats
etseidl Feb 22, 2024
2688627
Merge branch 'rapidsai:branch-24.04' into page_stats
etseidl Feb 24, 2024
a840850
Merge branch 'branch-24.04' into page_stats
etseidl Feb 26, 2024
c41b817
reduce naming confusion
etseidl Feb 27, 2024
ce4d58e
forgot some
etseidl Feb 27, 2024
c23783b
Merge branch 'branch-24.04' into page_stats
vuule Feb 27, 2024
96175d4
clean up comment
etseidl Feb 27, 2024
d24a6f8
Merge branch 'page_stats' of github.com:etseidl/cudf into page_stats
etseidl Feb 27, 2024
3de4261
Merge branch 'rapidsai:branch-24.04' into page_stats
etseidl Feb 27, 2024
cc9929c
Merge branch 'branch-24.04' into page_stats
etseidl Feb 28, 2024
76fc5a7
Merge branch 'branch-24.04' into page_stats
etseidl Feb 29, 2024
b71fc63
Merge remote-tracking branch 'origin/branch-24.04' into page_stats
etseidl Mar 1, 2024
cd65580
need to push chunks to device when using page index
etseidl Mar 1, 2024
5fc0186
Merge branch 'branch-24.04' into page_stats
etseidl Mar 1, 2024
b4d863a
Merge branch 'branch-24.04' into page_stats
etseidl Mar 4, 2024
2ee6ee2
Merge branch 'branch-24.04' into page_stats
etseidl Mar 5, 2024
7c69d4f
Merge branch 'branch-24.04' into page_stats
etseidl Mar 6, 2024
4570e2f
implement suggestion from review
etseidl Mar 6, 2024
ca078e1
Merge branch 'branch-24.04' into page_stats
PointKernel Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions cpp/src/io/parquet/decode_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,10 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
if (!t) {
s->page.skipped_values = -1;
s->page.skipped_leaf_values = 0;
s->page.str_bytes = 0;
s->input_row_count = 0;
s->input_value_count = 0;
// str_bytes_from_index will be 0 if no page stats are present
s->page.str_bytes = s->page.str_bytes_from_index;
s->input_row_count = 0;
s->input_value_count = 0;

// in the base pass, we're computing the number of rows, make sure we visit absolutely
// everything
Expand Down Expand Up @@ -462,7 +463,7 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size)
}

// retrieve total string size.
if (compute_string_sizes) {
if (compute_string_sizes && !pp->has_page_index) {
auto const str_bytes = gpuDecodeTotalPageStringSize(s, t);
if (t == 0) { s->page.str_bytes = str_bytes; }
}
Expand Down
21 changes: 13 additions & 8 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -385,14 +385,19 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
// this computation is only valid for flat schemas. for nested schemas,
// they will be recomputed in the preprocess step by examining repetition and
// definition levels
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
bs->page.temp_string_size = 0;
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
bs->page.str_bytes_from_index = 0;
bs->page.num_valids = 0;
bs->page.start_val = 0;
bs->page.end_val = 0;
bs->page.has_page_index = false;
bs->page.temp_string_size = 0;
bs->page.temp_string_buf = nullptr;
bs->page.kernel_mask = decode_kernel_mask::NONE;
}
num_values = bs->ck.num_values;
page_info = chunk_pages ? chunk_pages[chunk].pages : nullptr;
Expand Down
34 changes: 31 additions & 3 deletions cpp/src/io/parquet/page_string_decode.cu
Original file line number Diff line number Diff line change
Expand Up @@ -599,10 +599,12 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo
PageInfo* const pp = &pages[page_idx];

if (t == 0) {
s->page.num_nulls = 0;
s->page.num_valids = 0;
// don't clobber these if they're already computed from the index
if (!pp->has_page_index) {
s->page.num_nulls = 0;
s->page.num_valids = 0;
}
// reset str_bytes to 0 in case it's already been calculated (esp needed for chunked reads).
// TODO: need to rethink this once str_bytes is in the statistics
pp->str_bytes = 0;
}

Expand Down Expand Up @@ -632,6 +634,9 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputeStringPageBo

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) { return; }

// find start/end value indices
auto const [start_value, end_value] =
page_bounds(s, min_row, num_rows, is_bounds_pg, has_repetition, decoders);
Expand Down Expand Up @@ -698,6 +703,15 @@ CUDF_KERNEL void __launch_bounds__(delta_preproc_block_size) gpuComputeDeltaPage
}
}
} else {
bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

// now process string info in the range [start_value, end_value)
// set up for decoding strings...can be either plain or dictionary
uint8_t const* data = s->data_start;
Expand Down Expand Up @@ -759,6 +773,13 @@ CUDF_KERNEL void __launch_bounds__(delta_length_block_size) gpuComputeDeltaLengt

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

// for DELTA_LENGTH_BYTE_ARRAY, string size is page_data_size - size_of_delta_binary_block.
// so all we need to do is skip the encoded string size info and then do pointer arithmetic,
// if this isn't a bounds page.
Expand Down Expand Up @@ -850,6 +871,13 @@ CUDF_KERNEL void __launch_bounds__(preprocess_block_size) gpuComputePageStringSi

bool const is_bounds_pg = is_bounds_page(s, min_row, num_rows, has_repetition);

// if we have size info, then we only need to do this for bounds pages
if (pp->has_page_index && !is_bounds_pg) {
// check if we need to store values from the index
if (is_page_contained(s, min_row, num_rows)) { pp->str_bytes = pp->str_bytes_from_index; }
return;
}

auto const& col = s->col;
size_t str_bytes = 0;
// short circuit for FIXED_LEN_BYTE_ARRAY
Expand Down
15 changes: 14 additions & 1 deletion cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,8 @@ struct PageInfo {
// for string columns only, the size of all the chars in the string for
// this page. only valid/computed during the base preprocess pass
int32_t str_bytes;
int32_t str_offset; // offset into string data for this page
int32_t str_offset; // offset into string data for this page
bool has_page_index; // true if str_bytes, num_valids, etc are derivable from page indexes

// nesting information (input/output) for each page. this array contains
// input column nesting information, output column nesting information and
Expand All @@ -335,8 +336,15 @@ struct PageInfo {
uint8_t* temp_string_buf;

decode_kernel_mask kernel_mask;

// str_bytes from page index. because str_bytes needs to be reset each iteration
// while doing chunked reads, persist the value from the page index here.
int32_t str_bytes_from_index;
};

// forward declaration
struct column_chunk_info;

/**
* @brief Return the column schema id as the key for a PageInfo struct.
*/
Expand Down Expand Up @@ -376,6 +384,7 @@ struct ColumnChunkDesc {
int32_t ts_clock_rate_,
int32_t src_col_index_,
int32_t src_col_schema_,
column_chunk_info const* chunk_info_,
float list_bytes_per_row_est_)
: compressed_data(compressed_data_),
compressed_size(compressed_size_),
Expand All @@ -400,6 +409,7 @@ struct ColumnChunkDesc {
ts_clock_rate(ts_clock_rate_),
src_col_index(src_col_index_),
src_col_schema(src_col_schema_),
h_chunk_info(chunk_info_),
list_bytes_per_row_est(list_bytes_per_row_est_)
{
}
Expand Down Expand Up @@ -430,6 +440,9 @@ struct ColumnChunkDesc {
int32_t src_col_index{}; // my input column index
int32_t src_col_schema{}; // my schema index in the file

// pointer to column_chunk_info struct for this chunk (host only)
column_chunk_info const* h_chunk_info{};

float list_bytes_per_row_est{}; // for LIST columns, an estimate on number of bytes per row
};

Expand Down
29 changes: 19 additions & 10 deletions cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

namespace cudf::io::parquet::detail {

void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
void reader::impl::decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows)
{
auto& pass = *_pass_itm_data;
auto& subpass = *pass.subpass;
Expand Down Expand Up @@ -62,14 +62,23 @@ void reader::impl::decode_page_data(size_t skip_rows, size_t num_rows)
auto const has_strings = (kernel_mask & STRINGS_MASK) != 0;
std::vector<size_t> col_string_sizes(_input_columns.size(), 0L);
if (has_strings) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
skip_rows,
num_rows,
level_type_size,
kernel_mask,
_stream);
// need to compute pages bounds/sizes if we lack page indexes or are using custom bounds
// TODO: we could probably dummy up size stats for FLBA data since we know the width
auto const has_flba =
std::any_of(pass.chunks.begin(), pass.chunks.end(), [](auto const& chunk) {
return (chunk.data_type & 7) == FIXED_LEN_BYTE_ARRAY && chunk.converted_type != DECIMAL;
});

if (!_has_page_index || uses_custom_row_bounds || has_flba) {
ComputePageStringSizes(subpass.pages,
pass.chunks,
delta_temp_buf,
skip_rows,
num_rows,
level_type_size,
kernel_mask,
_stream);
}

col_string_sizes = calculate_page_string_offsets();

Expand Down Expand Up @@ -426,7 +435,7 @@ table_with_metadata reader::impl::read_chunk_internal(
allocate_columns(read_info.skip_rows, read_info.num_rows, uses_custom_row_bounds);

// Parse data into the output buffers.
decode_page_data(read_info.skip_rows, read_info.num_rows);
decode_page_data(uses_custom_row_bounds, read_info.skip_rows, read_info.num_rows);

// Create the final output cudf columns.
for (size_t i = 0; i < _output_buffers.size(); ++i) {
Expand Down
8 changes: 7 additions & 1 deletion cpp/src/io/parquet/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,12 @@ class reader::impl {
/**
* @brief Converts the page data and outputs to columns.
*
* @param uses_custom_row_bounds Whether or not num_rows and skip_rows represents user-specific
* bounds
* @param skip_rows Minimum number of rows from start
* @param num_rows Number of rows to output
*/
void decode_page_data(size_t skip_rows, size_t num_rows);
void decode_page_data(bool uses_custom_row_bounds, size_t skip_rows, size_t num_rows);

/**
* @brief Creates file-wide parquet chunk information.
Expand Down Expand Up @@ -365,6 +367,10 @@ class reader::impl {
std::unique_ptr<table_metadata> _output_metadata;

bool _strings_to_categorical = false;

// are there usable page indexes available
bool _has_page_index = false;

std::optional<std::vector<reader_column_schema>> _reader_column_schema;
data_type _timestamp_type{type_id::EMPTY};

Expand Down
27 changes: 27 additions & 0 deletions cpp/src/io/parquet/reader_impl_chunking.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1476,6 +1476,28 @@ void reader::impl::create_global_chunk_info()
auto const num_input_columns = _input_columns.size();
auto const num_chunks = row_groups_info.size() * num_input_columns;

// Mapping of input column to page index column
std::vector<size_type> column_mapping;

if (_has_page_index and not row_groups_info.empty()) {
// use first row group to define mappings (assumes same schema for each file)
auto const& rg = row_groups_info[0];
auto const& columns = _metadata->get_row_group(rg.index, rg.source_index).columns;
column_mapping.resize(num_input_columns);
std::transform(
_input_columns.begin(), _input_columns.end(), column_mapping.begin(), [&](auto const& col) {
// translate schema_idx into something we can use for the page indexes
if (auto it = std::find_if(
columns.begin(),
columns.end(),
[&col](auto const& col_chunk) { return col_chunk.schema_idx == col.schema_idx; });
it != columns.end()) {
return std::distance(columns.begin(), it);
}
CUDF_FAIL("cannot find column mapping");
});
}

// Initialize column chunk information
auto remaining_rows = num_rows;
for (auto const& rg : row_groups_info) {
Expand Down Expand Up @@ -1505,6 +1527,10 @@ void reader::impl::create_global_chunk_info()
static_cast<float>(row_group.num_rows)
: 0.0f;

// grab the column_chunk_info for each chunk (if it exists)
column_chunk_info const* const chunk_info =
_has_page_index ? &rg.column_chunks.value()[column_mapping[i]] : nullptr;

chunks.push_back(ColumnChunkDesc(col_meta.total_compressed_size,
nullptr,
col_meta.num_values,
Expand All @@ -1524,6 +1550,7 @@ void reader::impl::create_global_chunk_info()
clock_rate,
i,
col.schema_idx,
chunk_info,
list_bytes_per_row_est));
}

Expand Down
Loading
Loading