Skip to content

Commit

Permalink
feat: Add allow_missing_columns option to read/scan_parquet (#18922)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Sep 27, 2024
1 parent 79fcd53 commit 13e9717
Show file tree
Hide file tree
Showing 23 changed files with 256 additions and 59 deletions.
15 changes: 9 additions & 6 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ async fn download_projection(
let mut offsets = Vec::with_capacity(fields.len());
fields.iter().for_each(|name| {
// A single column can have multiple matches (structs).
let iter = row_group.columns_under_root_iter(name).map(|meta| {
let byte_range = meta.byte_range();
let offset = byte_range.start;
let byte_range = byte_range.start as usize..byte_range.end as usize;
(offset, byte_range)
});
let iter = row_group
.columns_under_root_iter(name)
.unwrap()
.map(|meta| {
let byte_range = meta.byte_range();
let offset = byte_range.start;
let byte_range = byte_range.start as usize..byte_range.end as usize;
(offset, byte_range)
});

for (offset, range) in iter {
offsets.push(offset);
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) fn collect_statistics(
let stats = schema
.iter_values()
.map(|field| {
let iter = md.columns_under_root_iter(&field.name);
let iter = md.columns_under_root_iter(&field.name).unwrap();

Ok(if iter.len() == 0 {
ColumnStats::new(field.into(), None, None, None)
Expand Down
86 changes: 65 additions & 21 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,19 @@ fn rg_to_dfs_prefiltered(
.map(|i| {
let col_idx = live_idx_to_col_idx[i];

let name = schema.get_at_index(col_idx).unwrap().0;
let field_md = file_metadata.row_groups[rg_idx]
.columns_under_root_iter(name)
.collect::<Vec<_>>();
let (name, field) = schema.get_at_index(col_idx).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)
column_idx_to_series(col_idx, part.as_slice(), None, schema, store)
.map(Column::from)
})
.collect::<PolarsResult<Vec<_>>>()?;
Expand Down Expand Up @@ -384,20 +391,30 @@ fn rg_to_dfs_prefiltered(
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

#[cfg(debug_assertions)]
{
let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
}

let n_rows_in_result = filter_mask.set_bits();

let mut dead_columns = (0..num_dead_columns)
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i];
let name = schema.get_at_index(col_idx).unwrap().0;

#[cfg(debug_assertions)]
{
let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
}
let field_md = file_metadata.row_groups[rg_idx]
.columns_under_root_iter(name)
.collect::<Vec<_>>();
let (name, field) = schema.get_at_index(col_idx).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
n_rows_in_result,
&DataType::from_arrow(&field.dtype, true),
));
};

let field_md = iter.collect::<Vec<_>>();

let pre = || {
column_idx_to_series(
Expand Down Expand Up @@ -556,8 +573,17 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.par_iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let part = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
Expand All @@ -574,8 +600,17 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let part = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
Expand Down Expand Up @@ -672,12 +707,21 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let field_md = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
field_md.as_slice(),
part.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
Expand Down
14 changes: 12 additions & 2 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ impl<R: MmapBytesReader> ParquetReader<R> {
/// dtype, and sets the projection indices.
pub fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
self.schema.replace(first_schema.clone());
}

let schema = self.schema()?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
Expand Down Expand Up @@ -301,9 +306,14 @@ impl ParquetAsyncReader {

pub async fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
self.schema.replace(first_schema.clone());
}

let schema = self.schema().await?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl LazyFileListReader for LazyJsonLineReader {
},
glob: true,
include_file_paths: self.include_file_paths,
allow_missing_columns: false,
};

let options = NDJsonReadOptions {
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct ScanArgsParquet {
/// Expand path given via globbing rules.
pub glob: bool,
pub include_file_paths: Option<PlSmallStr>,
pub allow_missing_columns: bool,
}

impl Default for ScanArgsParquet {
Expand All @@ -37,6 +38,7 @@ impl Default for ScanArgsParquet {
cache: true,
glob: true,
include_file_paths: None,
allow_missing_columns: false,
}
}
}
Expand Down Expand Up @@ -74,6 +76,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
self.args.allow_missing_columns,
)?
.build()
.into();
Expand Down
9 changes: 7 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ impl ParquetExec {
})
.collect::<Vec<_>>();

let allow_missing_columns = self.file_options.allow_missing_columns;

let out = POOL.install(|| {
readers_and_metadata
.into_par_iter()
Expand All @@ -217,8 +219,9 @@ impl ParquetExec {
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_arrow_schema_projection(
first_schema.as_ref(),
&first_schema,
projected_arrow_schema.as_deref(),
allow_missing_columns,
)?
.finish()?;

Expand Down Expand Up @@ -395,6 +398,7 @@ impl ParquetExec {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();
let allow_missing_columns = self.file_options.allow_missing_columns;

if verbose {
eprintln!("reading of {}/{} file...", processed, paths.len());
Expand Down Expand Up @@ -422,8 +426,9 @@ impl ParquetExec {
.with_slice(Some(slice))
.with_row_index(row_index)
.with_arrow_schema_projection(
first_schema.as_ref(),
&first_schema,
projected_arrow_schema.as_deref(),
allow_missing_columns,
)
.await?
.use_statistics(use_statistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Filter {
}
}

pub(crate) fn num_rows(&self) -> usize {
pub fn num_rows(&self) -> usize {
match self {
Filter::Range(range) => range.len(),
Filter::Mask(bitmap) => bitmap.set_bits(),
Expand Down
8 changes: 3 additions & 5 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ impl RowGroupMetadata {
self.columns.len()
}

/// Fetch all columns under this root name.
/// Fetch all columns under this root name if it exists.
pub fn columns_under_root_iter(
&self,
root_name: &str,
) -> impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator {
) -> Option<impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator> {
self.column_lookup
.get(root_name)
.unwrap()
.iter()
.map(|&x| &self.columns[x])
.map(|x| x.iter().map(|&x| &self.columns[x]))
}

/// Number of rows in this row group.
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/parquet/read/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn get_column_iterator<'a>(
) -> ColumnIterator<'a> {
let columns = row_group
.columns_under_root_iter(field_name)
.unwrap()
.rev()
.collect::<UnitVec<_>>();
ColumnIterator::new(reader, columns, max_page_size)
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl ParquetSource {
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)?
.with_row_index(file_options.row_index)
.with_predicate(predicate.clone())
Expand Down Expand Up @@ -199,6 +200,7 @@ impl ParquetSource {
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)
.await?
.with_predicate(predicate.clone())
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl DslBuilder {
},
glob: false,
include_file_paths: None,
allow_missing_columns: false,
};

Ok(DslPlan::Scan {
Expand Down Expand Up @@ -87,6 +88,7 @@ impl DslBuilder {
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
let options = FileScanOptions {
with_columns: None,
Expand All @@ -98,6 +100,7 @@ impl DslBuilder {
hive_options,
glob,
include_file_paths,
allow_missing_columns,
};
Ok(DslPlan::Scan {
sources,
Expand Down Expand Up @@ -143,6 +146,7 @@ impl DslBuilder {
hive_options,
glob: true,
include_file_paths,
allow_missing_columns: false,
},
scan_type: FileScan::Ipc {
options,
Expand Down Expand Up @@ -181,6 +185,7 @@ impl DslBuilder {
},
glob,
include_file_paths,
allow_missing_columns: false,
};
Ok(DslPlan::Scan {
sources,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct FileScanOptions {
pub hive_options: HiveOptions,
pub glob: bool,
pub include_file_paths: Option<PlSmallStr>,
pub allow_missing_columns: bool,
}

#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl PyLazyFrame {
#[cfg(feature = "parquet")]
#[staticmethod]
#[pyo3(signature = (source, sources, n_rows, cache, parallel, rechunk, row_index,
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths)
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
)]
fn new_from_parquet(
source: Option<PyObject>,
Expand All @@ -259,6 +259,7 @@ impl PyLazyFrame {
retries: usize,
glob: bool,
include_file_paths: Option<String>,
allow_missing_columns: bool,
) -> PyResult<Self> {
let parallel = parallel.0;
let hive_schema = hive_schema.map(|s| Arc::new(s.0));
Expand Down Expand Up @@ -287,6 +288,7 @@ impl PyLazyFrame {
hive_options,
glob,
include_file_paths: include_file_paths.map(|x| x.into()),
allow_missing_columns,
};

let sources = sources.0;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/lazyframe/visit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl NodeTraverser {
// Increment major on breaking changes to the IR (e.g. renaming
// fields, reordering tuples), minor on backwards compatible
// changes (e.g. exposing a new expression node).
const VERSION: Version = (2, 0);
const VERSION: Version = (2, 1);

pub fn new(root: Node, lp_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
Self {
Expand Down
Loading

0 comments on commit 13e9717

Please sign in to comment.