diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index ed600000135..960659c0608 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -270,6 +270,7 @@ add_library(cudf src/io/functions.cpp src/io/json/json_gpu.cu src/io/json/reader_impl.cu + src/io/orc/aggregate_orc_metadata.cpp src/io/orc/dict_enc.cu src/io/orc/orc.cpp src/io/orc/reader_impl.cu diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 511a1a22ee7..d825f40cd00 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -255,7 +255,7 @@ raw_orc_statistics read_raw_orc_statistics(source_info const& src_info) // Get column names for (auto i = 0; i < metadata.get_num_columns(); i++) { - result.column_names.push_back(metadata.get_column_name(i)); + result.column_names.push_back(metadata.column_name(i)); } // Get file-level statistics, statistics of each column of file diff --git a/cpp/src/io/orc/aggregate_orc_metadata.cpp b/cpp/src/io/orc/aggregate_orc_metadata.cpp new file mode 100644 index 00000000000..45d60605936 --- /dev/null +++ b/cpp/src/io/orc/aggregate_orc_metadata.cpp @@ -0,0 +1,274 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "aggregate_orc_metadata.hpp" + +#include +#include + +namespace cudf::io::orc::detail { + +column_hierarchy::column_hierarchy(nesting_map child_map) : children{std::move(child_map)} +{ + // Sort columns by nesting levels + std::function levelize = [&](size_type id, int32_t level) { + if (static_cast(levels.size()) == level) levels.emplace_back(); + + levels[level].push_back({id, static_cast(children[id].size())}); + + for (auto child_id : children[id]) { + levelize(child_id, level + 1); + } + }; + + std::for_each( + children[0].cbegin(), children[0].cend(), [&](auto col_id) { levelize(col_id, 0); }); +} + +namespace { + +/** + * @brief Goes up to the root to include the column with the given id and its parents. + */ +void update_parent_mapping(std::map>& selected_columns, + metadata const& metadata, + size_type id) +{ + auto current_id = id; + while (metadata.column_has_parent(current_id)) { + auto parent_id = metadata.parent_id(current_id); + if (std::find(selected_columns[parent_id].cbegin(), + selected_columns[parent_id].cend(), + current_id) == selected_columns[parent_id].end()) { + selected_columns[parent_id].push_back(current_id); + } + current_id = parent_id; + } +} + +/** + * @brief Adds all columns nested under the column with the given id to the nesting map. + */ +void add_nested_columns(std::map>& selected_columns, + std::vector const& types, + size_type id) +{ + for (auto child_id : types[id].subtypes) { + if (std::find(selected_columns[id].cbegin(), selected_columns[id].cend(), child_id) == + selected_columns[id].end()) { + selected_columns[id].push_back(child_id); + } + add_nested_columns(selected_columns, types, child_id); + } +} + +/** + * @brief Adds the column with the given id to the mapping + * + * All nested columns and direct ancestors of column `id` are included. + * Columns that are not on the direct path are excluded, which may result in prunning. + */ +void add_column_to_mapping(std::map>& selected_columns, + metadata const& metadata, + size_type id) +{ + update_parent_mapping(selected_columns, metadata, id); + add_nested_columns(selected_columns, metadata.ff.types, id); +} + +/** + * @brief Create a metadata object from each element in the source vector + */ +auto metadatas_from_sources(std::vector> const& sources) +{ + std::vector metadatas; + std::transform( + sources.cbegin(), sources.cend(), std::back_inserter(metadatas), [](auto const& source) { + return metadata(source.get()); + }); + return metadatas; +} + +} // namespace + +size_type aggregate_orc_metadata::calc_num_rows() const +{ + return std::accumulate( + per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto const& sum, auto const& pfm) { + return sum + pfm.get_total_rows(); + }); +} + +size_type aggregate_orc_metadata::calc_num_stripes() const +{ + return std::accumulate( + per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto const& sum, auto const& pfm) { + return sum + pfm.get_num_stripes(); + }); +} + +aggregate_orc_metadata::aggregate_orc_metadata( + std::vector> const& sources) + : per_file_metadata(metadatas_from_sources(sources)), + num_rows(calc_num_rows()), + num_stripes(calc_num_stripes()) +{ + // Verify that the input files have the same number of columns, + // as well as matching types, compression, and names + for (auto const& pfm : per_file_metadata) { + CUDF_EXPECTS(per_file_metadata[0].get_num_columns() == pfm.get_num_columns(), + "All sources must have the same number of columns"); + CUDF_EXPECTS(per_file_metadata[0].ps.compression == pfm.ps.compression, + "All sources must have the same compression type"); + + // Check the types, column names, and decimal scale + for (size_t i = 0; i < pfm.ff.types.size(); i++) { + CUDF_EXPECTS(pfm.ff.types[i].kind == per_file_metadata[0].ff.types[i].kind, + "Column types across all input sources must be the same"); + CUDF_EXPECTS(std::equal(pfm.ff.types[i].fieldNames.begin(), + pfm.ff.types[i].fieldNames.end(), + per_file_metadata[0].ff.types[i].fieldNames.begin()), + "All source column names must be the same"); + CUDF_EXPECTS( + pfm.ff.types[i].scale.value_or(0) == per_file_metadata[0].ff.types[i].scale.value_or(0), + "All scale values must be the same"); + } + } +} + +std::vector aggregate_orc_metadata::select_stripes( + std::vector> const& user_specified_stripes, + size_type& row_start, + size_type& row_count) +{ + std::vector selected_stripes_mapping; + + if (!user_specified_stripes.empty()) { + CUDF_EXPECTS(user_specified_stripes.size() == per_file_metadata.size(), + "Must specify stripes for each source"); + // row_start is 0 if stripes are set. If this is not true anymore, then + // row_start needs to be subtracted to get the correct row_count + CUDF_EXPECTS(row_start == 0, "Start row index should be 0"); + + row_count = 0; + // Each vector entry represents a source file; each nested vector represents the + // user_defined_stripes to get from that source file + for (size_t src_file_idx = 0; src_file_idx < user_specified_stripes.size(); ++src_file_idx) { + std::vector stripe_infos; + + // Coalesce stripe info at the source file later since that makes downstream processing much + // easier in impl::read + for (const size_t& stripe_idx : user_specified_stripes[src_file_idx]) { + CUDF_EXPECTS(stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size(), + "Invalid stripe index"); + stripe_infos.push_back( + std::make_pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); + row_count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; + } + selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); + } + } else { + row_start = std::max(row_start, 0); + if (row_count < 0) { + row_count = static_cast( + std::min(get_num_rows(), std::numeric_limits::max())); + } + row_count = std::min(row_count, get_num_rows() - row_start); + CUDF_EXPECTS(row_count >= 0, "Invalid row count"); + CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); + + size_type count = 0; + size_type stripe_skip_rows = 0; + // Iterate all source files, each source file has corelating metadata + for (size_t src_file_idx = 0; + src_file_idx < per_file_metadata.size() && count < row_start + row_count; + ++src_file_idx) { + std::vector stripe_infos; + + for (size_t stripe_idx = 0; stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size() && + count < row_start + row_count; + ++stripe_idx) { + count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; + if (count > row_start || count == 0) { + stripe_infos.push_back( + std::make_pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); + } else { + stripe_skip_rows = count; + } + } + + selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); + } + // Need to remove skipped rows from the stripes which are not selected. + row_start -= stripe_skip_rows; + } + + // Read each stripe's stripefooter metadata + if (not selected_stripes_mapping.empty()) { + for (auto& mapping : selected_stripes_mapping) { + // Resize to all stripe_info for the source level + per_file_metadata[mapping.source_idx].stripefooters.resize(mapping.stripe_info.size()); + + for (size_t i = 0; i < mapping.stripe_info.size(); i++) { + const auto stripe = mapping.stripe_info[i].first; + const auto sf_comp_offset = stripe->offset + stripe->indexLength + stripe->dataLength; + const auto sf_comp_length = stripe->footerLength; + CUDF_EXPECTS( + sf_comp_offset + sf_comp_length < per_file_metadata[mapping.source_idx].source->size(), + "Invalid stripe information"); + const auto buffer = + per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length); + size_t sf_length = 0; + auto sf_data = per_file_metadata[mapping.source_idx].decompressor->Decompress( + buffer->data(), sf_comp_length, &sf_length); + ProtobufReader(sf_data, sf_length) + .read(per_file_metadata[mapping.source_idx].stripefooters[i]); + mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i]; + if (stripe->indexLength == 0) { row_grp_idx_present = false; } + } + } + } + + return selected_stripes_mapping; +} + +column_hierarchy aggregate_orc_metadata::select_columns( + std::vector const& column_paths) +{ + auto const& pfm = per_file_metadata[0]; + + column_hierarchy::nesting_map selected_columns; + if (column_paths.empty()) { + for (auto const& col_id : pfm.ff.types[0].subtypes) { + add_column_to_mapping(selected_columns, pfm, col_id); + } + } else { + for (const auto& path : column_paths) { + bool name_found = false; + for (auto col_id = 1; col_id < pfm.get_num_columns(); ++col_id) { + if (pfm.column_path(col_id) == path) { + name_found = true; + add_column_to_mapping(selected_columns, pfm, col_id); + break; + } + } + CUDF_EXPECTS(name_found, "Unknown column name: " + std::string(path)); + } + } + return {std::move(selected_columns)}; +} + +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/aggregate_orc_metadata.hpp b/cpp/src/io/orc/aggregate_orc_metadata.hpp new file mode 100644 index 00000000000..356d20843e8 --- /dev/null +++ b/cpp/src/io/orc/aggregate_orc_metadata.hpp @@ -0,0 +1,130 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "orc.h" + +#include +#include + +namespace cudf::io::orc::detail { + +/** + * @brief Describes a column hierarchy, which may exclude some input columns. + */ +struct column_hierarchy { + // Maps column IDs to the IDs of their children columns + using nesting_map = std::map>; + // Children IDs of each column + nesting_map children; + // Each element contains column at the given nesting level + std::vector> levels; + + column_hierarchy(nesting_map child_map); + auto num_levels() const { return levels.size(); } +}; + +/** + * @brief In order to support multiple input files/buffers we need to gather + * the metadata across all of those input(s). This class provides a place + * to aggregate that metadata from all the files. + */ +class aggregate_orc_metadata { + using OrcStripeInfo = std::pair; + + /** + * @brief Sums up the number of rows of each source + */ + size_type calc_num_rows() const; + + /** + * @brief Number of columns in a ORC file. + */ + size_type calc_num_cols() const; + + /** + * @brief Sums up the number of stripes of each source + */ + size_type calc_num_stripes() const; + + public: + std::vector per_file_metadata; + size_type const num_rows; + size_type const num_stripes; + bool row_grp_idx_present{true}; + + aggregate_orc_metadata(std::vector> const& sources); + + auto const& get_schema(int schema_idx) const { return per_file_metadata[0].ff.types[schema_idx]; } + + auto get_col_type(int col_idx) const { return per_file_metadata[0].ff.types[col_idx]; } + + auto get_num_rows() const { return num_rows; } + + auto get_num_cols() const { return per_file_metadata[0].get_num_columns(); } + + auto get_num_stripes() const { return num_stripes; } + + auto const& get_types() const { return per_file_metadata[0].ff.types; } + + int get_row_index_stride() const { return per_file_metadata[0].ff.rowIndexStride; } + + auto is_row_grp_idx_present() const { return row_grp_idx_present; } + + /** + * @brief Returns the name of the given column from the given source. + */ + auto column_name(const int source_idx, const int column_id) const + { + CUDF_EXPECTS(source_idx <= static_cast(per_file_metadata.size()), + "Out of range source_idx provided"); + return per_file_metadata[source_idx].column_name(column_id); + } + + /** + * @brief Returns the full name of the given column from the given source. + * + * Full name includes ancestor columns' names. + */ + auto column_path(const int source_idx, const int column_id) const + { + CUDF_EXPECTS(source_idx <= static_cast(per_file_metadata.size()), + "Out of range source_idx provided"); + return per_file_metadata[source_idx].column_path(column_id); + } + + /** + * @brief Selects the stripes to read, based on the row/stripe selection parameters. + * + * Stripes are potentially selected from multiple files. + */ + std::vector select_stripes( + std::vector> const& user_specified_stripes, + size_type& row_start, + size_type& row_count); + + /** + * @brief Filters ORC file to a selection of columns, based on their paths in the file. + * + * Paths are in format "grandparent_col.parent_col.child_col", where the root ORC column is + * ommited to match the cuDF table hierarchy. + * + * @param column_paths List of full column names (i.e. paths) to select from the ORC file + * @return Columns hierarchy - lists of children columns and sorted columns in each nesting level + */ + column_hierarchy select_columns(std::vector const& column_paths); +}; + +} // namespace cudf::io::orc::detail diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index b275496c705..89eac0c9901 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -15,10 +15,13 @@ */ #include "orc.h" -#include #include "orc_field_reader.hpp" #include "orc_field_writer.hpp" +#include + +#include + namespace cudf { namespace io { namespace orc { @@ -459,48 +462,46 @@ metadata::metadata(datasource* const src) : source(src) auto md_data = decompressor->Decompress(buffer->data(), ps.metadataLength, &md_length); orc::ProtobufReader(md_data, md_length).read(md); - // Initialize the column names + init_parent_descriptors(); init_column_names(); } -void metadata::init_column_names() const -{ - auto const schema_idxs = get_schema_indexes(); - auto const& types = ff.types; - for (int32_t col_id = 0; col_id < get_num_columns(); ++col_id) { - std::string col_name; - if (schema_idxs[col_id].parent >= 0 and schema_idxs[col_id].field >= 0) { - auto const parent_idx = static_cast(schema_idxs[col_id].parent); - auto const field_idx = static_cast(schema_idxs[col_id].field); - if (field_idx < types[parent_idx].fieldNames.size()) { - col_name = types[parent_idx].fieldNames[field_idx]; - } - } - // If we have no name (root column), generate a name - column_names.push_back(col_name.empty() ? "col" + std::to_string(col_id) : col_name); - } -} - -std::vector metadata::get_schema_indexes() const -{ - std::vector result(ff.types.size()); - - auto const schema_size = static_cast(result.size()); - for (uint32_t i = 0; i < schema_size; i++) { - auto const& subtypes = ff.types[i].subtypes; - auto const num_children = static_cast(subtypes.size()); - if (result[i].parent == -1) { // Not initialized - result[i].parent = i; // set root node as its own parent - } - for (uint32_t j = 0; j < num_children; j++) { - auto const column_id = subtypes[j]; - CUDF_EXPECTS(column_id > i && column_id < schema_size, "Invalid column id"); - CUDF_EXPECTS(result[column_id].parent == -1, "Same node referenced twice"); - result[column_id].parent = i; - result[column_id].field = j; +void metadata::init_column_names() +{ + column_names.resize(get_num_columns()); + thrust::tabulate(column_names.begin(), column_names.end(), [&](auto col_id) { + if (not column_has_parent(col_id)) return std::string{}; + auto const& parent_field_names = ff.types[parent_id(col_id)].fieldNames; + // Child columns of lists don't have a name in ORC files, generate placeholder in that case + return field_index(col_id) < static_cast(parent_field_names.size()) + ? parent_field_names[field_index(col_id)] + : std::to_string(col_id); + }); + + column_paths.resize(get_num_columns()); + thrust::tabulate(column_paths.begin(), column_paths.end(), [&](auto col_id) { + if (not column_has_parent(col_id)) return std::string{}; + // Don't include ORC root column name in path + return (parent_id(col_id) == 0 ? "" : column_paths[parent_id(col_id)] + ".") + + column_names[col_id]; + }); +} + +void metadata::init_parent_descriptors() +{ + auto const num_columns = static_cast(ff.types.size()); + parents.resize(num_columns); + + for (size_type col_id = 0; col_id < num_columns; ++col_id) { + auto const& subtypes = ff.types[col_id].subtypes; + auto const num_children = static_cast(subtypes.size()); + for (size_type field_idx = 0; field_idx < num_children; ++field_idx) { + auto const child_id = static_cast(subtypes[field_idx]); + CUDF_EXPECTS(child_id > col_id && child_id < num_columns, "Invalid column id"); + CUDF_EXPECTS(not column_has_parent(child_id), "Same node referenced twice"); + parents[child_id] = {col_id, field_idx}; } } - return result; } } // namespace orc diff --git a/cpp/src/io/orc/orc.h b/cpp/src/io/orc/orc.h index 405bf7c2ecc..d75b76a0341 100644 --- a/cpp/src/io/orc/orc.h +++ b/cpp/src/io/orc/orc.h @@ -555,8 +555,8 @@ class OrcDecompressor { * */ struct orc_column_meta { - uint32_t id; // orc id for the column - uint32_t num_children; // number of children at the same level of nesting in case of struct + size_type id; // orc id for the column + size_type num_children; // number of children at the same level of nesting in case of struct }; /** @@ -586,13 +586,51 @@ class metadata { size_t get_total_rows() const { return ff.numberOfRows; } int get_num_stripes() const { return ff.stripes.size(); } int get_num_columns() const { return ff.types.size(); } - std::string const& get_column_name(int32_t column_id) const + /** + * @brief Returns the name of the column with the given ID. + * + * Name might not be unique in the ORC file, since columns with different parents are allowed to + * have the same names. + */ + std::string const& column_name(size_type column_id) const { - if (column_names.empty() && get_num_columns() != 0) { init_column_names(); } + CUDF_EXPECTS(column_id < get_num_columns(), "Out of range column id provided"); return column_names[column_id]; } + /** + * @brief Returns the full name of the column with the given ID - includes the ancestor columns + * names. + * + * Each column in the ORC file has a unique path. + */ + std::string const& column_path(size_type column_id) const + { + CUDF_EXPECTS(column_id < get_num_columns(), "Out of range column id provided"); + return column_paths[column_id]; + } int get_row_index_stride() const { return ff.rowIndexStride; } + /** + * @brief Returns the ID of the parent column of the given column. + */ + size_type parent_id(size_type column_id) const { return parents.at(column_id).value().id; } + + /** + * @brief Returns the index the given column has in its parent's children list. + */ + size_type field_index(size_type column_id) const + { + return parents.at(column_id).value().field_idx; + } + + /** + * @brief Returns whether the given column has a parent. + */ + size_type column_has_parent(size_type column_id) const + { + return parents.at(column_id).has_value(); + } + public: PostScript ps; FileFooter ff; @@ -602,14 +640,19 @@ class metadata { datasource* const source; private: - struct schema_indexes { - int32_t parent = -1; - int32_t field = -1; + struct column_parent { + // parent's ID + size_type id; + // Index of this column in the parent's list of children + size_type field_idx; + column_parent(size_type parent_id, size_type field_idx) : id{parent_id}, field_idx{field_idx} {} }; - std::vector get_schema_indexes() const; - void init_column_names() const; + void init_parent_descriptors(); + std::vector> parents; - mutable std::vector column_names; + void init_column_names(); + std::vector column_names; + std::vector column_paths; }; /** diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index d1c8c3661f4..da525ab8592 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -19,13 +19,13 @@ * @brief cuDF-IO ORC reader class implementation */ -#include "io/orc/orc_gpu.h" +#include "orc.h" +#include "orc_gpu.h" #include "reader_impl.hpp" #include "timezone.cuh" #include #include -#include "orc.h" #include #include @@ -33,7 +33,6 @@ #include #include -#include #include #include #include @@ -42,15 +41,13 @@ #include #include -#include +#include namespace cudf { namespace io { namespace detail { namespace orc { -// Import functionality that's independent of legacy code using namespace cudf::io::orc; -using namespace cudf::io; namespace { /** @@ -116,9 +113,6 @@ constexpr std::pair get_index_type_and_pos( } } -} // namespace - -namespace { /** * @brief struct to store buffer data and size of list buffer */ @@ -241,294 +235,11 @@ bool should_convert_decimal_column_to_float(const std::vector& colu { return (std::find(columns_to_convert.begin(), columns_to_convert.end(), - metadata.get_column_name(column_index)) != columns_to_convert.end()); + metadata.column_name(column_index)) != columns_to_convert.end()); } } // namespace -/** - * @brief In order to support multiple input files/buffers we need to gather - * the metadata across all of those input(s). This class provides a place - * to aggregate that metadata from all the files. - */ -class aggregate_orc_metadata { - using OrcStripeInfo = std::pair; - - public: - mutable std::vector per_file_metadata; - size_type const num_rows; - size_type const num_columns; - size_type const num_stripes; - bool row_grp_idx_present = true; - - /** - * @brief Create a metadata object from each element in the source vector - */ - auto metadatas_from_sources(std::vector> const& sources) - { - std::vector metadatas; - std::transform( - sources.cbegin(), sources.cend(), std::back_inserter(metadatas), [](auto const& source) { - return cudf::io::orc::metadata(source.get()); - }); - return metadatas; - } - - /** - * @brief Sums up the number of rows of each source - */ - size_type calc_num_rows() const - { - return std::accumulate( - per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto& sum, auto& pfm) { - return sum + pfm.get_total_rows(); - }); - } - - /** - * @brief Number of columns in a ORC file. - */ - size_type calc_num_cols() const - { - if (not per_file_metadata.empty()) { return per_file_metadata[0].get_num_columns(); } - return 0; - } - - /** - * @brief Sums up the number of stripes of each source - */ - size_type calc_num_stripes() const - { - return std::accumulate( - per_file_metadata.begin(), per_file_metadata.end(), 0, [](auto& sum, auto& pfm) { - return sum + pfm.get_num_stripes(); - }); - } - - aggregate_orc_metadata(std::vector> const& sources) - : per_file_metadata(metadatas_from_sources(sources)), - num_rows(calc_num_rows()), - num_columns(calc_num_cols()), - num_stripes(calc_num_stripes()) - { - // Verify that the input files have the same number of columns, - // as well as matching types, compression, and names - for (auto const& pfm : per_file_metadata) { - CUDF_EXPECTS(per_file_metadata[0].get_num_columns() == pfm.get_num_columns(), - "All sources must have the same number of columns"); - CUDF_EXPECTS(per_file_metadata[0].ps.compression == pfm.ps.compression, - "All sources must have the same compression type"); - - // Check the types, column names, and decimal scale - for (size_t i = 0; i < pfm.ff.types.size(); i++) { - CUDF_EXPECTS(pfm.ff.types[i].kind == per_file_metadata[0].ff.types[i].kind, - "Column types across all input sources must be the same"); - CUDF_EXPECTS(std::equal(pfm.ff.types[i].fieldNames.begin(), - pfm.ff.types[i].fieldNames.end(), - per_file_metadata[0].ff.types[i].fieldNames.begin()), - "All source column names must be the same"); - CUDF_EXPECTS( - pfm.ff.types[i].scale.value_or(0) == per_file_metadata[0].ff.types[i].scale.value_or(0), - "All scale values must be the same"); - } - } - } - - auto const& get_schema(int schema_idx) const { return per_file_metadata[0].ff.types[schema_idx]; } - - auto get_col_type(int col_idx) const { return per_file_metadata[0].ff.types[col_idx]; } - - auto get_num_rows() const { return num_rows; } - - auto get_num_cols() const { return per_file_metadata[0].get_num_columns(); } - - auto get_num_stripes() const { return num_stripes; } - - auto get_num_source_files() const { return per_file_metadata.size(); } - - auto const& get_types() const { return per_file_metadata[0].ff.types; } - - int get_row_index_stride() const { return per_file_metadata[0].ff.rowIndexStride; } - - auto get_column_name(const int source_idx, const int column_idx) const - { - CUDF_EXPECTS(source_idx <= static_cast(per_file_metadata.size()), - "Out of range source_idx provided"); - CUDF_EXPECTS(column_idx <= per_file_metadata[source_idx].get_num_columns(), - "Out of range column_idx provided"); - return per_file_metadata[source_idx].get_column_name(column_idx); - } - - auto is_row_grp_idx_present() const { return row_grp_idx_present; } - - std::vector select_stripes( - std::vector> const& user_specified_stripes, - size_type& row_start, - size_type& row_count) - { - std::vector selected_stripes_mapping; - - if (!user_specified_stripes.empty()) { - CUDF_EXPECTS(user_specified_stripes.size() == get_num_source_files(), - "Must specify stripes for each source"); - // row_start is 0 if stripes are set. If this is not true anymore, then - // row_start needs to be subtracted to get the correct row_count - CUDF_EXPECTS(row_start == 0, "Start row index should be 0"); - - row_count = 0; - // Each vector entry represents a source file; each nested vector represents the - // user_defined_stripes to get from that source file - for (size_t src_file_idx = 0; src_file_idx < user_specified_stripes.size(); ++src_file_idx) { - std::vector stripe_infos; - - // Coalesce stripe info at the source file later since that makes downstream processing much - // easier in impl::read - for (const size_t& stripe_idx : user_specified_stripes[src_file_idx]) { - CUDF_EXPECTS(stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size(), - "Invalid stripe index"); - stripe_infos.push_back( - std::make_pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); - row_count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; - } - selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); - } - } else { - row_start = std::max(row_start, 0); - if (row_count < 0) { - row_count = static_cast( - std::min(get_num_rows(), std::numeric_limits::max())); - } - row_count = std::min(row_count, get_num_rows() - row_start); - CUDF_EXPECTS(row_count >= 0, "Invalid row count"); - CUDF_EXPECTS(row_start <= get_num_rows(), "Invalid row start"); - - size_type count = 0; - size_type stripe_skip_rows = 0; - // Iterate all source files, each source file has corelating metadata - for (size_t src_file_idx = 0; - src_file_idx < per_file_metadata.size() && count < row_start + row_count; - ++src_file_idx) { - std::vector stripe_infos; - - for (size_t stripe_idx = 0; - stripe_idx < per_file_metadata[src_file_idx].ff.stripes.size() && - count < row_start + row_count; - ++stripe_idx) { - count += per_file_metadata[src_file_idx].ff.stripes[stripe_idx].numberOfRows; - if (count > row_start || count == 0) { - stripe_infos.push_back( - std::make_pair(&per_file_metadata[src_file_idx].ff.stripes[stripe_idx], nullptr)); - } else { - stripe_skip_rows = count; - } - } - - selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); - } - // Need to remove skipped rows from the stripes which are not selected. - row_start -= stripe_skip_rows; - } - - // Read each stripe's stripefooter metadata - if (not selected_stripes_mapping.empty()) { - for (auto& mapping : selected_stripes_mapping) { - // Resize to all stripe_info for the source level - per_file_metadata[mapping.source_idx].stripefooters.resize(mapping.stripe_info.size()); - - for (size_t i = 0; i < mapping.stripe_info.size(); i++) { - const auto stripe = mapping.stripe_info[i].first; - const auto sf_comp_offset = stripe->offset + stripe->indexLength + stripe->dataLength; - const auto sf_comp_length = stripe->footerLength; - CUDF_EXPECTS( - sf_comp_offset + sf_comp_length < per_file_metadata[mapping.source_idx].source->size(), - "Invalid stripe information"); - const auto buffer = - per_file_metadata[mapping.source_idx].source->host_read(sf_comp_offset, sf_comp_length); - size_t sf_length = 0; - auto sf_data = per_file_metadata[mapping.source_idx].decompressor->Decompress( - buffer->data(), sf_comp_length, &sf_length); - ProtobufReader(sf_data, sf_length) - .read(per_file_metadata[mapping.source_idx].stripefooters[i]); - mapping.stripe_info[i].second = &per_file_metadata[mapping.source_idx].stripefooters[i]; - if (stripe->indexLength == 0) { row_grp_idx_present = false; } - } - } - } - - return selected_stripes_mapping; - } - - /** - * @brief Adds column as per the request and saves metadata about children. - * Children of a column will be added to the next level. - * - * @param selection A vector that saves list of columns as per levels of nesting. - * @param types A vector of schema types of columns. - * @param level current level of nesting. - * @param id current column id that needs to be added. - * @param has_timestamp_column True if timestamp column present and false otherwise. - */ - void add_column(std::vector>& selection, - std::vector const& types, - const size_t level, - const uint32_t id, - bool& has_timestamp_column) - { - if (level == selection.size()) { selection.emplace_back(); } - selection[level].push_back({id, static_cast(types[id].subtypes.size())}); - if (types[id].kind == orc::TIMESTAMP) { has_timestamp_column = true; } - - for (const auto child_id : types[id].subtypes) { - // Since nested column needs to be processed before its child can be processed, - // child column is being added to next level - add_column(selection, types, level + 1, child_id, has_timestamp_column); - } - } - - /** - * @brief Filters and reduces down to a selection of columns - * - * @param use_names List of column names to select - * @param has_timestamp_column True if timestamp column present and false otherwise - * - * @return Vector of list of ORC column meta-data - */ - std::vector> select_columns( - std::vector const& use_names, bool& has_timestamp_column) - { - auto const& pfm = per_file_metadata[0]; - std::vector> selection; - - if (not use_names.empty()) { - uint32_t index = 0; - // Have to check only parent columns - auto const num_columns = pfm.ff.types[0].subtypes.size(); - - for (const auto& use_name : use_names) { - bool name_found = false; - for (uint32_t i = 0; i < num_columns; ++i, ++index) { - if (index >= num_columns) { index = 0; } - auto col_id = pfm.ff.types[0].subtypes[index]; - if (pfm.get_column_name(col_id) == use_name) { - name_found = true; - add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column); - // Should start with next index - index = i + 1; - break; - } - } - CUDF_EXPECTS(name_found, "Unknown column name : " + std::string(use_name)); - } - } else { - for (auto const& col_id : pfm.ff.types[0].subtypes) { - add_column(selection, pfm.ff.types, 0, col_id, has_timestamp_column); - } - } - - return selection; - } -}; - void snappy_decompress(device_span comp_in, device_span comp_stat, size_t max_uncomp_page_size, @@ -924,18 +635,18 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan row_groups, std::vector& out_buffers, std::vector const& list_col, - const int32_t level) + const size_type level) { const auto num_of_stripes = chunks.size().first; const auto num_of_rowgroups = row_groups.size().first; - const auto num_parent_cols = _selected_columns[level].size(); - const auto num_child_cols = _selected_columns[level + 1].size(); + const auto num_parent_cols = selected_columns.levels[level].size(); + const auto num_child_cols = selected_columns.levels[level + 1].size(); const auto number_of_child_chunks = num_child_cols * num_of_stripes; auto& num_child_rows = _col_meta.num_child_rows; auto& parent_column_data = _col_meta.parent_column_data; // Reset the meta to store child column details. - num_child_rows.resize(_selected_columns[level + 1].size()); + num_child_rows.resize(selected_columns.levels[level + 1].size()); std::fill(num_child_rows.begin(), num_child_rows.end(), 0); parent_column_data.resize(number_of_child_chunks); _col_meta.parent_column_index.resize(number_of_child_chunks); @@ -967,7 +678,7 @@ void reader::impl::aggregate_child_meta(cudf::detail::host_2dspan reader::impl::create_empty_column(const int32_t orc_col_id, +std::unique_ptr reader::impl::create_empty_column(const size_type orc_col_id, column_name_info& schema_info, rmm::cuda_stream_view stream) { - schema_info.name = _metadata->get_column_name(0, orc_col_id); + schema_info.name = _metadata.column_name(0, orc_col_id); // If the column type is orc::DECIMAL see if the user // desires it to be converted to float64 or not auto const decimal_as_float64 = should_convert_decimal_column_to_float( - _decimal_cols_as_float, _metadata->per_file_metadata[0], orc_col_id); + _decimal_cols_as_float, _metadata.per_file_metadata[0], orc_col_id); auto const type = to_type_id( - _metadata->get_schema(orc_col_id), _use_np_dtypes, _timestamp_type.id(), decimal_as_float64); + _metadata.get_schema(orc_col_id), _use_np_dtypes, _timestamp_type.id(), decimal_as_float64); int32_t scale = 0; std::vector> child_columns; std::unique_ptr out_col = nullptr; - auto kind = _metadata->get_col_type(orc_col_id).kind; + auto kind = _metadata.get_col_type(orc_col_id).kind; switch (kind) { case orc::LIST: @@ -1037,7 +748,7 @@ std::unique_ptr reader::impl::create_empty_column(const int32_t orc_col_ 0, make_empty_column(data_type(type_id::INT32)), create_empty_column( - _metadata->get_col_type(orc_col_id).subtypes[0], schema_info.children.back(), stream), + _metadata.get_col_type(orc_col_id).subtypes[0], schema_info.children.back(), stream), 0, rmm::device_buffer{0, stream}, stream); @@ -1045,8 +756,8 @@ std::unique_ptr reader::impl::create_empty_column(const int32_t orc_col_ case orc::MAP: { schema_info.children.emplace_back("offsets"); schema_info.children.emplace_back("struct"); - const auto child_column_ids = _metadata->get_col_type(orc_col_id).subtypes; - for (size_t idx = 0; idx < _metadata->get_col_type(orc_col_id).subtypes.size(); idx++) { + const auto child_column_ids = _metadata.get_col_type(orc_col_id).subtypes; + for (size_t idx = 0; idx < _metadata.get_col_type(orc_col_id).subtypes.size(); idx++) { auto& children_schema = schema_info.children.back().children; children_schema.emplace_back(""); child_columns.push_back(create_empty_column( @@ -1065,7 +776,7 @@ std::unique_ptr reader::impl::create_empty_column(const int32_t orc_col_ } break; case orc::STRUCT: - for (const auto col : _metadata->get_col_type(orc_col_id).subtypes) { + for (const auto col : _metadata.get_col_type(orc_col_id).subtypes) { schema_info.children.emplace_back(""); child_columns.push_back(create_empty_column(col, schema_info.children.back(), stream)); } @@ -1075,7 +786,7 @@ std::unique_ptr reader::impl::create_empty_column(const int32_t orc_col_ case orc::DECIMAL: if (type == type_id::DECIMAL64) { - scale = -static_cast(_metadata->get_types()[orc_col_id].scale.value_or(0)); + scale = -static_cast(_metadata.get_types()[orc_col_id].scale.value_or(0)); } out_col = make_empty_column(data_type(type, scale)); break; @@ -1087,7 +798,7 @@ std::unique_ptr reader::impl::create_empty_column(const int32_t orc_col_ } // Adds child column buffers to parent column -column_buffer&& reader::impl::assemble_buffer(const int32_t orc_col_id, +column_buffer&& reader::impl::assemble_buffer(const size_type orc_col_id, std::vector>& col_buffers, const size_t level, rmm::cuda_stream_view stream) @@ -1095,12 +806,12 @@ column_buffer&& reader::impl::assemble_buffer(const int32_t orc_col_id, auto const col_id = _col_meta.orc_col_map[level][orc_col_id]; auto& col_buffer = col_buffers[level][col_id]; - col_buffer.name = _metadata->get_column_name(0, orc_col_id); - auto kind = _metadata->get_col_type(orc_col_id).kind; + col_buffer.name = _metadata.column_name(0, orc_col_id); + auto kind = _metadata.get_col_type(orc_col_id).kind; switch (kind) { case orc::LIST: case orc::STRUCT: - for (auto const& col : _metadata->get_col_type(orc_col_id).subtypes) { + for (auto const& col : selected_columns.children[orc_col_id]) { col_buffer.children.emplace_back(assemble_buffer(col, col_buffers, level + 1, stream)); } @@ -1108,9 +819,9 @@ column_buffer&& reader::impl::assemble_buffer(const int32_t orc_col_id, case orc::MAP: { std::vector child_col_buffers; // Get child buffers - for (size_t idx = 0; idx < _metadata->get_col_type(orc_col_id).subtypes.size(); idx++) { + for (size_t idx = 0; idx < selected_columns.children[orc_col_id].size(); idx++) { auto name = get_map_child_col_name(idx); - auto col = _metadata->get_col_type(orc_col_id).subtypes[idx]; + auto col = selected_columns.children[orc_col_id][idx]; child_col_buffers.emplace_back(assemble_buffer(col, col_buffers, level + 1, stream)); child_col_buffers.back().name = name; } @@ -1136,8 +847,8 @@ void reader::impl::create_columns(std::vector>&& col_ std::vector& schema_info, rmm::cuda_stream_view stream) { - std::transform(_selected_columns[0].begin(), - _selected_columns[0].end(), + std::transform(selected_columns.levels[0].begin(), + selected_columns.levels[0].end(), std::back_inserter(out_columns), [&](auto const col_meta) { schema_info.emplace_back(""); @@ -1149,14 +860,11 @@ void reader::impl::create_columns(std::vector>&& col_ reader::impl::impl(std::vector>&& sources, orc_reader_options const& options, rmm::mr::device_memory_resource* mr) - : _mr(mr), _sources(std::move(sources)) + : _mr(mr), + _sources(std::move(sources)), + _metadata{_sources}, + selected_columns{_metadata.select_columns(options.get_columns())} { - // Open and parse the source(s) dataset metadata - _metadata = std::make_unique(_sources); - - // Select only columns required by the options - _selected_columns = _metadata->select_columns(options.get_columns(), _has_timestamp_column); - // Override output timestamp resolution if requested if (options.get_timestamp_type().id() != type_id::EMPTY) { _timestamp_type = options.get_timestamp_type(); @@ -1172,58 +880,80 @@ reader::impl::impl(std::vector>&& sources, _decimal_cols_as_float = options.get_decimal_cols_as_float(); } +timezone_table reader::impl::compute_timezone_table( + const std::vector& selected_stripes, + rmm::cuda_stream_view stream) +{ + if (selected_stripes.empty()) return {}; + + auto const has_timestamp_column = std::any_of( + selected_columns.levels.cbegin(), selected_columns.levels.cend(), [&](auto& col_lvl) { + return std::any_of(col_lvl.cbegin(), col_lvl.cend(), [&](auto& col_meta) { + return _metadata.get_col_type(col_meta.id).kind == TypeKind::TIMESTAMP; + }); + }); + if (not has_timestamp_column) return {}; + + return build_timezone_transition_table(selected_stripes[0].stripe_info[0].second->writerTimezone, + stream); +} + table_with_metadata reader::impl::read(size_type skip_rows, size_type num_rows, const std::vector>& stripes, rmm::cuda_stream_view stream) { // Selected columns at different levels of nesting are stored in different elements - // of `_selected_columns`; thus, size == 1 means no nested columns - CUDF_EXPECTS(skip_rows == 0 or _selected_columns.size() == 1, + // of `selected_columns`; thus, size == 1 means no nested columns + CUDF_EXPECTS(skip_rows == 0 or selected_columns.num_levels() == 1, "skip_rows is not supported by nested columns"); std::vector> out_columns; // buffer and stripe data are stored as per nesting level - std::vector> out_buffers(_selected_columns.size()); + std::vector> out_buffers(selected_columns.num_levels()); std::vector schema_info; - std::vector> lvl_stripe_data(_selected_columns.size()); + std::vector> lvl_stripe_data(selected_columns.num_levels()); std::vector>> null_count_prefix_sums; table_metadata out_metadata; // There are no columns in the table - if (_selected_columns.size() == 0) return {std::make_unique(), std::move(out_metadata)}; + if (selected_columns.num_levels() == 0) + return {std::make_unique
(), std::move(out_metadata)}; // Select only stripes required (aka row groups) - const auto selected_stripes = _metadata->select_stripes(stripes, skip_rows, num_rows); + const auto selected_stripes = _metadata.select_stripes(stripes, skip_rows, num_rows); + + auto const tz_table = compute_timezone_table(selected_stripes, stream); // Iterates through levels of nested columns, child column will be one level down // compared to parent column. - for (size_t level = 0; level < _selected_columns.size(); level++) { - auto& selected_columns = _selected_columns[level]; + for (size_t level = 0; level < selected_columns.num_levels(); level++) { + auto& columns_level = selected_columns.levels[level]; // Association between each ORC column and its cudf::column - _col_meta.orc_col_map.emplace_back(_metadata->get_num_cols(), -1); + _col_meta.orc_col_map.emplace_back(_metadata.get_num_cols(), -1); std::vector nested_col; bool is_data_empty = false; // Get a list of column data types std::vector column_types; - for (auto& col : selected_columns) { + for (auto& col : columns_level) { // If the column type is orc::DECIMAL see if the user // desires it to be converted to float64 or not auto const decimal_as_float64 = should_convert_decimal_column_to_float( - _decimal_cols_as_float, _metadata->per_file_metadata[0], col.id); + _decimal_cols_as_float, _metadata.per_file_metadata[0], col.id); auto col_type = to_type_id( - _metadata->get_col_type(col.id), _use_np_dtypes, _timestamp_type.id(), decimal_as_float64); + _metadata.get_col_type(col.id), _use_np_dtypes, _timestamp_type.id(), decimal_as_float64); CUDF_EXPECTS(col_type != type_id::EMPTY, "Unknown type"); // Remove this once we support Decimal128 data type CUDF_EXPECTS( - (col_type != type_id::DECIMAL64) or (_metadata->get_col_type(col.id).precision <= 18), + (col_type != type_id::DECIMAL64) or (_metadata.get_col_type(col.id).precision <= 18), "Decimal data has precision > 18, Decimal64 data type doesn't support it."); if (col_type == type_id::DECIMAL64) { // sign of the scale is changed since cuDF follows c++ libraries like CNL // which uses negative scaling, but liborc and other libraries // follow positive scaling. - auto const scale = -static_cast(_metadata->get_col_type(col.id).scale.value_or(0)); + auto const scale = + -static_cast(_metadata.get_col_type(col.id).scale.value_or(0)); column_types.emplace_back(col_type, scale); } else { column_types.emplace_back(col_type); @@ -1237,8 +967,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, // If no rows or stripes to read, return empty columns if (num_rows <= 0 || selected_stripes.empty()) { - std::transform(_selected_columns[0].begin(), - _selected_columns[0].end(), + std::transform(selected_columns.levels[0].begin(), + selected_columns.levels[0].end(), std::back_inserter(out_columns), [&](auto const col_meta) { schema_info.emplace_back(""); @@ -1254,7 +984,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, [](size_t sum, auto& stripe_source_mapping) { return sum + stripe_source_mapping.stripe_info.size(); }); - const auto num_columns = selected_columns.size(); + const auto num_columns = columns_level.size(); cudf::detail::hostdevice_2dvector chunks( total_num_stripes, num_columns, stream); memset(chunks.base_host_ptr(), 0, chunks.memory_size()); @@ -1262,11 +992,11 @@ table_with_metadata reader::impl::read(size_type skip_rows, const bool use_index = (_use_index == true) && // Do stripes have row group index - _metadata->is_row_grp_idx_present() && + _metadata.is_row_grp_idx_present() && // Only use if we don't have much work with complete columns & stripes // TODO: Consider nrows, gpu, and tune the threshold - (num_rows > _metadata->get_row_index_stride() && !(_metadata->get_row_index_stride() & 7) && - _metadata->get_row_index_stride() > 0 && num_columns * total_num_stripes < 8 * 128) && + (num_rows > _metadata.get_row_index_stride() && !(_metadata.get_row_index_stride() & 7) && + _metadata.get_row_index_stride() > 0 && num_columns * total_num_stripes < 8 * 128) && // Only use if first row is aligned to a stripe boundary // TODO: Fix logic to handle unaligned rows (skip_rows == 0); @@ -1275,12 +1005,13 @@ table_with_metadata reader::impl::read(size_type skip_rows, std::vector stream_info; null_count_prefix_sums.emplace_back(); - null_count_prefix_sums.back().reserve(_selected_columns[level].size()); - std::generate_n( - std::back_inserter(null_count_prefix_sums.back()), _selected_columns[level].size(), [&]() { - return cudf::detail::make_zeroed_device_uvector_async(total_num_stripes, - stream); - }); + null_count_prefix_sums.back().reserve(selected_columns.levels[level].size()); + std::generate_n(std::back_inserter(null_count_prefix_sums.back()), + selected_columns.levels[level].size(), + [&]() { + return cudf::detail::make_zeroed_device_uvector_async( + total_num_stripes, stream); + }); // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; @@ -1302,7 +1033,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, stripe_info, stripe_footer, _col_meta.orc_col_map[level], - _metadata->get_types(), + _metadata.get_types(), use_index, &num_dict_entries, chunks, @@ -1334,16 +1065,16 @@ table_with_metadata reader::impl::read(size_type skip_rows, len += stream_info[stream_count].length; stream_count++; } - if (_metadata->per_file_metadata[stripe_source_mapping.source_idx] + if (_metadata.per_file_metadata[stripe_source_mapping.source_idx] .source->is_device_read_preferred(len)) { read_tasks.push_back( - std::make_pair(_metadata->per_file_metadata[stripe_source_mapping.source_idx] + std::make_pair(_metadata.per_file_metadata[stripe_source_mapping.source_idx] .source->device_read_async(offset, len, d_dst, stream), len)); } else { const auto buffer = - _metadata->per_file_metadata[stripe_source_mapping.source_idx].source->host_read( + _metadata.per_file_metadata[stripe_source_mapping.source_idx].source->host_read( offset, len); CUDF_EXPECTS(buffer->size() == len, "Unexpected discrepancy in bytes read."); CUDA_TRY(cudaMemcpyAsync( @@ -1356,8 +1087,8 @@ table_with_metadata reader::impl::read(size_type skip_rows, const auto rowgroup_id = num_rowgroups; auto stripe_num_rowgroups = 0; if (use_index) { - stripe_num_rowgroups = (num_rows_per_stripe + _metadata->get_row_index_stride() - 1) / - _metadata->get_row_index_stride(); + stripe_num_rowgroups = (num_rows_per_stripe + _metadata.get_row_index_stride() - 1) / + _metadata.get_row_index_stride(); } // Update chunks to reference streams pointers for (size_t col_idx = 0; col_idx < num_columns; col_idx++) { @@ -1378,19 +1109,17 @@ table_with_metadata reader::impl::read(size_type skip_rows, (level == 0) ? nullptr : null_count_prefix_sums[level - 1][_col_meta.parent_column_index[col_idx]].data(); - chunk.encoding_kind = stripe_footer->columns[selected_columns[col_idx].id].kind; - chunk.type_kind = _metadata->per_file_metadata[stripe_source_mapping.source_idx] - .ff.types[selected_columns[col_idx].id] + chunk.encoding_kind = stripe_footer->columns[columns_level[col_idx].id].kind; + chunk.type_kind = _metadata.per_file_metadata[stripe_source_mapping.source_idx] + .ff.types[columns_level[col_idx].id] .kind; // num_child_rows for a struct column will be same, for other nested types it will be // calculated. - chunk.num_child_rows = (chunk.type_kind != orc::STRUCT) ? 0 : chunk.num_rows; - auto const decimal_as_float64 = - should_convert_decimal_column_to_float(_decimal_cols_as_float, - _metadata->per_file_metadata[0], - selected_columns[col_idx].id); - chunk.decimal_scale = _metadata->per_file_metadata[stripe_source_mapping.source_idx] - .ff.types[selected_columns[col_idx].id] + chunk.num_child_rows = (chunk.type_kind != orc::STRUCT) ? 0 : chunk.num_rows; + auto const decimal_as_float64 = should_convert_decimal_column_to_float( + _decimal_cols_as_float, _metadata.per_file_metadata[0], columns_level[col_idx].id); + chunk.decimal_scale = _metadata.per_file_metadata[stripe_source_mapping.source_idx] + .ff.types[columns_level[col_idx].id] .scale.value_or(0) | (decimal_as_float64 ? orc::gpu::orc_decimal2float64_scale : 0); @@ -1399,7 +1128,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, ? sizeof(string_index_pair) : ((column_types[col_idx].id() == type_id::LIST) or (column_types[col_idx].id() == type_id::STRUCT)) - ? sizeof(int32_t) + ? sizeof(size_type) : cudf::size_of(column_types[col_idx]); chunk.num_rowgroups = stripe_num_rowgroups; if (chunk.type_kind == orc::TIMESTAMP) { @@ -1442,15 +1171,15 @@ table_with_metadata reader::impl::read(size_type skip_rows, }); } // Setup row group descriptors if using indexes - if (_metadata->per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) { + if (_metadata.per_file_metadata[0].ps.compression != orc::NONE and not is_data_empty) { auto decomp_data = decompress_stripe_data(chunks, stripe_data, - _metadata->per_file_metadata[0].decompressor.get(), + _metadata.per_file_metadata[0].decompressor.get(), stream_info, total_num_stripes, row_groups, - _metadata->get_row_index_stride(), + _metadata.get_row_index_stride(), level == 0, stream); stripe_data.clear(); @@ -1465,19 +1194,12 @@ table_with_metadata reader::impl::read(size_type skip_rows, num_columns, total_num_stripes, num_rowgroups, - _metadata->get_row_index_stride(), + _metadata.get_row_index_stride(), level == 0, stream); } } - // Setup table for converting timestamp columns from local to UTC time - auto const tz_table = - _has_timestamp_column - ? build_timezone_transition_table( - selected_stripes[0].stripe_info[0].second->writerTimezone, stream) - : timezone_table{}; - for (size_t i = 0; i < column_types.size(); ++i) { bool is_nullable = false; for (size_t j = 0; j < total_num_stripes; ++j) { @@ -1499,7 +1221,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, skip_rows, tz_table.view(), row_groups, - _metadata->get_row_index_stride(), + _metadata.get_row_index_stride(), out_buffers[level], level, stream); @@ -1548,7 +1270,7 @@ table_with_metadata reader::impl::read(size_type skip_rows, out_metadata.schema_info = std::move(schema_info); - for (const auto& meta : _metadata->per_file_metadata) { + for (const auto& meta : _metadata.per_file_metadata) { for (const auto& kv : meta.ff.metadata) { out_metadata.user_data.insert({kv.name, kv.value}); } @@ -1575,6 +1297,7 @@ table_with_metadata reader::read(orc_reader_options const& options, rmm::cuda_st return _impl->read( options.get_skip_rows(), options.get_num_rows(), options.get_stripes(), stream); } + } // namespace orc } // namespace detail } // namespace io diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 8fed64ed64c..c9de2211d48 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -16,6 +16,7 @@ #pragma once +#include "aggregate_orc_metadata.hpp" #include "orc.h" #include "orc_gpu.h" @@ -38,7 +39,6 @@ namespace io { namespace detail { namespace orc { using namespace cudf::io::orc; -using namespace cudf::io; // Forward declarations class metadata; @@ -46,13 +46,12 @@ namespace { struct orc_stream_info; struct stripe_source_mapping; } // namespace -class aggregate_orc_metadata; /** * @brief Keeps track of orc mapping and child column details. */ struct reader_column_meta { - std::vector> + std::vector> orc_col_map; // Mapping between column id in orc to processing order. std::vector num_child_rows; // number of rows in child columns @@ -174,7 +173,7 @@ class reader::impl { * @param col_buffers Column buffers for columns and children. * @param level Current nesting level. */ - column_buffer&& assemble_buffer(const int32_t orc_col_id, + column_buffer&& assemble_buffer(const size_type orc_col_id, std::vector>& col_buffers, const size_t level, rmm::cuda_stream_view stream); @@ -201,20 +200,27 @@ class reader::impl { * * @return An empty column equivalent to orc column type. */ - std::unique_ptr create_empty_column(const int32_t orc_col_id, + std::unique_ptr create_empty_column(const size_type orc_col_id, column_name_info& schema_info, rmm::cuda_stream_view stream); + /** + * @brief Setup table for converting timestamp columns from local to UTC time + * + * @return Timezone table with timestamp offsets + */ + timezone_table compute_timezone_table( + const std::vector& selected_stripes, + rmm::cuda_stream_view stream); + private: rmm::mr::device_memory_resource* _mr = nullptr; std::vector> _sources; - std::unique_ptr _metadata; - // _output_columns associated schema indices - std::vector> _selected_columns; + cudf::io::orc::detail::aggregate_orc_metadata _metadata; + cudf::io::orc::detail::column_hierarchy selected_columns; - bool _use_index = true; - bool _use_np_dtypes = true; - bool _has_timestamp_column = false; + bool _use_index = true; + bool _use_np_dtypes = true; std::vector _decimal_cols_as_float; data_type _timestamp_type{type_id::EMPTY}; reader_column_meta _col_meta; diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index f2d5952d0ed..86cd48de417 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -1000,7 +1000,7 @@ TEST_F(OrcStatisticsTest, Basic) auto const stats = cudf_io::read_parsed_orc_statistics(cudf_io::source_info{filepath}); auto const expected_column_names = - std::vector{"col0", "_col0", "_col1", "_col2", "_col3", "_col4"}; + std::vector{"", "_col0", "_col1", "_col2", "_col3", "_col4"}; EXPECT_EQ(stats.column_names, expected_column_names); auto validate_statistics = [&](std::vector const& stats) { @@ -1371,4 +1371,40 @@ TEST_F(OrcWriterTest, TestMap) cudf::test::expect_metadata_equal(expected_metadata, result.metadata); } +TEST_F(OrcReaderTest, NestedColumnSelection) +{ + auto const num_rows = 1000; + auto child_col1_data = random_values(num_rows); + auto child_col2_data = random_values(num_rows); + auto validity = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 3; }); + column_wrapper child_col1 = {child_col1_data.begin(), child_col1_data.end(), validity}; + column_wrapper child_col2 = {child_col2_data.begin(), child_col2_data.end(), validity}; + auto struct_col = cudf::test::structs_column_wrapper{child_col1, child_col2}; + table_view expected({struct_col}); + + cudf_io::table_input_metadata expected_metadata(expected); + expected_metadata.column_metadata[0].set_name("struct_s"); + expected_metadata.column_metadata[0].child(0).set_name("field_a"); + expected_metadata.column_metadata[0].child(1).set_name("field_b"); + + auto filepath = temp_env->get_temp_filepath("OrcNestedSelection.orc"); + cudf_io::orc_writer_options out_opts = + cudf_io::orc_writer_options::builder(cudf_io::sink_info{filepath}, expected) + .metadata(&expected_metadata); + cudf_io::write_orc(out_opts); + + cudf_io::orc_reader_options in_opts = + cudf_io::orc_reader_options::builder(cudf_io::source_info{filepath}) + .use_index(false) + .columns({"struct_s.field_b"}); + auto result = cudf_io::read_orc(in_opts); + + // Verify that only one child column is included in the output table + ASSERT_EQ(1, result.tbl->view().column(0).num_children()); + // Verify that the first child column is `field_b` + column_wrapper expected_col = {child_col2_data.begin(), child_col2_data.end(), validity}; + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(expected_col, result.tbl->view().column(0).child(0)); + ASSERT_EQ("field_b", result.metadata.schema_info[0].children[0].name); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/tests/test_orc.py b/python/cudf/cudf/tests/test_orc.py index d94198bcd4d..dfec1eaadc1 100644 --- a/python/cudf/cudf/tests/test_orc.py +++ b/python/cudf/cudf/tests/test_orc.py @@ -1492,3 +1492,25 @@ def test_empty_statistics(): assert stats[0]["i"].get("minimum") == 1 assert stats[0]["i"].get("maximum") == 1 assert stats[0]["i"].get("sum") == 1 + + +@pytest.mark.filterwarnings("ignore:.*struct.*experimental") +@pytest.mark.parametrize( + "equivalent_columns", + [ + (["lvl1_struct.a", "lvl1_struct.b"], ["lvl1_struct"]), + (["lvl1_struct", "lvl1_struct.a"], ["lvl1_struct"]), + (["lvl1_struct.a", "lvl1_struct"], ["lvl1_struct"]), + (["lvl1_struct.b", "lvl1_struct.a"], ["lvl1_struct.b", "lvl1_struct"]), + (["lvl2_struct.lvl1_struct", "lvl2_struct"], ["lvl2_struct"]), + ( + ["lvl2_struct.a", "lvl2_struct.lvl1_struct.c", "lvl2_struct"], + ["lvl2_struct"], + ), + ], +) +def test_select_nested(list_struct_buff, equivalent_columns): + # The two column selections should be equivalent + df_cols1 = cudf.read_orc(list_struct_buff, columns=equivalent_columns[0]) + df_cols2 = cudf.read_orc(list_struct_buff, columns=equivalent_columns[1]) + assert_eq(df_cols1, df_cols2)