From f8e16d2cdf0a53d33f7367b257bd164aa5a3e183 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 27 Mar 2024 00:10:24 +0000 Subject: [PATCH 01/12] refactor read_parquet_metadata + extract RowGroup metadata --- cpp/include/cudf/io/parquet_metadata.hpp | 52 +++++++++++++++- cpp/src/io/parquet/reader_impl.cpp | 4 +- cpp/src/io/parquet/reader_impl_helpers.cpp | 23 ++++++- cpp/src/io/parquet/reader_impl_helpers.hpp | 13 ++++ .../cudf/_lib/cpp/io/parquet_metadata.pxd | 34 +++++++++++ python/cudf/cudf/_lib/parquet.pyx | 40 ++++++++++++ python/cudf/cudf/io/parquet.py | 61 ++++++++++++++++--- python/cudf/cudf/tests/test_parquet.py | 48 +++++++++++++-- python/cudf/cudf/utils/ioutils.py | 4 +- 9 files changed, 260 insertions(+), 19 deletions(-) create mode 100644 python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index 3149b5b5945..c3393f96fdb 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -59,6 +59,13 @@ enum class TypeKind : int8_t { */ struct parquet_column_schema { public: + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_column_schema() = default; + /** * @brief constructor * @@ -134,6 +141,13 @@ struct parquet_column_schema { */ struct parquet_schema { public: + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_schema() = default; + /** * @brief constructor * @@ -165,6 +179,15 @@ class parquet_metadata { public: /// Key-value metadata in the file footer. using key_value_metadata = std::unordered_map; + /// row group metadata from RowGroup element. + using row_group_metadata = std::pair; + + /** + * @brief Default constructor. + * + * This has been added since Cython requires a default constructor to create objects on stack. + */ + explicit parquet_metadata() = default; /** * @brief constructor @@ -172,16 +195,22 @@ class parquet_metadata { * @param schema parquet schema * @param num_rows number of rows * @param num_rowgroups number of row groups + * @param num_columns number of columns * @param file_metadata key-value metadata in the file footer + * @param rg_metadata vector of pairs of number of rows and total byte size for each row group */ parquet_metadata(parquet_schema schema, int64_t num_rows, size_type num_rowgroups, - key_value_metadata file_metadata) + size_type num_columns, + key_value_metadata file_metadata, + std::vector rg_metadata) : _schema{std::move(schema)}, _num_rows{num_rows}, _num_rowgroups{num_rowgroups}, - _file_metadata{std::move(file_metadata)} + _num_columns{num_columns}, + _file_metadata{std::move(file_metadata)}, + _rowgroup_metadata{std::move(rg_metadata)} { } @@ -207,6 +236,14 @@ class parquet_metadata { * @return Number of row groups */ [[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; } + + /** + * @brief Returns the number of columns in the file. + * + * @return Number of row groups + */ + [[nodiscard]] auto num_columns() const { return _num_columns; } + /** * @brief Returns the Key value metadata in the file footer. * @@ -214,11 +251,20 @@ class parquet_metadata { */ [[nodiscard]] auto const& metadata() const { return _file_metadata; } + /** + * @brief Returns the row group metadata in the file footer. + * + * @return pairs of number of rows and total byte size for each row group as a vector + */ + [[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; } + private: parquet_schema _schema; int64_t _num_rows; size_type _num_rowgroups; + size_type _num_columns; key_value_metadata _file_metadata; + std::vector _rowgroup_metadata; }; /** diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 8112328d962..c4d26c8d763 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -593,7 +593,9 @@ parquet_metadata read_parquet_metadata(host_span con return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)}, metadata.get_num_rows(), metadata.get_num_row_groups(), - metadata.get_key_value_metadata()[0]}; + metadata.get_num_columns(), + metadata.get_key_value_metadata()[0], + metadata.get_rowgroup_metadata()}; } } // namespace cudf::io::parquet::detail diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 776caa99ac9..72a8e2ffda7 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -367,6 +367,11 @@ size_type aggregate_reader_metadata::calc_num_row_groups() const }); } +size_type aggregate_reader_metadata::calc_num_columns() const +{ + return per_file_metadata[0].row_groups[0].columns.size(); +} + // Copies info from the column and offset indexes into the passed in row_group_info. void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_info, size_type chunk_start_row) const @@ -508,7 +513,8 @@ aggregate_reader_metadata::aggregate_reader_metadata( : per_file_metadata(metadatas_from_sources(sources)), keyval_maps(collect_keyval_metadata()), num_rows(calc_num_rows()), - num_row_groups(calc_num_row_groups()) + num_row_groups(calc_num_row_groups()), + num_columns(calc_num_columns()) { if (per_file_metadata.size() > 0) { auto const& first_meta = per_file_metadata.front(); @@ -548,6 +554,21 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t return col->meta_data; } +std::vector> aggregate_reader_metadata::get_rowgroup_metadata() const +{ + std::vector> rg_metadata; + + std::for_each( + per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) { + std::transform( + pfm.row_groups.cbegin(), + pfm.row_groups.cend(), + std::back_inserter(rg_metadata), + [](auto const& rg) { return std::make_pair(rg.num_rows, rg.total_byte_size); }); + }); + return rg_metadata; +} + std::string aggregate_reader_metadata::get_pandas_index() const { // Assumes that all input files have the same metadata diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 8295654764e..78dd373d80d 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -126,6 +126,7 @@ class aggregate_reader_metadata { std::vector> keyval_maps; int64_t num_rows; size_type num_row_groups; + size_type num_columns; /** * @brief Create a metadata object from each element in the source vector @@ -149,6 +150,8 @@ class aggregate_reader_metadata { */ [[nodiscard]] size_type calc_num_row_groups() const; + [[nodiscard]] size_type calc_num_columns() const; + /** * @brief Calculate column index info for the given `row_group_info` * @@ -166,10 +169,19 @@ class aggregate_reader_metadata { size_type src_idx, int schema_idx) const; + /** + * @brief Extracts high-level metadata for all row groups + * + * @return List of pairs of number of rows and total byte size for each row group + */ + [[nodiscard]] std::vector> get_rowgroup_metadata() const; + [[nodiscard]] auto get_num_rows() const { return num_rows; } [[nodiscard]] auto get_num_row_groups() const { return num_row_groups; } + [[nodiscard]] auto get_num_columns() const { return num_columns; } + [[nodiscard]] auto const& get_schema(int schema_idx) const { return per_file_metadata[0].schema[schema_idx]; @@ -178,6 +190,7 @@ class aggregate_reader_metadata { [[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; } [[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); } + /** * @brief Gets the concrete nesting depth of output cudf columns * diff --git a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd new file mode 100644 index 00000000000..10700e87927 --- /dev/null +++ b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd @@ -0,0 +1,34 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +from libc.stdint cimport int64_t +from libcpp.pair cimport pair +from libcpp.string cimport string +from libcpp.unordered_map cimport unordered_map +from libcpp.vector cimport vector + +cimport cudf._lib.cpp.io.types as cudf_io_types +from cudf._lib.cpp.types cimport size_type + + +cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: + cdef cppclass parquet_column_schema: + parquet_column_schema() except+ + string name() except+ + size_type num_children() except+ + parquet_column_schema child(int idx) except+ + vector[parquet_column_schema] children() except+ + + cdef cppclass parquet_schema: + parquet_schema() except+ + parquet_column_schema root() except+ + + cdef cppclass parquet_metadata: + parquet_metadata() except+ + parquet_schema schema() except+ + int64_t num_rows() except+ + size_type num_rowgroups() except+ + size_type num_columns() except+ + unordered_map[string, string] metadata() except+ + vector[pair[int64_t, int64_t]] rowgroup_metadata() except+ + + cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+ diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index ce1cba59bec..de2dd8cfb97 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -46,6 +46,10 @@ from cudf._lib.cpp.io.parquet cimport ( read_parquet as parquet_reader, write_parquet as parquet_writer, ) +from cudf._lib.cpp.io.parquet_metadata cimport ( + parquet_metadata, + read_parquet_metadata as parquet_metadata_reader, +) from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata from cudf._lib.cpp.table.table_view cimport table_view from cudf._lib.cpp.types cimport data_type, size_type @@ -316,6 +320,42 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None, df._data.label_dtype = cudf.dtype(column_index_type) return df +cpdef read_parquet_metadata(filepaths_or_buffers): + """ + Cython function to call into libcudf API, see `read_parquet_metadata`. + + filters, if not None, should be an Expression that evaluates to a + boolean predicate as a function of columns being read. + + See Also + -------- + cudf.io.parquet.read_parquet + cudf.io.parquet.to_parquet + """ + # Convert NativeFile buffers to NativeFileDatasource, + for i, datasource in enumerate(filepaths_or_buffers): + if isinstance(datasource, NativeFile): + filepaths_or_buffers[i] = NativeFileDatasource(datasource) + + cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers) + + args = move(source) + + cdef parquet_metadata c_result + + # Read Parquet metadata + with nogil: + c_result = move(parquet_metadata_reader(args)) + + # access and return results + num_columns = c_result.num_columns() + num_rows = c_result.num_rows() + num_rowgroups = c_result.num_rowgroups() + names = [info.name().decode() for info in c_result.schema().root().children()] + row_group_metadata = c_result.rowgroup_metadata() + + return num_rows, num_rowgroups, names, num_columns, row_group_metadata + @acquire_spill_lock() def write_parquet( diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index bead9c352ef..537d8b7b41a 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -267,17 +267,64 @@ def write_to_dataset( @ioutils.doc_read_parquet_metadata() @_cudf_nvtx_annotate -def read_parquet_metadata(path): +def read_parquet_metadata(filepath_or_buffer): """{docstring}""" - import pyarrow.parquet as pq + # Multiple sources are passed as a list. If a single source is passed, + # wrap it in a list for unified processing downstream. + if not is_list_like(filepath_or_buffer): + filepath_or_buffer = [filepath_or_buffer] - pq_file = pq.ParquetFile(path) + # Start by trying construct a filesystem object, so we + # can apply filters on remote file-systems + fs, paths = ioutils._get_filesystem_and_paths( + path_or_data=filepath_or_buffer, storage_options=None + ) - num_rows = pq_file.metadata.num_rows - num_row_groups = pq_file.num_row_groups - col_names = pq_file.schema.names + # Use pyarrow dataset to detect/process directory-partitioned + # data and apply filters. Note that we can only support partitioned + # data and filtering if the input is a single directory or list of + # paths. + partition_keys = [] + partition_categories = {} + if fs and paths: + ( + paths, + row_groups, + partition_keys, + partition_categories, + ) = _process_dataset( + paths=paths, + fs=fs, + filters=None, + row_groups=None, + categorical_partitions=True, + dataset_kwargs=None, + ) + filepath_or_buffer = paths if paths else filepath_or_buffer + + filepaths_or_buffers = [] + + for source in filepath_or_buffer: + tmp_source, compression = ioutils.get_reader_filepath_or_buffer( + path_or_data=source, + compression=None, + fs=fs, + use_python_file_object=True, + open_file_options=None, + storage_options=None, + bytes_per_thread=None, + ) + + if compression is not None: + raise ValueError( + "URL content-encoding decompression is not supported" + ) + if isinstance(tmp_source, list): + filepath_or_buffer.extend(tmp_source) + else: + filepaths_or_buffers.append(tmp_source) - return num_rows, num_row_groups, col_names + return libparquet.read_parquet_metadata(filepaths_or_buffers) @_cudf_nvtx_annotate diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 9ba71b28637..7c7afcc5173 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -415,8 +415,16 @@ def num_row_groups(rows, group_size): row_group_size = 5 pdf.to_parquet(fname, compression="snappy", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) - + ( + num_rows, + row_groups, + col_names, + num_columns, + _, + _, + ) = cudf.io.read_parquet_metadata(fname) + + assert num_columns == len(pdf.columns) assert num_rows == len(pdf.index) assert row_groups == num_row_groups(num_rows, row_group_size) for a, b in zip(col_names, pdf.columns): @@ -561,7 +569,9 @@ def test_parquet_read_row_groups(tmpdir, pdf, row_group_size): fname = tmpdir.join("row_group.parquet") pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) gdf = [cudf.read_parquet(fname, row_groups=[i]) for i in range(row_groups)] gdf = cudf.concat(gdf) @@ -586,7 +596,9 @@ def test_parquet_read_row_groups_non_contiguous(tmpdir, pdf, row_group_size): fname = tmpdir.join("row_group.parquet") pdf.to_parquet(fname, compression="gzip", row_group_size=row_group_size) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) # alternate rows between the two sources gdf = cudf.read_parquet( @@ -1803,7 +1815,9 @@ def test_parquet_writer_row_group_size(tmpdir, row_group_size_kwargs): writer.write_table(gdf) # Simple check for multiple row-groups - nrows, nrow_groups, columns = cudf.io.parquet.read_parquet_metadata(fname) + nrows, nrow_groups, columns, _, _ = cudf.io.parquet.read_parquet_metadata( + fname + ) assert nrows == size assert nrow_groups > 1 assert columns == ["a", "b"] @@ -2853,7 +2867,9 @@ def test_to_parquet_row_group_size( fname, row_group_size_bytes=size_bytes, row_group_size_rows=size_rows ) - num_rows, row_groups, col_names = cudf.io.read_parquet_metadata(fname) + num_rows, row_groups, col_names, _, _ = cudf.io.read_parquet_metadata( + fname + ) # 8 bytes per row, as the column is int64 expected_num_rows = max( math.ceil(num_rows / size_rows), math.ceil(8 * num_rows / size_bytes) @@ -2861,6 +2877,26 @@ def test_to_parquet_row_group_size( assert expected_num_rows == row_groups +@pytest.mark.parametrize("size_rows", [500_000, 100_000, 10_000]) +def test_parquet_row_group_metadata(tmpdir, large_int64_gdf, size_rows): + fname = tmpdir.join("row_group_size.parquet") + large_int64_gdf.to_parquet(fname, row_group_size_rows=size_rows) + + # read file metadata from parquet + ( + num_rows, + row_groups, + _, # col_names + _, # rowgroup_metadata + row_group_metadata, + ) = cudf.io.read_parquet_metadata(fname) + + # length(RowGroupsMetaData) == number of row groups + assert len(row_group_metadata) == row_groups + # sum of rows in row groups == total rows + assert num_rows == sum([row_group[0] for row_group in row_group_metadata]) + + def test_parquet_reader_decimal_columns(): df = cudf.DataFrame( { diff --git a/python/cudf/cudf/utils/ioutils.py b/python/cudf/cudf/utils/ioutils.py index 0a0ee4f592c..6268cee675b 100644 --- a/python/cudf/cudf/utils/ioutils.py +++ b/python/cudf/cudf/utils/ioutils.py @@ -101,11 +101,13 @@ Total number of rows Number of row groups List of column names +Number of columns +List of metadata of row groups Examples -------- >>> import cudf ->>> num_rows, num_row_groups, names = cudf.io.read_parquet_metadata(filename) +>>> num_rows, num_row_groups, names, num_columns, row_group_metadata = cudf.io.read_parquet_metadata(filename) >>> df = [cudf.read_parquet(fname, row_group=i) for i in range(row_groups)] >>> df = cudf.concat(df) >>> df From 0ed60f14e54d63d3c209f9d6c4b38c415a6a6c1c Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 27 Mar 2024 01:30:56 +0000 Subject: [PATCH 02/12] refactor rg-metadata as kv-map instead of pair --- cpp/include/cudf/io/parquet_metadata.hpp | 8 +++--- cpp/src/io/parquet/reader_impl_helpers.cpp | 25 +++++++++++-------- cpp/src/io/parquet/reader_impl_helpers.hpp | 4 +-- .../cudf/_lib/cpp/io/parquet_metadata.pxd | 3 +-- python/cudf/cudf/_lib/parquet.pyx | 7 +++++- python/cudf/cudf/tests/test_parquet.py | 4 ++- 6 files changed, 31 insertions(+), 20 deletions(-) diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index c3393f96fdb..20af973a7f8 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -179,8 +179,8 @@ class parquet_metadata { public: /// Key-value metadata in the file footer. using key_value_metadata = std::unordered_map; - /// row group metadata from RowGroup element. - using row_group_metadata = std::pair; + /// row group metadata from each RowGroup element. + using row_group_metadata = std::unordered_map; /** * @brief Default constructor. @@ -197,7 +197,7 @@ class parquet_metadata { * @param num_rowgroups number of row groups * @param num_columns number of columns * @param file_metadata key-value metadata in the file footer - * @param rg_metadata vector of pairs of number of rows and total byte size for each row group + * @param rg_metadata vector of maps containing metadata for each row group */ parquet_metadata(parquet_schema schema, int64_t num_rows, @@ -254,7 +254,7 @@ class parquet_metadata { /** * @brief Returns the row group metadata in the file footer. * - * @return pairs of number of rows and total byte size for each row group as a vector + * @return vector of row group metadata as maps */ [[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 2890188270d..468f8a3b1fa 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -566,17 +566,22 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t return col->meta_data; } -std::vector> aggregate_reader_metadata::get_rowgroup_metadata() const +std::vector> +aggregate_reader_metadata::get_rowgroup_metadata() const { - std::vector> rg_metadata; - - std::for_each( - per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) { - std::transform( - pfm.row_groups.cbegin(), - pfm.row_groups.cend(), - std::back_inserter(rg_metadata), - [](auto const& rg) { return std::make_pair(rg.num_rows, rg.total_byte_size); }); + std::vector> rg_metadata; + + std::transform( + per_file_metadata.cbegin(), + per_file_metadata.cend(), + std::back_inserter(rg_metadata), + [](auto const& pfm) { + std::unordered_map rg_meta_map; + std::for_each(pfm.row_groups.cbegin(), pfm.row_groups.cend(), [&rg_meta_map](auto const& rg) { + rg_meta_map["num_rows"] = rg.num_rows; + rg_meta_map["total_byte_size"] = rg.total_byte_size; + }); + return rg_meta_map; }); return rg_metadata; } diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index 78dd373d80d..bcd8782ea22 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -172,9 +172,9 @@ class aggregate_reader_metadata { /** * @brief Extracts high-level metadata for all row groups * - * @return List of pairs of number of rows and total byte size for each row group + * @return List of maps containing metadata information for each row group */ - [[nodiscard]] std::vector> get_rowgroup_metadata() const; + [[nodiscard]] std::vector> get_rowgroup_metadata() const; [[nodiscard]] auto get_num_rows() const { return num_rows; } diff --git a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd index 10700e87927..9dc0db8f972 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd @@ -1,7 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from libc.stdint cimport int64_t -from libcpp.pair cimport pair from libcpp.string cimport string from libcpp.unordered_map cimport unordered_map from libcpp.vector cimport vector @@ -29,6 +28,6 @@ cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: size_type num_rowgroups() except+ size_type num_columns() except+ unordered_map[string, string] metadata() except+ - vector[pair[int64_t, int64_t]] rowgroup_metadata() except+ + vector[unordered_map[string, int64_t]] rowgroup_metadata() except+ cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+ diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index de2dd8cfb97..cd70a04e5a9 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -352,7 +352,12 @@ cpdef read_parquet_metadata(filepaths_or_buffers): num_rows = c_result.num_rows() num_rowgroups = c_result.num_rowgroups() names = [info.name().decode() for info in c_result.schema().root().children()] - row_group_metadata = c_result.rowgroup_metadata() + + # extract row group metadata and sanitize keys + row_group_metadata=[] + row_group_metadata_unsanitized = c_result.rowgroup_metadata() + for meta in row_group_metadata_unsanitized: + row_group_metadata.append({k.decode(): v for k, v in meta}) return num_rows, num_rowgroups, names, num_columns, row_group_metadata diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 7c7afcc5173..92091385fe0 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -2894,7 +2894,9 @@ def test_parquet_row_group_metadata(tmpdir, large_int64_gdf, size_rows): # length(RowGroupsMetaData) == number of row groups assert len(row_group_metadata) == row_groups # sum of rows in row groups == total rows - assert num_rows == sum([row_group[0] for row_group in row_group_metadata]) + assert num_rows == sum( + [row_group["num_rows"] for row_group in row_group_metadata] + ) def test_parquet_reader_decimal_columns(): From c5dfa98025e970bd2eb3ed70d5cfc689d90eccc4 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 28 Mar 2024 01:04:22 +0000 Subject: [PATCH 03/12] fixing num_columns and empty rg metadata vector when empty table --- cpp/src/io/parquet/reader_impl_helpers.cpp | 26 ++++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 468f8a3b1fa..529e8066d47 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -381,7 +381,9 @@ size_type aggregate_reader_metadata::calc_num_row_groups() const size_type aggregate_reader_metadata::calc_num_columns() const { - return per_file_metadata[0].row_groups[0].columns.size(); + return per_file_metadata.size() + ? (per_file_metadata[0].schema.size() ? per_file_metadata[0].schema[0].num_children : 0) + : 0; } // Copies info from the column and offset indexes into the passed in row_group_info. @@ -571,17 +573,17 @@ aggregate_reader_metadata::get_rowgroup_metadata() const { std::vector> rg_metadata; - std::transform( - per_file_metadata.cbegin(), - per_file_metadata.cend(), - std::back_inserter(rg_metadata), - [](auto const& pfm) { - std::unordered_map rg_meta_map; - std::for_each(pfm.row_groups.cbegin(), pfm.row_groups.cend(), [&rg_meta_map](auto const& rg) { - rg_meta_map["num_rows"] = rg.num_rows; - rg_meta_map["total_byte_size"] = rg.total_byte_size; - }); - return rg_meta_map; + std::for_each( + per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) { + std::transform(pfm.row_groups.cbegin(), + pfm.row_groups.cend(), + std::back_inserter(rg_metadata), + [&rg_metadata](auto const& rg) { + std::unordered_map rg_meta_map; + rg_meta_map["num_rows"] = rg.num_rows; + rg_meta_map["total_byte_size"] = rg.total_byte_size; + return rg_meta_map; + }); }); return rg_metadata; } From 9b6336eb2a2089bcd0bf9cc3bbd8b9874ca7e184 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Thu, 28 Mar 2024 01:49:12 +0000 Subject: [PATCH 04/12] remove an extra `_`. minor bug fix. --- python/cudf/cudf/tests/test_parquet.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/cudf/cudf/tests/test_parquet.py b/python/cudf/cudf/tests/test_parquet.py index 92091385fe0..56a4281aad9 100644 --- a/python/cudf/cudf/tests/test_parquet.py +++ b/python/cudf/cudf/tests/test_parquet.py @@ -420,8 +420,7 @@ def num_row_groups(rows, group_size): row_groups, col_names, num_columns, - _, - _, + _, # rowgroup_metadata ) = cudf.io.read_parquet_metadata(fname) assert num_columns == len(pdf.columns) @@ -2887,7 +2886,7 @@ def test_parquet_row_group_metadata(tmpdir, large_int64_gdf, size_rows): num_rows, row_groups, _, # col_names - _, # rowgroup_metadata + _, # num_columns row_group_metadata, ) = cudf.io.read_parquet_metadata(fname) From 2a8555b7633ee8ec4b80c2af2bd647d3b64507ec Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Apr 2024 00:34:51 +0000 Subject: [PATCH 05/12] handle index columns in metadata if present --- cpp/include/cudf/io/parquet_metadata.hpp | 11 ------- cpp/src/io/parquet/reader_impl.cpp | 1 - cpp/src/io/parquet/reader_impl_helpers.cpp | 10 +----- cpp/src/io/parquet/reader_impl_helpers.hpp | 5 --- python/cudf/cudf/_lib/parquet.pyx | 37 +++++++++++++++++++--- 5 files changed, 33 insertions(+), 31 deletions(-) diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index 20af973a7f8..e0c406c180c 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -195,20 +195,17 @@ class parquet_metadata { * @param schema parquet schema * @param num_rows number of rows * @param num_rowgroups number of row groups - * @param num_columns number of columns * @param file_metadata key-value metadata in the file footer * @param rg_metadata vector of maps containing metadata for each row group */ parquet_metadata(parquet_schema schema, int64_t num_rows, size_type num_rowgroups, - size_type num_columns, key_value_metadata file_metadata, std::vector rg_metadata) : _schema{std::move(schema)}, _num_rows{num_rows}, _num_rowgroups{num_rowgroups}, - _num_columns{num_columns}, _file_metadata{std::move(file_metadata)}, _rowgroup_metadata{std::move(rg_metadata)} { @@ -237,13 +234,6 @@ class parquet_metadata { */ [[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; } - /** - * @brief Returns the number of columns in the file. - * - * @return Number of row groups - */ - [[nodiscard]] auto num_columns() const { return _num_columns; } - /** * @brief Returns the Key value metadata in the file footer. * @@ -262,7 +252,6 @@ class parquet_metadata { parquet_schema _schema; int64_t _num_rows; size_type _num_rowgroups; - size_type _num_columns; key_value_metadata _file_metadata; std::vector _rowgroup_metadata; }; diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index ff80661fd67..d3e109cdb88 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -607,7 +607,6 @@ parquet_metadata read_parquet_metadata(host_span con return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)}, metadata.get_num_rows(), metadata.get_num_row_groups(), - metadata.get_num_columns(), metadata.get_key_value_metadata()[0], metadata.get_rowgroup_metadata()}; } diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 529e8066d47..911307ec9a3 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -379,13 +379,6 @@ size_type aggregate_reader_metadata::calc_num_row_groups() const }); } -size_type aggregate_reader_metadata::calc_num_columns() const -{ - return per_file_metadata.size() - ? (per_file_metadata[0].schema.size() ? per_file_metadata[0].schema[0].num_children : 0) - : 0; -} - // Copies info from the column and offset indexes into the passed in row_group_info. void aggregate_reader_metadata::column_info_for_row_group(row_group_info& rg_info, size_type chunk_start_row) const @@ -527,8 +520,7 @@ aggregate_reader_metadata::aggregate_reader_metadata( : per_file_metadata(metadatas_from_sources(sources)), keyval_maps(collect_keyval_metadata()), num_rows(calc_num_rows()), - num_row_groups(calc_num_row_groups()), - num_columns(calc_num_columns()) + num_row_groups(calc_num_row_groups()) { if (per_file_metadata.size() > 0) { auto const& first_meta = per_file_metadata.front(); diff --git a/cpp/src/io/parquet/reader_impl_helpers.hpp b/cpp/src/io/parquet/reader_impl_helpers.hpp index bcd8782ea22..09f65f9c388 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.hpp +++ b/cpp/src/io/parquet/reader_impl_helpers.hpp @@ -126,7 +126,6 @@ class aggregate_reader_metadata { std::vector> keyval_maps; int64_t num_rows; size_type num_row_groups; - size_type num_columns; /** * @brief Create a metadata object from each element in the source vector @@ -150,8 +149,6 @@ class aggregate_reader_metadata { */ [[nodiscard]] size_type calc_num_row_groups() const; - [[nodiscard]] size_type calc_num_columns() const; - /** * @brief Calculate column index info for the given `row_group_info` * @@ -180,8 +177,6 @@ class aggregate_reader_metadata { [[nodiscard]] auto get_num_row_groups() const { return num_row_groups; } - [[nodiscard]] auto get_num_columns() const { return num_columns; } - [[nodiscard]] auto const& get_schema(int schema_idx) const { return per_file_metadata[0].schema[schema_idx]; diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index cd70a04e5a9..d80c195909a 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -348,18 +348,45 @@ cpdef read_parquet_metadata(filepaths_or_buffers): c_result = move(parquet_metadata_reader(args)) # access and return results - num_columns = c_result.num_columns() num_rows = c_result.num_rows() num_rowgroups = c_result.num_rowgroups() - names = [info.name().decode() for info in c_result.schema().root().children()] # extract row group metadata and sanitize keys row_group_metadata=[] row_group_metadata_unsanitized = c_result.rowgroup_metadata() - for meta in row_group_metadata_unsanitized: - row_group_metadata.append({k.decode(): v for k, v in meta}) + for metadata in row_group_metadata_unsanitized: + row_group_metadata.append({k.decode(): v for k, v in metadata}) - return num_rows, num_rowgroups, names, num_columns, row_group_metadata + # read all column names including index column, if any + col_names = [info.name().decode() for info in c_result.schema().root().children()] + + # access the Parquet file_footer to find the index + index_col = None + cdef unordered_map[string, string] file_footer = c_result.metadata() + + # get index column name(s) + index_col_names = None + json_str = file_footer[b'pandas'].decode('utf-8') + meta = None + if json_str != "": + meta = json.loads(json_str) + file_is_range_index, index_col, _ = _parse_metadata(meta) + if not file_is_range_index and index_col is not None \ + and index_col_names is None: + index_col_names = {} + for idx_col in index_col: + for c in meta['columns']: + if c['field_name'] == idx_col: + index_col_names[idx_col] = c['name'] + + # remove the index column from the list of column names + col_names = [name for name in col_names if name not in index_col_names] + + # num_columns = length of list(col_names) + num_columns = len(col_names) + + # return the metadata + return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata @acquire_spill_lock() From 884de54cb60bfb7926518d3f90604c71ecfbe94b Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Apr 2024 03:16:59 +0000 Subject: [PATCH 06/12] modify move semantics for gtests --- cpp/include/cudf/io/parquet_metadata.hpp | 2 +- cpp/src/io/parquet/reader_impl.cpp | 2 +- python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index e0c406c180c..ee1dbd19bae 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -207,7 +207,7 @@ class parquet_metadata { _num_rows{num_rows}, _num_rowgroups{num_rowgroups}, _file_metadata{std::move(file_metadata)}, - _rowgroup_metadata{std::move(rg_metadata)} + _rowgroup_metadata{rg_metadata} { } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index d3e109cdb88..87844826079 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -608,7 +608,7 @@ parquet_metadata read_parquet_metadata(host_span con metadata.get_num_rows(), metadata.get_num_row_groups(), metadata.get_key_value_metadata()[0], - metadata.get_rowgroup_metadata()}; + std::move(metadata.get_rowgroup_metadata())}; } } // namespace cudf::io::parquet::detail diff --git a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd index 9dc0db8f972..e9def2aea5d 100644 --- a/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd +++ b/python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd @@ -26,7 +26,6 @@ cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil: parquet_schema schema() except+ int64_t num_rows() except+ size_type num_rowgroups() except+ - size_type num_columns() except+ unordered_map[string, string] metadata() except+ vector[unordered_map[string, int64_t]] rowgroup_metadata() except+ From 421bfe3fe5020979c08a18052d76f2810785fb55 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb <14217455+mhaseeb123@users.noreply.github.com> Date: Mon, 1 Apr 2024 20:21:06 -0700 Subject: [PATCH 07/12] Update cpp/src/io/parquet/reader_impl_helpers.cpp Co-authored-by: Vukasin Milovanovic --- cpp/src/io/parquet/reader_impl_helpers.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/io/parquet/reader_impl_helpers.cpp b/cpp/src/io/parquet/reader_impl_helpers.cpp index 911307ec9a3..402ccef7a15 100644 --- a/cpp/src/io/parquet/reader_impl_helpers.cpp +++ b/cpp/src/io/parquet/reader_impl_helpers.cpp @@ -570,7 +570,7 @@ aggregate_reader_metadata::get_rowgroup_metadata() const std::transform(pfm.row_groups.cbegin(), pfm.row_groups.cend(), std::back_inserter(rg_metadata), - [&rg_metadata](auto const& rg) { + [](auto const& rg) { std::unordered_map rg_meta_map; rg_meta_map["num_rows"] = rg.num_rows; rg_meta_map["total_byte_size"] = rg.total_byte_size; From 7f3b13165128ca9e0bcac2313f442558055fbffe Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Apr 2024 04:56:13 +0000 Subject: [PATCH 08/12] minor bug fix when intersecting column names and index column name --- python/cudf/cudf/_lib/parquet.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d80c195909a..555240f620d 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -380,12 +380,14 @@ cpdef read_parquet_metadata(filepaths_or_buffers): index_col_names[idx_col] = c['name'] # remove the index column from the list of column names - col_names = [name for name in col_names if name not in index_col_names] + # only if index_col_names is not None + if index_col_names is not None: + col_names = [name for name in col_names if name not in index_col_names] # num_columns = length of list(col_names) num_columns = len(col_names) - # return the metadata + # return the metadata return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata From 319bc342a9145a76625dab0391fd71dd8a6ba33a Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 2 Apr 2024 04:57:50 +0000 Subject: [PATCH 09/12] minor comment indendation fix --- python/cudf/cudf/_lib/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 555240f620d..aab70cdf4eb 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -387,7 +387,7 @@ cpdef read_parquet_metadata(filepaths_or_buffers): # num_columns = length of list(col_names) num_columns = len(col_names) - # return the metadata + # return the metadata return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata From 1a0b45c8c343de6c2c202c8699bb44b722ecab90 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Mon, 8 Apr 2024 20:06:47 +0000 Subject: [PATCH 10/12] minor improvements --- python/cudf/cudf/_lib/parquet.pyx | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index aab70cdf4eb..4940ac8edbe 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -352,10 +352,8 @@ cpdef read_parquet_metadata(filepaths_or_buffers): num_rowgroups = c_result.num_rowgroups() # extract row group metadata and sanitize keys - row_group_metadata=[] - row_group_metadata_unsanitized = c_result.rowgroup_metadata() - for metadata in row_group_metadata_unsanitized: - row_group_metadata.append({k.decode(): v for k, v in metadata}) + row_group_metadata = [{k.decode(): v for k, v in metadata} + for metadata in c_result.rowgroup_metadata()] # read all column names including index column, if any col_names = [info.name().decode() for info in c_result.schema().root().children()] From a8a028d33abe627b804d044f6f7f3c8d377b9745 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Tue, 16 Apr 2024 23:27:18 +0000 Subject: [PATCH 11/12] suggestions from reviews --- cpp/include/cudf/io/parquet_metadata.hpp | 2 +- cpp/src/io/parquet/reader_impl.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/include/cudf/io/parquet_metadata.hpp b/cpp/include/cudf/io/parquet_metadata.hpp index ee1dbd19bae..e0c406c180c 100644 --- a/cpp/include/cudf/io/parquet_metadata.hpp +++ b/cpp/include/cudf/io/parquet_metadata.hpp @@ -207,7 +207,7 @@ class parquet_metadata { _num_rows{num_rows}, _num_rowgroups{num_rowgroups}, _file_metadata{std::move(file_metadata)}, - _rowgroup_metadata{rg_metadata} + _rowgroup_metadata{std::move(rg_metadata)} { } diff --git a/cpp/src/io/parquet/reader_impl.cpp b/cpp/src/io/parquet/reader_impl.cpp index 87844826079..d3e109cdb88 100644 --- a/cpp/src/io/parquet/reader_impl.cpp +++ b/cpp/src/io/parquet/reader_impl.cpp @@ -608,7 +608,7 @@ parquet_metadata read_parquet_metadata(host_span con metadata.get_num_rows(), metadata.get_num_row_groups(), metadata.get_key_value_metadata()[0], - std::move(metadata.get_rowgroup_metadata())}; + metadata.get_rowgroup_metadata()}; } } // namespace cudf::io::parquet::detail From 98036ed2d942546e8e10e7dd0d408f2225eaa425 Mon Sep 17 00:00:00 2001 From: Muhammad Haseeb Date: Wed, 17 Apr 2024 19:15:09 +0000 Subject: [PATCH 12/12] minor comments updates and remove a redundant code snippet --- python/cudf/cudf/_lib/parquet.pyx | 5 +---- python/cudf/cudf/io/parquet.py | 25 +++---------------------- 2 files changed, 4 insertions(+), 26 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 4940ac8edbe..9ce9aad18f7 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -324,15 +324,12 @@ cpdef read_parquet_metadata(filepaths_or_buffers): """ Cython function to call into libcudf API, see `read_parquet_metadata`. - filters, if not None, should be an Expression that evaluates to a - boolean predicate as a function of columns being read. - See Also -------- cudf.io.parquet.read_parquet cudf.io.parquet.to_parquet """ - # Convert NativeFile buffers to NativeFileDatasource, + # Convert NativeFile buffers to NativeFileDatasource for i, datasource in enumerate(filepaths_or_buffers): if isinstance(datasource, NativeFile): filepaths_or_buffers[i] = NativeFileDatasource(datasource) diff --git a/python/cudf/cudf/io/parquet.py b/python/cudf/cudf/io/parquet.py index 8a48997b547..e7f1ad0751f 100644 --- a/python/cudf/cudf/io/parquet.py +++ b/python/cudf/cudf/io/parquet.py @@ -274,34 +274,15 @@ def read_parquet_metadata(filepath_or_buffer): if not is_list_like(filepath_or_buffer): filepath_or_buffer = [filepath_or_buffer] - # Start by trying construct a filesystem object, so we - # can apply filters on remote file-systems + # Start by trying to construct a filesystem object fs, paths = ioutils._get_filesystem_and_paths( path_or_data=filepath_or_buffer, storage_options=None ) - # Use pyarrow dataset to detect/process directory-partitioned - # data and apply filters. Note that we can only support partitioned - # data and filtering if the input is a single directory or list of - # paths. - partition_keys = [] - partition_categories = {} - if fs and paths: - ( - paths, - row_groups, - partition_keys, - partition_categories, - ) = _process_dataset( - paths=paths, - fs=fs, - filters=None, - row_groups=None, - categorical_partitions=True, - dataset_kwargs=None, - ) + # Check if filepath or buffer filepath_or_buffer = paths if paths else filepath_or_buffer + # List of filepaths or buffers filepaths_or_buffers = [] for source in filepath_or_buffer: