Skip to content

Commit

Permalink
refactor: Remove extra hashmap construction in new-streaming parquet (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 17, 2024
1 parent f631502 commit 5d81d4a
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ impl ParquetSourceNode {
};

ensure_metadata_has_projected_fields(
projected_arrow_schema.as_ref(),
&metadata,
projected_arrow_schema.as_ref(),
)?;

PolarsResult::Ok((path_index, byte_source, metadata))
Expand Down
25 changes: 9 additions & 16 deletions crates/polars-stream/src/nodes/parquet_source/metadata_utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use polars_core::prelude::{ArrowSchema, DataType, PlHashMap};
use polars_core::prelude::{ArrowSchema, DataType};
use polars_error::{polars_bail, PolarsResult};
use polars_io::prelude::FileMetadata;
use polars_io::utils::byte_source::{ByteSource, DynByteSource};
use polars_utils::mmap::MemSlice;
use polars_utils::pl_str::PlSmallStr;

/// Read the metadata bytes of a parquet file, does not decode the bytes. If during metadata fetch
/// the bytes of the entire file are loaded, it is returned in the second return value.
Expand Down Expand Up @@ -124,26 +123,20 @@ pub(super) async fn read_parquet_metadata_bytes(
/// Ensures that a parquet file has all the necessary columns for a projection with the correct
/// dtype. There are no ordering requirements and extra columns are permitted.
pub(super) fn ensure_metadata_has_projected_fields(
projected_fields: &ArrowSchema,
metadata: &FileMetadata,
projected_fields: &ArrowSchema,
) -> PolarsResult<()> {
let schema = polars_parquet::arrow::read::infer_schema(metadata)?;

// Note: We convert to Polars-native dtypes for timezone normalization.
let mut schema = schema
.into_iter_values()
.map(|x| {
let dtype = DataType::from_arrow(&x.dtype, true);
(x.name, dtype)
})
.collect::<PlHashMap<PlSmallStr, DataType>>();

for field in projected_fields.iter_values() {
let Some(dtype) = schema.remove(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column: {}", field.name)
};

// Note: We convert to Polars-native dtypes for timezone normalization.
let expected_dtype = DataType::from_arrow(&field.dtype, true);
let dtype = {
let Some(field) = schema.get(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column: {}", field.name)
};
DataType::from_arrow(&field.dtype, true)
};

if dtype != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
Expand Down

0 comments on commit 5d81d4a

Please sign in to comment.