Skip to content

Commit

Permalink
Add support for reading ORC file with no row group index (rapidsai#9060)
Browse files Browse the repository at this point in the history
The ORC reader in cuIO was designed thinking row group index is always available, which resulted in the failure.
Changes have been made to read ORC files even in case group index stream is not available. 

closes rapidsai#8878

Authors:
  - Ram (Ramakrishna Prabhu) (https://github.com/rgsl888prabhu)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Devavret Makkar (https://github.com/devavret)
  - Vukasin Milovanovic (https://github.com/vuule)
  - https://github.com/nvdbaranec

URL: rapidsai#9060
  • Loading branch information
rgsl888prabhu authored and firestarman committed Sep 1, 2021
1 parent 398d9e3 commit 4eed410
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
51 changes: 36 additions & 15 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ class aggregate_orc_metadata {
size_type const num_rows;
size_type const num_columns;
size_type const num_stripes;
bool row_grp_idx_present = true;

/**
* @brief Create a metadata object from each element in the source vector
Expand Down Expand Up @@ -370,6 +371,8 @@ class aggregate_orc_metadata {
return per_file_metadata[source_idx].get_column_name(column_idx);
}

auto is_row_grp_idx_present() const { return row_grp_idx_present; }

std::vector<cudf::io::orc::metadata::stripe_source_mapping> select_stripes(
std::vector<std::vector<size_type>> const& user_specified_stripes,
size_type& row_start,
Expand Down Expand Up @@ -459,6 +462,7 @@ class aggregate_orc_metadata {
ProtobufReader(sf_data, sf_length)
.read(per_file_metadata[mapping.source_idx].stripefooters[i]);
mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i];
if (stripe->indexLength == 0) { row_grp_idx_present = false; }
}
}
}
Expand Down Expand Up @@ -1140,6 +1144,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Association between each ORC column and its cudf::column
_col_meta.orc_col_map.emplace_back(_metadata->get_num_cols(), -1);
std::vector<orc_column_meta> nested_col;
bool is_data_empty = false;

// Get a list of column data types
std::vector<data_type> column_types;
Expand Down Expand Up @@ -1197,6 +1202,8 @@ table_with_metadata reader::impl::read(size_type skip_rows,

const bool use_index =
(_use_index == true) &&
// Do stripes have row group index
_metadata->is_row_grp_idx_present() &&
// Only use if we don't have much work with complete columns & stripes
// TODO: Consider nrows, gpu, and tune the threshold
(num_rows > _metadata->get_row_index_stride() && !(_metadata->get_row_index_stride() & 7) &&
Expand Down Expand Up @@ -1244,13 +1251,21 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stream_info,
level == 0);

CUDF_EXPECTS(total_data_size > 0, "Expected streams data within stripe");
if (total_data_size == 0) {
CUDF_EXPECTS(stripe_info->indexLength == 0, "Invalid index rowgroup stream data");
// In case ROW GROUP INDEX is not present and all columns are structs with no null
// stream, there is nothing to read at this level.
auto fn_check_dtype = [](auto dtype) { return dtype.id() == type_id::STRUCT; };
CUDF_EXPECTS(std::all_of(column_types.begin(), column_types.end(), fn_check_dtype),
"Expected streams data within stripe");
is_data_empty = true;
}

stripe_data.emplace_back(total_data_size, stream);
auto dst_base = static_cast<uint8_t*>(stripe_data.back().data());

// Coalesce consecutive streams into one read
while (stream_count < stream_info.size()) {
while (not is_data_empty and stream_count < stream_info.size()) {
const auto d_dst = dst_base + stream_info[stream_count].dst_pos;
const auto offset = stream_info[stream_count].offset;
auto len = stream_info[stream_count].length;
Expand Down Expand Up @@ -1332,8 +1347,10 @@ table_with_metadata reader::impl::read(size_type skip_rows,
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.ts_clock_rate = to_clockrate(_timestamp_type.id());
}
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
if (not is_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
}
}
}
stripe_start_row += num_rows_per_stripe;
Expand Down Expand Up @@ -1367,7 +1384,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
});
}
// Setup row group descriptors if using indexes
if (_metadata->per_file_metadata[0].ps.compression != orc::NONE) {
if (_metadata->per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) {
auto decomp_data =
decompress_stripe_data(chunks,
stripe_data,
Expand Down Expand Up @@ -1418,19 +1435,23 @@ table_with_metadata reader::impl::read(size_type skip_rows,
out_buffers[level].emplace_back(column_types[i], n_rows, is_nullable, stream, _mr);
}

decode_stream_data(chunks,
num_dict_entries,
skip_rows,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers[level],
level,
stream);
if (not is_data_empty) {
decode_stream_data(chunks,
num_dict_entries,
skip_rows,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers[level],
level,
stream);
}

// Extract information to process nested child columns
if (nested_col.size()) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
if (not is_data_empty) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
}
row_groups.device_to_host(stream, true);
aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level);
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
18 changes: 18 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1345,3 +1345,21 @@ def test_writer_timestamp_stream_size(datadir, tmpdir):
got = pa.orc.ORCFile(gdf_fname).read().to_pandas()

assert_eq(expect, got)


@pytest.mark.parametrize(
"fname",
[
"TestOrcFile.NoIndStrm.StructWithNoNulls.orc",
"TestOrcFile.NoIndStrm.StructAndIntWithNulls.orc",
"TestOrcFile.NoIndStrm.StructAndIntWithNulls.TwoStripes.orc",
"TestOrcFile.NoIndStrm.IntWithNulls.orc",
],
)
def test_no_row_group_index_orc_read(datadir, fname):
fpath = datadir / fname

got = pa.orc.ORCFile(fpath).read()
expect = cudf.read_orc(fpath)

assert got.equals(expect.to_arrow())

0 comments on commit 4eed410

Please sign in to comment.