From cb10a741329a009bbe898c9d1a08707468d65973 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Tue, 6 Sep 2022 14:25:25 -0500 Subject: [PATCH 1/2] Revert "Remove support for skip_rows / num_rows options in the parquet reader. (#11503)" This reverts commit d39b957fea7a2d4b501a88ca0ee5c879c2f22067. --- cpp/benchmarks/io/parquet/parquet_reader.cpp | 209 ++++++++++++ cpp/include/cudf/io/parquet.hpp | 74 +++++ cpp/src/io/parquet/page_data.cu | 332 ++++++++++++++----- cpp/src/io/parquet/parquet_gpu.hpp | 20 ++ cpp/src/io/parquet/reader_impl.cu | 69 ++-- cpp/src/io/parquet/reader_impl.hpp | 20 +- cpp/tests/io/parquet_test.cpp | 207 ++++++++++++ 7 files changed, 825 insertions(+), 106 deletions(-) create mode 100644 cpp/benchmarks/io/parquet/parquet_reader.cpp diff --git a/cpp/benchmarks/io/parquet/parquet_reader.cpp b/cpp/benchmarks/io/parquet/parquet_reader.cpp new file mode 100644 index 00000000000..c66f0af2b76 --- /dev/null +++ b/cpp/benchmarks/io/parquet/parquet_reader.cpp @@ -0,0 +1,209 @@ +/* + * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include + +#include + +// to enable, run cmake with -DBUILD_BENCHMARKS=ON + +constexpr size_t data_size = 512 << 20; +constexpr cudf::size_type num_cols = 64; + +namespace cudf_io = cudf::io; + +class ParquetRead : public cudf::benchmark { +}; + +void BM_parq_read_varying_input(benchmark::State& state) +{ + auto const data_types = get_type_or_group(state.range(0)); + cudf::size_type const cardinality = state.range(1); + cudf::size_type const run_length = state.range(2); + cudf_io::compression_type const compression = + state.range(3) ? cudf_io::compression_type::SNAPPY : cudf_io::compression_type::NONE; + auto const source_type = static_cast(state.range(4)); + + data_profile table_data_profile; + table_data_profile.set_cardinality(cardinality); + table_data_profile.set_avg_run_length(run_length); + auto const tbl = create_random_table( + cycle_dtypes(data_types, num_cols), table_size_bytes{data_size}, table_data_profile); + auto const view = tbl->view(); + + cuio_source_sink_pair source_sink(source_type); + cudf_io::parquet_writer_options write_opts = + cudf_io::parquet_writer_options::builder(source_sink.make_sink_info(), view) + .compression(compression); + cudf_io::write_parquet(write_opts); + + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(source_sink.make_source_info()); + + auto mem_stats_logger = cudf::memory_stats_logger(); + for (auto _ : state) { + try_drop_l3_cache(); + cuda_event_timer const raii(state, true); // flush_l2_cache = true, stream = 0 + cudf_io::read_parquet(read_opts); + } + + state.SetBytesProcessed(data_size * state.iterations()); + state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); + state.counters["encoded_file_size"] = source_sink.size(); +} + +std::vector get_col_names(cudf::io::source_info const& source) +{ + cudf_io::parquet_reader_options const read_options = + cudf_io::parquet_reader_options::builder(source).num_rows(1); + return cudf_io::read_parquet(read_options).metadata.column_names; +} + +void BM_parq_read_varying_options(benchmark::State& state) +{ + auto state_idx = 0; + auto const col_sel = static_cast(state.range(state_idx++)); + auto const row_sel = static_cast(state.range(state_idx++)); + auto const num_chunks = state.range(state_idx++); + + auto const flags = state.range(state_idx++); + auto const str_to_categories = (flags & 1) != 0; + auto const use_pandas_metadata = (flags & 2) != 0; + auto const ts_type = cudf::data_type{static_cast(state.range(state_idx++))}; + + // No nested types here, because of https://github.com/rapidsai/cudf/issues/9970 + auto const data_types = dtypes_for_column_selection( + get_type_or_group({static_cast(type_group_id::INTEGRAL), + static_cast(type_group_id::FLOATING_POINT), + static_cast(type_group_id::FIXED_POINT), + static_cast(type_group_id::TIMESTAMP), + static_cast(type_group_id::DURATION), + static_cast(cudf::type_id::STRING)}), + col_sel); + auto const tbl = create_random_table(data_types, table_size_bytes{data_size}); + auto const view = tbl->view(); + + cuio_source_sink_pair source_sink(io_type::HOST_BUFFER); + cudf_io::parquet_writer_options options = + cudf_io::parquet_writer_options::builder(source_sink.make_sink_info(), view); + cudf_io::write_parquet(options); + + auto const cols_to_read = + select_column_names(get_col_names(source_sink.make_source_info()), col_sel); + cudf_io::parquet_reader_options read_options = + cudf_io::parquet_reader_options::builder(source_sink.make_source_info()) + .columns(cols_to_read) + .convert_strings_to_categories(str_to_categories) + .use_pandas_metadata(use_pandas_metadata) + .timestamp_type(ts_type); + + auto const num_row_groups = data_size / (128 << 20); + cudf::size_type const chunk_row_cnt = view.num_rows() / num_chunks; + auto mem_stats_logger = cudf::memory_stats_logger(); + for (auto _ : state) { + try_drop_l3_cache(); + cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 + + cudf::size_type rows_read = 0; + for (int32_t chunk = 0; chunk < num_chunks; ++chunk) { + auto const is_last_chunk = chunk == (num_chunks - 1); + switch (row_sel) { + case row_selection::ALL: break; + case row_selection::ROW_GROUPS: { + auto row_groups_to_read = segments_in_chunk(num_row_groups, num_chunks, chunk); + if (is_last_chunk) { + // Need to assume that an additional "overflow" row group is present + row_groups_to_read.push_back(num_row_groups); + } + read_options.set_row_groups({row_groups_to_read}); + } break; + case row_selection::NROWS: + read_options.set_skip_rows(chunk * chunk_row_cnt); + read_options.set_num_rows(chunk_row_cnt); + if (is_last_chunk) read_options.set_num_rows(-1); + break; + default: CUDF_FAIL("Unsupported row selection method"); + } + + rows_read += cudf_io::read_parquet(read_options).tbl->num_rows(); + } + + CUDF_EXPECTS(rows_read == view.num_rows(), "Benchmark did not read the entire table"); + } + + auto const data_processed = data_size * cols_to_read.size() / view.num_columns(); + state.SetBytesProcessed(data_processed * state.iterations()); + state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); + state.counters["encoded_file_size"] = source_sink.size(); +} + +#define PARQ_RD_BM_INPUTS_DEFINE(name, type_or_group, src_type) \ + BENCHMARK_DEFINE_F(ParquetRead, name) \ + (::benchmark::State & state) { BM_parq_read_varying_input(state); } \ + BENCHMARK_REGISTER_F(ParquetRead, name) \ + ->ArgsProduct({{int32_t(type_or_group)}, {0, 1000}, {1, 32}, {true, false}, {src_type}}) \ + ->Unit(benchmark::kMillisecond) \ + ->UseManualTime(); + +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, integral, type_group_id::INTEGRAL); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, floats, type_group_id::FLOATING_POINT); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, decimal, type_group_id::FIXED_POINT); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, timestamps, type_group_id::TIMESTAMP); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, durations, type_group_id::DURATION); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, string, cudf::type_id::STRING); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, list, cudf::type_id::LIST); +RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, struct, cudf::type_id::STRUCT); + +BENCHMARK_DEFINE_F(ParquetRead, column_selection) +(::benchmark::State& state) { BM_parq_read_varying_options(state); } +BENCHMARK_REGISTER_F(ParquetRead, column_selection) + ->ArgsProduct({{int32_t(column_selection::ALL), + int32_t(column_selection::ALTERNATE), + int32_t(column_selection::FIRST_HALF), + int32_t(column_selection::SECOND_HALF)}, + {int32_t(row_selection::ALL)}, + {1}, + {0b01}, // defaults + {int32_t(cudf::type_id::EMPTY)}}) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +// row_selection::ROW_GROUPS disabled until we add an API to read metadata from a parquet file and +// determine num row groups. https://github.com/rapidsai/cudf/pull/9963#issuecomment-1004832863 +BENCHMARK_DEFINE_F(ParquetRead, row_selection) +(::benchmark::State& state) { BM_parq_read_varying_options(state); } +BENCHMARK_REGISTER_F(ParquetRead, row_selection) + ->ArgsProduct({{int32_t(column_selection::ALL)}, + {int32_t(row_selection::NROWS)}, + {1, 4}, + {0b01}, // defaults + {int32_t(cudf::type_id::EMPTY)}}) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); + +BENCHMARK_DEFINE_F(ParquetRead, misc_options) +(::benchmark::State& state) { BM_parq_read_varying_options(state); } +BENCHMARK_REGISTER_F(ParquetRead, misc_options) + ->ArgsProduct({{int32_t(column_selection::ALL)}, + {int32_t(row_selection::NROWS)}, + {1}, + {0b01, 0b00, 0b11, 0b010}, + {int32_t(cudf::type_id::EMPTY), int32_t(cudf::type_id::TIMESTAMP_NANOSECONDS)}}) + ->Unit(benchmark::kMillisecond) + ->UseManualTime(); diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index 5d95bf9812f..ff5b9f5c457 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -57,6 +57,10 @@ class parquet_reader_options { // List of individual row groups to read (ignored if empty) std::vector> _row_groups; + // Number of rows to skip from the start + size_type _skip_rows = 0; + // Number of rows to read; -1 is all + size_type _num_rows = -1; // Whether to store string data as categorical type bool _convert_strings_to_categories = false; @@ -127,6 +131,20 @@ class parquet_reader_options { return _reader_column_schema; } + /** + * @brief Returns number of rows to skip from the start. + * + * @return Number of rows to skip from the start + */ + [[nodiscard]] size_type get_skip_rows() const { return _skip_rows; } + + /** + * @brief Returns number of rows to read. + * + * @return Number of rows to read + */ + [[nodiscard]] size_type get_num_rows() const { return _num_rows; } + /** * @brief Returns names of column to be read, if set. * @@ -162,6 +180,10 @@ class parquet_reader_options { */ void set_row_groups(std::vector> row_groups) { + if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) { + CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows"); + } + _row_groups = std::move(row_groups); } @@ -190,6 +212,34 @@ class parquet_reader_options { _reader_column_schema = std::move(val); } + /** + * @brief Sets number of rows to skip. + * + * @param val Number of rows to skip from start + */ + void set_skip_rows(size_type val) + { + if ((val != 0) and (!_row_groups.empty())) { + CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups"); + } + + _skip_rows = val; + } + + /** + * @brief Sets number of rows to read. + * + * @param val Number of rows to read after skip + */ + void set_num_rows(size_type val) + { + if ((val != -1) and (!_row_groups.empty())) { + CUDF_FAIL("num_rows can't be set along with a non-empty row_groups"); + } + + _num_rows = val; + } + /** * @brief Sets timestamp_type used to cast timestamp columns. * @@ -279,6 +329,30 @@ class parquet_reader_options_builder { return *this; } + /** + * @brief Sets number of rows to skip. + * + * @param val Number of rows to skip from start + * @return this for chaining + */ + parquet_reader_options_builder& skip_rows(size_type val) + { + options.set_skip_rows(val); + return *this; + } + + /** + * @brief Sets number of rows to read. + * + * @param val Number of rows to read after skip + * @return this for chaining + */ + parquet_reader_options_builder& num_rows(size_type val) + { + options.set_num_rows(val); + return *this; + } + /** * @brief timestamp_type used to cast timestamp columns. * diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu index 424882f45bf..531733a7df7 100644 --- a/cpp/src/io/parquet/page_data.cu +++ b/cpp/src/io/parquet/page_data.cu @@ -56,13 +56,15 @@ struct page_state_s { const uint8_t* data_start; const uint8_t* data_end; const uint8_t* lvl_end; - const uint8_t* dict_base; // ptr to dictionary page data - int32_t dict_size; // size of dictionary data - int32_t num_rows; // Rows in page to decode - int32_t num_input_values; // total # of input/level values in the page - int32_t dtype_len; // Output data type length - int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit - int32_t dict_bits; // # of bits to store dictionary indices + const uint8_t* dict_base; // ptr to dictionary page data + int32_t dict_size; // size of dictionary data + int32_t first_row; // First row in page to output + int32_t num_rows; // Rows in page to decode (including rows to be skipped) + int32_t first_output_value; // First value in page to output + int32_t num_input_values; // total # of input/level values in the page + int32_t dtype_len; // Output data type length + int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit + int32_t dict_bits; // # of bits to store dictionary indices uint32_t dict_run; int32_t dict_val; uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep] @@ -88,6 +90,7 @@ struct page_state_s { uint32_t def[non_zero_buffer_size]; // circular buffer of definition level values const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep] int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded + int32_t row_index_lower_bound; // lower bound of row indices we should process }; /** @@ -811,14 +814,17 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s, * @param[in, out] s The local page state to be filled in * @param[in] p The global page to be copied from * @param[in] chunks The global list of chunks - * @param[in] num_rows Maximum number of rows to process + * @param[in] num_rows Maximum number of rows to read + * @param[in] min_row Crop all rows below min_row */ static __device__ bool setupLocalPageInfo(page_state_s* const s, PageInfo const* p, device_span chunks, + size_t min_row, size_t num_rows) { - int const t = threadIdx.x; + int t = threadIdx.x; + int chunk_idx; // Fetch page info if (t == 0) s->page = *p; @@ -826,7 +832,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; } // Fetch column chunk info - int const chunk_idx = s->page.chunk_idx; + chunk_idx = s->page.chunk_idx; if (t == 0) { s->col = chunks[chunk_idx]; } // zero nested value and valid counts @@ -847,18 +853,19 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // our starting row (absolute index) is // col.start_row == absolute row index // page.chunk-row == relative row index within the chunk - size_t const page_start_row = s->col.start_row + s->page.chunk_row; + size_t page_start_row = s->col.start_row + s->page.chunk_row; // IMPORTANT : nested schemas can have 0 rows in a page but still have // values. The case is: // - On page N-1, the last row starts, with 2/6 values encoded // - On page N, the remaining 4/6 values are encoded, but there are no new rows. + // if (s->page.num_input_values > 0 && s->page.num_rows > 0) { if (s->page.num_input_values > 0) { - uint8_t const* cur = s->page.page_data; - uint8_t const* const end = cur + s->page.uncompressed_page_size; + uint8_t* cur = s->page.page_data; + uint8_t* end = cur + s->page.uncompressed_page_size; - uint32_t const dtype_len_out = s->col.data_type >> 3; - s->ts_scale = 0; + uint32_t dtype_len_out = s->col.data_type >> 3; + s->ts_scale = 0; // Validate data type auto const data_type = s->col.data_type & 7; switch (data_type) { @@ -907,10 +914,17 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dtype_len = 8; // Convert to 64-bit timestamp } + // first row within the page to output + if (page_start_row >= min_row) { + s->first_row = 0; + } else { + s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows); + } // # of rows within the page to output s->num_rows = s->page.num_rows; - if (page_start_row + s->num_rows > num_rows) { - s->num_rows = (int32_t)max((int64_t)(num_rows - page_start_row), INT64_C(0)); + if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) { + s->num_rows = + (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0)); } // during the decoding step we need to offset the global output buffers @@ -919,18 +933,25 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, // - for flat schemas, we can do this directly by using row counts // - for nested schemas, these offsets are computed during the preprocess step if (s->col.column_data_base != nullptr) { - int const max_depth = s->col.max_nesting_depth; + int max_depth = s->col.max_nesting_depth; for (int idx = 0; idx < max_depth; idx++) { PageNestingInfo* pni = &s->page.nesting[idx]; - size_t const output_offset = - s->col.max_level[level_type::REPETITION] == 0 ? page_start_row : pni->page_start_value; + size_t output_offset; + // schemas without lists + if (s->col.max_level[level_type::REPETITION] == 0) { + output_offset = page_start_row >= min_row ? page_start_row - min_row : 0; + } + // for schemas with lists, we've already got the exactly value precomputed + else { + output_offset = pni->page_start_value; + } pni->data_out = static_cast(s->col.column_data_base[idx]); if (pni->data_out != nullptr) { // anything below max depth with a valid data pointer must be a list, so the // element size is the size of the offset type. - uint32_t const len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; + uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len; pni->data_out += (output_offset * len); } pni->valid_map = s->col.valid_map_base[idx]; @@ -940,6 +961,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, } } } + s->first_output_value = 0; // Find the compressed size of repetition levels cur += InitLevelSection(s, cur, end, level_type::REPETITION); @@ -992,9 +1014,53 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s, s->dict_pos = 0; s->src_pos = 0; - s->input_value_count = 0; - s->input_row_count = 0; - s->input_leaf_count = 0; + // for flat hierarchies, we can't know how many leaf values to skip unless we do a full + // preprocess of the definition levels (since nulls will have no actual decodable value, there + // is no direct correlation between # of rows and # of decodable values). so we will start + // processing at the beginning of the value stream and disregard any indices that start + // before the first row. + if (s->col.max_level[level_type::REPETITION] == 0) { + s->page.skipped_values = 0; + s->page.skipped_leaf_values = 0; + s->input_value_count = 0; + s->input_row_count = 0; + + s->row_index_lower_bound = -1; + } + // for nested hierarchies, we have run a preprocess that lets us skip directly to the values + // we need to start decoding at + else { + // input_row_count translates to "how many rows we have processed so far", so since we are + // skipping directly to where we want to start decoding, set it to first_row + s->input_row_count = s->first_row; + + // return the lower bound to compare (page-relative) thread row index against. Explanation: + // In the case of nested schemas, rows can span page boundaries. That is to say, + // we can encounter the first value for row X on page M, but the last value for page M + // might not be the last value for row X. page M+1 (or further) may contain the last value. + // + // This means that the first values we encounter for a given page (M+1) may not belong to the + // row indicated by chunk_row, but to the row before it that spanned page boundaries. If that + // previous row is within the overall row bounds, include the values by allowing relative row + // index -1 + int const max_row = (min_row + num_rows) - 1; + if (min_row < page_start_row && max_row >= page_start_row - 1) { + s->row_index_lower_bound = -1; + } else { + s->row_index_lower_bound = s->first_row; + } + + // if we're in the decoding step, jump directly to the first + // value we care about + if (s->col.column_data_base != nullptr) { + s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0; + } else { + s->input_value_count = 0; + s->input_leaf_count = 0; + s->page.skipped_values = -1; + s->page.skipped_leaf_values = -1; + } + } __threadfence_block(); } @@ -1140,7 +1206,10 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); input_row_count += __popc(warp_row_count_mask); // is this thread within read row bounds? - int const in_row_bounds = thread_row_index < s->num_rows; + int const in_row_bounds = thread_row_index >= s->row_index_lower_bound && + thread_row_index < (s->first_row + s->num_rows) + ? 1 + : 0; // compute warp and thread value counts uint32_t const warp_count_mask = @@ -1215,7 +1284,9 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu // the correct position to start reading. since we are about to write the validity vector here // we need to adjust our computed mask to take into account the write row bounds. int const in_write_row_bounds = - max_depth == 1 ? thread_row_index < s->num_rows : in_row_bounds; + max_depth == 1 + ? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows) + : in_row_bounds; int const first_thread_in_write_range = max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0; // # of bits to of the validity mask to write out @@ -1303,11 +1374,16 @@ __device__ void gpuDecodeLevels(page_state_s* s, int32_t target_leaf_count, int * @param[in] s The local page info * @param[in] target_input_value_count The # of repetition/definition levels to process up to * @param[in] t Thread index + * @param[in] bounds_set Whether or not s->row_index_lower_bound, s->first_row and s->num_rows + * have been computed for this page (they will only be set in the second/trim pass). */ -static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_value_count, int t) +static __device__ void gpuUpdatePageSizes(page_state_s* s, + int32_t target_input_value_count, + int t, + bool bounds_set) { // max nesting depth of the column - int const max_depth = s->col.max_nesting_depth; + int max_depth = s->col.max_nesting_depth; // bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false; // how many input level values we've processed in the page so far int input_value_count = s->input_value_count; @@ -1322,23 +1398,44 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_ start_depth, end_depth, d, s, input_value_count, target_input_value_count, t); // count rows and leaf values - int const is_new_row = start_depth == 0 ? 1 : 0; - uint32_t const warp_row_count_mask = ballot(is_new_row); - int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; - uint32_t const warp_leaf_count_mask = ballot(is_new_leaf); - - // is this thread within row bounds? - int32_t const thread_row_index = - input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); - int const in_row_bounds = thread_row_index < s->num_rows; + int is_new_row = start_depth == 0 ? 1 : 0; + uint32_t warp_row_count_mask = ballot(is_new_row); + int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0; + uint32_t warp_leaf_count_mask = ballot(is_new_leaf); + + // is this thread within row bounds? on the first pass we don't know the bounds, so we will be + // computing the full size of the column. on the second pass, we will know our actual row + // bounds, so the computation will cap sizes properly. + int in_row_bounds = 1; + if (bounds_set) { + // absolute row index + int32_t thread_row_index = + input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1); + in_row_bounds = thread_row_index >= s->row_index_lower_bound && + thread_row_index < (s->first_row + s->num_rows) + ? 1 + : 0; + + uint32_t row_bounds_mask = ballot(in_row_bounds); + int first_thread_in_range = __ffs(row_bounds_mask) - 1; + + // if we've found the beginning of the first row, mark down the position + // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values) + if (!t && first_thread_in_range >= 0 && s->page.skipped_values < 0) { + // how many values we've skipped in the rep/def levels + s->page.skipped_values = input_value_count + first_thread_in_range; + // how many values we've skipped in the actual data stream + s->page.skipped_leaf_values = + input_leaf_count + __popc(warp_leaf_count_mask & ((1 << first_thread_in_range) - 1)); + } + } // increment counts across all nesting depths for (int s_idx = 0; s_idx < max_depth; s_idx++) { // if we are within the range of nesting levels we should be adding value indices for - int const in_nesting_bounds = - (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; + int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0; - uint32_t const count_mask = ballot(in_nesting_bounds); + uint32_t count_mask = ballot(in_nesting_bounds); if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); } } @@ -1362,18 +1459,29 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_ * * @param pages List of pages * @param chunks List of column chunks + * @param min_row Row index to start reading at + * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows. + * @param trim_pass Whether or not this is the trim pass. We first have to compute + * the full size information of every page before we come through in a second (trim) pass + * to determine what subset of rows in this page we should be reading. */ __global__ void __launch_bounds__(block_size) - gpuComputePageSizes(PageInfo* pages, device_span chunks) + gpuComputePageSizes(PageInfo* pages, + device_span chunks, + size_t min_row, + size_t num_rows, + bool trim_pass) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; - PageInfo* const pp = &pages[page_idx]; + int page_idx = blockIdx.x; + int t = threadIdx.x; + PageInfo* pp = &pages[page_idx]; - if (!setupLocalPageInfo(s, pp, chunks, INT_MAX)) { return; } + if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) { + return; + } // zero sizes int d = 0; @@ -1382,12 +1490,21 @@ __global__ void __launch_bounds__(block_size) d += blockDim.x; } if (!t) { - s->input_row_count = 0; - s->input_value_count = 0; + s->page.skipped_values = -1; + s->page.skipped_leaf_values = -1; + s->input_row_count = 0; + s->input_value_count = 0; + + // if this isn't the trim pass, make sure we visit absolutely everything + if (!trim_pass) { + s->first_row = 0; + s->num_rows = INT_MAX; + s->row_index_lower_bound = -1; + } } __syncthreads(); - bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0; + bool has_repetition = s->col.max_level[level_type::REPETITION] > 0; // optimization : it might be useful to have a version of gpuDecodeStream that could go wider than // 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step @@ -1406,18 +1523,22 @@ __global__ void __launch_bounds__(block_size) __syncwarp(); // we may have decoded different amounts from each stream, so only process what we've been - int const actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], - s->lvl_count[level_type::DEFINITION]) - : s->lvl_count[level_type::DEFINITION]; + int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION], + s->lvl_count[level_type::DEFINITION]) + : s->lvl_count[level_type::DEFINITION]; // process what we got back - gpuUpdatePageSizes(s, actual_input_count, t); + gpuUpdatePageSizes(s, actual_input_count, t, trim_pass); target_input_count = actual_input_count + batch_size; __syncwarp(); } } // update # rows in the actual page - if (!t) { pp->num_rows = s->page.nesting[0].size; } + if (!t) { + pp->num_rows = s->page.nesting[0].size; + pp->skipped_values = s->page.skipped_values; + pp->skipped_leaf_values = s->page.skipped_leaf_values; + } } /** @@ -1430,19 +1551,20 @@ __global__ void __launch_bounds__(block_size) * * @param pages List of pages * @param chunks List of column chunks + * @param min_row Row index to start reading at * @param num_rows Maximum number of rows to read */ -__global__ void __launch_bounds__(block_size) - gpuDecodePageData(PageInfo* pages, device_span chunks, size_t num_rows) +__global__ void __launch_bounds__(block_size) gpuDecodePageData( + PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows) { __shared__ __align__(16) page_state_s state_g; page_state_s* const s = &state_g; - int const page_idx = blockIdx.x; - int const t = threadIdx.x; + int page_idx = blockIdx.x; + int t = threadIdx.x; int out_thread0; - if (!setupLocalPageInfo(s, &pages[page_idx], chunks, num_rows)) { return; } + if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; } if (s->dict_base) { out_thread0 = (s->dict_bits > 0) ? 64 : 32; @@ -1451,6 +1573,8 @@ __global__ void __launch_bounds__(block_size) ((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32; } + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t skipped_leaf_values = s->page.skipped_leaf_values; while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) { int target_pos; int src_pos = s->src_pos; @@ -1470,7 +1594,8 @@ __global__ void __launch_bounds__(block_size) // - produces non-NULL value indices in s->nz_idx for subsequent decoding gpuDecodeLevels(s, target_pos, t); } else if (t < out_thread0) { - uint32_t src_target_pos = target_pos; + // skipped_leaf_values will always be 0 for flat hierarchies. + uint32_t src_target_pos = target_pos + skipped_leaf_values; // WARP1: Decode dictionary indices, booleans or string positions if (s->dict_base) { @@ -1483,51 +1608,70 @@ __global__ void __launch_bounds__(block_size) if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; } } else { // WARP1..WARP3: Decode values - int const dtype = s->col.data_type & 7; + int dtype = s->col.data_type & 7; src_pos += t - out_thread0; // the position in the output column/buffer - int const dst_pos = s->nz_idx[rolling_index(src_pos)]; + int dst_pos = s->nz_idx[rolling_index(src_pos)]; + + // for the flat hierarchy case we will be reading from the beginning of the value stream, + // regardless of the value of first_row. so adjust our destination offset accordingly. + // example: + // - user has passed skip_rows = 2, so our first_row to output is 2 + // - the row values we get from nz_idx will be + // 0, 1, 2, 3, 4 .... + // - by shifting these values by first_row, the sequence becomes + // -1, -2, 0, 1, 2 ... + // - so we will end up ignoring the first two input rows, and input rows 2..n will + // get written to the output starting at position 0. + // + if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; } // target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values // before first_row) in the flat hierarchy case. if (src_pos < target_pos && dst_pos >= 0) { + // src_pos represents the logical row position we want to read from. But in the case of + // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position + // has to take into account the # of values we have to skip in the page to get to the + // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0. + uint32_t val_src_pos = src_pos + skipped_leaf_values; + // nesting level that is storing actual leaf values - int const leaf_level_index = s->col.max_nesting_depth - 1; + int leaf_level_index = s->col.max_nesting_depth - 1; - uint32_t const dtype_len = s->dtype_len; + uint32_t dtype_len = s->dtype_len; void* dst = s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len; if (dtype == BYTE_ARRAY) { - gpuOutputString(s, src_pos, dst); + gpuOutputString(s, val_src_pos, dst); } else if (dtype == BOOLEAN) { - gpuOutputBoolean(s, src_pos, static_cast(dst)); + gpuOutputBoolean(s, val_src_pos, static_cast(dst)); } else if (s->col.converted_type == DECIMAL) { switch (dtype) { - case INT32: gpuOutputFast(s, src_pos, static_cast(dst)); break; - case INT64: gpuOutputFast(s, src_pos, static_cast(dst)); break; + case INT32: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; + case INT64: gpuOutputFast(s, val_src_pos, static_cast(dst)); break; default: if (s->dtype_len_in <= sizeof(int32_t)) { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); } else if (s->dtype_len_in <= sizeof(int64_t)) { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst)); } else { - gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast<__int128_t*>(dst)); + gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst)); } break; } } else if (dtype == INT96) { - gpuOutputInt96Timestamp(s, src_pos, static_cast(dst)); + gpuOutputInt96Timestamp(s, val_src_pos, static_cast(dst)); } else if (dtype_len == 8) { if (s->ts_scale) { - gpuOutputInt64Timestamp(s, src_pos, static_cast(dst)); + gpuOutputInt64Timestamp(s, val_src_pos, static_cast(dst)); } else { - gpuOutputFast(s, src_pos, static_cast(dst)); + gpuOutputFast(s, val_src_pos, static_cast(dst)); } } else if (dtype_len == 4) { - gpuOutputFast(s, src_pos, static_cast(dst)); + gpuOutputFast(s, val_src_pos, static_cast(dst)); } else { - gpuOutputGeneric(s, src_pos, static_cast(dst), dtype_len); + gpuOutputGeneric(s, val_src_pos, static_cast(dst), dtype_len); } } @@ -1598,6 +1742,8 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, + size_t min_row, + bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr) { @@ -1606,7 +1752,16 @@ void PreprocessColumnData(hostdevice_vector& pages, // computes: // PageNestingInfo::size for each level of nesting, for each page. - gpuComputePageSizes<<>>(pages.device_ptr(), chunks); + // This computes the size for the entire page, not taking row bounds into account. + // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims" + // the starting and ending read values to account for these bounds. + gpuComputePageSizes<<>>( + pages.device_ptr(), + chunks, + // if uses_custom_row_bounds is false, include all possible rows. + uses_custom_row_bounds ? min_row : 0, + uses_custom_row_bounds ? num_rows : INT_MAX, + !uses_custom_row_bounds); // computes: // PageInfo::chunk_row for all pages @@ -1620,6 +1775,16 @@ void PreprocessColumnData(hostdevice_vector& pages, page_input, chunk_row_output_iter{pages.device_ptr()}); + // computes: + // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account. + // PageInfo::skipped_values, which tells us where to start decoding in the input . + // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has + // specified artifical bounds). + if (uses_custom_row_bounds) { + gpuComputePageSizes<<>>( + pages.device_ptr(), chunks, min_row, num_rows, true); + } + // ordering of pages is by input column schema, repeated across row groups. so // if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like // @@ -1684,11 +1849,13 @@ void PreprocessColumnData(hostdevice_vector& pages, // Handle a specific corner case. It is possible to construct a parquet file such that // a column within a row group contains more rows than the row group itself. This may be // invalid, but we have seen instances of this in the wild, including how they were created - // using the apache parquet tools. So we need to cap the number of rows we will - // allocate/read from the file with the amount specified in the associated row group. This - // only applies to columns that are not children of lists as those may have an arbitrary - // number of rows in them. - if (!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && + // using the apache parquet tools. Normally, the trim pass would handle this case quietly, + // but if we are not running the trim pass (which is most of the time) we need to cap the + // number of rows we will allocate/read from the file with the amount specified in the + // associated row group. This only applies to columns that are not children of lists as + // those may have an arbitrary number of rows in them. + if (!uses_custom_row_bounds && + !(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) && size > static_cast(num_rows)) { size = static_cast(num_rows); } @@ -1723,13 +1890,14 @@ void PreprocessColumnData(hostdevice_vector& pages, void __host__ DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, + size_t min_row, rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); dim3 dim_grid(pages.size(), 1); // 1 threadblock per page gpuDecodePageData<<>>( - pages.device_ptr(), chunks, num_rows); + pages.device_ptr(), chunks, min_row, num_rows); } } // namespace gpu diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp index 823cb8fcc2b..610275ee26b 100644 --- a/cpp/src/io/parquet/parquet_gpu.hpp +++ b/cpp/src/io/parquet/parquet_gpu.hpp @@ -135,6 +135,19 @@ struct PageInfo { Encoding definition_level_encoding; // Encoding used for definition levels (data page) Encoding repetition_level_encoding; // Encoding used for repetition levels (data page) + // for nested types, we run a preprocess step in order to determine output + // column sizes. Because of this, we can jump directly to the position in the + // input data to start decoding instead of reading all of the data and discarding + // rows we don't care about. + // + // NOTE: for flat hierarchies we do not do the preprocess step, so skipped_values and + // skipped_leaf_values will always be 0. + // + // # of values skipped in the repetition/definition level stream + int skipped_values; + // # of values skipped in the actual data stream. + int skipped_leaf_values; + // nesting information (input/output) for each page int num_nesting_levels; PageNestingInfo* nesting; @@ -416,6 +429,9 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks, * @param input_columns Input column information * @param output_columns Output column information * @param num_rows Maximum number of rows to read + * @param min_rows crop all rows below min_row + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param stream Cuda stream */ void PreprocessColumnData(hostdevice_vector& pages, @@ -423,6 +439,8 @@ void PreprocessColumnData(hostdevice_vector& pages, std::vector& input_columns, std::vector& output_columns, size_t num_rows, + size_t min_row, + bool uses_custom_row_bounds, rmm::cuda_stream_view stream, rmm::mr::device_memory_resource* mr); @@ -435,11 +453,13 @@ void PreprocessColumnData(hostdevice_vector& pages, * @param[in,out] pages All pages to be decoded * @param[in] chunks All chunks to be decoded * @param[in] num_rows Total number of rows to read + * @param[in] min_row Minimum number of rows to read * @param[in] stream CUDA stream to use, default 0 */ void DecodePageData(hostdevice_vector& pages, hostdevice_vector const& chunks, size_t num_rows, + size_t min_row, rmm::cuda_stream_view stream); /** diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu index 6645c31536b..2553b375e72 100644 --- a/cpp/src/io/parquet/reader_impl.cu +++ b/cpp/src/io/parquet/reader_impl.cu @@ -540,14 +540,15 @@ class aggregate_reader_metadata { * @brief Filters and reduces down to a selection of row groups * * @param row_groups Lists of row groups to read, one per source + * @param row_start Starting row of the selection + * @param row_count Total number of rows selected * - * @return List of row group info structs and the total number of rows + * @return List of row group indexes and its starting row */ - [[nodiscard]] std::pair, size_type> select_row_groups( - std::vector> const& row_groups) const + [[nodiscard]] auto select_row_groups(std::vector> const& row_groups, + size_type& row_start, + size_type& row_count) const { - size_type row_count = 0; - if (!row_groups.empty()) { std::vector selection; CUDF_EXPECTS(row_groups.size() == per_file_metadata.size(), @@ -564,12 +565,17 @@ class aggregate_reader_metadata { row_count += get_row_group(rowgroup_idx, src_idx).num_rows; } } - return {selection, row_count}; + return selection; } - row_count = static_cast( - std::min(get_num_rows(), std::numeric_limits::max())); + row_start = std::max(row_start, 0); + if (row_count < 0) { + row_count = static_cast( + std::min(get_num_rows(), std::numeric_limits::max())); + } + row_count = min(row_count, get_num_rows() - row_start); CUDF_EXPECTS(row_count >= 0, "Invalid row count"); + CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); std::vector selection; size_type count = 0; @@ -577,12 +583,14 @@ class aggregate_reader_metadata { for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) { auto const chunk_start_row = count; count += get_row_group(rg_idx, src_idx).num_rows; - selection.emplace_back(rg_idx, chunk_start_row, src_idx); - if (count >= row_count) { break; } + if (count > row_start || count == 0) { + selection.emplace_back(rg_idx, chunk_start_row, src_idx); + } + if (count >= row_start + row_count) { break; } } } - return {selection, row_count}; + return selection; } /** @@ -1342,7 +1350,9 @@ void reader::impl::allocate_nesting_info(hostdevice_vector */ void reader::impl::preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t num_rows, + size_t min_row, + size_t total_rows, + bool uses_custom_row_bounds, bool has_lists) { // TODO : we should be selectively preprocessing only columns that have @@ -1355,15 +1365,22 @@ void reader::impl::preprocess_columns(hostdevice_vector& c [&](std::vector& cols) { for (size_t idx = 0; idx < cols.size(); idx++) { auto& col = cols[idx]; - col.create(num_rows, _stream, _mr); + col.create(total_rows, _stream, _mr); create_columns(col.children); } }; create_columns(_output_columns); } else { // preprocess per-nesting level sizes by page - gpu::PreprocessColumnData( - pages, chunks, _input_columns, _output_columns, num_rows, _stream, _mr); + gpu::PreprocessColumnData(pages, + chunks, + _input_columns, + _output_columns, + total_rows, + min_row, + uses_custom_row_bounds, + _stream, + _mr); _stream.synchronize(); } } @@ -1374,6 +1391,7 @@ void reader::impl::preprocess_columns(hostdevice_vector& c void reader::impl::decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, + size_t min_row, size_t total_rows) { auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) { @@ -1495,7 +1513,7 @@ void reader::impl::decode_page_data(hostdevice_vector& chu gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream); } - gpu::DecodePageData(pages, chunks, total_rows, _stream); + gpu::DecodePageData(pages, chunks, total_rows, min_row, _stream); pages.device_to_host(_stream); page_nesting.device_to_host(_stream); _stream.synchronize(); @@ -1587,10 +1605,14 @@ reader::impl::impl(std::vector>&& sources, _timestamp_type.id()); } -table_with_metadata reader::impl::read(std::vector> const& row_group_list) +table_with_metadata reader::impl::read(size_type skip_rows, + size_type num_rows, + bool uses_custom_row_bounds, + std::vector> const& row_group_list) { // Select only row groups required - const auto [selected_row_groups, num_rows] = _metadata->select_row_groups(row_group_list); + const auto selected_row_groups = + _metadata->select_row_groups(row_group_list, skip_rows, num_rows); table_metadata out_metadata; @@ -1732,10 +1754,10 @@ table_with_metadata reader::impl::read(std::vector> const // // - for nested schemas, output buffer offset values per-page, per nesting-level for the // purposes of decoding. - preprocess_columns(chunks, pages, num_rows, has_lists); + preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists); // decoding of column data itself - decode_page_data(chunks, pages, page_nesting_info, num_rows); + decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows); // create the final output cudf columns for (size_t i = 0; i < _output_columns.size(); ++i) { @@ -1786,7 +1808,12 @@ reader::~reader() = default; // Forward to implementation table_with_metadata reader::read(parquet_reader_options const& options) { - return _impl->read(options.get_row_groups()); + // if the user has specified custom row bounds + bool const uses_custom_row_bounds = options.get_num_rows() >= 0 || options.get_skip_rows() != 0; + return _impl->read(options.get_skip_rows(), + options.get_num_rows(), + uses_custom_row_bounds, + options.get_row_groups()); } } // namespace parquet diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp index b46fe042a13..e1f275bb8e8 100644 --- a/cpp/src/io/parquet/reader_impl.hpp +++ b/cpp/src/io/parquet/reader_impl.hpp @@ -69,11 +69,18 @@ class reader::impl { /** * @brief Read an entire set or a subset of data and returns a set of columns * + * @param skip_rows Number of rows to skip from the start + * @param num_rows Number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param row_group_indices Lists of row groups to read, one per source * * @return The set of columns along with metadata */ - table_with_metadata read(std::vector> const& row_group_indices); + table_with_metadata read(size_type skip_rows, + size_type num_rows, + bool uses_custom_row_bounds, + std::vector> const& row_group_indices); private: /** @@ -152,13 +159,18 @@ class reader::impl { * * @param chunks All chunks to be decoded * @param pages All pages to be decoded - * @param num_rows The number of rows to be decoded + * @param min_rows crop all rows below min_row + * @param total_rows Maximum number of rows to read + * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific + * bounds * @param has_lists Whether or not this data contains lists and requires * a preprocess. */ void preprocess_columns(hostdevice_vector& chunks, hostdevice_vector& pages, - size_t num_rows, + size_t min_row, + size_t total_rows, + bool uses_custom_row_bounds, bool has_lists); /** @@ -167,11 +179,13 @@ class reader::impl { * @param chunks List of column chunk descriptors * @param pages List of page information * @param page_nesting Page nesting array + * @param min_row Minimum number of rows from start * @param total_rows Number of rows to output */ void decode_page_data(hostdevice_vector& chunks, hostdevice_vector& pages, hostdevice_vector& page_nesting, + size_t min_row, size_t total_rows); private: diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp index b65488e9e43..c5000bc0add 100644 --- a/cpp/tests/io/parquet_test.cpp +++ b/cpp/tests/io/parquet_test.cpp @@ -2483,6 +2483,213 @@ TEST_F(ParquetWriterStressTest, DeviceWriteLargeTableWithValids) CUDF_TEST_EXPECT_TABLES_EQUAL(custom_tbl.tbl->view(), expected->view()); } +TEST_F(ParquetReaderTest, UserBounds) +{ + // trying to read more rows than there are should result in + // receiving the properly capped # of rows + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("TooManyRows.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(16); + auto result = cudf_io::read_parquet(read_opts); + + // we should only get back 4 rows + EXPECT_EQ(result.tbl->view().column(0).size(), 4); + } + + // trying to read past the end of the # of actual rows should result + // in empty columns. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("PastBounds.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).skip_rows(4); + auto result = cudf_io::read_parquet(read_opts); + + // we should get empty columns back + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } + + // trying to read 0 rows should result in reading the whole file + // at the moment we get back 4. when that bug gets fixed, this + // test can be flipped. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("ZeroRows.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(0); + auto result = cudf_io::read_parquet(read_opts); + + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } + + // trying to read 0 rows past the end of the # of actual rows should result + // in empty columns. + { + srand(31337); + auto expected = create_random_fixed_table(4, 4, false); + + auto filepath = temp_env->get_temp_filepath("ZeroRowsPastBounds.parquet"); + cudf_io::parquet_writer_options args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected); + cudf_io::write_parquet(args); + + // attempt to read more rows than there actually are + cudf_io::parquet_reader_options read_opts = + cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}) + .skip_rows(4) + .num_rows(0); + auto result = cudf_io::read_parquet(read_opts); + + // we should get empty columns back + EXPECT_EQ(result.tbl->view().num_columns(), 4); + EXPECT_EQ(result.tbl->view().column(0).size(), 0); + } +} + +TEST_F(ParquetReaderTest, UserBoundsWithNulls) +{ + // clang-format off + cudf::test::fixed_width_column_wrapper col{{1,1,1,1,1,1,1,1, 2,2,2,2,2,2,2,2, 3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4, 5,5,5,5,5,5,5,5, 6,6,6,6,6,6,6,6, 7,7,7,7,7,7,7,7, 8,8,8,8,8,8,8,8} + ,{1,1,1,0,0,0,1,1, 1,1,1,1,1,1,1,1, 0,0,0,0,0,0,0,0, 1,1,1,1,1,1,0,0, 1,0,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,0}}; + // clang-format on + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("UserBoundsWithNulls.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {1, 3}, {3, -1}, + {31, -1}, {32, -1}, {33, -1}, + {31, 5}, {32, 5}, {33, 5}, + {-1, 7}, {-1, 31}, {-1, 32}, {-1, 33}, + {62, -1}, {63, -1}, + {62, 2}, {63, 1}}; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + +TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge) +{ + constexpr int num_rows = 30 * 1000000; + + std::mt19937 gen(6747); + std::bernoulli_distribution bn(0.7f); + auto valids = + cudf::detail::make_counting_transform_iterator(0, [&](int index) { return bn(gen); }); + auto values = thrust::make_counting_iterator(0); + + cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids); + + // this file will have row groups of 1,000,000 each + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("UserBoundsWithNullsLarge.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {1613470, -1}, {1999999, -1}, + {31, 1}, {32, 1}, {33, 1}, + // deliberately span some row group boundaries + {999000, 1001}, {999000, 2000}, {2999999, 2}, {13999997, -1}, + {16785678, 3}, {22996176, 31}, + {24001231, 17}, {29000001, 989999}, {29999999, 1} }; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + +TEST_F(ParquetReaderTest, ListUserBoundsWithNullsLarge) +{ + constexpr int num_rows = 5 * 1000000; + auto colp = make_parquet_list_col(0, num_rows, 5, 8, true); + cudf::column_view col = *colp; + + // this file will have row groups of 1,000,000 each + cudf::table_view tbl({col}); + auto filepath = temp_env->get_temp_filepath("ListUserBoundsWithNullsLarge.parquet"); + cudf_io::parquet_writer_options out_args = + cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl); + cudf_io::write_parquet(out_args); + + // skip_rows / num_rows + // clang-format off + std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {161470, -1}, {4499997, -1}, + {31, 1}, {32, 1}, {33, 1}, + // deliberately span some row group boundaries + {999000, 1001}, {999000, 2000}, {2999999, 2}, + {1678567, 3}, {4299676, 31}, + {4001231, 17}, {1900000, 989999}, {4999999, 1} }; + // clang-format on + for (auto p : params) { + cudf_io::parquet_reader_options read_args = + cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath}); + if (p.first >= 0) { read_args.set_skip_rows(p.first); } + if (p.second >= 0) { read_args.set_num_rows(p.second); } + auto result = cudf_io::read_parquet(read_args); + + p.first = p.first < 0 ? 0 : p.first; + p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second; + std::vector slice_indices{p.first, p.first + p.second}; + auto expected = cudf::slice(col, slice_indices); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]); + } +} + TEST_F(ParquetReaderTest, ReorderedColumns) { { From 8a00d8da2b9b186e61f0ab91b59f60e80f88b7f9 Mon Sep 17 00:00:00 2001 From: Dave Baranec Date: Wed, 7 Sep 2022 13:54:59 -0500 Subject: [PATCH 2/2] Remove redundant parquet benchmark file. --- cpp/benchmarks/io/parquet/parquet_reader.cpp | 209 ------------------- 1 file changed, 209 deletions(-) delete mode 100644 cpp/benchmarks/io/parquet/parquet_reader.cpp diff --git a/cpp/benchmarks/io/parquet/parquet_reader.cpp b/cpp/benchmarks/io/parquet/parquet_reader.cpp deleted file mode 100644 index c66f0af2b76..00000000000 --- a/cpp/benchmarks/io/parquet/parquet_reader.cpp +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include - -// to enable, run cmake with -DBUILD_BENCHMARKS=ON - -constexpr size_t data_size = 512 << 20; -constexpr cudf::size_type num_cols = 64; - -namespace cudf_io = cudf::io; - -class ParquetRead : public cudf::benchmark { -}; - -void BM_parq_read_varying_input(benchmark::State& state) -{ - auto const data_types = get_type_or_group(state.range(0)); - cudf::size_type const cardinality = state.range(1); - cudf::size_type const run_length = state.range(2); - cudf_io::compression_type const compression = - state.range(3) ? cudf_io::compression_type::SNAPPY : cudf_io::compression_type::NONE; - auto const source_type = static_cast(state.range(4)); - - data_profile table_data_profile; - table_data_profile.set_cardinality(cardinality); - table_data_profile.set_avg_run_length(run_length); - auto const tbl = create_random_table( - cycle_dtypes(data_types, num_cols), table_size_bytes{data_size}, table_data_profile); - auto const view = tbl->view(); - - cuio_source_sink_pair source_sink(source_type); - cudf_io::parquet_writer_options write_opts = - cudf_io::parquet_writer_options::builder(source_sink.make_sink_info(), view) - .compression(compression); - cudf_io::write_parquet(write_opts); - - cudf_io::parquet_reader_options read_opts = - cudf_io::parquet_reader_options::builder(source_sink.make_source_info()); - - auto mem_stats_logger = cudf::memory_stats_logger(); - for (auto _ : state) { - try_drop_l3_cache(); - cuda_event_timer const raii(state, true); // flush_l2_cache = true, stream = 0 - cudf_io::read_parquet(read_opts); - } - - state.SetBytesProcessed(data_size * state.iterations()); - state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); - state.counters["encoded_file_size"] = source_sink.size(); -} - -std::vector get_col_names(cudf::io::source_info const& source) -{ - cudf_io::parquet_reader_options const read_options = - cudf_io::parquet_reader_options::builder(source).num_rows(1); - return cudf_io::read_parquet(read_options).metadata.column_names; -} - -void BM_parq_read_varying_options(benchmark::State& state) -{ - auto state_idx = 0; - auto const col_sel = static_cast(state.range(state_idx++)); - auto const row_sel = static_cast(state.range(state_idx++)); - auto const num_chunks = state.range(state_idx++); - - auto const flags = state.range(state_idx++); - auto const str_to_categories = (flags & 1) != 0; - auto const use_pandas_metadata = (flags & 2) != 0; - auto const ts_type = cudf::data_type{static_cast(state.range(state_idx++))}; - - // No nested types here, because of https://github.com/rapidsai/cudf/issues/9970 - auto const data_types = dtypes_for_column_selection( - get_type_or_group({static_cast(type_group_id::INTEGRAL), - static_cast(type_group_id::FLOATING_POINT), - static_cast(type_group_id::FIXED_POINT), - static_cast(type_group_id::TIMESTAMP), - static_cast(type_group_id::DURATION), - static_cast(cudf::type_id::STRING)}), - col_sel); - auto const tbl = create_random_table(data_types, table_size_bytes{data_size}); - auto const view = tbl->view(); - - cuio_source_sink_pair source_sink(io_type::HOST_BUFFER); - cudf_io::parquet_writer_options options = - cudf_io::parquet_writer_options::builder(source_sink.make_sink_info(), view); - cudf_io::write_parquet(options); - - auto const cols_to_read = - select_column_names(get_col_names(source_sink.make_source_info()), col_sel); - cudf_io::parquet_reader_options read_options = - cudf_io::parquet_reader_options::builder(source_sink.make_source_info()) - .columns(cols_to_read) - .convert_strings_to_categories(str_to_categories) - .use_pandas_metadata(use_pandas_metadata) - .timestamp_type(ts_type); - - auto const num_row_groups = data_size / (128 << 20); - cudf::size_type const chunk_row_cnt = view.num_rows() / num_chunks; - auto mem_stats_logger = cudf::memory_stats_logger(); - for (auto _ : state) { - try_drop_l3_cache(); - cuda_event_timer raii(state, true); // flush_l2_cache = true, stream = 0 - - cudf::size_type rows_read = 0; - for (int32_t chunk = 0; chunk < num_chunks; ++chunk) { - auto const is_last_chunk = chunk == (num_chunks - 1); - switch (row_sel) { - case row_selection::ALL: break; - case row_selection::ROW_GROUPS: { - auto row_groups_to_read = segments_in_chunk(num_row_groups, num_chunks, chunk); - if (is_last_chunk) { - // Need to assume that an additional "overflow" row group is present - row_groups_to_read.push_back(num_row_groups); - } - read_options.set_row_groups({row_groups_to_read}); - } break; - case row_selection::NROWS: - read_options.set_skip_rows(chunk * chunk_row_cnt); - read_options.set_num_rows(chunk_row_cnt); - if (is_last_chunk) read_options.set_num_rows(-1); - break; - default: CUDF_FAIL("Unsupported row selection method"); - } - - rows_read += cudf_io::read_parquet(read_options).tbl->num_rows(); - } - - CUDF_EXPECTS(rows_read == view.num_rows(), "Benchmark did not read the entire table"); - } - - auto const data_processed = data_size * cols_to_read.size() / view.num_columns(); - state.SetBytesProcessed(data_processed * state.iterations()); - state.counters["peak_memory_usage"] = mem_stats_logger.peak_memory_usage(); - state.counters["encoded_file_size"] = source_sink.size(); -} - -#define PARQ_RD_BM_INPUTS_DEFINE(name, type_or_group, src_type) \ - BENCHMARK_DEFINE_F(ParquetRead, name) \ - (::benchmark::State & state) { BM_parq_read_varying_input(state); } \ - BENCHMARK_REGISTER_F(ParquetRead, name) \ - ->ArgsProduct({{int32_t(type_or_group)}, {0, 1000}, {1, 32}, {true, false}, {src_type}}) \ - ->Unit(benchmark::kMillisecond) \ - ->UseManualTime(); - -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, integral, type_group_id::INTEGRAL); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, floats, type_group_id::FLOATING_POINT); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, decimal, type_group_id::FIXED_POINT); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, timestamps, type_group_id::TIMESTAMP); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, durations, type_group_id::DURATION); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, string, cudf::type_id::STRING); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, list, cudf::type_id::LIST); -RD_BENCHMARK_DEFINE_ALL_SOURCES(PARQ_RD_BM_INPUTS_DEFINE, struct, cudf::type_id::STRUCT); - -BENCHMARK_DEFINE_F(ParquetRead, column_selection) -(::benchmark::State& state) { BM_parq_read_varying_options(state); } -BENCHMARK_REGISTER_F(ParquetRead, column_selection) - ->ArgsProduct({{int32_t(column_selection::ALL), - int32_t(column_selection::ALTERNATE), - int32_t(column_selection::FIRST_HALF), - int32_t(column_selection::SECOND_HALF)}, - {int32_t(row_selection::ALL)}, - {1}, - {0b01}, // defaults - {int32_t(cudf::type_id::EMPTY)}}) - ->Unit(benchmark::kMillisecond) - ->UseManualTime(); - -// row_selection::ROW_GROUPS disabled until we add an API to read metadata from a parquet file and -// determine num row groups. https://github.com/rapidsai/cudf/pull/9963#issuecomment-1004832863 -BENCHMARK_DEFINE_F(ParquetRead, row_selection) -(::benchmark::State& state) { BM_parq_read_varying_options(state); } -BENCHMARK_REGISTER_F(ParquetRead, row_selection) - ->ArgsProduct({{int32_t(column_selection::ALL)}, - {int32_t(row_selection::NROWS)}, - {1, 4}, - {0b01}, // defaults - {int32_t(cudf::type_id::EMPTY)}}) - ->Unit(benchmark::kMillisecond) - ->UseManualTime(); - -BENCHMARK_DEFINE_F(ParquetRead, misc_options) -(::benchmark::State& state) { BM_parq_read_varying_options(state); } -BENCHMARK_REGISTER_F(ParquetRead, misc_options) - ->ArgsProduct({{int32_t(column_selection::ALL)}, - {int32_t(row_selection::NROWS)}, - {1}, - {0b01, 0b00, 0b11, 0b010}, - {int32_t(cudf::type_id::EMPTY), int32_t(cudf::type_id::TIMESTAMP_NANOSECONDS)}}) - ->Unit(benchmark::kMillisecond) - ->UseManualTime();