diff --git a/crates/polars-io/src/parquet/read/async_impl.rs b/crates/polars-io/src/parquet/read/async_impl.rs index 0c1ead03b85b..da50364855da 100644 --- a/crates/polars-io/src/parquet/read/async_impl.rs +++ b/crates/polars-io/src/parquet/read/async_impl.rs @@ -178,12 +178,15 @@ async fn download_projection( let mut offsets = Vec::with_capacity(fields.len()); fields.iter().for_each(|name| { // A single column can have multiple matches (structs). - let iter = row_group.columns_under_root_iter(name).map(|meta| { - let byte_range = meta.byte_range(); - let offset = byte_range.start; - let byte_range = byte_range.start as usize..byte_range.end as usize; - (offset, byte_range) - }); + let iter = row_group + .columns_under_root_iter(name) + .unwrap() + .map(|meta| { + let byte_range = meta.byte_range(); + let offset = byte_range.start; + let byte_range = byte_range.start as usize..byte_range.end as usize; + (offset, byte_range) + }); for (offset, range) in iter { offsets.push(offset); diff --git a/crates/polars-io/src/parquet/read/predicates.rs b/crates/polars-io/src/parquet/read/predicates.rs index 87615de1b8c2..eb8f7747f078 100644 --- a/crates/polars-io/src/parquet/read/predicates.rs +++ b/crates/polars-io/src/parquet/read/predicates.rs @@ -24,7 +24,7 @@ pub(crate) fn collect_statistics( let stats = schema .iter_values() .map(|field| { - let iter = md.columns_under_root_iter(&field.name); + let iter = md.columns_under_root_iter(&field.name).unwrap(); Ok(if iter.len() == 0 { ColumnStats::new(field.into(), None, None, None) diff --git a/crates/polars-io/src/parquet/read/read_impl.rs b/crates/polars-io/src/parquet/read/read_impl.rs index 830208776d30..45aa2260de30 100644 --- a/crates/polars-io/src/parquet/read/read_impl.rs +++ b/crates/polars-io/src/parquet/read/read_impl.rs @@ -326,12 +326,19 @@ fn rg_to_dfs_prefiltered( .map(|i| { let col_idx = live_idx_to_col_idx[i]; - let name = schema.get_at_index(col_idx).unwrap().0; - let field_md = file_metadata.row_groups[rg_idx] - .columns_under_root_iter(name) - .collect::>(); + let (name, field) = schema.get_at_index(col_idx).unwrap(); + + let Some(iter) = md.columns_under_root_iter(name) else { + return Ok(Column::full_null( + name.clone(), + md.num_rows(), + &DataType::from_arrow(&field.dtype, true), + )); + }; + + let part = iter.collect::>(); - column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store) + column_idx_to_series(col_idx, part.as_slice(), None, schema, store) .map(Column::from) }) .collect::>>()?; @@ -384,20 +391,30 @@ fn rg_to_dfs_prefiltered( .then(|| calc_prefilter_cost(&filter_mask)) .unwrap_or_default(); + #[cfg(debug_assertions)] + { + let md = &file_metadata.row_groups[rg_idx]; + debug_assert_eq!(md.num_rows(), mask.len()); + } + + let n_rows_in_result = filter_mask.set_bits(); + let mut dead_columns = (0..num_dead_columns) .into_par_iter() .map(|i| { let col_idx = dead_idx_to_col_idx[i]; - let name = schema.get_at_index(col_idx).unwrap().0; - #[cfg(debug_assertions)] - { - let md = &file_metadata.row_groups[rg_idx]; - debug_assert_eq!(md.num_rows(), mask.len()); - } - let field_md = file_metadata.row_groups[rg_idx] - .columns_under_root_iter(name) - .collect::>(); + let (name, field) = schema.get_at_index(col_idx).unwrap(); + + let Some(iter) = md.columns_under_root_iter(name) else { + return Ok(Column::full_null( + name.clone(), + n_rows_in_result, + &DataType::from_arrow(&field.dtype, true), + )); + }; + + let field_md = iter.collect::>(); let pre = || { column_idx_to_series( @@ -556,8 +573,17 @@ fn rg_to_dfs_optionally_par_over_columns( projection .par_iter() .map(|column_i| { - let name = schema.get_at_index(*column_i).unwrap().0; - let part = md.columns_under_root_iter(name).collect::>(); + let (name, field) = schema.get_at_index(*column_i).unwrap(); + + let Some(iter) = md.columns_under_root_iter(name) else { + return Ok(Column::full_null( + name.clone(), + rg_slice.1, + &DataType::from_arrow(&field.dtype, true), + )); + }; + + let part = iter.collect::>(); column_idx_to_series( *column_i, @@ -574,8 +600,17 @@ fn rg_to_dfs_optionally_par_over_columns( projection .iter() .map(|column_i| { - let name = schema.get_at_index(*column_i).unwrap().0; - let part = md.columns_under_root_iter(name).collect::>(); + let (name, field) = schema.get_at_index(*column_i).unwrap(); + + let Some(iter) = md.columns_under_root_iter(name) else { + return Ok(Column::full_null( + name.clone(), + rg_slice.1, + &DataType::from_arrow(&field.dtype, true), + )); + }; + + let part = iter.collect::>(); column_idx_to_series( *column_i, @@ -672,12 +707,21 @@ fn rg_to_dfs_par_over_rg( let columns = projection .iter() .map(|column_i| { - let name = schema.get_at_index(*column_i).unwrap().0; - let field_md = md.columns_under_root_iter(name).collect::>(); + let (name, field) = schema.get_at_index(*column_i).unwrap(); + + let Some(iter) = md.columns_under_root_iter(name) else { + return Ok(Column::full_null( + name.clone(), + md.num_rows(), + &DataType::from_arrow(&field.dtype, true), + )); + }; + + let part = iter.collect::>(); column_idx_to_series( *column_i, - field_md.as_slice(), + part.as_slice(), Some(Filter::new_ranged(slice.0, slice.0 + slice.1)), schema, store, diff --git a/crates/polars-io/src/parquet/read/reader.rs b/crates/polars-io/src/parquet/read/reader.rs index 3f4328943973..3cef93a89d73 100644 --- a/crates/polars-io/src/parquet/read/reader.rs +++ b/crates/polars-io/src/parquet/read/reader.rs @@ -85,9 +85,14 @@ impl ParquetReader { /// dtype, and sets the projection indices. pub fn with_arrow_schema_projection( mut self, - first_schema: &ArrowSchema, + first_schema: &Arc, projected_arrow_schema: Option<&ArrowSchema>, + allow_missing_columns: bool, ) -> PolarsResult { + if allow_missing_columns { + self.schema.replace(first_schema.clone()); + } + let schema = self.schema()?; if let Some(projected_arrow_schema) = projected_arrow_schema { @@ -301,9 +306,14 @@ impl ParquetAsyncReader { pub async fn with_arrow_schema_projection( mut self, - first_schema: &ArrowSchema, + first_schema: &Arc, projected_arrow_schema: Option<&ArrowSchema>, + allow_missing_columns: bool, ) -> PolarsResult { + if allow_missing_columns { + self.schema.replace(first_schema.clone()); + } + let schema = self.schema().await?; if let Some(projected_arrow_schema) = projected_arrow_schema { diff --git a/crates/polars-lazy/src/scan/ndjson.rs b/crates/polars-lazy/src/scan/ndjson.rs index a44ce9053ef5..635d23c2ee2d 100644 --- a/crates/polars-lazy/src/scan/ndjson.rs +++ b/crates/polars-lazy/src/scan/ndjson.rs @@ -137,6 +137,7 @@ impl LazyFileListReader for LazyJsonLineReader { }, glob: true, include_file_paths: self.include_file_paths, + allow_missing_columns: false, }; let options = NDJsonReadOptions { diff --git a/crates/polars-lazy/src/scan/parquet.rs b/crates/polars-lazy/src/scan/parquet.rs index eb26eafb6144..382addea7ed1 100644 --- a/crates/polars-lazy/src/scan/parquet.rs +++ b/crates/polars-lazy/src/scan/parquet.rs @@ -21,6 +21,7 @@ pub struct ScanArgsParquet { /// Expand path given via globbing rules. pub glob: bool, pub include_file_paths: Option, + pub allow_missing_columns: bool, } impl Default for ScanArgsParquet { @@ -37,6 +38,7 @@ impl Default for ScanArgsParquet { cache: true, glob: true, include_file_paths: None, + allow_missing_columns: false, } } } @@ -74,6 +76,7 @@ impl LazyFileListReader for LazyParquetReader { self.args.hive_options, self.args.glob, self.args.include_file_paths, + self.args.allow_missing_columns, )? .build() .into(); diff --git a/crates/polars-mem-engine/src/executors/scan/parquet.rs b/crates/polars-mem-engine/src/executors/scan/parquet.rs index e15f8ee8be00..49b01c471610 100644 --- a/crates/polars-mem-engine/src/executors/scan/parquet.rs +++ b/crates/polars-mem-engine/src/executors/scan/parquet.rs @@ -202,6 +202,8 @@ impl ParquetExec { }) .collect::>(); + let allow_missing_columns = self.file_options.allow_missing_columns; + let out = POOL.install(|| { readers_and_metadata .into_par_iter() @@ -217,8 +219,9 @@ impl ParquetExec { .with_row_index(row_index) .with_predicate(predicate.clone()) .with_arrow_schema_projection( - first_schema.as_ref(), + &first_schema, projected_arrow_schema.as_deref(), + allow_missing_columns, )? .finish()?; @@ -395,6 +398,7 @@ impl ParquetExec { let first_schema = first_schema.clone(); let projected_arrow_schema = projected_arrow_schema.clone(); let predicate = predicate.clone(); + let allow_missing_columns = self.file_options.allow_missing_columns; if verbose { eprintln!("reading of {}/{} file...", processed, paths.len()); @@ -422,8 +426,9 @@ impl ParquetExec { .with_slice(Some(slice)) .with_row_index(row_index) .with_arrow_schema_projection( - first_schema.as_ref(), + &first_schema, projected_arrow_schema.as_deref(), + allow_missing_columns, ) .await? .use_statistics(use_statistics) diff --git a/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs b/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs index 03e641634467..a9f0f7b3ef87 100644 --- a/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs +++ b/crates/polars-parquet/src/arrow/read/deserialize/utils/filter.rs @@ -29,7 +29,7 @@ impl Filter { } } - pub(crate) fn num_rows(&self) -> usize { + pub fn num_rows(&self) -> usize { match self { Filter::Range(range) => range.len(), Filter::Mask(bitmap) => bitmap.set_bits(), diff --git a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs index 013308ad7f12..9cca27553415 100644 --- a/crates/polars-parquet/src/parquet/metadata/row_metadata.rs +++ b/crates/polars-parquet/src/parquet/metadata/row_metadata.rs @@ -49,16 +49,14 @@ impl RowGroupMetadata { self.columns.len() } - /// Fetch all columns under this root name. + /// Fetch all columns under this root name if it exists. pub fn columns_under_root_iter( &self, root_name: &str, - ) -> impl ExactSizeIterator + DoubleEndedIterator { + ) -> Option + DoubleEndedIterator> { self.column_lookup .get(root_name) - .unwrap() - .iter() - .map(|&x| &self.columns[x]) + .map(|x| x.iter().map(|&x| &self.columns[x])) } /// Number of rows in this row group. diff --git a/crates/polars-parquet/src/parquet/read/column/mod.rs b/crates/polars-parquet/src/parquet/read/column/mod.rs index 56f914ba568e..8e5a1f533375 100644 --- a/crates/polars-parquet/src/parquet/read/column/mod.rs +++ b/crates/polars-parquet/src/parquet/read/column/mod.rs @@ -23,6 +23,7 @@ pub fn get_column_iterator<'a>( ) -> ColumnIterator<'a> { let columns = row_group .columns_under_root_iter(field_name) + .unwrap() .rev() .collect::>(); ColumnIterator::new(reader, columns, max_page_size) diff --git a/crates/polars-pipe/src/executors/sources/parquet.rs b/crates/polars-pipe/src/executors/sources/parquet.rs index faed9d4b667e..efe3edac1b87 100644 --- a/crates/polars-pipe/src/executors/sources/parquet.rs +++ b/crates/polars-pipe/src/executors/sources/parquet.rs @@ -134,6 +134,7 @@ impl ParquetSource { .with_arrow_schema_projection( &self.first_schema, self.projected_arrow_schema.as_deref(), + self.file_options.allow_missing_columns, )? .with_row_index(file_options.row_index) .with_predicate(predicate.clone()) @@ -199,6 +200,7 @@ impl ParquetSource { .with_arrow_schema_projection( &self.first_schema, self.projected_arrow_schema.as_deref(), + self.file_options.allow_missing_columns, ) .await? .with_predicate(predicate.clone()) diff --git a/crates/polars-plan/src/plans/builder_dsl.rs b/crates/polars-plan/src/plans/builder_dsl.rs index 5458c0442abe..bc695b47a035 100644 --- a/crates/polars-plan/src/plans/builder_dsl.rs +++ b/crates/polars-plan/src/plans/builder_dsl.rs @@ -54,6 +54,7 @@ impl DslBuilder { }, glob: false, include_file_paths: None, + allow_missing_columns: false, }; Ok(DslPlan::Scan { @@ -87,6 +88,7 @@ impl DslBuilder { hive_options: HiveOptions, glob: bool, include_file_paths: Option, + allow_missing_columns: bool, ) -> PolarsResult { let options = FileScanOptions { with_columns: None, @@ -98,6 +100,7 @@ impl DslBuilder { hive_options, glob, include_file_paths, + allow_missing_columns, }; Ok(DslPlan::Scan { sources, @@ -143,6 +146,7 @@ impl DslBuilder { hive_options, glob: true, include_file_paths, + allow_missing_columns: false, }, scan_type: FileScan::Ipc { options, @@ -181,6 +185,7 @@ impl DslBuilder { }, glob, include_file_paths, + allow_missing_columns: false, }; Ok(DslPlan::Scan { sources, diff --git a/crates/polars-plan/src/plans/options.rs b/crates/polars-plan/src/plans/options.rs index 078acbae7177..f0df191d395f 100644 --- a/crates/polars-plan/src/plans/options.rs +++ b/crates/polars-plan/src/plans/options.rs @@ -39,6 +39,7 @@ pub struct FileScanOptions { pub hive_options: HiveOptions, pub glob: bool, pub include_file_paths: Option, + pub allow_missing_columns: bool, } #[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)] diff --git a/crates/polars-python/src/lazyframe/general.rs b/crates/polars-python/src/lazyframe/general.rs index 3d0327c8f2cd..18b9323388e7 100644 --- a/crates/polars-python/src/lazyframe/general.rs +++ b/crates/polars-python/src/lazyframe/general.rs @@ -240,7 +240,7 @@ impl PyLazyFrame { #[cfg(feature = "parquet")] #[staticmethod] #[pyo3(signature = (source, sources, n_rows, cache, parallel, rechunk, row_index, - low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths) + low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns) )] fn new_from_parquet( source: Option, @@ -259,6 +259,7 @@ impl PyLazyFrame { retries: usize, glob: bool, include_file_paths: Option, + allow_missing_columns: bool, ) -> PyResult { let parallel = parallel.0; let hive_schema = hive_schema.map(|s| Arc::new(s.0)); @@ -287,6 +288,7 @@ impl PyLazyFrame { hive_options, glob, include_file_paths: include_file_paths.map(|x| x.into()), + allow_missing_columns, }; let sources = sources.0; diff --git a/crates/polars-python/src/lazyframe/visit.rs b/crates/polars-python/src/lazyframe/visit.rs index 8507d590d84c..726b5e7debd4 100644 --- a/crates/polars-python/src/lazyframe/visit.rs +++ b/crates/polars-python/src/lazyframe/visit.rs @@ -57,7 +57,7 @@ impl NodeTraverser { // Increment major on breaking changes to the IR (e.g. renaming // fields, reordering tuples), minor on backwards compatible // changes (e.g. exposing a new expression node). - const VERSION: Version = (2, 0); + const VERSION: Version = (2, 1); pub fn new(root: Node, lp_arena: Arena, expr_arena: Arena) -> Self { Self { diff --git a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs index f65a4436a75f..0bee88861769 100644 --- a/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/metadata_fetch.rs @@ -6,7 +6,6 @@ use polars_io::prelude::FileMetadata; use polars_io::utils::byte_source::{DynByteSource, MemSliceByteSource}; use polars_io::utils::slice::SplitSlicePosition; use polars_utils::mmap::MemSlice; -use polars_utils::pl_str::PlSmallStr; use super::metadata_utils::{ensure_schema_has_projected_fields, read_parquet_metadata_bytes}; use super::ParquetSourceNode; @@ -116,6 +115,7 @@ impl ParquetSourceNode { .unwrap_left() .len(); let has_projection = self.file_options.with_columns.is_some(); + let allow_missing_columns = self.file_options.allow_missing_columns; let process_metadata_bytes = { move |handle: task_handles_ext::AbortOnDropHandle< @@ -145,7 +145,12 @@ impl ParquetSourceNode { ) } - ensure_schema_has_projected_fields(&schema, projected_arrow_schema.as_ref())?; + if !allow_missing_columns { + ensure_schema_has_projected_fields( + &schema, + projected_arrow_schema.as_ref(), + )?; + } PolarsResult::Ok((path_index, byte_source, metadata)) }); @@ -213,11 +218,12 @@ impl ParquetSourceNode { let (path_index, byte_source, metadata) = v.map_err(|err| { err.wrap_msg(|msg| { format!( - "error at path (index: {}, path: {:?}): {}", + "error at path (index: {}, path: {}): {}", current_path_index, scan_sources .get(current_path_index) - .map(|x| PlSmallStr::from_str(x.to_include_path_name())), + .unwrap() + .to_include_path_name(), msg ) }) diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs index e3d2ba329d68..dfa4b11e3b02 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_data_fetch.rs @@ -353,6 +353,10 @@ fn get_row_group_byte_ranges_for_projection<'a>( columns.iter().flat_map(|col_name| { row_group_metadata .columns_under_root_iter(col_name) + // `Option::into_iter` so that we return an empty iterator for the + // `allow_missing_columns` case + .into_iter() + .flatten() .map(|col| { let byte_range = col.byte_range(); byte_range.start as usize..byte_range.end as usize diff --git a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs index dc8283b7f735..eda18101d1a0 100644 --- a/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs +++ b/crates/polars-stream/src/nodes/parquet_source/row_group_decode.rs @@ -215,6 +215,11 @@ impl RowGroupDecoder { filter: Option, ) -> PolarsResult<()> { let projected_arrow_schema = &self.projected_arrow_schema; + let expected_num_rows = filter + .as_ref() + .map_or(row_group_data.row_group_metadata.num_rows(), |x| { + x.num_rows() + }); let Some((cols_per_thread, remainder)) = calc_cols_per_thread( row_group_data.row_group_metadata.num_rows(), @@ -222,10 +227,14 @@ impl RowGroupDecoder { self.min_values_per_thread, ) else { // Single-threaded - for s in projected_arrow_schema - .iter_values() - .map(|arrow_field| decode_column(arrow_field, row_group_data, filter.clone())) - { + for s in projected_arrow_schema.iter_values().map(|arrow_field| { + decode_column( + arrow_field, + row_group_data, + filter.clone(), + expected_num_rows, + ) + }) { out_vec.push(s?) } @@ -253,7 +262,12 @@ impl RowGroupDecoder { let (_, arrow_field) = projected_arrow_schema.get_at_index(i).unwrap(); - decode_column(arrow_field, &row_group_data, filter.clone()) + decode_column( + arrow_field, + &row_group_data, + filter.clone(), + expected_num_rows, + ) }) .collect::>>() } @@ -270,7 +284,14 @@ impl RowGroupDecoder { for out in projected_arrow_schema .iter_values() .take(remainder) - .map(|arrow_field| decode_column(arrow_field, row_group_data, filter.clone())) + .map(|arrow_field| { + decode_column( + arrow_field, + row_group_data, + filter.clone(), + expected_num_rows, + ) + }) { out_vec.push(out?); } @@ -307,10 +328,20 @@ fn decode_column( arrow_field: &ArrowField, row_group_data: &RowGroupData, filter: Option, + expected_num_rows: usize, ) -> PolarsResult { - let columns_to_deserialize = row_group_data + let Some(iter) = row_group_data .row_group_metadata .columns_under_root_iter(&arrow_field.name) + else { + return Ok(Column::full_null( + arrow_field.name.clone(), + expected_num_rows, + &DataType::from_arrow(&arrow_field.dtype, true), + )); + }; + + let columns_to_deserialize = iter .map(|col_md| { let byte_range = col_md.byte_range(); @@ -329,6 +360,8 @@ fn decode_column( filter, )?; + assert_eq!(array.len(), expected_num_rows); + let series = Series::try_from((arrow_field, array))?; // TODO: Also load in the metadata. @@ -440,14 +473,12 @@ impl RowGroupDecoder { let row_group_data = Arc::new(row_group_data); - let mut live_columns = { - let capacity = self.row_index.is_some() as usize + let mut live_columns = Vec::with_capacity( + self.row_index.is_some() as usize + self.predicate_arrow_field_indices.len() + self.hive_partitions_width - + self.include_file_paths.is_some() as usize; - - Vec::with_capacity(capacity) - }; + + self.include_file_paths.is_some() as usize, + ); if let Some(s) = self.materialize_row_index( row_group_data.as_ref(), @@ -479,7 +510,9 @@ impl RowGroupDecoder { .predicate_arrow_field_indices .iter() .map(|&i| self.projected_arrow_schema.get_at_index(i).unwrap()) - .map(|(_, arrow_field)| decode_column(arrow_field, &row_group_data, None)) + .map(|(_, arrow_field)| { + decode_column(arrow_field, &row_group_data, None, projection_height) + }) { live_columns.push(s?); } @@ -514,6 +547,7 @@ impl RowGroupDecoder { assert_eq!(mask_bitmap.len(), projection_height); let prefilter_cost = calc_prefilter_cost(&mask_bitmap); + let expected_num_rows = mask_bitmap.set_bits(); let dead_cols_filtered = self .non_predicate_arrow_field_indices @@ -527,6 +561,7 @@ impl RowGroupDecoder { prefilter_setting, mask, &mask_bitmap, + expected_num_rows, ) }) .collect::>>()?; @@ -569,10 +604,20 @@ fn decode_column_prefiltered( prefilter_setting: &PrefilterMaskSetting, mask: &BooleanChunked, mask_bitmap: &Bitmap, + expected_num_rows: usize, ) -> PolarsResult { - let columns_to_deserialize = row_group_data + let Some(iter) = row_group_data .row_group_metadata .columns_under_root_iter(&arrow_field.name) + else { + return Ok(Column::full_null( + arrow_field.name.clone(), + expected_num_rows, + &DataType::from_arrow(&arrow_field.dtype, true), + )); + }; + + let columns_to_deserialize = iter .map(|col_md| { let byte_range = col_md.byte_range(); @@ -596,6 +641,8 @@ fn decode_column_prefiltered( deserialize_filter, )?; + assert_eq!(array.len(), expected_num_rows); + let column = Series::try_from((arrow_field, array))?.into_column(); if !prefilter { diff --git a/crates/polars/tests/it/io/parquet/read/mod.rs b/crates/polars/tests/it/io/parquet/read/mod.rs index 60ed6108edb7..d671e085c86a 100644 --- a/crates/polars/tests/it/io/parquet/read/mod.rs +++ b/crates/polars/tests/it/io/parquet/read/mod.rs @@ -205,6 +205,7 @@ pub fn read_column( let mut statistics = metadata.row_groups[row_group] .columns_under_root_iter(field.name()) + .unwrap() .map(|column_meta| column_meta.statistics().transpose()) .collect::>>()?; diff --git a/crates/polars/tests/it/io/parquet/read/row_group.rs b/crates/polars/tests/it/io/parquet/read/row_group.rs index 6d567a120c92..80478a0da958 100644 --- a/crates/polars/tests/it/io/parquet/read/row_group.rs +++ b/crates/polars/tests/it/io/parquet/read/row_group.rs @@ -75,6 +75,7 @@ pub fn read_columns<'a, R: Read + Seek>( ) -> PolarsResult)>> { row_group_metadata .columns_under_root_iter(field_name) + .unwrap() .map(|meta| _read_single_column(reader, meta)) .collect() } diff --git a/crates/polars/tests/it/io/parquet/write/mod.rs b/crates/polars/tests/it/io/parquet/write/mod.rs index 4403277a0552..e98a0223937f 100644 --- a/crates/polars/tests/it/io/parquet/write/mod.rs +++ b/crates/polars/tests/it/io/parquet/write/mod.rs @@ -215,6 +215,7 @@ fn basic() -> ParquetResult<()> { assert_eq!( metadata.row_groups[0] .columns_under_root_iter("col") + .unwrap() .next() .unwrap() .uncompressed_size(), diff --git a/py-polars/polars/io/parquet/functions.py b/py-polars/polars/io/parquet/functions.py index bc434b05cc2d..687c827a9c57 100644 --- a/py-polars/polars/io/parquet/functions.py +++ b/py-polars/polars/io/parquet/functions.py @@ -60,6 +60,7 @@ def read_parquet( use_pyarrow: bool = False, pyarrow_options: dict[str, Any] | None = None, memory_map: bool = True, + allow_missing_columns: bool = False, ) -> DataFrame: """ Read into a DataFrame from a parquet file. @@ -139,6 +140,12 @@ def read_parquet( memory_map Memory map underlying file. This will likely increase performance. Only used when `use_pyarrow=True`. + allow_missing_columns + When reading a list of parquet files, if a column existing in the first + file cannot be found in subsequent files, the default behavior is to + raise an error. However, if `allow_missing_columns` is set to + `True`, a full-NULL column is returned instead of erroring for the files + that do not contain the column. Returns ------- @@ -198,6 +205,7 @@ def read_parquet( retries=retries, glob=glob, include_file_paths=None, + allow_missing_columns=allow_missing_columns, ) if columns is not None: @@ -307,6 +315,7 @@ def scan_parquet( storage_options: dict[str, Any] | None = None, retries: int = 2, include_file_paths: str | None = None, + allow_missing_columns: bool = False, ) -> LazyFrame: """ Lazily read from a local or cloud-hosted parquet file (or files). @@ -388,6 +397,12 @@ def scan_parquet( Number of retries if accessing a cloud instance fails. include_file_paths Include the path of the source file(s) as a column with this name. + allow_missing_columns + When reading a list of parquet files, if a column existing in the first + file cannot be found in subsequent files, the default behavior is to + raise an error. However, if `allow_missing_columns` is set to + `True`, a full-NULL column is returned instead of erroring for the files + that do not contain the column. See Also -------- @@ -439,6 +454,7 @@ def scan_parquet( retries=retries, glob=glob, include_file_paths=include_file_paths, + allow_missing_columns=allow_missing_columns, ) @@ -460,6 +476,7 @@ def _scan_parquet_impl( try_parse_hive_dates: bool = True, retries: int = 2, include_file_paths: str | None = None, + allow_missing_columns: bool = False, ) -> LazyFrame: if isinstance(source, list): sources = source @@ -490,5 +507,6 @@ def _scan_parquet_impl( retries=retries, glob=glob, include_file_paths=include_file_paths, + allow_missing_columns=allow_missing_columns, ) return wrap_ldf(pylf) diff --git a/py-polars/tests/unit/io/test_parquet.py b/py-polars/tests/unit/io/test_parquet.py index 10aa23dfa7b6..ce1a4658a9f4 100644 --- a/py-polars/tests/unit/io/test_parquet.py +++ b/py-polars/tests/unit/io/test_parquet.py @@ -1917,3 +1917,47 @@ def test_prefilter_with_projection() -> None: .select(pl.col.a) .collect() ) + + +@pytest.mark.parametrize("parallel", ["columns", "row_groups", "prefiltered", "none"]) +@pytest.mark.parametrize("streaming", [True, False]) +@pytest.mark.parametrize("projection", [pl.all(), pl.col("b")]) +@pytest.mark.write_disk +def test_allow_missing_columns( + tmp_path: Path, + parallel: str, + streaming: bool, + projection: pl.Expr, +) -> None: + tmp_path.mkdir(exist_ok=True) + dfs = [pl.DataFrame({"a": 1, "b": 1}), pl.DataFrame({"a": 2})] + paths = [tmp_path / "1", tmp_path / "2"] + + for df, path in zip(dfs, paths): + df.write_parquet(path) + + expected = pl.DataFrame({"a": [1, 2], "b": [1, None]}).select(projection) + + with pytest.raises(pl.exceptions.SchemaError, match="did not find column"): + pl.read_parquet(paths, parallel=parallel) # type: ignore[arg-type] + + with pytest.raises(pl.exceptions.SchemaError, match="did not find column"): + pl.scan_parquet(paths, parallel=parallel).select(projection).collect( # type: ignore[arg-type] + streaming=streaming + ) + + assert_frame_equal( + pl.read_parquet( + paths, + parallel=parallel, # type: ignore[arg-type] + allow_missing_columns=True, + ).select(projection), + expected, + ) + + assert_frame_equal( + pl.scan_parquet(paths, parallel=parallel, allow_missing_columns=True) # type: ignore[arg-type] + .select(projection) + .collect(streaming=streaming), + expected, + )