Skip to content

Commit

Permalink
Fix ORC reading of files with struct columns that have null values (#…
Browse files Browse the repository at this point in the history
…9005)

Fixes #8910

Number of values in the null stream of a child column depends on the number of valid elements in the parent column.

This PR changes the reading logic to account for the number of parent null values when parsing child null streams.
Namely, the output row is offset by the number of null values in the parent column, in all previous stripes. To allow efficient parsing, null counts are inclusive_scan'd before the columns in the level are parsed.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)

Approvers:
  - Mike Wilson (https://github.com/hyperbolic2346)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

URL: #9005
  • Loading branch information
vuule authored Aug 17, 2021
1 parent abe57f8 commit 62c9312
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 13 deletions.
1 change: 1 addition & 0 deletions cpp/src/io/orc/orc_gpu.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ struct ColumnDesc {
int32_t decimal_scale; // number of fractional decimal digits for decimal type
int32_t ts_clock_rate; // output timestamp clock frequency (0=default, 1000=ms, 1000000000=ns)
column_validity_info parent_validity_info; // consists of parent column valid_map and null count
uint32_t* parent_null_count_prefix_sums; // per-stripe prefix sums of parent column's null count
};

/**
Expand Down
69 changes: 62 additions & 7 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,49 @@ void update_null_mask(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks
}
}

/**
* @brief Compute the per-stripe prefix sum of null count, for each struct column in the current
* layer.
*/
void scan_null_counts(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc> const& chunks,
cudf::host_span<rmm::device_uvector<uint32_t>> prefix_sums,
rmm::cuda_stream_view stream)
{
auto const num_stripes = chunks.size().first;
if (num_stripes == 0) return;

auto const num_columns = chunks.size().second;
std::vector<thrust::pair<size_type, cudf::device_span<uint32_t>>> prefix_sums_to_update;
for (auto col_idx = 0ul; col_idx < num_columns; ++col_idx) {
// Null counts sums are only needed for children of struct columns
if (chunks[0][col_idx].type_kind == STRUCT) {
prefix_sums_to_update.emplace_back(col_idx, prefix_sums[col_idx]);
}
}
auto const d_prefix_sums_to_update =
cudf::detail::make_device_uvector_async(prefix_sums_to_update, stream);

thrust::for_each(rmm::exec_policy(stream),
d_prefix_sums_to_update.begin(),
d_prefix_sums_to_update.end(),
[chunks = cudf::detail::device_2dspan<gpu::ColumnDesc const>{chunks}] __device__(
auto const& idx_psums) {
auto const col_idx = idx_psums.first;
auto const psums = idx_psums.second;

thrust::transform(
thrust::seq,
thrust::make_counting_iterator(0),
thrust::make_counting_iterator(0) + psums.size(),
psums.begin(),
[&](auto stripe_idx) { return chunks[stripe_idx][col_idx].null_count; });

thrust::inclusive_scan(thrust::seq, psums.begin(), psums.end(), psums.begin());
});
// `prefix_sums_to_update` goes out of scope, copy has to be done before we return
stream.synchronize();
}

void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::ColumnDesc>& chunks,
size_t num_dicts,
size_t skip_rows,
Expand Down Expand Up @@ -817,8 +860,6 @@ void reader::impl::decode_stream_data(cudf::detail::hostdevice_2dvector<gpu::Col
[&](auto null_count, auto const stripe_idx) {
return null_count + chunks[stripe_idx][col_idx].null_count;
});
// Add parent null count in case this is a child column of a struct
out_buffers[col_idx].null_count() += chunks[0][col_idx].parent_validity_info.null_count;
});
}

Expand All @@ -841,6 +882,7 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDes
num_child_rows.resize(_selected_columns[level + 1].size());
std::fill(num_child_rows.begin(), num_child_rows.end(), 0);
parent_column_data.resize(number_of_child_chunks);
_col_meta.parent_column_index.resize(number_of_child_chunks);
_col_meta.child_start_row.resize(number_of_child_chunks);
_col_meta.num_child_rows_per_stripe.resize(number_of_child_chunks);
_col_meta.rwgrp_meta.resize(num_of_rowgroups * num_child_cols);
Expand Down Expand Up @@ -899,7 +941,8 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan<gpu::ColumnDes
auto num_rows = out_buffers[parent_col_idx].size;

