Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for reading ORC file with no row group index #9060

Merged
merged 30 commits into from
Aug 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
710e2f6
code changes and test cases
rgsl888prabhu Jul 9, 2021
c2efba9
test file
rgsl888prabhu Jul 9, 2021
73df774
review changes and doc
rgsl888prabhu Jul 9, 2021
896fc8e
primary changes
rgsl888prabhu Jul 17, 2021
6618164
primary changes
rgsl888prabhu Jul 18, 2021
3494a79
test cases working
rgsl888prabhu Jul 19, 2021
97ad267
clean-up
rgsl888prabhu Jul 19, 2021
1bd9607
added test case and docs
rgsl888prabhu Jul 19, 2021
61e9247
Merge branch 'branch-21.08' of https://github.com/rapidsai/cudf into …
rgsl888prabhu Jul 20, 2021
1499459
Fixes for pyspark generated orc file reading
rgsl888prabhu Jul 21, 2021
5546b32
changes
rgsl888prabhu Jul 21, 2021
bcc7dcb
changes
rgsl888prabhu Jul 21, 2021
470b7c0
review changes
rgsl888prabhu Jul 22, 2021
40c8115
review changes
rgsl888prabhu Jul 22, 2021
31a2c51
review changes
rgsl888prabhu Jul 22, 2021
2d3842a
review changes
rgsl888prabhu Jul 22, 2021
bf57b6b
review changes
rgsl888prabhu Jul 22, 2021
723c788
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_stru…
rgsl888prabhu Aug 2, 2021
400a4a3
changes
rgsl888prabhu Aug 18, 2021
311ed2a
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_stru…
rgsl888prabhu Aug 18, 2021
3c75f87
changes
rgsl888prabhu Aug 18, 2021
117347e
adding missing test file
rgsl888prabhu Aug 18, 2021
72de035
Merge branch 'branch-21.10' of github.com:rapidsai/cudf into orc_stru…
rgsl888prabhu Aug 19, 2021
156c659
testing
rgsl888prabhu Aug 19, 2021
2bebc3e
testing
rgsl888prabhu Aug 19, 2021
d09c855
testing prints
rgsl888prabhu Aug 20, 2021
860757e
some more testing
rgsl888prabhu Aug 20, 2021
e15ac38
changes and removing prints
rgsl888prabhu Aug 20, 2021
a21d9b8
review changes
rgsl888prabhu Aug 20, 2021
3c51111
review changes
rgsl888prabhu Aug 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 36 additions & 15 deletions cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ class aggregate_orc_metadata {
size_type const num_rows;
size_type const num_columns;
size_type const num_stripes;
bool row_grp_idx_present = true;

/**
* @brief Create a metadata object from each element in the source vector
Expand Down Expand Up @@ -368,6 +369,8 @@ class aggregate_orc_metadata {
return per_file_metadata[source_idx].get_column_name(column_idx);
}

auto is_row_grp_idx_present() const { return row_grp_idx_present; }

std::vector<cudf::io::orc::metadata::stripe_source_mapping> select_stripes(
std::vector<std::vector<size_type>> const& user_specified_stripes,
size_type& row_start,
Expand Down Expand Up @@ -457,6 +460,7 @@ class aggregate_orc_metadata {
ProtobufReader(sf_data, sf_length)
.read(per_file_metadata[mapping.source_idx].stripefooters[i]);
mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i];
if (stripe->indexLength == 0) { row_grp_idx_present = false; }
vuule marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -1101,6 +1105,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
// Association between each ORC column and its cudf::column
_col_meta.orc_col_map.emplace_back(_metadata->get_num_cols(), -1);
std::vector<orc_column_meta> nested_col;
bool is_data_empty = false;

// Get a list of column data types
std::vector<data_type> column_types;
Expand Down Expand Up @@ -1157,6 +1162,8 @@ table_with_metadata reader::impl::read(size_type skip_rows,

const bool use_index =
(_use_index == true) &&
// Do stripes have row group index
_metadata->is_row_grp_idx_present() &&
// Only use if we don't have much work with complete columns & stripes
// TODO: Consider nrows, gpu, and tune the threshold
(num_rows > _metadata->get_row_index_stride() && !(_metadata->get_row_index_stride() & 7) &&
Expand Down Expand Up @@ -1204,13 +1211,21 @@ table_with_metadata reader::impl::read(size_type skip_rows,
stream_info,
level == 0);

CUDF_EXPECTS(total_data_size > 0, "Expected streams data within stripe");
if (total_data_size == 0) {
CUDF_EXPECTS(stripe_info->indexLength == 0, "Invalid index rowgroup stream data");
// In case ROW GROUP INDEX is not present and all columns are structs with no null
// stream, there is nothing to read at this level.
auto fn_check_dtype = [](auto dtype) { return dtype.id() == type_id::STRUCT; };
CUDF_EXPECTS(std::all_of(column_types.begin(), column_types.end(), fn_check_dtype),
"Expected streams data within stripe");
is_data_empty = true;
}

stripe_data.emplace_back(total_data_size, stream);
auto dst_base = static_cast<uint8_t*>(stripe_data.back().data());

// Coalesce consecutive streams into one read
while (stream_count < stream_info.size()) {
while (not is_data_empty and stream_count < stream_info.size()) {
const auto d_dst = dst_base + stream_info[stream_count].dst_pos;
const auto offset = stream_info[stream_count].offset;
auto len = stream_info[stream_count].length;
Expand Down Expand Up @@ -1292,8 +1307,10 @@ table_with_metadata reader::impl::read(size_type skip_rows,
if (chunk.type_kind == orc::TIMESTAMP) {
chunk.ts_clock_rate = to_clockrate(_timestamp_type.id());
}
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
if (not is_data_empty) {
for (int k = 0; k < gpu::CI_NUM_STREAMS; k++) {
chunk.streams[k] = dst_base + stream_info[chunk.strm_id[k]].dst_pos;
}
}
}
stripe_start_row += num_rows_per_stripe;
Expand Down Expand Up @@ -1327,7 +1344,7 @@ table_with_metadata reader::impl::read(size_type skip_rows,
});
}
// Setup row group descriptors if using indexes
if (_metadata->per_file_metadata[0].ps.compression != orc::NONE) {
if (_metadata->per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) {
auto decomp_data =
decompress_stripe_data(chunks,
stripe_data,
Expand Down Expand Up @@ -1378,19 +1395,23 @@ table_with_metadata reader::impl::read(size_type skip_rows,
out_buffers[level].emplace_back(column_types[i], n_rows, is_nullable, stream, _mr);
}

decode_stream_data(chunks,
num_dict_entries,
skip_rows,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers[level],
level,
stream);
if (not is_data_empty) {
decode_stream_data(chunks,
num_dict_entries,
skip_rows,
tz_table.view(),
row_groups,
_metadata->get_row_index_stride(),
out_buffers[level],
level,
stream);
}

// Extract information to process nested child columns
if (nested_col.size()) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
if (not is_data_empty) {
scan_null_counts(chunks, null_count_prefix_sums[level], stream);
}
row_groups.device_to_host(stream, true);
aggregate_child_meta(chunks, row_groups, out_buffers[level], nested_col, level);
}
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
18 changes: 18 additions & 0 deletions python/cudf/cudf/tests/test_orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1171,3 +1171,21 @@ def test_writer_timestamp_stream_size(datadir, tmpdir):
got = pa.orc.ORCFile(gdf_fname).read().to_pandas()

assert_eq(expect, got)


@pytest.mark.parametrize(
"fname",
[
"TestOrcFile.NoIndStrm.StructWithNoNulls.orc",
"TestOrcFile.NoIndStrm.StructAndIntWithNulls.orc",
"TestOrcFile.NoIndStrm.StructAndIntWithNulls.TwoStripes.orc",
"TestOrcFile.NoIndStrm.IntWithNulls.orc",
],
)
def test_no_row_group_index_orc_read(datadir, fname):
fpath = datadir / fname

got = pa.orc.ORCFile(fpath).read()
expect = cudf.read_orc(fpath)

assert got.equals(expect.to_arrow())