From 37612ee7946c3851bf3915b957d27ee78d3fad83 Mon Sep 17 00:00:00 2001
From: nvdbaranec <56695930+nvdbaranec@users.noreply.github.com>
Date: Thu, 8 Sep 2022 10:11:13 -0500
Subject: [PATCH] Revert removal of skip_rows / num_rows options from the
Parquet reader. (#11657)
Reverts:
https://github.com/rapidsai/cudf/pull/11480
https://github.com/rapidsai/cudf/pull/11503
Authors:
- https://github.com/nvdbaranec
Approvers:
- Nghia Truong (https://github.com/ttnghia)
- Yunsong Wang (https://github.com/PointKernel)
URL: https://github.com/rapidsai/cudf/pull/11657
---
cpp/include/cudf/io/parquet.hpp | 74 +++++++
cpp/src/io/parquet/page_data.cu | 332 ++++++++++++++++++++++-------
cpp/src/io/parquet/parquet_gpu.hpp | 20 ++
cpp/src/io/parquet/reader_impl.cu | 69 ++++--
cpp/src/io/parquet/reader_impl.hpp | 20 +-
cpp/tests/io/parquet_test.cpp | 207 ++++++++++++++++++
6 files changed, 616 insertions(+), 106 deletions(-)
diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp
index 5d95bf9812f..ff5b9f5c457 100644
--- a/cpp/include/cudf/io/parquet.hpp
+++ b/cpp/include/cudf/io/parquet.hpp
@@ -57,6 +57,10 @@ class parquet_reader_options {
// List of individual row groups to read (ignored if empty)
std::vector> _row_groups;
+ // Number of rows to skip from the start
+ size_type _skip_rows = 0;
+ // Number of rows to read; -1 is all
+ size_type _num_rows = -1;
// Whether to store string data as categorical type
bool _convert_strings_to_categories = false;
@@ -127,6 +131,20 @@ class parquet_reader_options {
return _reader_column_schema;
}
+ /**
+ * @brief Returns number of rows to skip from the start.
+ *
+ * @return Number of rows to skip from the start
+ */
+ [[nodiscard]] size_type get_skip_rows() const { return _skip_rows; }
+
+ /**
+ * @brief Returns number of rows to read.
+ *
+ * @return Number of rows to read
+ */
+ [[nodiscard]] size_type get_num_rows() const { return _num_rows; }
+
/**
* @brief Returns names of column to be read, if set.
*
@@ -162,6 +180,10 @@ class parquet_reader_options {
*/
void set_row_groups(std::vector> row_groups)
{
+ if ((!row_groups.empty()) and ((_skip_rows != 0) or (_num_rows != -1))) {
+ CUDF_FAIL("row_groups can't be set along with skip_rows and num_rows");
+ }
+
_row_groups = std::move(row_groups);
}
@@ -190,6 +212,34 @@ class parquet_reader_options {
_reader_column_schema = std::move(val);
}
+ /**
+ * @brief Sets number of rows to skip.
+ *
+ * @param val Number of rows to skip from start
+ */
+ void set_skip_rows(size_type val)
+ {
+ if ((val != 0) and (!_row_groups.empty())) {
+ CUDF_FAIL("skip_rows can't be set along with a non-empty row_groups");
+ }
+
+ _skip_rows = val;
+ }
+
+ /**
+ * @brief Sets number of rows to read.
+ *
+ * @param val Number of rows to read after skip
+ */
+ void set_num_rows(size_type val)
+ {
+ if ((val != -1) and (!_row_groups.empty())) {
+ CUDF_FAIL("num_rows can't be set along with a non-empty row_groups");
+ }
+
+ _num_rows = val;
+ }
+
/**
* @brief Sets timestamp_type used to cast timestamp columns.
*
@@ -279,6 +329,30 @@ class parquet_reader_options_builder {
return *this;
}
+ /**
+ * @brief Sets number of rows to skip.
+ *
+ * @param val Number of rows to skip from start
+ * @return this for chaining
+ */
+ parquet_reader_options_builder& skip_rows(size_type val)
+ {
+ options.set_skip_rows(val);
+ return *this;
+ }
+
+ /**
+ * @brief Sets number of rows to read.
+ *
+ * @param val Number of rows to read after skip
+ * @return this for chaining
+ */
+ parquet_reader_options_builder& num_rows(size_type val)
+ {
+ options.set_num_rows(val);
+ return *this;
+ }
+
/**
* @brief timestamp_type used to cast timestamp columns.
*
diff --git a/cpp/src/io/parquet/page_data.cu b/cpp/src/io/parquet/page_data.cu
index 424882f45bf..531733a7df7 100644
--- a/cpp/src/io/parquet/page_data.cu
+++ b/cpp/src/io/parquet/page_data.cu
@@ -56,13 +56,15 @@ struct page_state_s {
const uint8_t* data_start;
const uint8_t* data_end;
const uint8_t* lvl_end;
- const uint8_t* dict_base; // ptr to dictionary page data
- int32_t dict_size; // size of dictionary data
- int32_t num_rows; // Rows in page to decode
- int32_t num_input_values; // total # of input/level values in the page
- int32_t dtype_len; // Output data type length
- int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit
- int32_t dict_bits; // # of bits to store dictionary indices
+ const uint8_t* dict_base; // ptr to dictionary page data
+ int32_t dict_size; // size of dictionary data
+ int32_t first_row; // First row in page to output
+ int32_t num_rows; // Rows in page to decode (including rows to be skipped)
+ int32_t first_output_value; // First value in page to output
+ int32_t num_input_values; // total # of input/level values in the page
+ int32_t dtype_len; // Output data type length
+ int32_t dtype_len_in; // Can be larger than dtype_len if truncating 32-bit into 8-bit
+ int32_t dict_bits; // # of bits to store dictionary indices
uint32_t dict_run;
int32_t dict_val;
uint32_t initial_rle_run[NUM_LEVEL_TYPES]; // [def,rep]
@@ -88,6 +90,7 @@ struct page_state_s {
uint32_t def[non_zero_buffer_size]; // circular buffer of definition level values
const uint8_t* lvl_start[NUM_LEVEL_TYPES]; // [def,rep]
int32_t lvl_count[NUM_LEVEL_TYPES]; // how many of each of the streams we've decoded
+ int32_t row_index_lower_bound; // lower bound of row indices we should process
};
/**
@@ -811,14 +814,17 @@ static __device__ void gpuOutputGeneric(volatile page_state_s* s,
* @param[in, out] s The local page state to be filled in
* @param[in] p The global page to be copied from
* @param[in] chunks The global list of chunks
- * @param[in] num_rows Maximum number of rows to process
+ * @param[in] num_rows Maximum number of rows to read
+ * @param[in] min_row Crop all rows below min_row
*/
static __device__ bool setupLocalPageInfo(page_state_s* const s,
PageInfo const* p,
device_span chunks,
+ size_t min_row,
size_t num_rows)
{
- int const t = threadIdx.x;
+ int t = threadIdx.x;
+ int chunk_idx;
// Fetch page info
if (t == 0) s->page = *p;
@@ -826,7 +832,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
if (s->page.flags & PAGEINFO_FLAGS_DICTIONARY) { return false; }
// Fetch column chunk info
- int const chunk_idx = s->page.chunk_idx;
+ chunk_idx = s->page.chunk_idx;
if (t == 0) { s->col = chunks[chunk_idx]; }
// zero nested value and valid counts
@@ -847,18 +853,19 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
// our starting row (absolute index) is
// col.start_row == absolute row index
// page.chunk-row == relative row index within the chunk
- size_t const page_start_row = s->col.start_row + s->page.chunk_row;
+ size_t page_start_row = s->col.start_row + s->page.chunk_row;
// IMPORTANT : nested schemas can have 0 rows in a page but still have
// values. The case is:
// - On page N-1, the last row starts, with 2/6 values encoded
// - On page N, the remaining 4/6 values are encoded, but there are no new rows.
+ // if (s->page.num_input_values > 0 && s->page.num_rows > 0) {
if (s->page.num_input_values > 0) {
- uint8_t const* cur = s->page.page_data;
- uint8_t const* const end = cur + s->page.uncompressed_page_size;
+ uint8_t* cur = s->page.page_data;
+ uint8_t* end = cur + s->page.uncompressed_page_size;
- uint32_t const dtype_len_out = s->col.data_type >> 3;
- s->ts_scale = 0;
+ uint32_t dtype_len_out = s->col.data_type >> 3;
+ s->ts_scale = 0;
// Validate data type
auto const data_type = s->col.data_type & 7;
switch (data_type) {
@@ -907,10 +914,17 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dtype_len = 8; // Convert to 64-bit timestamp
}
+ // first row within the page to output
+ if (page_start_row >= min_row) {
+ s->first_row = 0;
+ } else {
+ s->first_row = (int32_t)min(min_row - page_start_row, (size_t)s->page.num_rows);
+ }
// # of rows within the page to output
s->num_rows = s->page.num_rows;
- if (page_start_row + s->num_rows > num_rows) {
- s->num_rows = (int32_t)max((int64_t)(num_rows - page_start_row), INT64_C(0));
+ if ((page_start_row + s->first_row) + s->num_rows > min_row + num_rows) {
+ s->num_rows =
+ (int32_t)max((int64_t)(min_row + num_rows - (page_start_row + s->first_row)), INT64_C(0));
}
// during the decoding step we need to offset the global output buffers
@@ -919,18 +933,25 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
// - for flat schemas, we can do this directly by using row counts
// - for nested schemas, these offsets are computed during the preprocess step
if (s->col.column_data_base != nullptr) {
- int const max_depth = s->col.max_nesting_depth;
+ int max_depth = s->col.max_nesting_depth;
for (int idx = 0; idx < max_depth; idx++) {
PageNestingInfo* pni = &s->page.nesting[idx];
- size_t const output_offset =
- s->col.max_level[level_type::REPETITION] == 0 ? page_start_row : pni->page_start_value;
+ size_t output_offset;
+ // schemas without lists
+ if (s->col.max_level[level_type::REPETITION] == 0) {
+ output_offset = page_start_row >= min_row ? page_start_row - min_row : 0;
+ }
+ // for schemas with lists, we've already got the exactly value precomputed
+ else {
+ output_offset = pni->page_start_value;
+ }
pni->data_out = static_cast(s->col.column_data_base[idx]);
if (pni->data_out != nullptr) {
// anything below max depth with a valid data pointer must be a list, so the
// element size is the size of the offset type.
- uint32_t const len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len;
+ uint32_t len = idx < max_depth - 1 ? sizeof(cudf::size_type) : s->dtype_len;
pni->data_out += (output_offset * len);
}
pni->valid_map = s->col.valid_map_base[idx];
@@ -940,6 +961,7 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
}
}
}
+ s->first_output_value = 0;
// Find the compressed size of repetition levels
cur += InitLevelSection(s, cur, end, level_type::REPETITION);
@@ -992,9 +1014,53 @@ static __device__ bool setupLocalPageInfo(page_state_s* const s,
s->dict_pos = 0;
s->src_pos = 0;
- s->input_value_count = 0;
- s->input_row_count = 0;
- s->input_leaf_count = 0;
+ // for flat hierarchies, we can't know how many leaf values to skip unless we do a full
+ // preprocess of the definition levels (since nulls will have no actual decodable value, there
+ // is no direct correlation between # of rows and # of decodable values). so we will start
+ // processing at the beginning of the value stream and disregard any indices that start
+ // before the first row.
+ if (s->col.max_level[level_type::REPETITION] == 0) {
+ s->page.skipped_values = 0;
+ s->page.skipped_leaf_values = 0;
+ s->input_value_count = 0;
+ s->input_row_count = 0;
+
+ s->row_index_lower_bound = -1;
+ }
+ // for nested hierarchies, we have run a preprocess that lets us skip directly to the values
+ // we need to start decoding at
+ else {
+ // input_row_count translates to "how many rows we have processed so far", so since we are
+ // skipping directly to where we want to start decoding, set it to first_row
+ s->input_row_count = s->first_row;
+
+ // return the lower bound to compare (page-relative) thread row index against. Explanation:
+ // In the case of nested schemas, rows can span page boundaries. That is to say,
+ // we can encounter the first value for row X on page M, but the last value for page M
+ // might not be the last value for row X. page M+1 (or further) may contain the last value.
+ //
+ // This means that the first values we encounter for a given page (M+1) may not belong to the
+ // row indicated by chunk_row, but to the row before it that spanned page boundaries. If that
+ // previous row is within the overall row bounds, include the values by allowing relative row
+ // index -1
+ int const max_row = (min_row + num_rows) - 1;
+ if (min_row < page_start_row && max_row >= page_start_row - 1) {
+ s->row_index_lower_bound = -1;
+ } else {
+ s->row_index_lower_bound = s->first_row;
+ }
+
+ // if we're in the decoding step, jump directly to the first
+ // value we care about
+ if (s->col.column_data_base != nullptr) {
+ s->input_value_count = s->page.skipped_values > -1 ? s->page.skipped_values : 0;
+ } else {
+ s->input_value_count = 0;
+ s->input_leaf_count = 0;
+ s->page.skipped_values = -1;
+ s->page.skipped_leaf_values = -1;
+ }
+ }
__threadfence_block();
}
@@ -1140,7 +1206,10 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1);
input_row_count += __popc(warp_row_count_mask);
// is this thread within read row bounds?
- int const in_row_bounds = thread_row_index < s->num_rows;
+ int const in_row_bounds = thread_row_index >= s->row_index_lower_bound &&
+ thread_row_index < (s->first_row + s->num_rows)
+ ? 1
+ : 0;
// compute warp and thread value counts
uint32_t const warp_count_mask =
@@ -1215,7 +1284,9 @@ static __device__ void gpuUpdateValidityOffsetsAndRowIndices(int32_t target_inpu
// the correct position to start reading. since we are about to write the validity vector here
// we need to adjust our computed mask to take into account the write row bounds.
int const in_write_row_bounds =
- max_depth == 1 ? thread_row_index < s->num_rows : in_row_bounds;
+ max_depth == 1
+ ? thread_row_index >= s->first_row && thread_row_index < (s->first_row + s->num_rows)
+ : in_row_bounds;
int const first_thread_in_write_range =
max_depth == 1 ? __ffs(ballot(in_write_row_bounds)) - 1 : 0;
// # of bits to of the validity mask to write out
@@ -1303,11 +1374,16 @@ __device__ void gpuDecodeLevels(page_state_s* s, int32_t target_leaf_count, int
* @param[in] s The local page info
* @param[in] target_input_value_count The # of repetition/definition levels to process up to
* @param[in] t Thread index
+ * @param[in] bounds_set Whether or not s->row_index_lower_bound, s->first_row and s->num_rows
+ * have been computed for this page (they will only be set in the second/trim pass).
*/
-static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_value_count, int t)
+static __device__ void gpuUpdatePageSizes(page_state_s* s,
+ int32_t target_input_value_count,
+ int t,
+ bool bounds_set)
{
// max nesting depth of the column
- int const max_depth = s->col.max_nesting_depth;
+ int max_depth = s->col.max_nesting_depth;
// bool has_repetition = s->col.max_level[level_type::REPETITION] > 0 ? true : false;
// how many input level values we've processed in the page so far
int input_value_count = s->input_value_count;
@@ -1322,23 +1398,44 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_
start_depth, end_depth, d, s, input_value_count, target_input_value_count, t);
// count rows and leaf values
- int const is_new_row = start_depth == 0 ? 1 : 0;
- uint32_t const warp_row_count_mask = ballot(is_new_row);
- int const is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0;
- uint32_t const warp_leaf_count_mask = ballot(is_new_leaf);
-
- // is this thread within row bounds?
- int32_t const thread_row_index =
- input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1);
- int const in_row_bounds = thread_row_index < s->num_rows;
+ int is_new_row = start_depth == 0 ? 1 : 0;
+ uint32_t warp_row_count_mask = ballot(is_new_row);
+ int is_new_leaf = (d >= s->page.nesting[max_depth - 1].max_def_level) ? 1 : 0;
+ uint32_t warp_leaf_count_mask = ballot(is_new_leaf);
+
+ // is this thread within row bounds? on the first pass we don't know the bounds, so we will be
+ // computing the full size of the column. on the second pass, we will know our actual row
+ // bounds, so the computation will cap sizes properly.
+ int in_row_bounds = 1;
+ if (bounds_set) {
+ // absolute row index
+ int32_t thread_row_index =
+ input_row_count + ((__popc(warp_row_count_mask & ((1 << t) - 1)) + is_new_row) - 1);
+ in_row_bounds = thread_row_index >= s->row_index_lower_bound &&
+ thread_row_index < (s->first_row + s->num_rows)
+ ? 1
+ : 0;
+
+ uint32_t row_bounds_mask = ballot(in_row_bounds);
+ int first_thread_in_range = __ffs(row_bounds_mask) - 1;
+
+ // if we've found the beginning of the first row, mark down the position
+ // in the def/repetition buffer (skipped_values) and the data buffer (skipped_leaf_values)
+ if (!t && first_thread_in_range >= 0 && s->page.skipped_values < 0) {
+ // how many values we've skipped in the rep/def levels
+ s->page.skipped_values = input_value_count + first_thread_in_range;
+ // how many values we've skipped in the actual data stream
+ s->page.skipped_leaf_values =
+ input_leaf_count + __popc(warp_leaf_count_mask & ((1 << first_thread_in_range) - 1));
+ }
+ }
// increment counts across all nesting depths
for (int s_idx = 0; s_idx < max_depth; s_idx++) {
// if we are within the range of nesting levels we should be adding value indices for
- int const in_nesting_bounds =
- (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0;
+ int in_nesting_bounds = (s_idx >= start_depth && s_idx <= end_depth && in_row_bounds) ? 1 : 0;
- uint32_t const count_mask = ballot(in_nesting_bounds);
+ uint32_t count_mask = ballot(in_nesting_bounds);
if (!t) { s->page.nesting[s_idx].size += __popc(count_mask); }
}
@@ -1362,18 +1459,29 @@ static __device__ void gpuUpdatePageSizes(page_state_s* s, int32_t target_input_
*
* @param pages List of pages
* @param chunks List of column chunks
+ * @param min_row Row index to start reading at
+ * @param num_rows Maximum number of rows to read. Pass as INT_MAX to guarantee reading all rows.
+ * @param trim_pass Whether or not this is the trim pass. We first have to compute
+ * the full size information of every page before we come through in a second (trim) pass
+ * to determine what subset of rows in this page we should be reading.
*/
__global__ void __launch_bounds__(block_size)
- gpuComputePageSizes(PageInfo* pages, device_span chunks)
+ gpuComputePageSizes(PageInfo* pages,
+ device_span chunks,
+ size_t min_row,
+ size_t num_rows,
+ bool trim_pass)
{
__shared__ __align__(16) page_state_s state_g;
page_state_s* const s = &state_g;
- int const page_idx = blockIdx.x;
- int const t = threadIdx.x;
- PageInfo* const pp = &pages[page_idx];
+ int page_idx = blockIdx.x;
+ int t = threadIdx.x;
+ PageInfo* pp = &pages[page_idx];
- if (!setupLocalPageInfo(s, pp, chunks, INT_MAX)) { return; }
+ if (!setupLocalPageInfo(s, pp, chunks, trim_pass ? min_row : 0, trim_pass ? num_rows : INT_MAX)) {
+ return;
+ }
// zero sizes
int d = 0;
@@ -1382,12 +1490,21 @@ __global__ void __launch_bounds__(block_size)
d += blockDim.x;
}
if (!t) {
- s->input_row_count = 0;
- s->input_value_count = 0;
+ s->page.skipped_values = -1;
+ s->page.skipped_leaf_values = -1;
+ s->input_row_count = 0;
+ s->input_value_count = 0;
+
+ // if this isn't the trim pass, make sure we visit absolutely everything
+ if (!trim_pass) {
+ s->first_row = 0;
+ s->num_rows = INT_MAX;
+ s->row_index_lower_bound = -1;
+ }
}
__syncthreads();
- bool const has_repetition = s->col.max_level[level_type::REPETITION] > 0;
+ bool has_repetition = s->col.max_level[level_type::REPETITION] > 0;
// optimization : it might be useful to have a version of gpuDecodeStream that could go wider than
// 1 warp. Currently it only uses 1 warp so that it can overlap work with the value decoding step
@@ -1406,18 +1523,22 @@ __global__ void __launch_bounds__(block_size)
__syncwarp();
// we may have decoded different amounts from each stream, so only process what we've been
- int const actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION],
- s->lvl_count[level_type::DEFINITION])
- : s->lvl_count[level_type::DEFINITION];
+ int actual_input_count = has_repetition ? min(s->lvl_count[level_type::REPETITION],
+ s->lvl_count[level_type::DEFINITION])
+ : s->lvl_count[level_type::DEFINITION];
// process what we got back
- gpuUpdatePageSizes(s, actual_input_count, t);
+ gpuUpdatePageSizes(s, actual_input_count, t, trim_pass);
target_input_count = actual_input_count + batch_size;
__syncwarp();
}
}
// update # rows in the actual page
- if (!t) { pp->num_rows = s->page.nesting[0].size; }
+ if (!t) {
+ pp->num_rows = s->page.nesting[0].size;
+ pp->skipped_values = s->page.skipped_values;
+ pp->skipped_leaf_values = s->page.skipped_leaf_values;
+ }
}
/**
@@ -1430,19 +1551,20 @@ __global__ void __launch_bounds__(block_size)
*
* @param pages List of pages
* @param chunks List of column chunks
+ * @param min_row Row index to start reading at
* @param num_rows Maximum number of rows to read
*/
-__global__ void __launch_bounds__(block_size)
- gpuDecodePageData(PageInfo* pages, device_span chunks, size_t num_rows)
+__global__ void __launch_bounds__(block_size) gpuDecodePageData(
+ PageInfo* pages, device_span chunks, size_t min_row, size_t num_rows)
{
__shared__ __align__(16) page_state_s state_g;
page_state_s* const s = &state_g;
- int const page_idx = blockIdx.x;
- int const t = threadIdx.x;
+ int page_idx = blockIdx.x;
+ int t = threadIdx.x;
int out_thread0;
- if (!setupLocalPageInfo(s, &pages[page_idx], chunks, num_rows)) { return; }
+ if (!setupLocalPageInfo(s, &pages[page_idx], chunks, min_row, num_rows)) { return; }
if (s->dict_base) {
out_thread0 = (s->dict_bits > 0) ? 64 : 32;
@@ -1451,6 +1573,8 @@ __global__ void __launch_bounds__(block_size)
((s->col.data_type & 7) == BOOLEAN || (s->col.data_type & 7) == BYTE_ARRAY) ? 64 : 32;
}
+ // skipped_leaf_values will always be 0 for flat hierarchies.
+ uint32_t skipped_leaf_values = s->page.skipped_leaf_values;
while (!s->error && (s->input_value_count < s->num_input_values || s->src_pos < s->nz_count)) {
int target_pos;
int src_pos = s->src_pos;
@@ -1470,7 +1594,8 @@ __global__ void __launch_bounds__(block_size)
// - produces non-NULL value indices in s->nz_idx for subsequent decoding
gpuDecodeLevels(s, target_pos, t);
} else if (t < out_thread0) {
- uint32_t src_target_pos = target_pos;
+ // skipped_leaf_values will always be 0 for flat hierarchies.
+ uint32_t src_target_pos = target_pos + skipped_leaf_values;
// WARP1: Decode dictionary indices, booleans or string positions
if (s->dict_base) {
@@ -1483,51 +1608,70 @@ __global__ void __launch_bounds__(block_size)
if (t == 32) { *(volatile int32_t*)&s->dict_pos = src_target_pos; }
} else {
// WARP1..WARP3: Decode values
- int const dtype = s->col.data_type & 7;
+ int dtype = s->col.data_type & 7;
src_pos += t - out_thread0;
// the position in the output column/buffer
- int const dst_pos = s->nz_idx[rolling_index(src_pos)];
+ int dst_pos = s->nz_idx[rolling_index(src_pos)];
+
+ // for the flat hierarchy case we will be reading from the beginning of the value stream,
+ // regardless of the value of first_row. so adjust our destination offset accordingly.
+ // example:
+ // - user has passed skip_rows = 2, so our first_row to output is 2
+ // - the row values we get from nz_idx will be
+ // 0, 1, 2, 3, 4 ....
+ // - by shifting these values by first_row, the sequence becomes
+ // -1, -2, 0, 1, 2 ...
+ // - so we will end up ignoring the first two input rows, and input rows 2..n will
+ // get written to the output starting at position 0.
+ //
+ if (s->col.max_nesting_depth == 1) { dst_pos -= s->first_row; }
// target_pos will always be properly bounded by num_rows, but dst_pos may be negative (values
// before first_row) in the flat hierarchy case.
if (src_pos < target_pos && dst_pos >= 0) {
+ // src_pos represents the logical row position we want to read from. But in the case of
+ // nested hierarchies, there is no 1:1 mapping of rows to values. So our true read position
+ // has to take into account the # of values we have to skip in the page to get to the
+ // desired logical row. For flat hierarchies, skipped_leaf_values will always be 0.
+ uint32_t val_src_pos = src_pos + skipped_leaf_values;
+
// nesting level that is storing actual leaf values
- int const leaf_level_index = s->col.max_nesting_depth - 1;
+ int leaf_level_index = s->col.max_nesting_depth - 1;
- uint32_t const dtype_len = s->dtype_len;
+ uint32_t dtype_len = s->dtype_len;
void* dst =
s->page.nesting[leaf_level_index].data_out + static_cast(dst_pos) * dtype_len;
if (dtype == BYTE_ARRAY) {
- gpuOutputString(s, src_pos, dst);
+ gpuOutputString(s, val_src_pos, dst);
} else if (dtype == BOOLEAN) {
- gpuOutputBoolean(s, src_pos, static_cast(dst));
+ gpuOutputBoolean(s, val_src_pos, static_cast(dst));
} else if (s->col.converted_type == DECIMAL) {
switch (dtype) {
- case INT32: gpuOutputFast(s, src_pos, static_cast(dst)); break;
- case INT64: gpuOutputFast(s, src_pos, static_cast(dst)); break;
+ case INT32: gpuOutputFast(s, val_src_pos, static_cast(dst)); break;
+ case INT64: gpuOutputFast(s, val_src_pos, static_cast(dst)); break;
default:
if (s->dtype_len_in <= sizeof(int32_t)) {
- gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst));
+ gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst));
} else if (s->dtype_len_in <= sizeof(int64_t)) {
- gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast(dst));
+ gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast(dst));
} else {
- gpuOutputFixedLenByteArrayAsInt(s, src_pos, static_cast<__int128_t*>(dst));
+ gpuOutputFixedLenByteArrayAsInt(s, val_src_pos, static_cast<__int128_t*>(dst));
}
break;
}
} else if (dtype == INT96) {
- gpuOutputInt96Timestamp(s, src_pos, static_cast(dst));
+ gpuOutputInt96Timestamp(s, val_src_pos, static_cast(dst));
} else if (dtype_len == 8) {
if (s->ts_scale) {
- gpuOutputInt64Timestamp(s, src_pos, static_cast(dst));
+ gpuOutputInt64Timestamp(s, val_src_pos, static_cast(dst));
} else {
- gpuOutputFast(s, src_pos, static_cast(dst));
+ gpuOutputFast(s, val_src_pos, static_cast(dst));
}
} else if (dtype_len == 4) {
- gpuOutputFast(s, src_pos, static_cast(dst));
+ gpuOutputFast(s, val_src_pos, static_cast(dst));
} else {
- gpuOutputGeneric(s, src_pos, static_cast(dst), dtype_len);
+ gpuOutputGeneric(s, val_src_pos, static_cast(dst), dtype_len);
}
}
@@ -1598,6 +1742,8 @@ void PreprocessColumnData(hostdevice_vector& pages,
std::vector& input_columns,
std::vector& output_columns,
size_t num_rows,
+ size_t min_row,
+ bool uses_custom_row_bounds,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr)
{
@@ -1606,7 +1752,16 @@ void PreprocessColumnData(hostdevice_vector& pages,
// computes:
// PageNestingInfo::size for each level of nesting, for each page.
- gpuComputePageSizes<<>>(pages.device_ptr(), chunks);
+ // This computes the size for the entire page, not taking row bounds into account.
+ // If uses_custom_row_bounds is set to true, we have to do a second pass later that "trims"
+ // the starting and ending read values to account for these bounds.
+ gpuComputePageSizes<<>>(
+ pages.device_ptr(),
+ chunks,
+ // if uses_custom_row_bounds is false, include all possible rows.
+ uses_custom_row_bounds ? min_row : 0,
+ uses_custom_row_bounds ? num_rows : INT_MAX,
+ !uses_custom_row_bounds);
// computes:
// PageInfo::chunk_row for all pages
@@ -1620,6 +1775,16 @@ void PreprocessColumnData(hostdevice_vector& pages,
page_input,
chunk_row_output_iter{pages.device_ptr()});
+ // computes:
+ // PageNestingInfo::size for each level of nesting, for each page, taking row bounds into account.
+ // PageInfo::skipped_values, which tells us where to start decoding in the input .
+ // It is only necessary to do this second pass if uses_custom_row_bounds is set (if the user has
+ // specified artifical bounds).
+ if (uses_custom_row_bounds) {
+ gpuComputePageSizes<<>>(
+ pages.device_ptr(), chunks, min_row, num_rows, true);
+ }
+
// ordering of pages is by input column schema, repeated across row groups. so
// if we had 3 columns, each with 2 pages, and 1 row group, our schema values might look like
//
@@ -1684,11 +1849,13 @@ void PreprocessColumnData(hostdevice_vector& pages,
// Handle a specific corner case. It is possible to construct a parquet file such that
// a column within a row group contains more rows than the row group itself. This may be
// invalid, but we have seen instances of this in the wild, including how they were created
- // using the apache parquet tools. So we need to cap the number of rows we will
- // allocate/read from the file with the amount specified in the associated row group. This
- // only applies to columns that are not children of lists as those may have an arbitrary
- // number of rows in them.
- if (!(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) &&
+ // using the apache parquet tools. Normally, the trim pass would handle this case quietly,
+ // but if we are not running the trim pass (which is most of the time) we need to cap the
+ // number of rows we will allocate/read from the file with the amount specified in the
+ // associated row group. This only applies to columns that are not children of lists as
+ // those may have an arbitrary number of rows in them.
+ if (!uses_custom_row_bounds &&
+ !(out_buf.user_data & PARQUET_COLUMN_BUFFER_FLAG_HAS_LIST_PARENT) &&
size > static_cast(num_rows)) {
size = static_cast(num_rows);
}
@@ -1723,13 +1890,14 @@ void PreprocessColumnData(hostdevice_vector& pages,
void __host__ DecodePageData(hostdevice_vector& pages,
hostdevice_vector const& chunks,
size_t num_rows,
+ size_t min_row,
rmm::cuda_stream_view stream)
{
dim3 dim_block(block_size, 1);
dim3 dim_grid(pages.size(), 1); // 1 threadblock per page
gpuDecodePageData<<>>(
- pages.device_ptr(), chunks, num_rows);
+ pages.device_ptr(), chunks, min_row, num_rows);
}
} // namespace gpu
diff --git a/cpp/src/io/parquet/parquet_gpu.hpp b/cpp/src/io/parquet/parquet_gpu.hpp
index 823cb8fcc2b..610275ee26b 100644
--- a/cpp/src/io/parquet/parquet_gpu.hpp
+++ b/cpp/src/io/parquet/parquet_gpu.hpp
@@ -135,6 +135,19 @@ struct PageInfo {
Encoding definition_level_encoding; // Encoding used for definition levels (data page)
Encoding repetition_level_encoding; // Encoding used for repetition levels (data page)
+ // for nested types, we run a preprocess step in order to determine output
+ // column sizes. Because of this, we can jump directly to the position in the
+ // input data to start decoding instead of reading all of the data and discarding
+ // rows we don't care about.
+ //
+ // NOTE: for flat hierarchies we do not do the preprocess step, so skipped_values and
+ // skipped_leaf_values will always be 0.
+ //
+ // # of values skipped in the repetition/definition level stream
+ int skipped_values;
+ // # of values skipped in the actual data stream.
+ int skipped_leaf_values;
+
// nesting information (input/output) for each page
int num_nesting_levels;
PageNestingInfo* nesting;
@@ -416,6 +429,9 @@ void BuildStringDictionaryIndex(ColumnChunkDesc* chunks,
* @param input_columns Input column information
* @param output_columns Output column information
* @param num_rows Maximum number of rows to read
+ * @param min_rows crop all rows below min_row
+ * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific
+ * bounds
* @param stream Cuda stream
*/
void PreprocessColumnData(hostdevice_vector& pages,
@@ -423,6 +439,8 @@ void PreprocessColumnData(hostdevice_vector& pages,
std::vector& input_columns,
std::vector& output_columns,
size_t num_rows,
+ size_t min_row,
+ bool uses_custom_row_bounds,
rmm::cuda_stream_view stream,
rmm::mr::device_memory_resource* mr);
@@ -435,11 +453,13 @@ void PreprocessColumnData(hostdevice_vector& pages,
* @param[in,out] pages All pages to be decoded
* @param[in] chunks All chunks to be decoded
* @param[in] num_rows Total number of rows to read
+ * @param[in] min_row Minimum number of rows to read
* @param[in] stream CUDA stream to use, default 0
*/
void DecodePageData(hostdevice_vector& pages,
hostdevice_vector const& chunks,
size_t num_rows,
+ size_t min_row,
rmm::cuda_stream_view stream);
/**
diff --git a/cpp/src/io/parquet/reader_impl.cu b/cpp/src/io/parquet/reader_impl.cu
index 6645c31536b..2553b375e72 100644
--- a/cpp/src/io/parquet/reader_impl.cu
+++ b/cpp/src/io/parquet/reader_impl.cu
@@ -540,14 +540,15 @@ class aggregate_reader_metadata {
* @brief Filters and reduces down to a selection of row groups
*
* @param row_groups Lists of row groups to read, one per source
+ * @param row_start Starting row of the selection
+ * @param row_count Total number of rows selected
*
- * @return List of row group info structs and the total number of rows
+ * @return List of row group indexes and its starting row
*/
- [[nodiscard]] std::pair, size_type> select_row_groups(
- std::vector> const& row_groups) const
+ [[nodiscard]] auto select_row_groups(std::vector> const& row_groups,
+ size_type& row_start,
+ size_type& row_count) const
{
- size_type row_count = 0;
-
if (!row_groups.empty()) {
std::vector selection;
CUDF_EXPECTS(row_groups.size() == per_file_metadata.size(),
@@ -564,12 +565,17 @@ class aggregate_reader_metadata {
row_count += get_row_group(rowgroup_idx, src_idx).num_rows;
}
}
- return {selection, row_count};
+ return selection;
}
- row_count = static_cast(
- std::min(get_num_rows(), std::numeric_limits::max()));
+ row_start = std::max(row_start, 0);
+ if (row_count < 0) {
+ row_count = static_cast(
+ std::min(get_num_rows(), std::numeric_limits::max()));
+ }
+ row_count = min(row_count, get_num_rows() - row_start);
CUDF_EXPECTS(row_count >= 0, "Invalid row count");
+ CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start");
std::vector selection;
size_type count = 0;
@@ -577,12 +583,14 @@ class aggregate_reader_metadata {
for (size_t rg_idx = 0; rg_idx < per_file_metadata[src_idx].row_groups.size(); ++rg_idx) {
auto const chunk_start_row = count;
count += get_row_group(rg_idx, src_idx).num_rows;
- selection.emplace_back(rg_idx, chunk_start_row, src_idx);
- if (count >= row_count) { break; }
+ if (count > row_start || count == 0) {
+ selection.emplace_back(rg_idx, chunk_start_row, src_idx);
+ }
+ if (count >= row_start + row_count) { break; }
}
}
- return {selection, row_count};
+ return selection;
}
/**
@@ -1342,7 +1350,9 @@ void reader::impl::allocate_nesting_info(hostdevice_vector
*/
void reader::impl::preprocess_columns(hostdevice_vector& chunks,
hostdevice_vector& pages,
- size_t num_rows,
+ size_t min_row,
+ size_t total_rows,
+ bool uses_custom_row_bounds,
bool has_lists)
{
// TODO : we should be selectively preprocessing only columns that have
@@ -1355,15 +1365,22 @@ void reader::impl::preprocess_columns(hostdevice_vector& c
[&](std::vector& cols) {
for (size_t idx = 0; idx < cols.size(); idx++) {
auto& col = cols[idx];
- col.create(num_rows, _stream, _mr);
+ col.create(total_rows, _stream, _mr);
create_columns(col.children);
}
};
create_columns(_output_columns);
} else {
// preprocess per-nesting level sizes by page
- gpu::PreprocessColumnData(
- pages, chunks, _input_columns, _output_columns, num_rows, _stream, _mr);
+ gpu::PreprocessColumnData(pages,
+ chunks,
+ _input_columns,
+ _output_columns,
+ total_rows,
+ min_row,
+ uses_custom_row_bounds,
+ _stream,
+ _mr);
_stream.synchronize();
}
}
@@ -1374,6 +1391,7 @@ void reader::impl::preprocess_columns(hostdevice_vector& c
void reader::impl::decode_page_data(hostdevice_vector& chunks,
hostdevice_vector& pages,
hostdevice_vector& page_nesting,
+ size_t min_row,
size_t total_rows)
{
auto is_dict_chunk = [](const gpu::ColumnChunkDesc& chunk) {
@@ -1495,7 +1513,7 @@ void reader::impl::decode_page_data(hostdevice_vector& chu
gpu::BuildStringDictionaryIndex(chunks.device_ptr(), chunks.size(), _stream);
}
- gpu::DecodePageData(pages, chunks, total_rows, _stream);
+ gpu::DecodePageData(pages, chunks, total_rows, min_row, _stream);
pages.device_to_host(_stream);
page_nesting.device_to_host(_stream);
_stream.synchronize();
@@ -1587,10 +1605,14 @@ reader::impl::impl(std::vector>&& sources,
_timestamp_type.id());
}
-table_with_metadata reader::impl::read(std::vector> const& row_group_list)
+table_with_metadata reader::impl::read(size_type skip_rows,
+ size_type num_rows,
+ bool uses_custom_row_bounds,
+ std::vector> const& row_group_list)
{
// Select only row groups required
- const auto [selected_row_groups, num_rows] = _metadata->select_row_groups(row_group_list);
+ const auto selected_row_groups =
+ _metadata->select_row_groups(row_group_list, skip_rows, num_rows);
table_metadata out_metadata;
@@ -1732,10 +1754,10 @@ table_with_metadata reader::impl::read(std::vector> const
//
// - for nested schemas, output buffer offset values per-page, per nesting-level for the
// purposes of decoding.
- preprocess_columns(chunks, pages, num_rows, has_lists);
+ preprocess_columns(chunks, pages, skip_rows, num_rows, uses_custom_row_bounds, has_lists);
// decoding of column data itself
- decode_page_data(chunks, pages, page_nesting_info, num_rows);
+ decode_page_data(chunks, pages, page_nesting_info, skip_rows, num_rows);
// create the final output cudf columns
for (size_t i = 0; i < _output_columns.size(); ++i) {
@@ -1786,7 +1808,12 @@ reader::~reader() = default;
// Forward to implementation
table_with_metadata reader::read(parquet_reader_options const& options)
{
- return _impl->read(options.get_row_groups());
+ // if the user has specified custom row bounds
+ bool const uses_custom_row_bounds = options.get_num_rows() >= 0 || options.get_skip_rows() != 0;
+ return _impl->read(options.get_skip_rows(),
+ options.get_num_rows(),
+ uses_custom_row_bounds,
+ options.get_row_groups());
}
} // namespace parquet
diff --git a/cpp/src/io/parquet/reader_impl.hpp b/cpp/src/io/parquet/reader_impl.hpp
index b46fe042a13..e1f275bb8e8 100644
--- a/cpp/src/io/parquet/reader_impl.hpp
+++ b/cpp/src/io/parquet/reader_impl.hpp
@@ -69,11 +69,18 @@ class reader::impl {
/**
* @brief Read an entire set or a subset of data and returns a set of columns
*
+ * @param skip_rows Number of rows to skip from the start
+ * @param num_rows Number of rows to read
+ * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific
+ * bounds
* @param row_group_indices Lists of row groups to read, one per source
*
* @return The set of columns along with metadata
*/
- table_with_metadata read(std::vector> const& row_group_indices);
+ table_with_metadata read(size_type skip_rows,
+ size_type num_rows,
+ bool uses_custom_row_bounds,
+ std::vector> const& row_group_indices);
private:
/**
@@ -152,13 +159,18 @@ class reader::impl {
*
* @param chunks All chunks to be decoded
* @param pages All pages to be decoded
- * @param num_rows The number of rows to be decoded
+ * @param min_rows crop all rows below min_row
+ * @param total_rows Maximum number of rows to read
+ * @param uses_custom_row_bounds Whether or not num_rows and min_rows represents user-specific
+ * bounds
* @param has_lists Whether or not this data contains lists and requires
* a preprocess.
*/
void preprocess_columns(hostdevice_vector& chunks,
hostdevice_vector& pages,
- size_t num_rows,
+ size_t min_row,
+ size_t total_rows,
+ bool uses_custom_row_bounds,
bool has_lists);
/**
@@ -167,11 +179,13 @@ class reader::impl {
* @param chunks List of column chunk descriptors
* @param pages List of page information
* @param page_nesting Page nesting array
+ * @param min_row Minimum number of rows from start
* @param total_rows Number of rows to output
*/
void decode_page_data(hostdevice_vector& chunks,
hostdevice_vector& pages,
hostdevice_vector& page_nesting,
+ size_t min_row,
size_t total_rows);
private:
diff --git a/cpp/tests/io/parquet_test.cpp b/cpp/tests/io/parquet_test.cpp
index b65488e9e43..c5000bc0add 100644
--- a/cpp/tests/io/parquet_test.cpp
+++ b/cpp/tests/io/parquet_test.cpp
@@ -2483,6 +2483,213 @@ TEST_F(ParquetWriterStressTest, DeviceWriteLargeTableWithValids)
CUDF_TEST_EXPECT_TABLES_EQUAL(custom_tbl.tbl->view(), expected->view());
}
+TEST_F(ParquetReaderTest, UserBounds)
+{
+ // trying to read more rows than there are should result in
+ // receiving the properly capped # of rows
+ {
+ srand(31337);
+ auto expected = create_random_fixed_table(4, 4, false);
+
+ auto filepath = temp_env->get_temp_filepath("TooManyRows.parquet");
+ cudf_io::parquet_writer_options args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected);
+ cudf_io::write_parquet(args);
+
+ // attempt to read more rows than there actually are
+ cudf_io::parquet_reader_options read_opts =
+ cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(16);
+ auto result = cudf_io::read_parquet(read_opts);
+
+ // we should only get back 4 rows
+ EXPECT_EQ(result.tbl->view().column(0).size(), 4);
+ }
+
+ // trying to read past the end of the # of actual rows should result
+ // in empty columns.
+ {
+ srand(31337);
+ auto expected = create_random_fixed_table(4, 4, false);
+
+ auto filepath = temp_env->get_temp_filepath("PastBounds.parquet");
+ cudf_io::parquet_writer_options args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected);
+ cudf_io::write_parquet(args);
+
+ // attempt to read more rows than there actually are
+ cudf_io::parquet_reader_options read_opts =
+ cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).skip_rows(4);
+ auto result = cudf_io::read_parquet(read_opts);
+
+ // we should get empty columns back
+ EXPECT_EQ(result.tbl->view().num_columns(), 4);
+ EXPECT_EQ(result.tbl->view().column(0).size(), 0);
+ }
+
+ // trying to read 0 rows should result in reading the whole file
+ // at the moment we get back 4. when that bug gets fixed, this
+ // test can be flipped.
+ {
+ srand(31337);
+ auto expected = create_random_fixed_table(4, 4, false);
+
+ auto filepath = temp_env->get_temp_filepath("ZeroRows.parquet");
+ cudf_io::parquet_writer_options args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected);
+ cudf_io::write_parquet(args);
+
+ // attempt to read more rows than there actually are
+ cudf_io::parquet_reader_options read_opts =
+ cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath}).num_rows(0);
+ auto result = cudf_io::read_parquet(read_opts);
+
+ EXPECT_EQ(result.tbl->view().num_columns(), 4);
+ EXPECT_EQ(result.tbl->view().column(0).size(), 0);
+ }
+
+ // trying to read 0 rows past the end of the # of actual rows should result
+ // in empty columns.
+ {
+ srand(31337);
+ auto expected = create_random_fixed_table(4, 4, false);
+
+ auto filepath = temp_env->get_temp_filepath("ZeroRowsPastBounds.parquet");
+ cudf_io::parquet_writer_options args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, *expected);
+ cudf_io::write_parquet(args);
+
+ // attempt to read more rows than there actually are
+ cudf_io::parquet_reader_options read_opts =
+ cudf_io::parquet_reader_options::builder(cudf_io::source_info{filepath})
+ .skip_rows(4)
+ .num_rows(0);
+ auto result = cudf_io::read_parquet(read_opts);
+
+ // we should get empty columns back
+ EXPECT_EQ(result.tbl->view().num_columns(), 4);
+ EXPECT_EQ(result.tbl->view().column(0).size(), 0);
+ }
+}
+
+TEST_F(ParquetReaderTest, UserBoundsWithNulls)
+{
+ // clang-format off
+ cudf::test::fixed_width_column_wrapper col{{1,1,1,1,1,1,1,1, 2,2,2,2,2,2,2,2, 3,3,3,3,3,3,3,3, 4,4,4,4,4,4,4,4, 5,5,5,5,5,5,5,5, 6,6,6,6,6,6,6,6, 7,7,7,7,7,7,7,7, 8,8,8,8,8,8,8,8}
+ ,{1,1,1,0,0,0,1,1, 1,1,1,1,1,1,1,1, 0,0,0,0,0,0,0,0, 1,1,1,1,1,1,0,0, 1,0,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,1, 1,1,1,1,1,1,1,0}};
+ // clang-format on
+ cudf::table_view tbl({col});
+ auto filepath = temp_env->get_temp_filepath("UserBoundsWithNulls.parquet");
+ cudf_io::parquet_writer_options out_args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl);
+ cudf_io::write_parquet(out_args);
+
+ // skip_rows / num_rows
+ // clang-format off
+ std::vector> params{ {-1, -1}, {1, 3}, {3, -1},
+ {31, -1}, {32, -1}, {33, -1},
+ {31, 5}, {32, 5}, {33, 5},
+ {-1, 7}, {-1, 31}, {-1, 32}, {-1, 33},
+ {62, -1}, {63, -1},
+ {62, 2}, {63, 1}};
+ // clang-format on
+ for (auto p : params) {
+ cudf_io::parquet_reader_options read_args =
+ cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath});
+ if (p.first >= 0) { read_args.set_skip_rows(p.first); }
+ if (p.second >= 0) { read_args.set_num_rows(p.second); }
+ auto result = cudf_io::read_parquet(read_args);
+
+ p.first = p.first < 0 ? 0 : p.first;
+ p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second;
+ std::vector slice_indices{p.first, p.first + p.second};
+ auto expected = cudf::slice(col, slice_indices);
+
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]);
+ }
+}
+
+TEST_F(ParquetReaderTest, UserBoundsWithNullsLarge)
+{
+ constexpr int num_rows = 30 * 1000000;
+
+ std::mt19937 gen(6747);
+ std::bernoulli_distribution bn(0.7f);
+ auto valids =
+ cudf::detail::make_counting_transform_iterator(0, [&](int index) { return bn(gen); });
+ auto values = thrust::make_counting_iterator(0);
+
+ cudf::test::fixed_width_column_wrapper col(values, values + num_rows, valids);
+
+ // this file will have row groups of 1,000,000 each
+ cudf::table_view tbl({col});
+ auto filepath = temp_env->get_temp_filepath("UserBoundsWithNullsLarge.parquet");
+ cudf_io::parquet_writer_options out_args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl);
+ cudf_io::write_parquet(out_args);
+
+ // skip_rows / num_rows
+ // clang-format off
+ std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {1613470, -1}, {1999999, -1},
+ {31, 1}, {32, 1}, {33, 1},
+ // deliberately span some row group boundaries
+ {999000, 1001}, {999000, 2000}, {2999999, 2}, {13999997, -1},
+ {16785678, 3}, {22996176, 31},
+ {24001231, 17}, {29000001, 989999}, {29999999, 1} };
+ // clang-format on
+ for (auto p : params) {
+ cudf_io::parquet_reader_options read_args =
+ cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath});
+ if (p.first >= 0) { read_args.set_skip_rows(p.first); }
+ if (p.second >= 0) { read_args.set_num_rows(p.second); }
+ auto result = cudf_io::read_parquet(read_args);
+
+ p.first = p.first < 0 ? 0 : p.first;
+ p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second;
+ std::vector slice_indices{p.first, p.first + p.second};
+ auto expected = cudf::slice(col, slice_indices);
+
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]);
+ }
+}
+
+TEST_F(ParquetReaderTest, ListUserBoundsWithNullsLarge)
+{
+ constexpr int num_rows = 5 * 1000000;
+ auto colp = make_parquet_list_col(0, num_rows, 5, 8, true);
+ cudf::column_view col = *colp;
+
+ // this file will have row groups of 1,000,000 each
+ cudf::table_view tbl({col});
+ auto filepath = temp_env->get_temp_filepath("ListUserBoundsWithNullsLarge.parquet");
+ cudf_io::parquet_writer_options out_args =
+ cudf_io::parquet_writer_options::builder(cudf_io::sink_info{filepath}, tbl);
+ cudf_io::write_parquet(out_args);
+
+ // skip_rows / num_rows
+ // clang-format off
+ std::vector> params{ {-1, -1}, {31, -1}, {32, -1}, {33, -1}, {161470, -1}, {4499997, -1},
+ {31, 1}, {32, 1}, {33, 1},
+ // deliberately span some row group boundaries
+ {999000, 1001}, {999000, 2000}, {2999999, 2},
+ {1678567, 3}, {4299676, 31},
+ {4001231, 17}, {1900000, 989999}, {4999999, 1} };
+ // clang-format on
+ for (auto p : params) {
+ cudf_io::parquet_reader_options read_args =
+ cudf::io::parquet_reader_options::builder(cudf_io::source_info{filepath});
+ if (p.first >= 0) { read_args.set_skip_rows(p.first); }
+ if (p.second >= 0) { read_args.set_num_rows(p.second); }
+ auto result = cudf_io::read_parquet(read_args);
+
+ p.first = p.first < 0 ? 0 : p.first;
+ p.second = p.second < 0 ? static_cast(col).size() - p.first : p.second;
+ std::vector slice_indices{p.first, p.first + p.second};
+ auto expected = cudf::slice(col, slice_indices);
+
+ CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(0), expected[0]);
+ }
+}
+
TEST_F(ParquetReaderTest, ReorderedColumns)
{
{