diff --git a/crates/polars-plan/src/dsl/functions/horizontal.rs b/crates/polars-plan/src/dsl/functions/horizontal.rs index c713eb813068..13837fc59796 100644 --- a/crates/polars-plan/src/dsl/functions/horizontal.rs +++ b/crates/polars-plan/src/dsl/functions/horizontal.rs @@ -197,7 +197,7 @@ pub fn all_horizontal>(exprs: E) -> PolarsResult { polars_ensure!(!exprs.is_empty(), ComputeError: "cannot return empty fold because the number of output rows is unknown"); // We prefer this path as the optimizer can better deal with the binary operations. - // However if we have a single expression, we might loose information. + // However if we have a single expression, we might lose information. // E.g. `all().is_null()` would reduce to `all().is_null()` (the & is not needed as there is no rhs (yet) // And upon expansion, it becomes // `col(i).is_null() for i in len(df))` diff --git a/crates/polars-plan/src/logical_plan/builder_dsl.rs b/crates/polars-plan/src/logical_plan/builder_dsl.rs index 8887c2f55df2..5c7f032fbe80 100644 --- a/crates/polars-plan/src/logical_plan/builder_dsl.rs +++ b/crates/polars-plan/src/logical_plan/builder_dsl.rs @@ -277,7 +277,19 @@ impl DslBuilder { } pub fn drop_nulls(self, subset: Option>) -> Self { - self.map_private(DslFunction::DropNulls(subset)) + if let Some(subset) = subset { + self.filter( + all_horizontal( + subset + .into_iter() + .map(|v| v.is_not_null()) + .collect::>(), + ) + .unwrap(), + ) + } else { + self.filter(all_horizontal([all().is_not_null()]).unwrap()) + } } pub fn fill_nan(self, fill_value: Expr) -> Self { diff --git a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs index f13b705dcc6a..32ca8966fadd 100644 --- a/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs +++ b/crates/polars-plan/src/logical_plan/conversion/dsl_plan_to_ir_plan.rs @@ -344,24 +344,6 @@ pub fn to_alp( }, } }, - DslFunction::DropNulls(subset) => { - let predicate = match subset { - None => all_horizontal([col("*").is_not_null()]), - Some(subset) => all_horizontal( - subset - .into_iter() - .map(|e| e.is_not_null()) - .collect::>(), - ), - } - .map_err(|e| e.context(failed_here!(drop_nulls)))?; - let predicate = rewrite_projections(vec![predicate], &input_schema, &[]) - .map_err(|e| e.context(failed_here!(drop_nulls)))? - .pop() - .unwrap(); - let predicate = to_expr_ir(predicate, expr_arena); - IR::Filter { predicate, input } - }, DslFunction::Drop(to_drop) => { let mut output_schema = Schema::with_capacity(input_schema.len().saturating_sub(to_drop.len())); diff --git a/crates/polars-plan/src/logical_plan/functions/dsl.rs b/crates/polars-plan/src/logical_plan/functions/dsl.rs index 4f362075cdd5..849555d28003 100644 --- a/crates/polars-plan/src/logical_plan/functions/dsl.rs +++ b/crates/polars-plan/src/logical_plan/functions/dsl.rs @@ -23,7 +23,6 @@ pub enum DslFunction { Stats(StatsFunction), /// FillValue FillNan(Expr), - DropNulls(Option>), Drop(PlHashSet), } @@ -93,10 +92,7 @@ impl DslFunction { schema: Default::default(), } }, - DslFunction::Stats(_) - | DslFunction::FillNan(_) - | DslFunction::DropNulls(_) - | DslFunction::Drop(_) => { + DslFunction::Stats(_) | DslFunction::FillNan(_) | DslFunction::Drop(_) => { // We should not reach this. panic!("impl error") }, @@ -121,32 +117,7 @@ impl Display for DslFunction { RowIndex { .. } => write!(f, "WITH ROW INDEX"), Stats(_) => write!(f, "STATS"), FillNan(_) => write!(f, "FILL NAN"), - DropNulls(_) => write!(f, "DROP NULLS"), Drop(_) => write!(f, "DROP"), - // DropNulls { subset } => { - // write!(f, "DROP_NULLS by: ")?; - // let subset = subset.as_ref(); - // fmt_column_delimited(f, subset, "[", "]") - // }, - // Rechunk => write!(f, "RECHUNK"), - // Count { .. } => write!(f, "FAST COUNT(*)"), - // Unnest { columns } => { - // write!(f, "UNNEST by:")?; - // let columns = columns.as_ref(); - // fmt_column_delimited(f, columns, "[", "]") - // }, - // #[cfg(feature = "merge_sorted")] - // MergeSorted { .. } => write!(f, "MERGE SORTED"), - // Pipeline { original, .. } => { - // if let Some(original) = original { - // writeln!(f, "--- STREAMING")?; - // write!(f, "{:?}", original.as_ref())?; - // let indent = 2; - // writeln!(f, "{:indent$}--- END STREAMING", "") - // } else { - // writeln!(f, "STREAMING") - // } - // }, Rename { .. } => write!(f, "RENAME"), } } diff --git a/crates/polars-plan/src/logical_plan/functions/mod.rs b/crates/polars-plan/src/logical_plan/functions/mod.rs index 189c7ed46bd1..7de90431ab17 100644 --- a/crates/polars-plan/src/logical_plan/functions/mod.rs +++ b/crates/polars-plan/src/logical_plan/functions/mod.rs @@ -66,9 +66,6 @@ pub enum FunctionNode { Unnest { columns: Arc<[Arc]>, }, - DropNulls { - subset: Arc<[Arc]>, - }, Rechunk, // The two DataFrames are temporary concatenated // this indicates until which chunk the data is from the left df @@ -112,7 +109,6 @@ impl PartialEq for FunctionNode { fn eq(&self, other: &Self) -> bool { use FunctionNode::*; match (self, other) { - (DropNulls { subset: l }, DropNulls { subset: r }) => l == r, (Rechunk, Rechunk) => true, (Count { paths: paths_l, .. }, Count { paths: paths_r, .. }) => paths_l == paths_r, ( @@ -155,7 +151,6 @@ impl Hash for FunctionNode { }, FunctionNode::Pipeline { .. } => {}, FunctionNode::Unnest { columns } => columns.hash(state), - FunctionNode::DropNulls { subset } => subset.hash(state), FunctionNode::Rechunk => {}, #[cfg(feature = "merge_sorted")] FunctionNode::MergeSorted { column } => column.hash(state), @@ -190,9 +185,7 @@ impl FunctionNode { Rechunk | Pipeline { .. } => false, #[cfg(feature = "merge_sorted")] MergeSorted { .. } => false, - DropNulls { .. } | Count { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => { - true - }, + Count { .. } | Unnest { .. } | Rename { .. } | Explode { .. } => true, Melt { args, .. } => args.streamable, Opaque { streamable, .. } => *streamable, #[cfg(feature = "python")] @@ -218,12 +211,7 @@ impl FunctionNode { Opaque { predicate_pd, .. } => *predicate_pd, #[cfg(feature = "python")] OpaquePython { predicate_pd, .. } => *predicate_pd, - DropNulls { .. } - | Rechunk - | Unnest { .. } - | Rename { .. } - | Explode { .. } - | Melt { .. } => true, + Rechunk | Unnest { .. } | Rename { .. } | Explode { .. } | Melt { .. } => true, #[cfg(feature = "merge_sorted")] MergeSorted { .. } => true, RowIndex { .. } | Count { .. } => false, @@ -237,8 +225,7 @@ impl FunctionNode { Opaque { projection_pd, .. } => *projection_pd, #[cfg(feature = "python")] OpaquePython { projection_pd, .. } => *projection_pd, - DropNulls { .. } - | Rechunk + Rechunk | Count { .. } | Unnest { .. } | Rename { .. } @@ -273,7 +260,6 @@ impl FunctionNode { schema, .. } => python_udf::call_python_udf(function, df, *validate_output, schema.as_deref()), - DropNulls { subset } => df.drop_nulls(Some(subset.as_ref())), Count { paths, scan_type, .. } => count::count_rows(paths, scan_type), @@ -330,11 +316,6 @@ impl Display for FunctionNode { Opaque { fmt_str, .. } => write!(f, "{fmt_str}"), #[cfg(feature = "python")] OpaquePython { .. } => write!(f, "python dataframe udf"), - DropNulls { subset } => { - write!(f, "DROP_NULLS by: ")?; - let subset = subset.as_ref(); - fmt_column_delimited(f, subset, "[", "]") - }, Rechunk => write!(f, "RECHUNK"), Count { .. } => write!(f, "FAST COUNT(*)"), Unnest { columns } => { diff --git a/crates/polars-plan/src/logical_plan/functions/schema.rs b/crates/polars-plan/src/logical_plan/functions/schema.rs index ba6e7acdfc4c..c534f16e2035 100644 --- a/crates/polars-plan/src/logical_plan/functions/schema.rs +++ b/crates/polars-plan/src/logical_plan/functions/schema.rs @@ -38,7 +38,6 @@ impl FunctionNode { .map(|schema| Cow::Owned(schema.clone())) .unwrap_or_else(|| Cow::Borrowed(input_schema))), Pipeline { schema, .. } => Ok(Cow::Owned(schema.clone())), - DropNulls { .. } => Ok(Cow::Borrowed(input_schema)), Count { alias, .. } => { let mut schema: Schema = Schema::with_capacity(1); let name = SmartString::from( diff --git a/crates/polars-plan/src/logical_plan/optimizer/drop_nulls.rs b/crates/polars-plan/src/logical_plan/optimizer/drop_nulls.rs deleted file mode 100644 index 420ffff925e2..000000000000 --- a/crates/polars-plan/src/logical_plan/optimizer/drop_nulls.rs +++ /dev/null @@ -1,83 +0,0 @@ -use super::*; -use crate::logical_plan::iterator::*; - -/// If we realize that a predicate drops nulls on a subset -/// we replace it with an explicit df.drop_nulls call, as this -/// has a fast path for the no null case -pub(super) struct ReplaceDropNulls {} - -impl OptimizationRule for ReplaceDropNulls { - fn optimize_plan( - &mut self, - lp_arena: &mut Arena, - expr_arena: &mut Arena, - node: Node, - ) -> Option { - let lp = lp_arena.get(node); - - use IR::*; - match lp { - Filter { input, predicate } => { - // We want to make sure we find this pattern - // A != null AND B != null AND C != null .. etc. - // the outer expression always is a binary and operation and the inner - let iter = (&*expr_arena).iter(predicate.node()); - let is_binary_and = |e: &AExpr| { - matches!( - e, - &AExpr::BinaryExpr { - op: Operator::And, - .. - } - ) - }; - let is_not_null = |e: &AExpr| { - matches!( - e, - &AExpr::Function { - function: FunctionExpr::Boolean(BooleanFunction::IsNotNull), - .. - } - ) - }; - let is_column = |e: &AExpr| matches!(e, &AExpr::Column(_)); - let is_lit_true = - |e: &AExpr| matches!(e, &AExpr::Literal(LiteralValue::Boolean(true))); - - let mut binary_and_count = 0; - let mut not_null_count = 0; - let mut column_count = 0; - for (_, e) in iter { - if is_binary_and(e) { - binary_and_count += 1; - } else if is_column(e) { - column_count += 1; - } else if is_not_null(e) { - not_null_count += 1; - } else if is_lit_true(e) { - // do nothing - } else { - // only expected - // - binary and - // - column - // - is not null - // - lit true - // so we can return early - return None; - } - } - if not_null_count == column_count && binary_and_count < column_count { - let subset = Arc::from(aexpr_to_leaf_names(predicate.node(), expr_arena)); - - Some(IR::MapFunction { - input: *input, - function: FunctionNode::DropNulls { subset }, - }) - } else { - None - } - }, - _ => None, - } - } -} diff --git a/crates/polars-plan/src/logical_plan/optimizer/mod.rs b/crates/polars-plan/src/logical_plan/optimizer/mod.rs index c465bf6651d0..250eac00484f 100644 --- a/crates/polars-plan/src/logical_plan/optimizer/mod.rs +++ b/crates/polars-plan/src/logical_plan/optimizer/mod.rs @@ -4,7 +4,6 @@ use crate::prelude::*; mod cache_states; mod delay_rechunk; -mod drop_nulls; mod collapse_and_project; mod collect_members; @@ -25,7 +24,6 @@ mod type_coercion; use collapse_and_project::SimpleProjectionAndCollapse; use delay_rechunk::DelayRechunk; -use drop_nulls::ReplaceDropNulls; use polars_core::config::verbose; use polars_io::predicates::PhysicalIoExpr; pub use predicate_pushdown::PredicatePushDown; @@ -181,7 +179,6 @@ pub fn optimize( rules.push(Box::new(SimplifyBooleanRule {})); } - rules.push(Box::new(ReplaceDropNulls {})); if !eager { rules.push(Box::new(FlattenUnionRule {})); } diff --git a/py-polars/tests/unit/test_cse.py b/py-polars/tests/unit/test_cse.py index db9ba46e0b7f..e0668768e42f 100644 --- a/py-polars/tests/unit/test_cse.py +++ b/py-polars/tests/unit/test_cse.py @@ -715,3 +715,11 @@ def test_cse_manual_cache_15688() -> None: "id": [3], "foo": [1], } + + +def test_cse_drop_nulls_15795() -> None: + A = pl.LazyFrame({"X": 1}) + B = pl.LazyFrame({"X": 1, "Y": 0}).filter(pl.col("Y").is_not_null()) + C = A.join(B, on="X").select("X") + D = B.select("X") + assert C.join(D, on="X").collect().shape == (1, 1)