Skip to content

Commit

Permalink
fix: drop-nulls edge case; remove drop-nulls special case (#15815)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 authored Apr 21, 2024
1 parent efa92f9 commit 522e659
Show file tree
Hide file tree
Showing 9 changed files with 26 additions and 159 deletions.
2 changes: 1 addition & 1 deletion crates/polars-plan/src/dsl/functions/horizontal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn all_horizontal<E: AsRef<[Expr]>>(exprs: E) -> PolarsResult<Expr> {
polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown");

// We prefer this path as the optimizer can better deal with the binary operations.
// However if we have a single expression, we might loose information.
// However if we have a single expression, we might lose information.
// E.g. `all().is_null()` would reduce to `all().is_null()` (the & is not needed as there is no rhs (yet)
// And upon expansion, it becomes
// `col(i).is_null() for i in len(df))`
Expand Down
14 changes: 13 additions & 1 deletion crates/polars-plan/src/logical_plan/builder_dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,19 @@ impl DslBuilder {
}

pub fn drop_nulls(self, subset: Option<Vec<Expr>>) -> Self {
self.map_private(DslFunction::DropNulls(subset))
if let Some(subset) = subset {
self.filter(
all_horizontal(
subset
.into_iter()
.map(|v| v.is_not_null())
.collect::<Vec<_>>(),
)
.unwrap(),
)
} else {
self.filter(all_horizontal([all().is_not_null()]).unwrap())
}
}

pub fn fill_nan(self, fill_value: Expr) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,24 +344,6 @@ pub fn to_alp(
},
}
},
DslFunction::DropNulls(subset) => {
let predicate = match subset {
None => all_horizontal([col("*").is_not_null()]),
Some(subset) => all_horizontal(
subset
.into_iter()
.map(|e| e.is_not_null())
.collect::<Vec<_>>(),
),
}
.map_err(|e| e.context(failed_here!(drop_nulls)))?;
let predicate = rewrite_projections(vec![predicate], &input_schema, &[])
.map_err(|e| e.context(failed_here!(drop_nulls)))?
.pop()
.unwrap();
let predicate = to_expr_ir(predicate, expr_arena);
IR::Filter { predicate, input }
},
DslFunction::Drop(to_drop) => {
let mut output_schema =
Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len()));
Expand Down
31 changes: 1 addition & 30 deletions crates/polars-plan/src/logical_plan/functions/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub enum DslFunction {
Stats(StatsFunction),
/// FillValue
FillNan(Expr),
DropNulls(Option<Vec<Expr>>),
Drop(PlHashSet<String>),
}

