Skip to content

Commit

Permalink
Align integral types in ORC to specs (#15008)
Browse files Browse the repository at this point in the history
Use `uint64_t` where specified by the ORC specs:

- `PostScript::compressionBlockSize`
- `StripeInformation::footerLength`
- `StripeInformation::numberOfRows`

Using the same type for the derived values.

Other changes:

- Changed the num_rows in orc_metadata to uint64_t so it works with files that have more than 2B rows.
- Modified how the skiprows parameter in Python is converted to a C++ value, so now we can skip more than 2B rows.
- Renamed `FileFooter` to `Footer` to match the specs.

No measurable impact on performance or on the memory footprint of the ORC reader.

Authors:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Nghia Truong (https://github.com/ttnghia)

Approvers:
  - Nghia Truong (https://github.com/ttnghia)
  - Shruti Shivakumar (https://github.com/shrshi)
  - Yunsong Wang (https://github.com/PointKernel)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #15008
  • Loading branch information
vuule authored Feb 24, 2024
1 parent 71c9909 commit c37367e
Show file tree
Hide file tree
Showing 21 changed files with 262 additions and 203 deletions.
1 change: 1 addition & 0 deletions cpp/include/cudf/io/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ class orc_reader_options {
void set_skip_rows(uint64_t rows)
{
CUDF_EXPECTS(rows == 0 or _stripes.empty(), "Can't set both skip_rows along with stripes");
CUDF_EXPECTS(rows <= std::numeric_limits<int64_t>::max(), "skip_rows is too large");
_skip_rows = rows;
}

Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cudf/io/orc_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ class orc_metadata {
* @param num_rows number of rows
* @param num_stripes number of stripes
*/
orc_metadata(orc_schema schema, size_type num_rows, size_type num_stripes)
orc_metadata(orc_schema schema, uint64_t num_rows, size_type num_stripes)
: _schema{std::move(schema)}, _num_rows{num_rows}, _num_stripes{num_stripes}
{
}
Expand Down Expand Up @@ -362,7 +362,7 @@ class orc_metadata {

private:
orc_schema _schema;
size_type _num_rows;
uint64_t _num_rows;
size_type _num_stripes;
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ orc_metadata read_orc_metadata(source_info const& src_info, rmm::cuda_stream_vie
auto const footer = orc::metadata(sources.front().get(), stream).ff;

return {{make_orc_column_schema(footer.types, 0, "")},
static_cast<size_type>(footer.numberOfRows),
footer.numberOfRows,
static_cast<size_type>(footer.stripes.size())};
}

Expand Down
8 changes: 4 additions & 4 deletions cpp/src/io/orc/aggregate_orc_metadata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ aggregate_orc_metadata::aggregate_orc_metadata(
std::tuple<int64_t, size_type, std::vector<metadata::stripe_source_mapping>>
aggregate_orc_metadata::select_stripes(
std::vector<std::vector<size_type>> const& user_specified_stripes,
uint64_t skip_rows,
int64_t skip_rows,
std::optional<size_type> const& num_rows,
rmm::cuda_stream_view stream)
{
CUDF_EXPECTS((skip_rows == 0 and not num_rows.has_value()) or user_specified_stripes.empty(),
"Can't use both the row selection and the stripe selection");

auto [rows_to_skip, rows_to_read] = [&]() {
if (not user_specified_stripes.empty()) { return std::pair<uint64_t, size_type>{0, 0}; }
if (not user_specified_stripes.empty()) { return std::pair<int64_t, size_type>{0, 0}; }
return cudf::io::detail::skip_rows_num_rows_from_options(skip_rows, num_rows, get_num_rows());
}();

Expand Down Expand Up @@ -192,8 +192,8 @@ aggregate_orc_metadata::select_stripes(
selected_stripes_mapping.push_back({static_cast<int>(src_file_idx), stripe_infos});
}
} else {
uint64_t count = 0;
size_type stripe_skip_rows = 0;
int64_t count = 0;
int64_t stripe_skip_rows = 0;
// Iterate all source files, each source file has corelating metadata
for (size_t src_file_idx = 0;
src_file_idx < per_file_metadata.size() && count < rows_to_skip + rows_to_read;
Expand Down
8 changes: 5 additions & 3 deletions cpp/src/io/orc/aggregate_orc_metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,11 @@ class aggregate_orc_metadata {

[[nodiscard]] auto const& get_types() const { return per_file_metadata[0].ff.types; }

[[nodiscard]] int get_row_index_stride() const
[[nodiscard]] size_type get_row_index_stride() const
{
return static_cast<int>(per_file_metadata[0].ff.rowIndexStride);
CUDF_EXPECTS(per_file_metadata[0].ff.rowIndexStride <= std::numeric_limits<size_type>::max(),
"Row index stride exceeds size_type max");
return per_file_metadata[0].ff.rowIndexStride;
}

[[nodiscard]] auto is_row_grp_idx_present() const { return row_grp_idx_present; }
Expand Down Expand Up @@ -115,7 +117,7 @@ class aggregate_orc_metadata {
*/
[[nodiscard]] std::tuple<int64_t, size_type, std::vector<metadata::stripe_source_mapping>>
select_stripes(std::vector<std::vector<size_type>> const& user_specified_stripes,
uint64_t skip_rows,
int64_t skip_rows,
std::optional<size_type> const& num_rows,
rmm::cuda_stream_view stream);

Expand Down
7 changes: 4 additions & 3 deletions cpp/src/io/orc/orc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void ProtobufReader::read(PostScript& s, size_t maxlen)
function_builder(s, maxlen, op);
}

void ProtobufReader::read(FileFooter& s, size_t maxlen)
void ProtobufReader::read(Footer& s, size_t maxlen)
{
auto op = std::tuple(field_reader(1, s.headerLength),
field_reader(2, s.contentLength),
Expand Down Expand Up @@ -307,7 +307,7 @@ size_t ProtobufWriter::write(PostScript const& s)
return w.value();
}

size_t ProtobufWriter::write(FileFooter const& s)
size_t ProtobufWriter::write(Footer const& s)
{
ProtobufFieldWriter w(this);
w.field_uint(1, s.headerLength);
Expand Down Expand Up @@ -393,7 +393,8 @@ size_t ProtobufWriter::write(Metadata const& s)
return w.value();
}

OrcDecompressor::OrcDecompressor(CompressionKind kind, uint32_t blockSize) : m_blockSize(blockSize)
OrcDecompressor::OrcDecompressor(CompressionKind kind, uint64_t block_size)
: m_blockSize(block_size)
{
switch (kind) {
case NONE:
Expand Down
32 changes: 16 additions & 16 deletions cpp/src/io/orc/orc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ static constexpr int32_t DEFAULT_MAX_NANOS = 999'999;
struct PostScript {
uint64_t footerLength = 0; // the length of the footer section in bytes
CompressionKind compression = NONE; // the kind of generic compression used
uint32_t compressionBlockSize{}; // the maximum size of each compression chunk
uint64_t compressionBlockSize{}; // the maximum size of each compression chunk
std::vector<uint32_t> version; // the version of the file format [major, minor]
uint64_t metadataLength = 0; // the length of the metadata section in bytes
std::optional<uint32_t> writerVersion; // The version of the writer that wrote the file
Expand All @@ -84,8 +84,8 @@ struct StripeInformation {
uint64_t offset = 0; // the start of the stripe within the file
uint64_t indexLength = 0; // the length of the indexes in bytes
uint64_t dataLength = 0; // the length of the data in bytes
uint32_t footerLength = 0; // the length of the footer in bytes
uint32_t numberOfRows = 0; // the number of rows in the stripe
uint64_t footerLength = 0; // the length of the footer in bytes
uint64_t numberOfRows = 0; // the number of rows in the stripe
};

struct SchemaType {
Expand All @@ -105,7 +105,7 @@ struct UserMetadataItem {

using ColStatsBlob = std::vector<uint8_t>; // Column statistics blob

struct FileFooter {
struct Footer {
uint64_t headerLength = 0; // the length of the file header in bytes (always 3)
uint64_t contentLength = 0; // the length of the file header and body in bytes
std::vector<StripeInformation> stripes; // the information about the stripes
Expand Down Expand Up @@ -237,7 +237,7 @@ class ProtobufReader {
read(s, m_end - m_cur);
}
void read(PostScript&, size_t maxlen);
void read(FileFooter&, size_t maxlen);
void read(Footer&, size_t maxlen);
void read(StripeInformation&, size_t maxlen);
void read(SchemaType&, size_t maxlen);
void read(UserMetadataItem&, size_t maxlen);
Expand Down Expand Up @@ -519,7 +519,7 @@ class ProtobufWriter {

public:
size_t write(PostScript const&);
size_t write(FileFooter const&);
size_t write(Footer const&);
size_t write(StripeInformation const&);
size_t write(SchemaType const&);
size_t write(UserMetadataItem const&);
Expand All @@ -540,7 +540,7 @@ class ProtobufWriter {

class OrcDecompressor {
public:
OrcDecompressor(CompressionKind kind, uint32_t blockSize);
OrcDecompressor(CompressionKind kind, uint64_t blockSize);

/**
* @brief ORC block decompression
Expand All @@ -553,17 +553,17 @@ class OrcDecompressor {
host_span<uint8_t const> decompress_blocks(host_span<uint8_t const> src,
rmm::cuda_stream_view stream);
[[nodiscard]] uint32_t GetLog2MaxCompressionRatio() const { return m_log2MaxRatio; }
[[nodiscard]] uint32_t GetMaxUncompressedBlockSize(uint32_t block_len) const
[[nodiscard]] uint64_t GetMaxUncompressedBlockSize(uint32_t block_len) const
{
return std::min(block_len << m_log2MaxRatio, m_blockSize);
return std::min(static_cast<uint64_t>(block_len) << m_log2MaxRatio, m_blockSize);
}
[[nodiscard]] compression_type compression() const { return _compression; }
[[nodiscard]] uint32_t GetBlockSize() const { return m_blockSize; }
[[nodiscard]] auto GetBlockSize() const { return m_blockSize; }

protected:
compression_type _compression;
uint32_t m_log2MaxRatio = 24; // log2 of maximum compression ratio
uint32_t m_blockSize;
uint64_t m_blockSize;
std::vector<uint8_t> m_buf;
};

Expand Down Expand Up @@ -613,9 +613,9 @@ class metadata {
public:
explicit metadata(datasource* const src, rmm::cuda_stream_view stream);

[[nodiscard]] size_t get_total_rows() const { return ff.numberOfRows; }
[[nodiscard]] int get_num_stripes() const { return ff.stripes.size(); }
[[nodiscard]] int get_num_columns() const { return ff.types.size(); }
[[nodiscard]] auto get_total_rows() const { return ff.numberOfRows; }
[[nodiscard]] size_type get_num_stripes() const { return ff.stripes.size(); }
[[nodiscard]] size_type get_num_columns() const { return ff.types.size(); }
/**
* @brief Returns the name of the column with the given ID.
*
Expand All @@ -638,7 +638,7 @@ class metadata {
CUDF_EXPECTS(column_id < get_num_columns(), "Out of range column id provided");
return column_paths[column_id];
}
[[nodiscard]] int get_row_index_stride() const { return ff.rowIndexStride; }
[[nodiscard]] auto get_row_index_stride() const { return ff.rowIndexStride; }

/**
* @brief Returns the ID of the parent column of the given column.
Expand Down Expand Up @@ -666,7 +666,7 @@ class metadata {

public:
PostScript ps;
FileFooter ff;
Footer ff;
Metadata md;
std::vector<StripeFooter> stripefooters;
std::unique_ptr<OrcDecompressor> decompressor;
Expand Down
54 changes: 26 additions & 28 deletions cpp/src/io/orc/orc_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,18 @@ struct DictionaryEntry {
struct ColumnDesc {
uint8_t const* streams[CI_NUM_STREAMS]; // ptr to data stream index
uint32_t strm_id[CI_NUM_STREAMS]; // stream ids
uint32_t strm_len[CI_NUM_STREAMS]; // stream length
int64_t strm_len[CI_NUM_STREAMS]; // stream length
uint32_t* valid_map_base; // base pointer of valid bit map for this column
void* column_data_base; // base pointer of column data
uint32_t start_row; // starting row of the stripe
uint32_t num_rows; // number of rows in stripe
uint32_t column_num_rows; // number of rows in whole column
uint32_t num_child_rows; // store number of child rows if it's list column
int64_t start_row; // starting row of the stripe
int64_t num_rows; // number of rows in stripe
int64_t column_num_rows; // number of rows in whole column
int64_t num_child_rows; // store number of child rows if it's list column
uint32_t num_rowgroups; // number of rowgroups in the chunk
uint32_t dictionary_start; // start position in global dictionary
int64_t dictionary_start; // start position in global dictionary
uint32_t dict_len; // length of local dictionary
uint32_t null_count; // number of null values in this stripe's column
uint32_t skip_count; // number of non-null values to skip
int64_t null_count; // number of null values in this stripe's column
int64_t skip_count; // number of non-null values to skip
uint32_t rowgroup_id; // row group position
ColumnEncodingKind encoding_kind; // column encoding kind
TypeKind type_kind; // column data type
Expand All @@ -129,20 +129,20 @@ struct ColumnDesc {
*/
struct RowGroup {
uint32_t chunk_id; // Column chunk this entry belongs to
uint32_t strm_offset[2]; // Index offset for CI_DATA and CI_DATA2 streams
int64_t strm_offset[2]; // Index offset for CI_DATA and CI_DATA2 streams
uint16_t run_pos[2]; // Run position for CI_DATA and CI_DATA2
uint32_t num_rows; // number of rows in rowgroup
uint32_t start_row; // starting row of the rowgroup
int64_t start_row; // starting row of the rowgroup
uint32_t num_child_rows; // number of rows of children in rowgroup in case of list type
};

/**
* @brief Struct to describe an encoder data chunk
*/
struct EncChunk {
uint32_t start_row; // start row of this chunk
int64_t start_row; // start row of this chunk
uint32_t num_rows; // number of rows in this chunk
uint32_t null_mask_start_row; // adjusted to multiple of 8
int64_t null_mask_start_row; // adjusted to multiple of 8
uint32_t null_mask_num_rows; // adjusted to multiple of 8
ColumnEncodingKind encoding_kind; // column encoding kind
TypeKind type_kind; // column data type
Expand Down Expand Up @@ -253,7 +253,7 @@ constexpr uint32_t encode_block_size = 512;
*/
void ParseCompressedStripeData(CompressedStreamInfo* strm_info,
int32_t num_streams,
uint32_t compression_block_size,
uint64_t compression_block_size,
uint32_t log2maxcr,
rmm::cuda_stream_view stream);

Expand All @@ -276,7 +276,6 @@ void PostDecompressionReassemble(CompressedStreamInfo* strm_info,
* @param[in] chunks ColumnDesc device array [stripe][column]
* @param[in] num_columns Number of columns
* @param[in] num_stripes Number of stripes
* @param[in] num_rowgroups Number of row groups
* @param[in] rowidx_stride Row index stride
* @param[in] use_base_stride Whether to use base stride obtained from meta or use the computed
* value
Expand All @@ -285,10 +284,9 @@ void PostDecompressionReassemble(CompressedStreamInfo* strm_info,
void ParseRowGroupIndex(RowGroup* row_groups,
CompressedStreamInfo* strm_info,
ColumnDesc* chunks,
uint32_t num_columns,
uint32_t num_stripes,
uint32_t num_rowgroups,
uint32_t rowidx_stride,
size_type num_columns,
size_type num_stripes,
size_type rowidx_stride,
bool use_base_stride,
rmm::cuda_stream_view stream);

Expand All @@ -304,9 +302,9 @@ void ParseRowGroupIndex(RowGroup* row_groups,
*/
void DecodeNullsAndStringDictionaries(ColumnDesc* chunks,
DictionaryEntry* global_dictionary,
uint32_t num_columns,
uint32_t num_stripes,
size_t first_row,
size_type num_columns,
size_type num_stripes,
int64_t first_row,
rmm::cuda_stream_view stream);

/**
Expand All @@ -329,12 +327,12 @@ void DecodeNullsAndStringDictionaries(ColumnDesc* chunks,
void DecodeOrcColumnData(ColumnDesc* chunks,
DictionaryEntry* global_dictionary,
device_2dspan<RowGroup> row_groups,
uint32_t num_columns,
uint32_t num_stripes,
size_t first_row,
size_type num_columns,
size_type num_stripes,
int64_t first_row,
table_device_view tz_table,
uint32_t num_rowgroups,
uint32_t rowidx_stride,
int64_t num_rowgroups,
size_type rowidx_stride,
size_t level,
size_type* error_count,
rmm::cuda_stream_view stream);
Expand Down Expand Up @@ -364,8 +362,8 @@ void EncodeOrcColumnData(device_2dspan<EncChunk const> chunks,
void EncodeStripeDictionaries(stripe_dictionary const* stripes,
device_span<orc_column_device_view const> columns,
device_2dspan<EncChunk const> chunks,
uint32_t num_string_columns,
uint32_t num_stripes,
size_type num_string_columns,
size_type num_stripes,
device_2dspan<encoder_chunk_streams> enc_streams,
rmm::cuda_stream_view stream);

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/io/orc/reader_impl.cu
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ reader::impl::impl(std::vector<std::unique_ptr<datasource>>&& sources,
{
}

table_with_metadata reader::impl::read(uint64_t skip_rows,
table_with_metadata reader::impl::read(int64_t skip_rows,
std::optional<size_type> const& num_rows_opt,
std::vector<std::vector<size_type>> const& stripes)
{
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/orc/reader_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ class reader::impl {
* @param stripes Indices of individual stripes to load if non-empty
* @return The set of columns along with metadata
*/
table_with_metadata read(uint64_t skip_rows,
table_with_metadata read(int64_t skip_rows,
std::optional<size_type> const& num_rows_opt,
std::vector<std::vector<size_type>> const& stripes);

Expand All @@ -72,7 +72,7 @@ class reader::impl {
* @param num_rows_opt Optional number of rows to read, or `std::nullopt` to read all rows
* @param stripes Indices of individual stripes to load if non-empty
*/
void prepare_data(uint64_t skip_rows,
void prepare_data(int64_t skip_rows,
std::optional<size_type> const& num_rows_opt,
std::vector<std::vector<size_type>> const& stripes);

Expand Down
Loading

0 comments on commit c37367e

Please sign in to comment.