Skip to content

Commit

Permalink
fix: Raise when parquet file has extra columns and no select() was …
Browse files Browse the repository at this point in the history
…done (#18843)
  • Loading branch information
nameexhaustion authored Sep 23, 2024
1 parent 292ac84 commit b86eed3
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 21 deletions.
18 changes: 16 additions & 2 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {

/// Checks that the file contains all the columns in `projected_arrow_schema` with the same
/// dtype, and sets the projection indices.
pub fn with_projected_arrow_schema(
pub fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
Expand All @@ -96,6 +96,13 @@ impl<R: MmapBytesReader> ParquetReader<R> {
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}
Expand Down Expand Up @@ -292,7 +299,7 @@ impl ParquetAsyncReader {
})
}

pub async fn with_projected_arrow_schema(
pub async fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
Expand All @@ -305,6 +312,13 @@ impl ParquetAsyncReader {
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ impl ParquetExec {
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_projected_arrow_schema(
.with_arrow_schema_projection(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)?
Expand Down Expand Up @@ -421,7 +421,7 @@ impl ParquetExec {
let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_projected_arrow_schema(
.with_arrow_schema_projection(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl ParquetSource {
}

let mut reader = reader
.with_projected_arrow_schema(
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
)?
Expand Down Expand Up @@ -196,7 +196,7 @@ impl ParquetSource {
ParquetAsyncReader::from_uri(&uri, cloud_options.as_ref(), metadata)
.await?
.with_row_index(file_options.row_index)
.with_projected_arrow_schema(
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
)
Expand Down
27 changes: 21 additions & 6 deletions crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::sync::Arc;

use futures::StreamExt;
use polars_error::PolarsResult;
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource};
use polars_io::utils::slice::SplitSlicePosition;
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;

use super::metadata_utils::{ensure_metadata_has_projected_fields, read_parquet_metadata_bytes};
use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes};
use super::ParquetSourceNode;
use crate::async_executor;
use crate::async_primitives::connector::connector;
Expand Down Expand Up @@ -107,6 +107,15 @@ impl ParquetSourceNode {
};

let first_metadata = self.first_metadata.clone();
let reader_schema_len = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.len();
let has_projection = self.file_options.with_columns.is_some();

let process_metadata_bytes = {
move |handle: task_handles_ext::AbortOnDropHandle<
Expand All @@ -127,10 +136,16 @@ impl ParquetSourceNode {
)?,
};

ensure_metadata_has_projected_fields(
&metadata,
projected_arrow_schema.as_ref(),
)?;
let schema = polars_parquet::arrow::read::infer_schema(&metadata)?;

if !has_projection && schema.len() > reader_schema_len {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

ensure_schema_has_projected_fields(&schema, projected_arrow_schema.as_ref())?;

PolarsResult::Ok((path_index, byte_source, metadata))
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use polars_core::prelude::{ArrowSchema, DataType};
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_utils::mmap::MemSlice;

Expand Down Expand Up @@ -122,12 +121,10 @@ pub(super) async fn read_parquet_metadata_bytes(

/// Ensures that a parquet file has all the necessary columns for a projection with the correct
/// dtype. There are no ordering requirements and extra columns are permitted.
pub(super) fn ensure_metadata_has_projected_fields(
metadata: &FileMetadata,
pub(super) fn ensure_schema_has_projected_fields(
schema: &ArrowSchema,
projected_fields: &ArrowSchema,
) -> PolarsResult<()> {
let schema = polars_parquet::arrow::read::infer_schema(metadata)?;

for field in projected_fields.iter_values() {
// Note: We convert to Polars-native dtypes for timezone normalization.
let expected_dtype = DataType::from_arrow(&field.dtype, true);
Expand Down
9 changes: 5 additions & 4 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -597,10 +597,11 @@ def test_parquet_unaligned_schema_read(tmp_path: Path, streaming: bool) -> None:
pl.DataFrame({"a": [1, 2], "b": [10, 11]}),
)

assert_frame_equal(
lf.collect(streaming=streaming),
pl.DataFrame({"a": [1, 2, 3], "b": [10, 11, 12]}),
)
with pytest.raises(
pl.exceptions.SchemaError,
match="parquet file contained extra columns and no selection was given",
):
lf.collect(streaming=streaming)


@pytest.mark.write_disk
Expand Down

0 comments on commit b86eed3

Please sign in to comment.