diff --git a/py-polars/polars/io/_utils.py b/py-polars/polars/io/_utils.py index 7590f8c871b9..55440d481ecf 100644 --- a/py-polars/polars/io/_utils.py +++ b/py-polars/polars/io/_utils.py @@ -31,33 +31,48 @@ def _is_local_file(file: str) -> bool: @overload def _prepare_file_arg( - file: str | list[str] | Path | BinaryIO | bytes, **kwargs: Any + file: str | list[str] | Path | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | BinaryIO]: ... @overload def _prepare_file_arg( - file: str | TextIO | Path | BinaryIO | bytes, **kwargs: Any + file: str | TextIO | Path | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | BinaryIO]: ... @overload def _prepare_file_arg( - file: str | list[str] | TextIO | Path | BinaryIO | bytes, **kwargs: Any + file: str | list[str] | Path | TextIO | BinaryIO | bytes, + encoding: str | None = ..., + *, + use_pyarrow: bool = ..., + raise_if_empty: bool = ..., + storage_options: dict[str, Any] | None = ..., ) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]: ... def _prepare_file_arg( - file: str | list[str] | TextIO | Path | BinaryIO | bytes, + file: str | list[str] | Path | TextIO | BinaryIO | bytes, encoding: str | None = None, *, - use_pyarrow: bool | None = None, + use_pyarrow: bool = False, raise_if_empty: bool = True, - **kwargs: Any, -) -> ContextManager[str | BinaryIO | list[str] | list[BinaryIO]]: + storage_options: dict[str, Any] | None = None, +) -> ContextManager[str | list[str] | BinaryIO | list[BinaryIO]]: """ Prepare file argument. @@ -80,6 +95,7 @@ def _prepare_file_arg( fsspec too. """ + storage_options = storage_options or {} # Small helper to use a variable as context @contextmanager @@ -167,8 +183,8 @@ def managed_file(file: Any) -> Iterator[Any]: context=f"{file!r}", raise_if_empty=raise_if_empty, ) - kwargs["encoding"] = encoding - return fsspec.open(file, **kwargs) + storage_options["encoding"] = encoding + return fsspec.open(file, **storage_options) if isinstance(file, list) and bool(file) and all(isinstance(f, str) for f in file): if _FSSPEC_AVAILABLE: @@ -182,8 +198,8 @@ def managed_file(file: Any) -> Iterator[Any]: for f in file ] ) - kwargs["encoding"] = encoding - return fsspec.open_files(file, **kwargs) + storage_options["encoding"] = encoding + return fsspec.open_files(file, **storage_options) if isinstance(file, str): file = normalize_filepath(file, check_not_directory=check_not_dir) diff --git a/py-polars/polars/io/csv/functions.py b/py-polars/polars/io/csv/functions.py index cf0e93522512..299414adc1a7 100644 --- a/py-polars/polars/io/csv/functions.py +++ b/py-polars/polars/io/csv/functions.py @@ -230,7 +230,7 @@ def read_csv( encoding=None, use_pyarrow=True, raise_if_empty=raise_if_empty, - **storage_options, + storage_options=storage_options, ) as data: import pyarrow as pa import pyarrow.csv @@ -364,7 +364,7 @@ def read_csv( encoding=encoding, use_pyarrow=False, raise_if_empty=raise_if_empty, - **storage_options, + storage_options=storage_options, ) as data: df = pl.DataFrame._read_csv( data, diff --git a/py-polars/polars/io/ipc/anonymous_scan.py b/py-polars/polars/io/ipc/anonymous_scan.py index db8cfc802a4b..b6f59b44783f 100644 --- a/py-polars/polars/io/ipc/anonymous_scan.py +++ b/py-polars/polars/io/ipc/anonymous_scan.py @@ -18,7 +18,7 @@ def _scan_ipc_fsspec( func = partial(_scan_ipc_impl, source, storage_options=storage_options) storage_options = storage_options or {} - with _prepare_file_arg(source, **storage_options) as data: + with _prepare_file_arg(source, storage_options=storage_options) as data: schema = polars.io.ipc.read_ipc_schema(data) return pl.LazyFrame._scan_python_function(schema, func) diff --git a/py-polars/polars/io/ipc/functions.py b/py-polars/polars/io/ipc/functions.py index 3d520b5cc388..a9563f032c82 100644 --- a/py-polars/polars/io/ipc/functions.py +++ b/py-polars/polars/io/ipc/functions.py @@ -79,8 +79,9 @@ def read_ipc( "`n_rows` cannot be used with `use_pyarrow=True` and `memory_map=False`" ) - storage_options = storage_options or {} - with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data: + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, storage_options=storage_options + ) as data: if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( @@ -154,8 +155,9 @@ def read_ipc_stream( DataFrame """ - storage_options = storage_options or {} - with _prepare_file_arg(source, use_pyarrow=use_pyarrow, **storage_options) as data: + with _prepare_file_arg( + source, use_pyarrow=use_pyarrow, storage_options=storage_options + ) as data: if use_pyarrow: if not _PYARROW_AVAILABLE: raise ModuleNotFoundError( diff --git a/py-polars/polars/io/parquet/anonymous_scan.py b/py-polars/polars/io/parquet/anonymous_scan.py index 1b698094d7bd..eea410c72b9c 100644 --- a/py-polars/polars/io/parquet/anonymous_scan.py +++ b/py-polars/polars/io/parquet/anonymous_scan.py @@ -17,8 +17,7 @@ def _scan_parquet_fsspec( ) -> LazyFrame: func = partial(_scan_parquet_impl, source, storage_options=storage_options) - storage_options = storage_options or {} - with _prepare_file_arg(source, **storage_options) as data: + with _prepare_file_arg(source, storage_options=storage_options) as data: schema = polars.io.parquet.read_parquet_schema(data) return pl.LazyFrame._scan_python_function(schema, func) diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index cfce8e1d085c..82b46aee45f1 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +from io import BytesIO from pathlib import Path from typing import TYPE_CHECKING, Any, BinaryIO @@ -8,117 +9,132 @@ from polars.convert import from_arrow from polars.dependencies import _PYARROW_AVAILABLE from polars.io._utils import _prepare_file_arg -from polars.utils.various import normalize_filepath +from polars.utils.various import is_int_sequence, normalize_filepath with contextlib.suppress(ImportError): from polars.polars import read_parquet_schema as _read_parquet_schema if TYPE_CHECKING: - from io import BytesIO - from polars import DataFrame, DataType, LazyFrame from polars.type_aliases import ParallelStrategy def read_parquet( - source: str | Path | BinaryIO | BytesIO | bytes, + source: str | Path | list[str] | list[Path] | BinaryIO | BytesIO | bytes, *, columns: list[int] | list[str] | None = None, n_rows: int | None = None, - use_pyarrow: bool = False, - memory_map: bool = True, - storage_options: dict[str, Any] | None = None, - parallel: ParallelStrategy = "auto", row_count_name: str | None = None, row_count_offset: int = 0, - low_memory: bool = False, - pyarrow_options: dict[str, Any] | None = None, + parallel: ParallelStrategy = "auto", use_statistics: bool = True, + hive_partitioning: bool = True, rechunk: bool = True, + low_memory: bool = False, + storage_options: dict[str, Any] | None = None, + retries: int = 0, + use_pyarrow: bool = False, + pyarrow_options: dict[str, Any] | None = None, + memory_map: bool = True, ) -> DataFrame: """ Read into a DataFrame from a parquet file. - Notes - ----- - * Partitioned files: - If you have a directory-nested (hive-style) partitioned dataset, you should - use the :func:`scan_pyarrow_dataset` method instead. - * When benchmarking: - This operation defaults to a `rechunk` operation at the end, meaning that all - data will be stored continuously in memory. Set `rechunk=False` if you are - benchmarking the parquet-reader as `rechunk` can be an expensive operation - that should not contribute to the timings. - Parameters ---------- source Path to a file, or a file-like object. If the path is a directory, files in that - directory will all be read. If `fsspec` is installed, it will be used to open - remote files. + directory will all be read. columns Columns to select. Accepts a list of column indices (starting at zero) or a list of column names. n_rows Stop reading from parquet file after reading `n_rows`. Only valid when `use_pyarrow=False`. - use_pyarrow - Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is - more stable. - memory_map - Memory map underlying file. This will likely increase performance. - Only used when `use_pyarrow=True`. - storage_options - Extra options that make sense for `fsspec.open()` or a particular storage - connection, e.g. host, port, username, password, etc. - parallel : {'auto', 'columns', 'row_groups', 'none'} - This determines the direction of parallelism. 'auto' will try to determine the - optimal direction. row_count_name If not None, this will insert a row count column with give name into the DataFrame. row_count_offset Offset to start the row_count column (only use if the name is set). - low_memory - Reduce memory pressure at the expense of performance. - pyarrow_options - Keyword arguments for `pyarrow.parquet.read_table - `_. + parallel : {'auto', 'columns', 'row_groups', 'none'} + This determines the direction of parallelism. 'auto' will try to determine the + optimal direction. use_statistics Use statistics in the parquet to determine if pages can be skipped from reading. + hive_partitioning + Infer statistics and schema from hive partitioned URL and use them + to prune reads. rechunk Make sure that all columns are contiguous in memory by aggregating the chunks into a single array. + low_memory + Reduce memory pressure at the expense of performance. + storage_options + Options that indicate how to connect to a cloud provider. + If the cloud provider is not supported by Polars, the storage options + are passed to `fsspec.open()`. - See Also - -------- - scan_parquet - scan_pyarrow_dataset + The cloud providers currently supported are AWS, GCP, and Azure. + See supported keys here: + + * `aws `_ + * `gcp `_ + * `azure `_ + + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. + retries + Number of retries if accessing a cloud instance fails. + use_pyarrow + Use pyarrow instead of the Rust native parquet reader. The pyarrow reader is + more stable. + pyarrow_options + Keyword arguments for `pyarrow.parquet.read_table + `_. + memory_map + Memory map underlying file. This will likely increase performance. + Only used when `use_pyarrow=True`. Returns ------- DataFrame + See Also + -------- + scan_parquet + scan_pyarrow_dataset + + Notes + ----- + * Partitioned files: + If you have a directory-nested (hive-style) partitioned dataset, you should + use the :func:`scan_pyarrow_dataset` method instead. + * When benchmarking: + This operation defaults to a `rechunk` operation at the end, meaning that all + data will be stored continuously in memory. Set `rechunk=False` if you are + benchmarking the parquet-reader as `rechunk` can be an expensive operation + that should not contribute to the timings. """ - if use_pyarrow and n_rows: - raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`") - - storage_options = storage_options or {} - pyarrow_options = pyarrow_options or {} - - with _prepare_file_arg( - source, use_pyarrow=use_pyarrow, **storage_options - ) as source_prep: - if use_pyarrow: - if not _PYARROW_AVAILABLE: - raise ModuleNotFoundError( - "'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`" - ) + # Dispatch to pyarrow if requested + if use_pyarrow: + if not _PYARROW_AVAILABLE: + raise ModuleNotFoundError( + "'pyarrow' is required when using `read_parquet(..., use_pyarrow=True)`" + ) + if n_rows is not None: + raise ValueError("`n_rows` cannot be used with `use_pyarrow=True`") + + import pyarrow as pa + import pyarrow.parquet - import pyarrow as pa - import pyarrow.parquet + pyarrow_options = pyarrow_options or {} + with _prepare_file_arg( + source, # type: ignore[arg-type] + use_pyarrow=True, + storage_options=storage_options, + ) as source_prep: return from_arrow( # type: ignore[return-value] pa.parquet.read_table( source_prep, @@ -128,17 +144,43 @@ def read_parquet( ) ) - return pl.DataFrame._read_parquet( - source_prep, - columns=columns, - n_rows=n_rows, - parallel=parallel, - row_count_name=row_count_name, - row_count_offset=row_count_offset, - low_memory=low_memory, - use_statistics=use_statistics, - rechunk=rechunk, - ) + # Read binary types using `read_parquet` + elif isinstance(source, (BinaryIO, BytesIO, bytes)): + with _prepare_file_arg(source, use_pyarrow=False) as source_prep: + return pl.DataFrame._read_parquet( + source_prep, + columns=columns, + n_rows=n_rows, + parallel=parallel, + row_count_name=row_count_name, + row_count_offset=row_count_offset, + low_memory=low_memory, + use_statistics=use_statistics, + rechunk=rechunk, + ) + + # For other inputs, defer to `scan_parquet` + lf = scan_parquet( + source, + n_rows=n_rows, + row_count_name=row_count_name, + row_count_offset=row_count_offset, + parallel=parallel, + use_statistics=use_statistics, + hive_partitioning=hive_partitioning, + rechunk=rechunk, + low_memory=low_memory, + cache=False, + storage_options=storage_options, + retries=retries, + ) + + if columns is not None: + if is_int_sequence(columns): + columns = [lf.columns[i] for i in columns] + lf = lf.select(columns) + + return lf.collect(no_optimization=True) def read_parquet_schema( @@ -170,15 +212,15 @@ def scan_parquet( source: str | Path | list[str] | list[Path], *, n_rows: int | None = None, - cache: bool = True, - parallel: ParallelStrategy = "auto", - rechunk: bool = True, row_count_name: str | None = None, row_count_offset: int = 0, - storage_options: dict[str, Any] | None = None, - low_memory: bool = False, + parallel: ParallelStrategy = "auto", use_statistics: bool = True, hive_partitioning: bool = True, + rechunk: bool = True, + low_memory: bool = False, + cache: bool = True, + storage_options: dict[str, Any] | None = None, retries: int = 0, ) -> LazyFrame: """ @@ -194,40 +236,41 @@ def scan_parquet( If a single path is given, it can be a globbing pattern. n_rows Stop reading from parquet file after reading `n_rows`. - cache - Cache the result after reading. + row_count_name + If not None, this will insert a row count column with the given name into the + DataFrame + row_count_offset + Offset to start the row_count column (only used if the name is set) parallel : {'auto', 'columns', 'row_groups', 'none'} This determines the direction of parallelism. 'auto' will try to determine the optimal direction. + use_statistics + Use statistics in the parquet to determine if pages + can be skipped from reading. + hive_partitioning + Infer statistics and schema from hive partitioned URL and use them + to prune reads. rechunk In case of reading multiple files via a glob pattern rechunk the final DataFrame into contiguous memory chunks. - row_count_name - If not None, this will insert a row count column with give name into the - DataFrame - row_count_offset - Offset to start the row_count column (only use if the name is set) + low_memory + Reduce memory pressure at the expense of performance. + cache + Cache the result after reading. storage_options - Options that inform use how to connect to the cloud provider. - If the cloud provider is not supported by us, the storage options + Options that indicate how to connect to a cloud provider. + If the cloud provider is not supported by Polars, the storage options are passed to `fsspec.open()`. - Currently supported providers are: {'aws', 'gcp', 'azure' }. + + The cloud providers currently supported are AWS, GCP, and Azure. See supported keys here: * `aws `_ * `gcp `_ * `azure `_ - If `storage_options` are not provided we will try to infer them from the - environment variables. - low_memory - Reduce memory pressure at the expense of performance. - use_statistics - Use statistics in the parquet to determine if pages - can be skipped from reading. - hive_partitioning - Infer statistics and schema from hive partitioned URL and use them - to prune reads. + If `storage_options` is not provided, Polars will try to infer the information + from environment variables. retries Number of retries if accessing a cloud instance fails. diff --git a/py-polars/polars/lazyframe/frame.py b/py-polars/polars/lazyframe/frame.py index 7e5443be62ff..ece772e9bac2 100644 --- a/py-polars/polars/lazyframe/frame.py +++ b/py-polars/polars/lazyframe/frame.py @@ -447,8 +447,11 @@ def _scan_parquet( scan = scan.with_row_count(row_count_name, row_count_offset) return scan # type: ignore[return-value] - if storage_options is not None: + if storage_options: storage_options = list(storage_options.items()) # type: ignore[assignment] + else: + # Handle empty dict input + storage_options = None self = cls.__new__(cls) self._ldf = PyLazyFrame.new_from_parquet( diff --git a/py-polars/tests/unit/io/cloud/test_aws.py b/py-polars/tests/unit/io/cloud/test_aws.py index d48ad8ec575e..03fb72b7eff4 100644 --- a/py-polars/tests/unit/io/cloud/test_aws.py +++ b/py-polars/tests/unit/io/cloud/test_aws.py @@ -58,7 +58,6 @@ def s3(s3_base: str, io_files_path: Path) -> str: [ (pl.read_csv, "csv"), (pl.read_ipc, "ipc"), - (pl.read_parquet, "parquet"), ], ) def test_read_s3(s3: str, function: Callable[..., Any], extension: str) -> None: diff --git a/py-polars/tests/unit/io/test_lazy_parquet.py b/py-polars/tests/unit/io/test_lazy_parquet.py index 5257c355e807..dcf77d9b29c4 100644 --- a/py-polars/tests/unit/io/test_lazy_parquet.py +++ b/py-polars/tests/unit/io/test_lazy_parquet.py @@ -97,17 +97,6 @@ def test_categorical_parquet_statistics(tmp_path: Path) -> None: assert df.shape == (4, 3) -@pytest.mark.write_disk() -def test_null_parquet(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)]) - file_path = tmp_path / "null.parquet" - df.write_parquet(file_path) - out = pl.read_parquet(file_path) - assert_frame_equal(out, df) - - @pytest.mark.write_disk() def test_parquet_eq_stats(tmp_path: Path) -> None: tmp_path.mkdir(exist_ok=True) @@ -353,25 +342,6 @@ def test_streaming_categorical(tmp_path: Path) -> None: assert_frame_equal(result, expected) -@pytest.mark.write_disk() -def test_parquet_struct_categorical(tmp_path: Path) -> None: - tmp_path.mkdir(exist_ok=True) - - df = pl.DataFrame( - [ - pl.Series("a", ["bob"], pl.Categorical), - pl.Series("b", ["foo"], pl.Categorical), - ] - ) - - file_path = tmp_path / "categorical.parquet" - df.write_parquet(file_path) - - with pl.StringCache(): - out = pl.read_parquet(file_path).select(pl.col("b").value_counts()) - assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]} - - def test_glob_n_rows(io_files_path: Path) -> None: file_path = io_files_path / "foods*.parquet" df = pl.scan_parquet(file_path, n_rows=40).collect() diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 26a8e33549d5..bfee08f9f9fe 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -588,3 +588,33 @@ def test_parquet_12831() -> None: df.write_parquet(f, row_group_size=int(1e8), data_page_size=512) f.seek(0) assert_frame_equal(pl.from_arrow(pq.read_table(f)), df) # type: ignore[arg-type] + + +@pytest.mark.write_disk() +def test_parquet_struct_categorical(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + df = pl.DataFrame( + [ + pl.Series("a", ["bob"], pl.Categorical), + pl.Series("b", ["foo"], pl.Categorical), + ] + ) + + file_path = tmp_path / "categorical.parquet" + df.write_parquet(file_path) + + with pl.StringCache(): + out = pl.read_parquet(file_path).select(pl.col("b").value_counts()) + assert out.to_dict(as_series=False) == {"b": [{"b": "foo", "count": 1}]} + + +@pytest.mark.write_disk() +def test_null_parquet(tmp_path: Path) -> None: + tmp_path.mkdir(exist_ok=True) + + df = pl.DataFrame([pl.Series("foo", [], dtype=pl.Int8)]) + file_path = tmp_path / "null.parquet" + df.write_parquet(file_path) + out = pl.read_parquet(file_path) + assert_frame_equal(out, df)