Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add allow_missing_columns option to read/scan_parquet #18922

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions crates/polars-io/src/parquet/read/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,15 @@ async fn download_projection(
let mut offsets = Vec::with_capacity(fields.len());
fields.iter().for_each(|name| {
// A single column can have multiple matches (structs).
let iter = row_group.columns_under_root_iter(name).map(|meta| {
let byte_range = meta.byte_range();
let offset = byte_range.start;
let byte_range = byte_range.start as usize..byte_range.end as usize;
(offset, byte_range)
});
let iter = row_group
.columns_under_root_iter(name)
.unwrap()
.map(|meta| {
let byte_range = meta.byte_range();
let offset = byte_range.start;
let byte_range = byte_range.start as usize..byte_range.end as usize;
(offset, byte_range)
});

for (offset, range) in iter {
offsets.push(offset);
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) fn collect_statistics(
let stats = schema
.iter_values()
.map(|field| {
let iter = md.columns_under_root_iter(&field.name);
let iter = md.columns_under_root_iter(&field.name).unwrap();

Ok(if iter.len() == 0 {
ColumnStats::new(field.into(), None, None, None)
Expand Down
86 changes: 65 additions & 21 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,19 @@ fn rg_to_dfs_prefiltered(
.map(|i| {
let col_idx = live_idx_to_col_idx[i];

let name = schema.get_at_index(col_idx).unwrap().0;
let field_md = file_metadata.row_groups[rg_idx]
.columns_under_root_iter(name)
.collect::<Vec<_>>();
let (name, field) = schema.get_at_index(col_idx).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(col_idx, field_md.as_slice(), None, schema, store)
column_idx_to_series(col_idx, part.as_slice(), None, schema, store)
.map(Column::from)
})
.collect::<PolarsResult<Vec<_>>>()?;
Expand Down Expand Up @@ -384,20 +391,30 @@ fn rg_to_dfs_prefiltered(
.then(|| calc_prefilter_cost(&filter_mask))
.unwrap_or_default();

#[cfg(debug_assertions)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if cfg!(debug_assertions)

{
let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
}

let n_rows_in_result = filter_mask.set_bits();

let mut dead_columns = (0..num_dead_columns)
.into_par_iter()
.map(|i| {
let col_idx = dead_idx_to_col_idx[i];
let name = schema.get_at_index(col_idx).unwrap().0;

#[cfg(debug_assertions)]
{
let md = &file_metadata.row_groups[rg_idx];
debug_assert_eq!(md.num_rows(), mask.len());
}
let field_md = file_metadata.row_groups[rg_idx]
.columns_under_root_iter(name)
.collect::<Vec<_>>();
let (name, field) = schema.get_at_index(col_idx).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
n_rows_in_result,
&DataType::from_arrow(&field.dtype, true),
));
};

let field_md = iter.collect::<Vec<_>>();

let pre = || {
column_idx_to_series(
Expand Down Expand Up @@ -556,8 +573,17 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.par_iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let part = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
Expand All @@ -574,8 +600,17 @@ fn rg_to_dfs_optionally_par_over_columns(
projection
.iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let part = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
rg_slice.1,
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
Expand Down Expand Up @@ -672,12 +707,21 @@ fn rg_to_dfs_par_over_rg(
let columns = projection
.iter()
.map(|column_i| {
let name = schema.get_at_index(*column_i).unwrap().0;
let field_md = md.columns_under_root_iter(name).collect::<Vec<_>>();
let (name, field) = schema.get_at_index(*column_i).unwrap();

let Some(iter) = md.columns_under_root_iter(name) else {
return Ok(Column::full_null(
name.clone(),
md.num_rows(),
&DataType::from_arrow(&field.dtype, true),
));
};

let part = iter.collect::<Vec<_>>();

column_idx_to_series(
*column_i,
field_md.as_slice(),
part.as_slice(),
Some(Filter::new_ranged(slice.0, slice.0 + slice.1)),
schema,
store,
Expand Down
14 changes: 12 additions & 2 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,14 @@ impl<R: MmapBytesReader> ParquetReader<R> {
/// dtype, and sets the projection indices.
pub fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
self.schema.replace(first_schema.clone());
}

let schema = self.schema()?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
Expand Down Expand Up @@ -301,9 +306,14 @@ impl ParquetAsyncReader {

pub async fn with_arrow_schema_projection(
mut self,
first_schema: &ArrowSchema,
first_schema: &Arc<ArrowSchema>,
projected_arrow_schema: Option<&ArrowSchema>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
if allow_missing_columns {
self.schema.replace(first_schema.clone());
}

let schema = self.schema().await?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
Expand Down
1 change: 1 addition & 0 deletions crates/polars-lazy/src/scan/ndjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ impl LazyFileListReader for LazyJsonLineReader {
},
glob: true,
include_file_paths: self.include_file_paths,
allow_missing_columns: false,
};

let options = NDJsonReadOptions {
Expand Down
3 changes: 3 additions & 0 deletions crates/polars-lazy/src/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub struct ScanArgsParquet {
/// Expand path given via globbing rules.
pub glob: bool,
pub include_file_paths: Option<PlSmallStr>,
pub allow_missing_columns: bool,
}

impl Default for ScanArgsParquet {
Expand All @@ -37,6 +38,7 @@ impl Default for ScanArgsParquet {
cache: true,
glob: true,
include_file_paths: None,
allow_missing_columns: false,
}
}
}
Expand Down Expand Up @@ -74,6 +76,7 @@ impl LazyFileListReader for LazyParquetReader {
self.args.hive_options,
self.args.glob,
self.args.include_file_paths,
self.args.allow_missing_columns,
)?
.build()
.into();
Expand Down
9 changes: 7 additions & 2 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ impl ParquetExec {
})
.collect::<Vec<_>>();

let allow_missing_columns = self.file_options.allow_missing_columns;

let out = POOL.install(|| {
readers_and_metadata
.into_par_iter()
Expand All @@ -217,8 +219,9 @@ impl ParquetExec {
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_arrow_schema_projection(
first_schema.as_ref(),
&first_schema,
projected_arrow_schema.as_deref(),
allow_missing_columns,
)?
.finish()?;

Expand Down Expand Up @@ -395,6 +398,7 @@ impl ParquetExec {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();
let allow_missing_columns = self.file_options.allow_missing_columns;

if verbose {
eprintln!("reading of {}/{} file...", processed, paths.len());
Expand Down Expand Up @@ -422,8 +426,9 @@ impl ParquetExec {
.with_slice(Some(slice))
.with_row_index(row_index)
.with_arrow_schema_projection(
first_schema.as_ref(),
&first_schema,
projected_arrow_schema.as_deref(),
allow_missing_columns,
)
.await?
.use_statistics(use_statistics)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Filter {
}
}

pub(crate) fn num_rows(&self) -> usize {
pub fn num_rows(&self) -> usize {
match self {
Filter::Range(range) => range.len(),
Filter::Mask(bitmap) => bitmap.set_bits(),
Expand Down
8 changes: 3 additions & 5 deletions crates/polars-parquet/src/parquet/metadata/row_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,14 @@ impl RowGroupMetadata {
self.columns.len()
}

/// Fetch all columns under this root name.
/// Fetch all columns under this root name if it exists.
pub fn columns_under_root_iter(
&self,
root_name: &str,
) -> impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator {
) -> Option<impl ExactSizeIterator<Item = &ColumnChunkMetadata> + DoubleEndedIterator> {
self.column_lookup
.get(root_name)
.unwrap()
.iter()
.map(|&x| &self.columns[x])
.map(|x| x.iter().map(|&x| &self.columns[x]))
}

/// Number of rows in this row group.
Expand Down
1 change: 1 addition & 0 deletions crates/polars-parquet/src/parquet/read/column/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub fn get_column_iterator<'a>(
) -> ColumnIterator<'a> {
let columns = row_group
.columns_under_root_iter(field_name)
.unwrap()
.rev()
.collect::<UnitVec<_>>();
ColumnIterator::new(reader, columns, max_page_size)
Expand Down
2 changes: 2 additions & 0 deletions crates/polars-pipe/src/executors/sources/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ impl ParquetSource {
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)?
.with_row_index(file_options.row_index)
.with_predicate(predicate.clone())
Expand Down Expand Up @@ -199,6 +200,7 @@ impl ParquetSource {
.with_arrow_schema_projection(
&self.first_schema,
self.projected_arrow_schema.as_deref(),
self.file_options.allow_missing_columns,
)
.await?
.with_predicate(predicate.clone())
Expand Down
5 changes: 5 additions & 0 deletions crates/polars-plan/src/plans/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl DslBuilder {
},
glob: false,
include_file_paths: None,
allow_missing_columns: false,
};

Ok(DslPlan::Scan {
Expand Down Expand Up @@ -87,6 +88,7 @@ impl DslBuilder {
hive_options: HiveOptions,
glob: bool,
include_file_paths: Option<PlSmallStr>,
allow_missing_columns: bool,
) -> PolarsResult<Self> {
let options = FileScanOptions {
with_columns: None,
Expand All @@ -98,6 +100,7 @@ impl DslBuilder {
hive_options,
glob,
include_file_paths,
allow_missing_columns,
};
Ok(DslPlan::Scan {
sources,
Expand Down Expand Up @@ -143,6 +146,7 @@ impl DslBuilder {
hive_options,
glob: true,
include_file_paths,
allow_missing_columns: false,
},
scan_type: FileScan::Ipc {
options,
Expand Down Expand Up @@ -181,6 +185,7 @@ impl DslBuilder {
},
glob,
include_file_paths,
allow_missing_columns: false,
};
Ok(DslPlan::Scan {
sources,
Expand Down
1 change: 1 addition & 0 deletions crates/polars-plan/src/plans/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub struct FileScanOptions {
pub hive_options: HiveOptions,
pub glob: bool,
pub include_file_paths: Option<PlSmallStr>,
pub allow_missing_columns: bool,
}

#[derive(Clone, Debug, Copy, Default, Eq, PartialEq, Hash)]
Expand Down
4 changes: 3 additions & 1 deletion crates/polars-python/src/lazyframe/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ impl PyLazyFrame {
#[cfg(feature = "parquet")]
#[staticmethod]
#[pyo3(signature = (source, sources, n_rows, cache, parallel, rechunk, row_index,
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths)
low_memory, cloud_options, use_statistics, hive_partitioning, hive_schema, try_parse_hive_dates, retries, glob, include_file_paths, allow_missing_columns)
)]
fn new_from_parquet(
source: Option<PyObject>,
Expand All @@ -259,6 +259,7 @@ impl PyLazyFrame {
retries: usize,
glob: bool,
include_file_paths: Option<String>,
allow_missing_columns: bool,
) -> PyResult<Self> {
let parallel = parallel.0;
let hive_schema = hive_schema.map(|s| Arc::new(s.0));
Expand Down Expand Up @@ -287,6 +288,7 @@ impl PyLazyFrame {
hive_options,
glob,
include_file_paths: include_file_paths.map(|x| x.into()),
allow_missing_columns,
};

let sources = sources.0;
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-python/src/lazyframe/visit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl NodeTraverser {
// Increment major on breaking changes to the IR (e.g. renaming
// fields, reordering tuples), minor on backwards compatible
// changes (e.g. exposing a new expression node).
const VERSION: Version = (2, 0);
const VERSION: Version = (2, 1);

pub fn new(root: Node, lp_arena: Arena<IR>, expr_arena: Arena<AExpr>) -> Self {
Self {
Expand Down
Loading
Loading