for (uint32_t id = 0; id < p_col.num_children; id++) {
const auto child_col_idx = index + id;
const auto child_col_idx = index + id;
_col_meta.parent_column_index[child_col_idx] = parent_col_idx;
if (type == type_id::STRUCT) {
parent_column_data[child_col_idx] = {parent_valid_map, parent_null_count};
// Number of rows in child will remain same as parent in case of struct column
Expand Down Expand Up @@ -1042,6 +1085,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
std::vector<std::vector<column_buffer>> out_buffers(_selected_columns.size());
std::vector<column_name_info> schema_info;
std::vector<std::vector<rmm::device_buffer>> lvl_stripe_data(_selected_columns.size());
std::vector<std::vector<rmm::device_uvector<uint32_t>>> null_count_prefix_sums;
table_metadata out_metadata;

// There are no columns in the table
Expand Down Expand Up @@ -1124,6 +1168,14 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Logically view streams as columns
std::vector<orc_stream_info> stream_info;

null_count_prefix_sums.emplace_back();
null_count_prefix_sums.back().reserve(_selected_columns[level].size());
std::generate_n(
std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() {
return cudf::detail::make_zeroed_device_uvector_async<uint32_t>(total_num_stripes,
stream);
});

// Tracker for eventually deallocating compressed and uncompressed data
auto& stripe_data = lvl_stripe_data[level];

Expand Down Expand Up @@ -1207,10 +1259,12 @@ table_with_metadata reader::impl::read(size_type skip_rows,
? stripe_info->numberOfRows
: _col_meta.num_child_rows_per_stripe[stripe_idx * num_columns + col_idx];
chunk.column_num_rows = (level == 0) ? num_rows : _col_meta.num_child_rows[col_idx];
chunk.parent_validity_info.valid_map_base =
(level == 0) ? nullptr : _col_meta.parent_column_data[col_idx].valid_map_base;
chunk.parent_validity_info.null_count =
(level == 0) ? 0 : _col_meta.parent_column_data[col_idx].null_count;
chunk.parent_validity_info =
(level == 0) ? column_validity_info{} : _col_meta.parent_column_data[col_idx];
chunk.parent_null_count_prefix_sums =
(level == 0)
? nullptr
: null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data();
chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind;
chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx]
.ff.types[selected_columns[col_idx].id]
Expand Down Expand Up @@ -1336,6 +1390,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,

// Extract information to process nested child columns
if (nested_col.size()) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
row_groups.device_to_host(stream, true);
aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level);
}
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct reader_column_meta {

std::vector<column_validity_info>
parent_column_data; // consists of parent column valid_map and null count
std::vector<size_type> parent_column_index;

std::vector<uint32_t> child_start_row; // start row of child columns [stripe][column]
std::vector<uint32_t>
Expand Down
17 changes: 13 additions & 4 deletions cpp/src/io/orc/stripe_data.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1167,8 +1167,17 @@ __global__ void __launch_bounds__(block_size)
// No present stream: all rows are valid
s->vals.u32[t] = ~0;
}
while (s->top.nulls_desc_row < s->chunk.num_rows) {
uint32_t nrows_max = min(s->chunk.num_rows - s->top.nulls_desc_row, blockDim.x * 32);
auto const prev_parent_null_count =
(s->chunk.parent_null_count_prefix_sums != nullptr && stripe > 0)
? s->chunk.parent_null_count_prefix_sums[stripe - 1]
: 0;
auto const parent_null_count =
(s->chunk.parent_null_count_prefix_sums != nullptr)
? s->chunk.parent_null_count_prefix_sums[stripe] - prev_parent_null_count
: 0;
auto const num_elems = s->chunk.num_rows - parent_null_count;
while (s->top.nulls_desc_row < num_elems) {
uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32);
uint32_t nrows;
size_t row_in;

Expand All @@ -1187,7 +1196,7 @@ __global__ void __launch_bounds__(block_size)
}
__syncthreads();

row_in = s->chunk.start_row + s->top.nulls_desc_row;
row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count;
if (row_in + nrows > first_row && row_in < first_row + max_num_rows &&
s->chunk.valid_map_base != NULL) {
int64_t dst_row = row_in - first_row;
Expand Down Expand Up @@ -1251,7 +1260,7 @@ __global__ void __launch_bounds__(block_size)
// Sum up the valid counts and infer null_count
null_count = block_reduce(temp_storage.bk_storage).Sum(null_count);
if (t == 0) {
chunks[chunk_id].null_count = null_count;
chunks[chunk_id].null_count = parent_null_count + null_count;
chunks[chunk_id].skip_count = s->chunk.skip_count;
}
} else {
Expand Down
4 changes: 2 additions & 2 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ def test_orc_string_stream_offset_issue():


# Data is generated using pyorc module
def generate_list_struct_buff(size=28000):
def generate_list_struct_buff(size=100_000):
rd = random.Random(1)
np.random.seed(seed=1)

Expand Down Expand Up @@ -963,7 +963,7 @@ def generate_list_struct_buff(size=28000):
["lvl2_struct", "lvl1_struct"],
],
)
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 28000])
@pytest.mark.parametrize("num_rows", [0, 15, 1005, 10561, 100_000])
@pytest.mark.parametrize("use_index", [True, False])
def test_lists_struct_nests(
columns, num_rows, use_index,
Expand Down

0 comments on commit 62c9312

Please sign in to comment.