diff --git a/cpp/include/cudf/io/orc.hpp b/cpp/include/cudf/io/orc.hpp index a3f76817f8a..5cc9ea81f29 100644 --- a/cpp/include/cudf/io/orc.hpp +++ b/cpp/include/cudf/io/orc.hpp @@ -200,6 +200,7 @@ class orc_reader_options { void set_skip_rows(uint64_t rows) { CUDF_EXPECTS(rows == 0 or _stripes.empty(), "Can't set both skip_rows along with stripes"); + CUDF_EXPECTS(rows <= std::numeric_limits::max(), "skip_rows is too large"); _skip_rows = rows; } diff --git a/cpp/include/cudf/io/orc_metadata.hpp b/cpp/include/cudf/io/orc_metadata.hpp index 19d44263d1b..8f3eb1dff3c 100644 --- a/cpp/include/cudf/io/orc_metadata.hpp +++ b/cpp/include/cudf/io/orc_metadata.hpp @@ -331,7 +331,7 @@ class orc_metadata { * @param num_rows number of rows * @param num_stripes number of stripes */ - orc_metadata(orc_schema schema, size_type num_rows, size_type num_stripes) + orc_metadata(orc_schema schema, uint64_t num_rows, size_type num_stripes) : _schema{std::move(schema)}, _num_rows{num_rows}, _num_stripes{num_stripes} { } @@ -362,7 +362,7 @@ class orc_metadata { private: orc_schema _schema; - size_type _num_rows; + uint64_t _num_rows; size_type _num_stripes; }; diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 315562e9183..b8353d312fe 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -404,7 +404,7 @@ orc_metadata read_orc_metadata(source_info const& src_info, rmm::cuda_stream_vie auto const footer = orc::metadata(sources.front().get(), stream).ff; return {{make_orc_column_schema(footer.types, 0, "")}, - static_cast(footer.numberOfRows), + footer.numberOfRows, static_cast(footer.stripes.size())}; } diff --git a/cpp/src/io/orc/aggregate_orc_metadata.cpp b/cpp/src/io/orc/aggregate_orc_metadata.cpp index ea091099b6e..f5f540bc3a4 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.cpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.cpp @@ -155,7 +155,7 @@ aggregate_orc_metadata::aggregate_orc_metadata( std::tuple> aggregate_orc_metadata::select_stripes( std::vector> const& user_specified_stripes, - uint64_t skip_rows, + int64_t skip_rows, std::optional const& num_rows, rmm::cuda_stream_view stream) { @@ -163,7 +163,7 @@ aggregate_orc_metadata::select_stripes( "Can't use both the row selection and the stripe selection"); auto [rows_to_skip, rows_to_read] = [&]() { - if (not user_specified_stripes.empty()) { return std::pair{0, 0}; } + if (not user_specified_stripes.empty()) { return std::pair{0, 0}; } return cudf::io::detail::skip_rows_num_rows_from_options(skip_rows, num_rows, get_num_rows()); }(); @@ -192,8 +192,8 @@ aggregate_orc_metadata::select_stripes( selected_stripes_mapping.push_back({static_cast(src_file_idx), stripe_infos}); } } else { - uint64_t count = 0; - size_type stripe_skip_rows = 0; + int64_t count = 0; + int64_t stripe_skip_rows = 0; // Iterate all source files, each source file has corelating metadata for (size_t src_file_idx = 0; src_file_idx < per_file_metadata.size() && count < rows_to_skip + rows_to_read; diff --git a/cpp/src/io/orc/aggregate_orc_metadata.hpp b/cpp/src/io/orc/aggregate_orc_metadata.hpp index f05946a4346..d1e053be481 100644 --- a/cpp/src/io/orc/aggregate_orc_metadata.hpp +++ b/cpp/src/io/orc/aggregate_orc_metadata.hpp @@ -79,9 +79,11 @@ class aggregate_orc_metadata { [[nodiscard]] auto const& get_types() const { return per_file_metadata[0].ff.types; } - [[nodiscard]] int get_row_index_stride() const + [[nodiscard]] size_type get_row_index_stride() const { - return static_cast(per_file_metadata[0].ff.rowIndexStride); + CUDF_EXPECTS(per_file_metadata[0].ff.rowIndexStride <= std::numeric_limits::max(), + "Row index stride exceeds size_type max"); + return per_file_metadata[0].ff.rowIndexStride; } [[nodiscard]] auto is_row_grp_idx_present() const { return row_grp_idx_present; } @@ -115,7 +117,7 @@ class aggregate_orc_metadata { */ [[nodiscard]] std::tuple> select_stripes(std::vector> const& user_specified_stripes, - uint64_t skip_rows, + int64_t skip_rows, std::optional const& num_rows, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/orc/orc.cpp b/cpp/src/io/orc/orc.cpp index de0d7a88614..1fe5e5aa41e 100644 --- a/cpp/src/io/orc/orc.cpp +++ b/cpp/src/io/orc/orc.cpp @@ -69,7 +69,7 @@ void ProtobufReader::read(PostScript& s, size_t maxlen) function_builder(s, maxlen, op); } -void ProtobufReader::read(FileFooter& s, size_t maxlen) +void ProtobufReader::read(Footer& s, size_t maxlen) { auto op = std::tuple(field_reader(1, s.headerLength), field_reader(2, s.contentLength), @@ -307,7 +307,7 @@ size_t ProtobufWriter::write(PostScript const& s) return w.value(); } -size_t ProtobufWriter::write(FileFooter const& s) +size_t ProtobufWriter::write(Footer const& s) { ProtobufFieldWriter w(this); w.field_uint(1, s.headerLength); @@ -393,7 +393,8 @@ size_t ProtobufWriter::write(Metadata const& s) return w.value(); } -OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_blockSize(blockSize) +OrcDecompressor::OrcDecompressor(CompressionKind kind, uint64_t block_size) + : m_blockSize(block_size) { switch (kind) { case NONE: diff --git a/cpp/src/io/orc/orc.hpp b/cpp/src/io/orc/orc.hpp index 6fbee2824eb..88bd260a598 100644 --- a/cpp/src/io/orc/orc.hpp +++ b/cpp/src/io/orc/orc.hpp @@ -73,7 +73,7 @@ static constexpr int32_t DEFAULT_MAX_NANOS = 999'999; struct PostScript { uint64_t footerLength = 0; // the length of the footer section in bytes CompressionKind compression = NONE; // the kind of generic compression used - uint32_t compressionBlockSize{}; // the maximum size of each compression chunk + uint64_t compressionBlockSize{}; // the maximum size of each compression chunk std::vector version; // the version of the file format [major, minor] uint64_t metadataLength = 0; // the length of the metadata section in bytes std::optional writerVersion; // The version of the writer that wrote the file @@ -84,8 +84,8 @@ struct StripeInformation { uint64_t offset = 0; // the start of the stripe within the file uint64_t indexLength = 0; // the length of the indexes in bytes uint64_t dataLength = 0; // the length of the data in bytes - uint32_t footerLength = 0; // the length of the footer in bytes - uint32_t numberOfRows = 0; // the number of rows in the stripe + uint64_t footerLength = 0; // the length of the footer in bytes + uint64_t numberOfRows = 0; // the number of rows in the stripe }; struct SchemaType { @@ -105,7 +105,7 @@ struct UserMetadataItem { using ColStatsBlob = std::vector; // Column statistics blob -struct FileFooter { +struct Footer { uint64_t headerLength = 0; // the length of the file header in bytes (always 3) uint64_t contentLength = 0; // the length of the file header and body in bytes std::vector stripes; // the information about the stripes @@ -237,7 +237,7 @@ class ProtobufReader { read(s, m_end - m_cur); } void read(PostScript&, size_t maxlen); - void read(FileFooter&, size_t maxlen); + void read(Footer&, size_t maxlen); void read(StripeInformation&, size_t maxlen); void read(SchemaType&, size_t maxlen); void read(UserMetadataItem&, size_t maxlen); @@ -519,7 +519,7 @@ class ProtobufWriter { public: size_t write(PostScript const&); - size_t write(FileFooter const&); + size_t write(Footer const&); size_t write(StripeInformation const&); size_t write(SchemaType const&); size_t write(UserMetadataItem const&); @@ -540,7 +540,7 @@ class ProtobufWriter { class OrcDecompressor { public: - OrcDecompressor(CompressionKind kind, uint32_t blockSize); + OrcDecompressor(CompressionKind kind, uint64_t blockSize); /** * @brief ORC block decompression @@ -553,17 +553,17 @@ class OrcDecompressor { host_span decompress_blocks(host_span src, rmm::cuda_stream_view stream); [[nodiscard]] uint32_t GetLog2MaxCompressionRatio() const { return m_log2MaxRatio; } - [[nodiscard]] uint32_t GetMaxUncompressedBlockSize(uint32_t block_len) const + [[nodiscard]] uint64_t GetMaxUncompressedBlockSize(uint32_t block_len) const { - return std::min(block_len << m_log2MaxRatio, m_blockSize); + return std::min(static_cast(block_len) << m_log2MaxRatio, m_blockSize); } [[nodiscard]] compression_type compression() const { return _compression; } - [[nodiscard]] uint32_t GetBlockSize() const { return m_blockSize; } + [[nodiscard]] auto GetBlockSize() const { return m_blockSize; } protected: compression_type _compression; uint32_t m_log2MaxRatio = 24; // log2 of maximum compression ratio - uint32_t m_blockSize; + uint64_t m_blockSize; std::vector m_buf; }; @@ -613,9 +613,9 @@ class metadata { public: explicit metadata(datasource* const src, rmm::cuda_stream_view stream); - [[nodiscard]] size_t get_total_rows() const { return ff.numberOfRows; } - [[nodiscard]] int get_num_stripes() const { return ff.stripes.size(); } - [[nodiscard]] int get_num_columns() const { return ff.types.size(); } + [[nodiscard]] auto get_total_rows() const { return ff.numberOfRows; } + [[nodiscard]] size_type get_num_stripes() const { return ff.stripes.size(); } + [[nodiscard]] size_type get_num_columns() const { return ff.types.size(); } /** * @brief Returns the name of the column with the given ID. * @@ -638,7 +638,7 @@ class metadata { CUDF_EXPECTS(column_id < get_num_columns(), "Out of range column id provided"); return column_paths[column_id]; } - [[nodiscard]] int get_row_index_stride() const { return ff.rowIndexStride; } + [[nodiscard]] auto get_row_index_stride() const { return ff.rowIndexStride; } /** * @brief Returns the ID of the parent column of the given column. @@ -666,7 +666,7 @@ class metadata { public: PostScript ps; - FileFooter ff; + Footer ff; Metadata md; std::vector stripefooters; std::unique_ptr decompressor; diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index b69722bbded..8c7ccf0527f 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -101,18 +101,18 @@ struct DictionaryEntry { struct ColumnDesc { uint8_t const* streams[CI_NUM_STREAMS]; // ptr to data stream index uint32_t strm_id[CI_NUM_STREAMS]; // stream ids - uint32_t strm_len[CI_NUM_STREAMS]; // stream length + int64_t strm_len[CI_NUM_STREAMS]; // stream length uint32_t* valid_map_base; // base pointer of valid bit map for this column void* column_data_base; // base pointer of column data - uint32_t start_row; // starting row of the stripe - uint32_t num_rows; // number of rows in stripe - uint32_t column_num_rows; // number of rows in whole column - uint32_t num_child_rows; // store number of child rows if it's list column + int64_t start_row; // starting row of the stripe + int64_t num_rows; // number of rows in stripe + int64_t column_num_rows; // number of rows in whole column + int64_t num_child_rows; // store number of child rows if it's list column uint32_t num_rowgroups; // number of rowgroups in the chunk - uint32_t dictionary_start; // start position in global dictionary + int64_t dictionary_start; // start position in global dictionary uint32_t dict_len; // length of local dictionary - uint32_t null_count; // number of null values in this stripe's column - uint32_t skip_count; // number of non-null values to skip + int64_t null_count; // number of null values in this stripe's column + int64_t skip_count; // number of non-null values to skip uint32_t rowgroup_id; // row group position ColumnEncodingKind encoding_kind; // column encoding kind TypeKind type_kind; // column data type @@ -129,10 +129,10 @@ struct ColumnDesc { */ struct RowGroup { uint32_t chunk_id; // Column chunk this entry belongs to - uint32_t strm_offset[2]; // Index offset for CI_DATA and CI_DATA2 streams + int64_t strm_offset[2]; // Index offset for CI_DATA and CI_DATA2 streams uint16_t run_pos[2]; // Run position for CI_DATA and CI_DATA2 uint32_t num_rows; // number of rows in rowgroup - uint32_t start_row; // starting row of the rowgroup + int64_t start_row; // starting row of the rowgroup uint32_t num_child_rows; // number of rows of children in rowgroup in case of list type }; @@ -140,9 +140,9 @@ struct RowGroup { * @brief Struct to describe an encoder data chunk */ struct EncChunk { - uint32_t start_row; // start row of this chunk + int64_t start_row; // start row of this chunk uint32_t num_rows; // number of rows in this chunk - uint32_t null_mask_start_row; // adjusted to multiple of 8 + int64_t null_mask_start_row; // adjusted to multiple of 8 uint32_t null_mask_num_rows; // adjusted to multiple of 8 ColumnEncodingKind encoding_kind; // column encoding kind TypeKind type_kind; // column data type @@ -253,7 +253,7 @@ constexpr uint32_t encode_block_size = 512; */ void ParseCompressedStripeData(CompressedStreamInfo* strm_info, int32_t num_streams, - uint32_t compression_block_size, + uint64_t compression_block_size, uint32_t log2maxcr, rmm::cuda_stream_view stream); @@ -276,7 +276,6 @@ void PostDecompressionReassemble(CompressedStreamInfo* strm_info, * @param[in] chunks ColumnDesc device array [stripe][column] * @param[in] num_columns Number of columns * @param[in] num_stripes Number of stripes - * @param[in] num_rowgroups Number of row groups * @param[in] rowidx_stride Row index stride * @param[in] use_base_stride Whether to use base stride obtained from meta or use the computed * value @@ -285,10 +284,9 @@ void PostDecompressionReassemble(CompressedStreamInfo* strm_info, void ParseRowGroupIndex(RowGroup* row_groups, CompressedStreamInfo* strm_info, ColumnDesc* chunks, - uint32_t num_columns, - uint32_t num_stripes, - uint32_t num_rowgroups, - uint32_t rowidx_stride, + size_type num_columns, + size_type num_stripes, + size_type rowidx_stride, bool use_base_stride, rmm::cuda_stream_view stream); @@ -304,9 +302,9 @@ void ParseRowGroupIndex(RowGroup* row_groups, */ void DecodeNullsAndStringDictionaries(ColumnDesc* chunks, DictionaryEntry* global_dictionary, - uint32_t num_columns, - uint32_t num_stripes, - size_t first_row, + size_type num_columns, + size_type num_stripes, + int64_t first_row, rmm::cuda_stream_view stream); /** @@ -329,12 +327,12 @@ void DecodeNullsAndStringDictionaries(ColumnDesc* chunks, void DecodeOrcColumnData(ColumnDesc* chunks, DictionaryEntry* global_dictionary, device_2dspan row_groups, - uint32_t num_columns, - uint32_t num_stripes, - size_t first_row, + size_type num_columns, + size_type num_stripes, + int64_t first_row, table_device_view tz_table, - uint32_t num_rowgroups, - uint32_t rowidx_stride, + int64_t num_rowgroups, + size_type rowidx_stride, size_t level, size_type* error_count, rmm::cuda_stream_view stream); @@ -364,8 +362,8 @@ void EncodeOrcColumnData(device_2dspan chunks, void EncodeStripeDictionaries(stripe_dictionary const* stripes, device_span columns, device_2dspan chunks, - uint32_t num_string_columns, - uint32_t num_stripes, + size_type num_string_columns, + size_type num_stripes, device_2dspan enc_streams, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/orc/reader_impl.cu b/cpp/src/io/orc/reader_impl.cu index cf3121fe659..f078e20f7e6 100644 --- a/cpp/src/io/orc/reader_impl.cu +++ b/cpp/src/io/orc/reader_impl.cu @@ -37,7 +37,7 @@ reader::impl::impl(std::vector>&& sources, { } -table_with_metadata reader::impl::read(uint64_t skip_rows, +table_with_metadata reader::impl::read(int64_t skip_rows, std::optional const& num_rows_opt, std::vector> const& stripes) { diff --git a/cpp/src/io/orc/reader_impl.hpp b/cpp/src/io/orc/reader_impl.hpp index 7746bacd188..ab8eaebeb61 100644 --- a/cpp/src/io/orc/reader_impl.hpp +++ b/cpp/src/io/orc/reader_impl.hpp @@ -60,7 +60,7 @@ class reader::impl { * @param stripes Indices of individual stripes to load if non-empty * @return The set of columns along with metadata */ - table_with_metadata read(uint64_t skip_rows, + table_with_metadata read(int64_t skip_rows, std::optional const& num_rows_opt, std::vector> const& stripes); @@ -72,7 +72,7 @@ class reader::impl { * @param num_rows_opt Optional number of rows to read, or `std::nullopt` to read all rows * @param stripes Indices of individual stripes to load if non-empty */ - void prepare_data(uint64_t skip_rows, + void prepare_data(int64_t skip_rows, std::optional const& num_rows_opt, std::vector> const& stripes); diff --git a/cpp/src/io/orc/reader_impl_helpers.hpp b/cpp/src/io/orc/reader_impl_helpers.hpp index 48742b5fc8c..22482bad486 100644 --- a/cpp/src/io/orc/reader_impl_helpers.hpp +++ b/cpp/src/io/orc/reader_impl_helpers.hpp @@ -38,7 +38,7 @@ struct reader_column_meta { std::vector> orc_col_map; // Number of rows in child columns. - std::vector num_child_rows; + std::vector num_child_rows; // Consists of parent column valid_map and null count. std::vector parent_column_data; @@ -46,14 +46,14 @@ struct reader_column_meta { std::vector parent_column_index; // Start row of child columns [stripe][column]. - std::vector child_start_row; + std::vector child_start_row; // Number of rows of child columns [stripe][column]. - std::vector num_child_rows_per_stripe; + std::vector num_child_rows_per_stripe; struct row_group_meta { - uint32_t num_rows; // number of rows in a column in a row group - uint32_t start_row; // start row in a column in a row group + size_type num_rows; // number of rows in a column in a row group + int64_t start_row; // start row in a column in a row group }; // Row group metadata [rowgroup][column]. diff --git a/cpp/src/io/orc/reader_impl_preprocess.cu b/cpp/src/io/orc/reader_impl_preprocess.cu index ea191f67785..6c59f83bc46 100644 --- a/cpp/src/io/orc/reader_impl_preprocess.cu +++ b/cpp/src/io/orc/reader_impl_preprocess.cu @@ -77,7 +77,7 @@ std::size_t gather_stream_info(std::size_t stripe_index, host_span types, bool use_index, bool apply_struct_map, - std::size_t* num_dictionary_entries, + int64_t* num_dictionary_entries, std::vector& stream_info, cudf::detail::hostdevice_2dvector& chunks) { @@ -174,8 +174,8 @@ rmm::device_buffer decompress_stripe_data( host_span stream_info, cudf::detail::hostdevice_2dvector& chunks, cudf::detail::hostdevice_2dvector& row_groups, - std::size_t num_stripes, - std::size_t row_index_stride, + size_type num_stripes, + size_type row_index_stride, bool use_base_stride, rmm::cuda_stream_view stream) { @@ -350,15 +350,15 @@ rmm::device_buffer decompress_stripe_data( // We can check on host after stream synchronize CUDF_EXPECTS(not any_block_failure[0], "Error during decompression"); - auto const num_columns = chunks.size().second; + size_type const num_columns = chunks.size().second; // Update the stream information with the updated uncompressed info // TBD: We could update the value from the information we already // have in stream_info[], but using the gpu results also updates // max_uncompressed_size to the actual uncompressed size, or zero if // decompression failed. - for (std::size_t i = 0; i < num_stripes; ++i) { - for (std::size_t j = 0; j < num_columns; ++j) { + for (size_type i = 0; i < num_stripes; ++i) { + for (size_type j = 0; j < num_columns; ++j) { auto& chunk = chunks[i][j]; for (int k = 0; k < gpu::CI_NUM_STREAMS; ++k) { if (chunk.strm_len[k] > 0 && chunk.strm_id[k] < compinfo.size()) { @@ -377,7 +377,6 @@ rmm::device_buffer decompress_stripe_data( chunks.base_device_ptr(), num_columns, num_stripes, - row_groups.size().first, row_index_stride, use_base_stride, stream); @@ -485,8 +484,8 @@ void update_null_mask(cudf::detail::hostdevice_2dvector& chunks * @param mr Device memory resource to use for device memory allocation */ void decode_stream_data(std::size_t num_dicts, - std::size_t skip_rows, - std::size_t row_index_stride, + int64_t skip_rows, + size_type row_index_stride, std::size_t level, table_view const& tz_table, cudf::detail::hostdevice_2dvector& chunks, @@ -622,9 +621,9 @@ void aggregate_child_meta(std::size_t level, col_meta.num_child_rows_per_stripe.resize(number_of_child_chunks); col_meta.rwgrp_meta.resize(num_of_rowgroups * num_child_cols); - auto child_start_row = cudf::detail::host_2dspan( + auto child_start_row = cudf::detail::host_2dspan( col_meta.child_start_row.data(), num_of_stripes, num_child_cols); - auto num_child_rows_per_stripe = cudf::detail::host_2dspan( + auto num_child_rows_per_stripe = cudf::detail::host_2dspan( col_meta.num_child_rows_per_stripe.data(), num_of_stripes, num_child_cols); auto rwgrp_meta = cudf::detail::host_2dspan( col_meta.rwgrp_meta.data(), num_of_rowgroups, num_child_cols); @@ -634,7 +633,7 @@ void aggregate_child_meta(std::size_t level, // For each parent column, update its child column meta for each stripe. std::for_each(nested_cols.begin(), nested_cols.end(), [&](auto const p_col) { auto const parent_col_idx = col_meta.orc_col_map[level][p_col.id]; - auto start_row = 0; + int64_t start_row = 0; auto processed_row_groups = 0; for (std::size_t stripe_id = 0; stripe_id < num_of_stripes; stripe_id++) { @@ -711,7 +710,7 @@ void generate_offsets_for_list(host_span buff_data, rmm::cuda_ } // namespace -void reader::impl::prepare_data(uint64_t skip_rows, +void reader::impl::prepare_data(int64_t skip_rows, std::optional const& num_rows_opt, std::vector> const& stripes) { @@ -813,7 +812,7 @@ void reader::impl::prepare_data(uint64_t skip_rows, // Only use if we don't have much work with complete columns & stripes // TODO: Consider nrows, gpu, and tune the threshold (rows_to_read > _metadata.get_row_index_stride() && !(_metadata.get_row_index_stride() & 7) && - _metadata.get_row_index_stride() > 0 && num_columns * total_num_stripes < 8 * 128) && + _metadata.get_row_index_stride() != 0 && num_columns * total_num_stripes < 8 * 128) && // Only use if first row is aligned to a stripe boundary // TODO: Fix logic to handle unaligned rows (rows_to_skip == 0); @@ -833,10 +832,10 @@ void reader::impl::prepare_data(uint64_t skip_rows, // Tracker for eventually deallocating compressed and uncompressed data auto& stripe_data = lvl_stripe_data[level]; - std::size_t stripe_start_row = 0; - std::size_t num_dict_entries = 0; - std::size_t num_rowgroups = 0; - int stripe_idx = 0; + int64_t stripe_start_row = 0; + int64_t num_dict_entries = 0; + int64_t num_rowgroups = 0; + size_type stripe_idx = 0; std::vector, std::size_t>> read_tasks; for (auto const& stripe_source_mapping : selected_stripes) { @@ -1003,7 +1002,6 @@ void reader::impl::prepare_data(uint64_t skip_rows, chunks.base_device_ptr(), num_columns, total_num_stripes, - num_rowgroups, _metadata.get_row_index_stride(), level == 0, _stream); diff --git a/cpp/src/io/orc/stripe_data.cu b/cpp/src/io/orc/stripe_data.cu index 5e10d90ae9b..1572b7246c0 100644 --- a/cpp/src/io/orc/stripe_data.cu +++ b/cpp/src/io/orc/stripe_data.cu @@ -94,8 +94,8 @@ struct orc_strdict_state_s { }; struct orc_datadec_state_s { - uint32_t cur_row; // starting row of current batch - uint32_t end_row; // ending row of this chunk (start_row + num_rows) + int64_t cur_row; // starting row of current batch + int64_t end_row; // ending row of this chunk (start_row + num_rows) uint32_t max_vals; // max # of non-zero values to decode in this batch uint32_t nrows; // # of rows in current batch (up to block_size) uint32_t buffered_count; // number of buffered values in the secondary data stream @@ -108,7 +108,7 @@ struct orcdec_state_s { orc_bytestream_s bs; orc_bytestream_s bs2; int is_string; - uint64_t num_child_rows; + int64_t num_child_rows; union { orc_strdict_state_s dict; uint32_t nulls_desc_row; // number of rows processed for nulls. @@ -1086,9 +1086,9 @@ template CUDF_KERNEL void __launch_bounds__(block_size) gpuDecodeNullsAndStringDictionaries(ColumnDesc* chunks, DictionaryEntry* global_dictionary, - uint32_t num_columns, - uint32_t num_stripes, - size_t first_row) + size_type num_columns, + size_type num_stripes, + int64_t first_row) { __shared__ __align__(16) orcdec_state_s state_g; using warp_reduce = cub::WarpReduce; @@ -1132,12 +1132,13 @@ CUDF_KERNEL void __launch_bounds__(block_size) : 0; auto const num_elems = s->chunk.num_rows - parent_null_count; while (s->top.nulls_desc_row < num_elems) { - uint32_t nrows_max = min(num_elems - s->top.nulls_desc_row, blockDim.x * 32); - uint32_t nrows; - size_t row_in; + auto const nrows_max = + static_cast(min(num_elems - s->top.nulls_desc_row, blockDim.x * 32ul)); bytestream_fill(&s->bs, t); __syncthreads(); + + uint32_t nrows; if (s->chunk.strm_len[CI_PRESENT] > 0) { uint32_t nbytes = Byte_RLE(&s->bs, &s->u.rle8, s->vals.u8, (nrows_max + 7) >> 3, t); nrows = min(nrows_max, nbytes * 8u); @@ -1151,7 +1152,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) } __syncthreads(); - row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count; + auto const row_in = s->chunk.start_row + s->top.nulls_desc_row - prev_parent_null_count; if (row_in + nrows > first_row && row_in < first_row + max_num_rows && s->chunk.valid_map_base != nullptr) { int64_t dst_row = row_in - first_row; @@ -1284,7 +1285,10 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s, if (t == 0) { if (s->chunk.skip_count != 0) { - s->u.rowdec.nz_count = min(min(s->chunk.skip_count, s->top.data.max_vals), blockDim.x); + s->u.rowdec.nz_count = + min(static_cast( + min(s->chunk.skip_count, static_cast(s->top.data.max_vals))), + blockDim.x); s->chunk.skip_count -= s->u.rowdec.nz_count; s->top.data.nrows = s->u.rowdec.nz_count; } else { @@ -1297,11 +1301,12 @@ static __device__ void DecodeRowPositions(orcdec_state_s* s, } while (s->u.rowdec.nz_count < s->top.data.max_vals && s->top.data.cur_row + s->top.data.nrows < s->top.data.end_row) { - uint32_t nrows = min(s->top.data.end_row - (s->top.data.cur_row + s->top.data.nrows), - min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x)); + uint32_t const remaining_rows = s->top.data.end_row - (s->top.data.cur_row + s->top.data.nrows); + uint32_t nrows = + min(remaining_rows, min((row_decoder_buffer_size - s->u.rowdec.nz_count) * 2, blockDim.x)); if (s->chunk.valid_map_base != nullptr) { // We have a present stream - uint32_t rmax = s->top.data.end_row - min((uint32_t)first_row, s->top.data.end_row); + uint32_t rmax = s->top.data.end_row - min(first_row, s->top.data.end_row); auto r = (uint32_t)(s->top.data.cur_row + s->top.data.nrows + t - first_row); uint32_t valid = (t < nrows && r < rmax) ? (((uint8_t const*)s->chunk.valid_map_base)[r >> 3] >> (r & 7)) & 1 @@ -1364,8 +1369,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) DictionaryEntry* global_dictionary, table_device_view tz_table, device_2dspan row_groups, - size_t first_row, - uint32_t rowidx_stride, + int64_t first_row, + size_type rowidx_stride, size_t level, size_type* error_count) { @@ -1405,8 +1410,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (s->top.data.index.strm_offset[1] > s->chunk.strm_len[CI_DATA2]) { atomicAdd(error_count, 1); } - uint32_t ofs0 = min(s->top.data.index.strm_offset[0], s->chunk.strm_len[CI_DATA]); - uint32_t ofs1 = min(s->top.data.index.strm_offset[1], s->chunk.strm_len[CI_DATA2]); + auto const ofs0 = min(s->top.data.index.strm_offset[0], s->chunk.strm_len[CI_DATA]); + auto const ofs1 = min(s->top.data.index.strm_offset[1], s->chunk.strm_len[CI_DATA2]); uint32_t rowgroup_rowofs = (level == 0) ? (blockIdx.y - min(s->chunk.rowgroup_id, blockIdx.y)) * rowidx_stride : s->top.data.index.start_row; @@ -1415,14 +1420,13 @@ CUDF_KERNEL void __launch_bounds__(block_size) s->chunk.strm_len[CI_DATA] -= ofs0; s->chunk.streams[CI_DATA2] += ofs1; s->chunk.strm_len[CI_DATA2] -= ofs1; - rowgroup_rowofs = min(rowgroup_rowofs, s->chunk.num_rows); + rowgroup_rowofs = min(static_cast(rowgroup_rowofs), s->chunk.num_rows); s->chunk.start_row += rowgroup_rowofs; s->chunk.num_rows -= rowgroup_rowofs; } - s->is_string = (s->chunk.type_kind == STRING || s->chunk.type_kind == BINARY || + s->is_string = (s->chunk.type_kind == STRING || s->chunk.type_kind == BINARY || s->chunk.type_kind == VARCHAR || s->chunk.type_kind == CHAR); - s->top.data.cur_row = - max(s->chunk.start_row, max((int32_t)(first_row - s->chunk.skip_count), 0)); + s->top.data.cur_row = max(s->chunk.start_row, max(first_row - s->chunk.skip_count, 0ul)); s->top.data.end_row = s->chunk.start_row + s->chunk.num_rows; s->top.data.buffered_count = 0; if (s->top.data.end_row > first_row + max_num_rows) { @@ -1824,7 +1828,8 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (num_rowgroups > 0) { row_groups[blockIdx.y][blockIdx.x].num_child_rows = s->num_child_rows; } - atomicAdd(&chunks[chunk_id].num_child_rows, s->num_child_rows); + cuda::atomic_ref ref{chunks[chunk_id].num_child_rows}; + ref.fetch_add(s->num_child_rows, cuda::std::memory_order_relaxed); } } @@ -1840,9 +1845,9 @@ CUDF_KERNEL void __launch_bounds__(block_size) */ void __host__ DecodeNullsAndStringDictionaries(ColumnDesc* chunks, DictionaryEntry* global_dictionary, - uint32_t num_columns, - uint32_t num_stripes, - size_t first_row, + size_type num_columns, + size_type num_stripes, + int64_t first_row, rmm::cuda_stream_view stream) { dim3 dim_block(block_size, 1); @@ -1869,17 +1874,17 @@ void __host__ DecodeNullsAndStringDictionaries(ColumnDesc* chunks, void __host__ DecodeOrcColumnData(ColumnDesc* chunks, DictionaryEntry* global_dictionary, device_2dspan row_groups, - uint32_t num_columns, - uint32_t num_stripes, - size_t first_row, + size_type num_columns, + size_type num_stripes, + int64_t first_row, table_device_view tz_table, - uint32_t num_rowgroups, - uint32_t rowidx_stride, + int64_t num_rowgroups, + size_type rowidx_stride, size_t level, size_type* error_count, rmm::cuda_stream_view stream) { - uint32_t num_chunks = num_columns * num_stripes; + auto const num_chunks = num_columns * num_stripes; dim3 dim_block(block_size, 1); // 1024 threads per chunk dim3 dim_grid((num_rowgroups > 0) ? num_columns : num_chunks, (num_rowgroups > 0) ? num_rowgroups : 1); diff --git a/cpp/src/io/orc/stripe_enc.cu b/cpp/src/io/orc/stripe_enc.cu index 748e4d2c27b..b6fc4e3510f 100644 --- a/cpp/src/io/orc/stripe_enc.cu +++ b/cpp/src/io/orc/stripe_enc.cu @@ -647,8 +647,8 @@ static __device__ void encode_null_mask(orcenc_state_s* s, if (t_nrows == 0) return 0; if (mask == nullptr) return 0xff; - auto const begin_offset = row + offset; - auto const end_offset = min(begin_offset + 8, offset + column.size()); + size_type const begin_offset = row + offset; + auto const end_offset = min(begin_offset + 8, offset + column.size()); auto const mask_word = cudf::detail::get_mask_offset_word(mask, 0, begin_offset, end_offset); return mask_word & 0xff; }; @@ -1309,8 +1309,8 @@ void EncodeOrcColumnData(device_2dspan chunks, void EncodeStripeDictionaries(stripe_dictionary const* stripes, device_span columns, device_2dspan chunks, - uint32_t num_string_columns, - uint32_t num_stripes, + size_type num_string_columns, + size_type num_stripes, device_2dspan enc_streams, rmm::cuda_stream_view stream) { diff --git a/cpp/src/io/orc/stripe_init.cu b/cpp/src/io/orc/stripe_init.cu index 350700a22fd..dd44b779402 100644 --- a/cpp/src/io/orc/stripe_init.cu +++ b/cpp/src/io/orc/stripe_init.cu @@ -42,8 +42,11 @@ struct compressed_stream_s { }; // blockDim {128,1,1} -CUDF_KERNEL void __launch_bounds__(128, 8) gpuParseCompressedStripeData( - CompressedStreamInfo* strm_info, int32_t num_streams, uint32_t block_size, uint32_t log2maxcr) +CUDF_KERNEL void __launch_bounds__(128, 8) + gpuParseCompressedStripeData(CompressedStreamInfo* strm_info, + int32_t num_streams, + uint64_t compression_block_size, + uint32_t log2maxcr) { __shared__ compressed_stream_s strm_g[4]; @@ -60,18 +63,18 @@ CUDF_KERNEL void __launch_bounds__(128, 8) gpuParseCompressedStripeData( uint8_t const* end = cur + s->info.compressed_data_size; uint8_t* uncompressed = s->info.uncompressed_data; size_t max_uncompressed_size = 0; - uint32_t max_uncompressed_block_size = 0; + uint64_t max_uncompressed_block_size = 0; uint32_t num_compressed_blocks = 0; uint32_t num_uncompressed_blocks = 0; while (cur + block_header_size < end) { uint32_t block_len = shuffle((lane_id == 0) ? cur[0] | (cur[1] << 8) | (cur[2] << 16) : 0); auto const is_uncompressed = static_cast(block_len & 1); - uint32_t uncompressed_size; + uint64_t uncompressed_size; device_span* init_in_ctl = nullptr; device_span* init_out_ctl = nullptr; block_len >>= 1; cur += block_header_size; - if (block_len > block_size || cur + block_len > end) { + if (block_len > compression_block_size || cur + block_len > end) { // Fatal num_compressed_blocks = 0; max_uncompressed_size = 0; @@ -81,9 +84,10 @@ CUDF_KERNEL void __launch_bounds__(128, 8) gpuParseCompressedStripeData( // TBD: For some codecs like snappy, it wouldn't be too difficult to get the actual // uncompressed size and avoid waste due to block size alignment For now, rely on the max // compression ratio to limit waste for the most extreme cases (small single-block streams) - uncompressed_size = (is_uncompressed) ? block_len - : (block_len < (block_size >> log2maxcr)) ? block_len << log2maxcr - : block_size; + uncompressed_size = (is_uncompressed) ? block_len + : (block_len < (compression_block_size >> log2maxcr)) + ? block_len << log2maxcr + : compression_block_size; if (is_uncompressed) { if (uncompressed_size <= 32) { // For short blocks, copy the uncompressed data to output @@ -446,10 +450,9 @@ static __device__ void gpuMapRowIndexToUncompressed(rowindex_state_s* s, CUDF_KERNEL void __launch_bounds__(128, 8) gpuParseRowGroupIndex(RowGroup* row_groups, CompressedStreamInfo* strm_info, ColumnDesc* chunks, - uint32_t num_columns, - uint32_t num_stripes, - uint32_t num_rowgroups, - uint32_t rowidx_stride, + size_type num_columns, + size_type num_stripes, + size_type rowidx_stride, bool use_base_stride) { __shared__ __align__(16) rowindex_state_s state_g; @@ -554,7 +557,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) void __host__ ParseCompressedStripeData(CompressedStreamInfo* strm_info, int32_t num_streams, - uint32_t compression_block_size, + uint64_t compression_block_size, uint32_t log2maxcr, rmm::cuda_stream_view stream) { @@ -577,23 +580,16 @@ void __host__ PostDecompressionReassemble(CompressedStreamInfo* strm_info, void __host__ ParseRowGroupIndex(RowGroup* row_groups, CompressedStreamInfo* strm_info, ColumnDesc* chunks, - uint32_t num_columns, - uint32_t num_stripes, - uint32_t num_rowgroups, - uint32_t rowidx_stride, + size_type num_columns, + size_type num_stripes, + size_type rowidx_stride, bool use_base_stride, rmm::cuda_stream_view stream) { dim3 dim_block(128, 1); dim3 dim_grid(num_columns, num_stripes); // 1 column chunk per block - gpuParseRowGroupIndex<<>>(row_groups, - strm_info, - chunks, - num_columns, - num_stripes, - num_rowgroups, - rowidx_stride, - use_base_stride); + gpuParseRowGroupIndex<<>>( + row_groups, strm_info, chunks, num_columns, num_stripes, rowidx_stride, use_base_stride); } void __host__ reduce_pushdown_masks(device_span columns, diff --git a/cpp/src/io/orc/writer_impl.cu b/cpp/src/io/orc/writer_impl.cu index f0235e13422..ade0e75de35 100644 --- a/cpp/src/io/orc/writer_impl.cu +++ b/cpp/src/io/orc/writer_impl.cu @@ -1309,15 +1309,15 @@ intermediate_statistics gather_statistic_blobs(statistics_freq const stats_freq, * @param stream CUDA stream used for device memory operations and kernel launches * @return The encoded statistic blobs */ -encoded_footer_statistics finish_statistic_blobs(FileFooter const& file_footer, +encoded_footer_statistics finish_statistic_blobs(Footer const& footer, persisted_statistics& per_chunk_stats, rmm::cuda_stream_view stream) { auto stripe_size_iter = thrust::make_transform_iterator(per_chunk_stats.stripe_stat_merge.begin(), [](auto const& s) { return s.size(); }); - auto const num_columns = file_footer.types.size() - 1; - auto const num_stripes = file_footer.stripes.size(); + auto const num_columns = footer.types.size() - 1; + auto const num_stripes = footer.stripes.size(); auto const num_stripe_blobs = thrust::reduce(stripe_size_iter, stripe_size_iter + per_chunk_stats.stripe_stat_merge.size()); @@ -1333,7 +1333,7 @@ encoded_footer_statistics finish_statistic_blobs(FileFooter const& file_footer, // Fill in stats_merge and stat_chunks on the host for (auto i = 0u; i < num_file_blobs; ++i) { stats_merge[i].col_dtype = per_chunk_stats.col_types[i]; - stats_merge[i].stats_dtype = kind_to_stats_type(file_footer.types[i + 1].kind); + stats_merge[i].stats_dtype = kind_to_stats_type(footer.types[i + 1].kind); // Write the sum for empty columns, equal to zero h_stat_chunks[i].has_sum = true; } @@ -2632,21 +2632,21 @@ void writer::impl::write_orc_data_to_sink(encoded_data const& enc_data, void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table, std::vector& stripes) { - if (_ffooter.headerLength == 0) { + if (_footer.headerLength == 0) { // First call - _ffooter.headerLength = std::strlen(MAGIC); - _ffooter.writer = cudf_writer_code; - _ffooter.rowIndexStride = _row_index_stride; - _ffooter.types.resize(1 + orc_table.num_columns()); - _ffooter.types[0].kind = STRUCT; + _footer.headerLength = std::strlen(MAGIC); + _footer.writer = cudf_writer_code; + _footer.rowIndexStride = _row_index_stride; + _footer.types.resize(1 + orc_table.num_columns()); + _footer.types[0].kind = STRUCT; for (auto const& column : orc_table.columns) { if (!column.is_child()) { - _ffooter.types[0].subtypes.emplace_back(column.id()); - _ffooter.types[0].fieldNames.emplace_back(column.orc_name()); + _footer.types[0].subtypes.emplace_back(column.id()); + _footer.types[0].fieldNames.emplace_back(column.orc_name()); } } for (auto const& column : orc_table.columns) { - auto& schema_type = _ffooter.types[column.id()]; + auto& schema_type = _footer.types[column.id()]; schema_type.kind = column.orc_kind(); if (column.orc_kind() == DECIMAL) { schema_type.scale = static_cast(column.scale()); @@ -2667,18 +2667,18 @@ void writer::impl::add_table_to_footer_data(orc_table_view const& orc_table, } } else { // verify the user isn't passing mismatched tables - CUDF_EXPECTS(_ffooter.types.size() == 1 + orc_table.num_columns(), + CUDF_EXPECTS(_footer.types.size() == 1 + orc_table.num_columns(), "Mismatch in table structure between multiple calls to write"); CUDF_EXPECTS( std::all_of(orc_table.columns.cbegin(), orc_table.columns.cend(), - [&](auto const& col) { return _ffooter.types[col.id()].kind == col.orc_kind(); }), + [&](auto const& col) { return _footer.types[col.id()].kind == col.orc_kind(); }), "Mismatch in column types between multiple calls to write"); } - _ffooter.stripes.insert(_ffooter.stripes.end(), - std::make_move_iterator(stripes.begin()), - std::make_move_iterator(stripes.end())); - _ffooter.numberOfRows += orc_table.num_rows(); + _footer.stripes.insert(_footer.stripes.end(), + std::make_move_iterator(stripes.begin()), + std::make_move_iterator(stripes.end())); + _footer.numberOfRows += orc_table.num_rows(); } void writer::impl::close() @@ -2689,11 +2689,11 @@ void writer::impl::close() if (_stats_freq != statistics_freq::STATISTICS_NONE) { // Write column statistics - auto statistics = finish_statistic_blobs(_ffooter, _persisted_stripe_statistics, _stream); + auto statistics = finish_statistic_blobs(_footer, _persisted_stripe_statistics, _stream); // File-level statistics { - _ffooter.statistics.reserve(_ffooter.types.size()); + _footer.statistics.reserve(_footer.types.size()); ProtobufWriter pbw; // Root column: number of rows @@ -2702,32 +2702,32 @@ void writer::impl::close() // Root column: has nulls pbw.put_uint(encode_field_number(10)); pbw.put_uint(0); - _ffooter.statistics.emplace_back(pbw.release()); + _footer.statistics.emplace_back(pbw.release()); // Add file stats, stored after stripe stats in `column_stats` - _ffooter.statistics.insert(_ffooter.statistics.end(), - std::make_move_iterator(statistics.file_level.begin()), - std::make_move_iterator(statistics.file_level.end())); + _footer.statistics.insert(_footer.statistics.end(), + std::make_move_iterator(statistics.file_level.begin()), + std::make_move_iterator(statistics.file_level.end())); } // Stripe-level statistics if (_stats_freq == statistics_freq::STATISTICS_ROWGROUP or _stats_freq == statistics_freq::STATISTICS_PAGE) { - _orc_meta.stripeStats.resize(_ffooter.stripes.size()); - for (size_t stripe_id = 0; stripe_id < _ffooter.stripes.size(); stripe_id++) { - _orc_meta.stripeStats[stripe_id].colStats.resize(_ffooter.types.size()); + _orc_meta.stripeStats.resize(_footer.stripes.size()); + for (size_t stripe_id = 0; stripe_id < _footer.stripes.size(); stripe_id++) { + _orc_meta.stripeStats[stripe_id].colStats.resize(_footer.types.size()); ProtobufWriter pbw; // Root column: number of rows pbw.put_uint(encode_field_number(1)); - pbw.put_uint(_ffooter.stripes[stripe_id].numberOfRows); + pbw.put_uint(_footer.stripes[stripe_id].numberOfRows); // Root column: has nulls pbw.put_uint(encode_field_number(10)); pbw.put_uint(0); _orc_meta.stripeStats[stripe_id].colStats[0] = pbw.release(); - for (size_t col_idx = 0; col_idx < _ffooter.types.size() - 1; col_idx++) { - size_t idx = _ffooter.stripes.size() * col_idx + stripe_id; + for (size_t col_idx = 0; col_idx < _footer.types.size() - 1; col_idx++) { + size_t idx = _footer.stripes.size() * col_idx + stripe_id; _orc_meta.stripeStats[stripe_id].colStats[1 + col_idx] = std::move(statistics.stripe_level[idx]); } @@ -2737,13 +2737,11 @@ void writer::impl::close() _persisted_stripe_statistics.clear(); - _ffooter.contentLength = _out_sink->bytes_written(); - std::transform(_kv_meta.begin(), - _kv_meta.end(), - std::back_inserter(_ffooter.metadata), - [&](auto const& udata) { - return UserMetadataItem{udata.first, udata.second}; - }); + _footer.contentLength = _out_sink->bytes_written(); + std::transform( + _kv_meta.begin(), _kv_meta.end(), std::back_inserter(_footer.metadata), [&](auto const& udata) { + return UserMetadataItem{udata.first, udata.second}; + }); // Write statistics metadata if (not _orc_meta.stripeStats.empty()) { @@ -2756,7 +2754,7 @@ void writer::impl::close() ps.metadataLength = 0; } ProtobufWriter pbw((_compression_kind != NONE) ? 3 : 0); - pbw.write(_ffooter); + pbw.write(_footer); add_uncompressed_block_headers(_compression_kind, _compression_blocksize, pbw.buffer()); // Write postscript metadata diff --git a/cpp/src/io/orc/writer_impl.hpp b/cpp/src/io/orc/writer_impl.hpp index f1dc45087d5..417d29efb58 100644 --- a/cpp/src/io/orc/writer_impl.hpp +++ b/cpp/src/io/orc/writer_impl.hpp @@ -360,7 +360,7 @@ class writer::impl { // Internal states, filled during `write()` and written to sink during `write` and `close()`. std::unique_ptr _table_meta; - FileFooter _ffooter; + Footer _footer; Metadata _orc_meta; persisted_statistics _persisted_stripe_statistics; // Statistics data saved between calls. bool _closed = false; // To track if the output has been written to sink. diff --git a/cpp/src/io/utilities/row_selection.cpp b/cpp/src/io/utilities/row_selection.cpp index bb5565d8ce7..f136cd11ff7 100644 --- a/cpp/src/io/utilities/row_selection.cpp +++ b/cpp/src/io/utilities/row_selection.cpp @@ -23,8 +23,8 @@ namespace cudf::io::detail { -std::pair skip_rows_num_rows_from_options( - uint64_t skip_rows, std::optional const& num_rows, uint64_t num_source_rows) +std::pair skip_rows_num_rows_from_options( + int64_t skip_rows, std::optional const& num_rows, int64_t num_source_rows) { auto const rows_to_skip = std::min(skip_rows, num_source_rows); if (not num_rows.has_value()) { @@ -36,7 +36,7 @@ std::pair skip_rows_num_rows_from_options( // Limit the number of rows to the end of the input return { rows_to_skip, - static_cast(std::min(num_rows.value(), num_source_rows - rows_to_skip))}; + static_cast(std::min(num_rows.value(), num_source_rows - rows_to_skip))}; } } // namespace cudf::io::detail diff --git a/cpp/src/io/utilities/row_selection.hpp b/cpp/src/io/utilities/row_selection.hpp index 211726816de..0b5d3aef8bd 100644 --- a/cpp/src/io/utilities/row_selection.hpp +++ b/cpp/src/io/utilities/row_selection.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -34,7 +34,7 @@ namespace cudf::io::detail { * * @throw std::overflow_exception The requested number of rows exceeds the column size limit */ -std::pair skip_rows_num_rows_from_options( - uint64_t skip_rows, std::optional const& num_rows, uint64_t num_source_rows); +std::pair skip_rows_num_rows_from_options( + int64_t skip_rows, std::optional const& num_rows, int64_t num_source_rows); } // namespace cudf::io::detail diff --git a/cpp/tests/io/orc_test.cpp b/cpp/tests/io/orc_test.cpp index 0b34b39f739..24e2e2cfea0 100644 --- a/cpp/tests/io/orc_test.cpp +++ b/cpp/tests/io/orc_test.cpp @@ -2111,4 +2111,62 @@ TEST_F(OrcWriterTest, BounceBufferBug) cudf::io::write_orc(out_opts); } +TEST_F(OrcReaderTest, SizeTypeRowsOverflow) +{ + using cudf::test::iterators::no_nulls; + constexpr auto num_rows = 500'000'000l; + constexpr auto num_reps = 5; + constexpr auto total_rows = num_rows * num_reps; + static_assert(total_rows > std::numeric_limits::max()); + + auto sequence = cudf::detail::make_counting_transform_iterator(0, [](auto i) { return i % 127; }); + column_wrapper col(sequence, + sequence + num_rows); + table_view chunk_table({col}); + + std::vector out_buffer; + { + cudf::io::chunked_orc_writer_options write_opts = + cudf::io::chunked_orc_writer_options::builder(cudf::io::sink_info{&out_buffer}); + + auto writer = cudf::io::orc_chunked_writer(write_opts); + for (int i = 0; i < num_reps; i++) { + writer.write(chunk_table); + } + } + + // Test reading the metadata + auto metadata = read_orc_metadata(cudf::io::source_info{out_buffer.data(), out_buffer.size()}); + EXPECT_EQ(metadata.num_rows(), total_rows); + EXPECT_EQ(metadata.num_stripes(), total_rows / 1'000'000); + + constexpr auto num_rows_to_read = 1'000'000; + const auto num_rows_to_skip = metadata.num_rows() - num_rows_to_read; + + // Read the last million rows + cudf::io::orc_reader_options skip_opts = + cudf::io::orc_reader_options::builder( + cudf::io::source_info{out_buffer.data(), out_buffer.size()}) + .use_index(false) + .skip_rows(num_rows_to_skip); + const auto got_with_skip = cudf::io::read_orc(skip_opts).tbl; + + const auto sequence_start = num_rows_to_skip % num_rows; + column_wrapper skipped_col( + sequence + sequence_start, sequence + sequence_start + num_rows_to_read, no_nulls()); + table_view expected({skipped_col}); + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_with_skip->view()); + + // Read the last stripe (still the last million rows) + cudf::io::orc_reader_options stripe_opts = + cudf::io::orc_reader_options::builder( + cudf::io::source_info{out_buffer.data(), out_buffer.size()}) + .use_index(false) + .stripes({{metadata.num_stripes() - 1}}); + const auto got_with_stripe_selection = cudf::io::read_orc(stripe_opts).tbl; + + CUDF_TEST_EXPECT_TABLES_EQUAL(expected, got_with_stripe_selection->view()); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 16feccc12d0..3fc9823b914 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -3,6 +3,7 @@ import cudf from cudf.core.buffer import acquire_spill_lock +from libc.stdint cimport int64_t from libcpp cimport bool, int from libcpp.map cimport map from libcpp.memory cimport unique_ptr @@ -98,8 +99,8 @@ cpdef read_orc(object filepaths_or_buffers, filepaths_or_buffers, columns, stripes or [], - get_size_t_arg(skip_rows, "skip_rows"), - get_size_t_arg(num_rows, "num_rows"), + get_skiprows_arg(skip_rows), + get_num_rows_arg(num_rows), ( type_id.EMPTY if timestamp_type is None else @@ -318,15 +319,16 @@ def write_orc( libcudf_write_orc(c_orc_writer_options) -cdef size_type get_size_t_arg(object arg, str name) except*: - if name == "skip_rows": - arg = 0 if arg is None else arg - if not isinstance(arg, int) or arg < 0: - raise TypeError(f"{name} must be an int >= 0") - else: - arg = -1 if arg is None else arg - if not isinstance(arg, int) or arg < -1: - raise TypeError(f"{name} must be an int >= -1") +cdef int64_t get_skiprows_arg(object arg) except*: + arg = 0 if arg is None else arg + if not isinstance(arg, int) or arg < 0: + raise TypeError("skiprows must be an int >= 0") + return arg + +cdef size_type get_num_rows_arg(object arg) except*: + arg = -1 if arg is None else arg + if not isinstance(arg, int) or arg < -1: + raise TypeError("num_rows must be an int >= -1") return arg @@ -334,7 +336,7 @@ cdef orc_reader_options make_orc_reader_options( object filepaths_or_buffers, object column_names, object stripes, - size_type skip_rows, + int64_t skip_rows, size_type num_rows, type_id timestamp_type, bool use_index