Skip to content

Commit

Permalink
refactor: Mention allow_missing_columns in error message when colum…
Browse files Browse the repository at this point in the history
…n not found (parquet) (#18972)
  • Loading branch information
nameexhaustion authored Sep 27, 2024
1 parent 2dbb444 commit 6abc2f1
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 36 deletions.
92 changes: 62 additions & 30 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,22 +95,38 @@ impl<R: MmapBytesReader> ParquetReader<R> {

let schema = self.schema()?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
(|| {
if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
};
Ok(())
})()
.map_err(|e| {
if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
e.wrap_msg(|s| {
format!(
"error with column selection, \
consider enabling `allow_missing_columns`: {}",
s
)
})
} else {
e
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}
})?;

Ok(self)
}
Expand Down Expand Up @@ -316,22 +332,38 @@ impl ParquetAsyncReader {

let schema = self.schema().await?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
(|| {
if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
if schema.len() > first_schema.len() {
polars_bail!(
SchemaMismatch:
"parquet file contained extra columns and no selection was given"
)
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
};
Ok(())
})()
.map_err(|e| {
if !allow_missing_columns && matches!(e, PolarsError::ColumnNotFound(_)) {
e.wrap_msg(|s| {
format!(
"error with column selection, \
consider enabling `allow_missing_columns`: {}",
s
)
})
} else {
e
}

self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}
})?;

Ok(self)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) fn projected_arrow_schema_to_projection_indices(
for (i, field) in projected_arrow_schema.iter_values().enumerate() {
let dtype = {
let Some((idx, _, field)) = schema.get_full(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column in file: {}", field.name)
polars_bail!(ColumnNotFound: "did not find column in file: {}", field.name)
};

projection_indices.push(idx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(super) fn ensure_schema_has_projected_fields(
let expected_dtype = DataType::from_arrow(&field.dtype, true);
let dtype = {
let Some(field) = schema.get(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column: {}", field.name)
polars_bail!(ColumnNotFound: "error with column selection, consider enabling `allow_missing_columns`: did not find column in file: {}", field.name)
};
DataType::from_arrow(&field.dtype, true)
};
Expand Down
7 changes: 5 additions & 2 deletions py-polars/tests/unit/io/test_lazy_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ def test_parquet_schema_mismatch_panic_17067(tmp_path: Path, streaming: bool) ->
pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}).write_parquet(tmp_path / "1.parquet")
pl.DataFrame({"c": [1, 2, 3], "d": [4, 5, 6]}).write_parquet(tmp_path / "2.parquet")

with pytest.raises(pl.exceptions.SchemaError):
with pytest.raises(pl.exceptions.ColumnNotFoundError):
pl.scan_parquet(tmp_path).collect(streaming=streaming)


Expand Down Expand Up @@ -642,5 +642,8 @@ def test_parquet_unaligned_schema_read_missing_cols_from_first(

lf = pl.scan_parquet(paths)

with pytest.raises(pl.exceptions.SchemaError, match="did not find column"):
with pytest.raises(
pl.exceptions.ColumnNotFoundError,
match="did not find column in file: a",
):
lf.collect(streaming=streaming)
10 changes: 8 additions & 2 deletions py-polars/tests/unit/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -1932,10 +1932,16 @@ def test_allow_missing_columns(

expected = pl.DataFrame({"a": [1, 2], "b": [1, None]}).select(projection)

with pytest.raises(pl.exceptions.SchemaError, match="did not find column"):
with pytest.raises(
pl.exceptions.ColumnNotFoundError,
match="error with column selection, consider enabling `allow_missing_columns`: did not find column in file: b",
):
pl.read_parquet(paths, parallel=parallel) # type: ignore[arg-type]

with pytest.raises(pl.exceptions.SchemaError, match="did not find column"):
with pytest.raises(
pl.exceptions.ColumnNotFoundError,
match="error with column selection, consider enabling `allow_missing_columns`: did not find column in file: b",
):
pl.scan_parquet(paths, parallel=parallel).select(projection).collect( # type: ignore[arg-type]
streaming=streaming
)
Expand Down

0 comments on commit 6abc2f1

Please sign in to comment.