Expand Down Expand Up @@ -93,10 +92,7 @@ impl DslFunction {
schema: Default::default(),
}
},
DslFunction::Stats(_)
| DslFunction::FillNan(_)
| DslFunction::DropNulls(_)
| DslFunction::Drop(_) => {
DslFunction::Stats(_) | DslFunction::FillNan(_) | DslFunction::Drop(_) => {
// We should not reach this.
panic!("impl error")
},
Expand All @@ -121,32 +117,7 @@ impl Display for DslFunction {
RowIndex { .. } => write!(f, "WITH ROW INDEX"),
Stats(_) => write!(f, "STATS"),
FillNan(_) => write!(f, "FILL NAN"),
DropNulls(_) => write!(f, "DROP NULLS"),
Drop(_) => write!(f, "DROP"),
// DropNulls { subset } => {
// write!(f, "DROP_NULLS by: ")?;
// let subset = subset.as_ref();
// fmt_column_delimited(f, subset, "[", "]")
// },
// Rechunk => write!(f, "RECHUNK"),
// Count { .. } => write!(f, "FAST COUNT(*)"),
// Unnest { columns } => {
// write!(f, "UNNEST by:")?;
// let columns = columns.as_ref();
// fmt_column_delimited(f, columns, "[", "]")
// },
// #[cfg(feature = "merge_sorted")]
// MergeSorted { .. } => write!(f, "MERGE SORTED"),
// Pipeline { original, .. } => {
// if let Some(original) = original {
// writeln!(f, "--- STREAMING")?;
// write!(f, "{:?}", original.as_ref())?;
// let indent = 2;
// writeln!(f, "{:indent$}--- END STREAMING", "")
// } else {
// writeln!(f, "STREAMING")
// }
// },
Rename { .. } => write!(f, "RENAME"),
}
}
Expand Down
25 changes: 3 additions & 22 deletions crates/polars-plan/src/logical_plan/functions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ pub enum FunctionNode {
Unnest {
columns: Arc<[Arc<str>]>,
},
DropNulls {
subset: Arc<[Arc<str>]>,
},
Rechunk,
// The two DataFrames are temporary concatenated
// this indicates until which chunk the data is from the left df
Expand Down Expand Up @@ -112,7 +109,6 @@ impl PartialEq for FunctionNode {
fn eq(&self, other: &Self) -> bool {
use FunctionNode::*;
match (self, other) {
(DropNulls { subset: l }, DropNulls { subset: r }) => l == r,
(Rechunk, Rechunk) => true,
(Count { paths: paths_l, .. }, Count { paths: paths_r, .. }) => paths_l == paths_r,
(
Expand Down Expand Up @@ -155,7 +151,6 @@ impl Hash for FunctionNode {
},
FunctionNode::Pipeline { .. } => {},
FunctionNode::Unnest { columns } => columns.hash(state),
FunctionNode::DropNulls { subset } => subset.hash(state),
FunctionNode::Rechunk => {},
#[cfg(feature = "merge_sorted")]
FunctionNode::MergeSorted { column } => column.hash(state),
Expand Down Expand Up @@ -190,9 +185,7 @@ impl FunctionNode {
Rechunk | Pipeline { .. } => false,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => false,
DropNulls { .. } | Count { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => {
true
},
Count { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true,
Melt { args, .. } => args.streamable,
Opaque { streamable, .. } => *streamable,
#[cfg(feature = "python")]
Expand All @@ -218,12 +211,7 @@ impl FunctionNode {
Opaque { predicate_pd, .. } => *predicate_pd,
#[cfg(feature = "python")]
OpaquePython { predicate_pd, .. } => *predicate_pd,
DropNulls { .. }
| Rechunk
| Unnest { .. }
| Rename { .. }
| Explode { .. }
| Melt { .. } => true,
Rechunk | Unnest { .. } | Rename { .. } | Explode { .. } | Melt { .. } => true,
#[cfg(feature = "merge_sorted")]
MergeSorted { .. } => true,
RowIndex { .. } | Count { .. } => false,
Expand All @@ -237,8 +225,7 @@ impl FunctionNode {
Opaque { projection_pd, .. } => *projection_pd,
#[cfg(feature = "python")]
OpaquePython { projection_pd, .. } => *projection_pd,
DropNulls { .. }
| Rechunk
Rechunk
| Count { .. }
| Unnest { .. }
| Rename { .. }
Expand Down Expand Up @@ -273,7 +260,6 @@ impl FunctionNode {
schema,
..
} => python_udf::call_python_udf(function, df, *validate_output, schema.as_deref()),
DropNulls { subset } => df.drop_nulls(Some(subset.as_ref())),
Count {
paths, scan_type, ..
} => count::count_rows(paths, scan_type),
Expand Down Expand Up @@ -330,11 +316,6 @@ impl Display for FunctionNode {
Opaque { fmt_str, .. } => write!(f, "{fmt_str}"),
#[cfg(feature = "python")]
OpaquePython { .. } => write!(f, "python dataframe udf"),
DropNulls { subset } => {
write!(f, "DROP_NULLS by: ")?;
let subset = subset.as_ref();
fmt_column_delimited(f, subset, "[", "]")
},
Rechunk => write!(f, "RECHUNK"),
Count { .. } => write!(f, "FAST COUNT(*)"),
Unnest { columns } => {
Expand Down
1 change: 0 additions & 1 deletion crates/polars-plan/src/logical_plan/functions/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ impl FunctionNode {
.map(|schema| Cow::Owned(schema.clone()))
.unwrap_or_else(|| Cow::Borrowed(input_schema))),
Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())),
DropNulls { .. } => Ok(Cow::Borrowed(input_schema)),
Count { alias, .. } => {
let mut schema: Schema = Schema::with_capacity(1);
let name = SmartString::from(
Expand Down
83 changes: 0 additions & 83 deletions crates/polars-plan/src/logical_plan/optimizer/drop_nulls.rs

This file was deleted.

3 changes: 0 additions & 3 deletions crates/polars-plan/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::prelude::*;

mod cache_states;
mod delay_rechunk;
mod drop_nulls;

mod collapse_and_project;
mod collect_members;
Expand All @@ -25,7 +24,6 @@ mod type_coercion;

use collapse_and_project::SimpleProjectionAndCollapse;
use delay_rechunk::DelayRechunk;
use drop_nulls::ReplaceDropNulls;
use polars_core::config::verbose;
use polars_io::predicates::PhysicalIoExpr;
pub use predicate_pushdown::PredicatePushDown;
Expand Down Expand Up @@ -181,7 +179,6 @@ pub fn optimize(
rules.push(Box::new(SimplifyBooleanRule {}));
}

rules.push(Box::new(ReplaceDropNulls {}));
if !eager {
rules.push(Box::new(FlattenUnionRule {}));
}
Expand Down
8 changes: 8 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
Expand Up @@ -715,3 +715,11 @@ def test_cse_manual_cache_15688() -> None:
"id": [3],
"foo": [1],
}


def test_cse_drop_nulls_15795() -> None:
A = pl.LazyFrame({"X": 1})
B = pl.LazyFrame({"X": 1, "Y": 0}).filter(pl.col("Y").is_not_null())
C = A.join(B, on="X").select("X")
D = B.select("X")
assert C.join(D, on="X").collect().shape == (1, 1)

0 comments on commit 522e659

Please sign in to comment.