diff --git a/crates/polars-core/src/datatypes/field.rs b/crates/polars-core/src/datatypes/field.rs index f3bc3571505c..b85caeec0a2e 100644 --- a/crates/polars-core/src/datatypes/field.rs +++ b/crates/polars-core/src/datatypes/field.rs @@ -165,7 +165,7 @@ impl DataType { ArrowDataType::Extension(name, _, _) if name.as_str() == "POLARS_EXTENSION_TYPE" => { #[cfg(feature = "object")] { - DataType::Object("extension", None) + DataType::Object("object", None) } #[cfg(not(feature = "object"))] { diff --git a/crates/polars-expr/src/expressions/apply.rs b/crates/polars-expr/src/expressions/apply.rs index f9993473099a..fca35b589e65 100644 --- a/crates/polars-expr/src/expressions/apply.rs +++ b/crates/polars-expr/src/expressions/apply.rs @@ -23,11 +23,11 @@ pub struct ApplyExpr { function_operates_on_scalar: bool, allow_rename: bool, pass_name_to_apply: bool, - input_schema: Option, + input_schema: SchemaRef, allow_threading: bool, check_lengths: bool, allow_group_aware: bool, - output_dtype: Option, + output_field: Field, } impl ApplyExpr { @@ -38,8 +38,8 @@ impl ApplyExpr { expr: Expr, options: FunctionOptions, allow_threading: bool, - input_schema: Option, - output_dtype: Option, + input_schema: SchemaRef, + output_field: Field, returns_scalar: bool, ) -> Self { #[cfg(debug_assertions)] @@ -62,30 +62,7 @@ impl ApplyExpr { allow_threading, check_lengths: options.check_lengths(), allow_group_aware: options.flags.contains(FunctionFlags::ALLOW_GROUP_AWARE), - output_dtype, - } - } - - pub(crate) fn new_minimal( - inputs: Vec>, - function: SpecialEq>, - expr: Expr, - collect_groups: ApplyOptions, - ) -> Self { - Self { - inputs, - function, - expr, - collect_groups, - function_returns_scalar: false, - function_operates_on_scalar: false, - allow_rename: false, - pass_name_to_apply: false, - input_schema: None, - allow_threading: true, - check_lengths: true, - allow_group_aware: true, - output_dtype: None, + output_field, } } @@ -123,11 +100,8 @@ impl ApplyExpr { Ok(ac) } - fn get_input_schema(&self, df: &DataFrame) -> Cow { - match &self.input_schema { - Some(schema) => Cow::Borrowed(schema.as_ref()), - None => Cow::Owned(df.schema()), - } + fn get_input_schema(&self, _df: &DataFrame) -> Cow { + Cow::Borrowed(self.input_schema.as_ref()) } /// Evaluates and flattens `Option` to `Column`. @@ -135,7 +109,7 @@ impl ApplyExpr { if let Some(out) = self.function.call_udf(inputs)? { Ok(out) } else { - let field = self.to_field(self.input_schema.as_ref().unwrap()).unwrap(); + let field = self.to_field(self.input_schema.as_ref()).unwrap(); Ok(Column::full_null(field.name().clone(), 1, field.dtype())) } } @@ -179,9 +153,11 @@ impl ApplyExpr { }; let ca: ListChunked = if self.allow_threading { - let dtype = match &self.output_dtype { - Some(dtype) if dtype.is_known() && !dtype.is_null() => Some(dtype.clone()), - _ => None, + let dtype = if self.output_field.dtype.is_known() && !self.output_field.dtype.is_null() + { + Some(self.output_field.dtype.clone()) + } else { + None }; let lst = agg.list().unwrap(); @@ -287,6 +263,7 @@ impl ApplyExpr { } builder.finish() } else { + // We still need this branch to materialize unknown/ data dependent types in eager. :( (0..len) .map(|_| { container.clear(); @@ -303,6 +280,13 @@ impl ApplyExpr { .collect::>()? .with_name(field.name.clone()) }; + #[cfg(debug_assertions)] + { + let inner = ca.dtype().inner_dtype().unwrap(); + if field.dtype.is_known() { + assert_eq!(inner, &field.dtype); + } + } drop(iters); diff --git a/crates/polars-expr/src/expressions/column.rs b/crates/polars-expr/src/expressions/column.rs index 6bac214f140c..8a59d6c25ddb 100644 --- a/crates/polars-expr/src/expressions/column.rs +++ b/crates/polars-expr/src/expressions/column.rs @@ -9,11 +9,11 @@ use crate::expressions::{AggregationContext, PartitionedAggregation, PhysicalExp pub struct ColumnExpr { name: PlSmallStr, expr: Expr, - schema: Option, + schema: SchemaRef, } impl ColumnExpr { - pub fn new(name: PlSmallStr, expr: Expr, schema: Option) -> Self { + pub fn new(name: PlSmallStr, expr: Expr, schema: SchemaRef) -> Self { Self { name, expr, schema } } } @@ -141,42 +141,37 @@ impl PhysicalExpr for ColumnExpr { Some(&self.expr) } fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult { - let out = match &self.schema { - None => self.process_by_linear_search(df, state, false), - Some(schema) => { - match schema.get_full(&self.name) { - Some((idx, _, _)) => { - // check if the schema was correct - // if not do O(n) search - match df.get_columns().get(idx) { - Some(out) => self.process_by_idx( - out.as_materialized_series(), - state, - schema, - df, - true, - ), - None => { - // partitioned group_by special case - if let Some(schema) = state.get_schema() { - self.process_from_state_schema(df, state, &schema) - } else { - self.process_by_linear_search(df, state, true) - } - }, - } - }, - // in the future we will throw an error here - // now we do a linear search first as the lazy reported schema may still be incorrect - // in debug builds we panic so that it can be fixed when occurring + let out = match self.schema.get_full(&self.name) { + Some((idx, _, _)) => { + // check if the schema was correct + // if not do O(n) search + match df.get_columns().get(idx) { + Some(out) => self.process_by_idx( + out.as_materialized_series(), + state, + &self.schema, + df, + true, + ), None => { - if self.name.starts_with(CSE_REPLACED) { - return self.process_cse(df, schema); + // partitioned group_by special case + if let Some(schema) = state.get_schema() { + self.process_from_state_schema(df, state, &schema) + } else { + self.process_by_linear_search(df, state, true) } - self.process_by_linear_search(df, state, true) }, } }, + // in the future we will throw an error here + // now we do a linear search first as the lazy reported schema may still be incorrect + // in debug builds we panic so that it can be fixed when occurring + None => { + if self.name.starts_with(CSE_REPLACED) { + return self.process_cse(df, &self.schema); + } + self.process_by_linear_search(df, state, true) + }, }; self.check_external_context(out, state) } diff --git a/crates/polars-expr/src/planner.rs b/crates/polars-expr/src/planner.rs index 8878b420af02..620f8bf87089 100644 --- a/crates/polars-expr/src/planner.rs +++ b/crates/polars-expr/src/planner.rs @@ -25,7 +25,7 @@ pub fn create_physical_expressions_from_irs( exprs: &[ExprIR], context: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, ) -> PolarsResult>> { create_physical_expressions_check_state(exprs, context, expr_arena, schema, state, ok_checker) @@ -35,7 +35,7 @@ pub(crate) fn create_physical_expressions_check_state( exprs: &[ExprIR], context: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, checker: F, ) -> PolarsResult>> @@ -57,7 +57,7 @@ pub(crate) fn create_physical_expressions_from_nodes( exprs: &[Node], context: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, ) -> PolarsResult>> { create_physical_expressions_from_nodes_check_state( @@ -69,7 +69,7 @@ pub(crate) fn create_physical_expressions_from_nodes_check_state( exprs: &[Node], context: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, checker: F, ) -> PolarsResult>> @@ -165,7 +165,7 @@ pub fn create_physical_expr( expr_ir: &ExprIR, ctxt: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, ) -> PolarsResult> { let phys_expr = create_physical_expr_inner(expr_ir.node(), ctxt, expr_arena, schema, state)?; @@ -185,7 +185,7 @@ fn create_physical_expr_inner( expression: Node, ctxt: Context, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, state: &mut ExpressionConversionState, ) -> PolarsResult> { use AExpr::*; @@ -309,7 +309,7 @@ fn create_physical_expr_inner( Column(column) => Ok(Arc::new(ColumnExpr::new( column.clone(), node_to_expr(expression, expr_arena), - schema.cloned(), + schema.clone(), ))), Sort { expr, options } => { let phys_expr = create_physical_expr_inner(*expr, ctxt, expr_arena, schema, state)?; @@ -410,22 +410,18 @@ fn create_physical_expr_inner( return Ok(Arc::new(AggQuantileExpr::new(input, quantile, *interpol))); } - let field = schema - .map(|schema| { - expr_arena.get(expression).to_field( - schema, - Context::Aggregation, - expr_arena, - ) - }) - .transpose()?; + let field = expr_arena.get(expression).to_field( + schema, + Context::Aggregation, + expr_arena, + )?; let groupby = GroupByMethod::from(agg.clone()); let agg_type = AggregationType { groupby, allow_threading: false, }; - Ok(Arc::new(AggregationExpr::new(input, agg_type, field))) + Ok(Arc::new(AggregationExpr::new(input, agg_type, Some(field)))) }, } }, @@ -475,12 +471,10 @@ fn create_physical_expr_inner( options, } => { let is_scalar = is_scalar_ae(expression, expr_arena); - let output_dtype = schema.and_then(|schema| { + let output_dtype = expr_arena .get(expression) - .to_dtype(schema, Context::Default, expr_arena) - .ok() - }); + .to_field(schema, Context::Default, expr_arena)?; let is_reducing_aggregation = options.flags.contains(FunctionFlags::RETURNS_SCALAR) && matches!(options.collect_groups, ApplyOptions::GroupWise); @@ -504,7 +498,7 @@ fn create_physical_expr_inner( node_to_expr(expression, expr_arena), *options, state.allow_threading, - schema.cloned(), + schema.clone(), output_dtype, is_scalar, ))) @@ -516,12 +510,10 @@ fn create_physical_expr_inner( .. } => { let is_scalar = is_scalar_ae(expression, expr_arena); - let output_dtype = schema.and_then(|schema| { + let output_field = expr_arena .get(expression) - .to_dtype(schema, Context::Default, expr_arena) - .ok() - }); + .to_field(schema, Context::Default, expr_arena)?; let is_reducing_aggregation = options.flags.contains(FunctionFlags::RETURNS_SCALAR) && matches!(options.collect_groups, ApplyOptions::GroupWise); // Will be reset in the function so get that here. @@ -544,8 +536,8 @@ fn create_physical_expr_inner( node_to_expr(expression, expr_arena), *options, state.allow_threading, - schema.cloned(), - output_dtype, + schema.clone(), + output_field, is_scalar, ))) }, @@ -570,11 +562,25 @@ fn create_physical_expr_inner( let function = SpecialEq::new(Arc::new( move |c: &mut [polars_core::frame::column::Column]| c[0].explode().map(Some), ) as Arc); - Ok(Arc::new(ApplyExpr::new_minimal( + + let field = expr_arena + .get(expression) + .to_field(schema, ctxt, expr_arena)?; + Ok(Arc::new(ApplyExpr::new( vec![input], function, node_to_expr(expression, expr_arena), - ApplyOptions::GroupWise, + FunctionOptions { + collect_groups: ApplyOptions::GroupWise, + fmt_str: "", + cast_to_supertypes: None, + check_lengths: Default::default(), + flags: Default::default(), + }, + state.allow_threading, + schema.clone(), + field, + false, ))) }, Alias(input, name) => { diff --git a/crates/polars-lazy/src/dsl/eval.rs b/crates/polars-lazy/src/dsl/eval.rs index 460889ef01a6..b94295ac12e4 100644 --- a/crates/polars-lazy/src/dsl/eval.rs +++ b/crates/polars-lazy/src/dsl/eval.rs @@ -52,6 +52,7 @@ pub trait ExprEvalExtension: IntoExpr + Sized { // Ensure we get the new schema. let output_field = eval_field_to_dtype(c.field().as_ref(), &expr, false); + let schema = Arc::new(Schema::from_iter(std::iter::once(output_field.clone()))); let expr = expr.clone(); let mut arena = Arena::with_capacity(10); @@ -60,7 +61,7 @@ pub trait ExprEvalExtension: IntoExpr + Sized { &aexpr, Context::Default, &arena, - None, + &schema, &mut ExpressionConversionState::new(true, 0), )?; diff --git a/crates/polars-lazy/src/frame/mod.rs b/crates/polars-lazy/src/frame/mod.rs index 92efbd1d2173..fd4334cad066 100644 --- a/crates/polars-lazy/src/frame/mod.rs +++ b/crates/polars-lazy/src/frame/mod.rs @@ -612,12 +612,12 @@ impl LazyFrame { lp_arena, expr_arena, scratch, - Some(&|expr, expr_arena| { + Some(&|expr, expr_arena, schema| { let phys_expr = create_physical_expr( expr, Context::Default, expr_arena, - None, + schema, &mut ExpressionConversionState::new(true, 0), ) .ok()?; diff --git a/crates/polars-lazy/src/physical_plan/exotic.rs b/crates/polars-lazy/src/physical_plan/exotic.rs index 453337e616f8..e3c2b9e58120 100644 --- a/crates/polars-lazy/src/physical_plan/exotic.rs +++ b/crates/polars-lazy/src/physical_plan/exotic.rs @@ -24,12 +24,13 @@ pub(crate) fn prepare_expression_for_context( // create a dummy lazyframe and run a very simple optimization run so that // type coercion and simplify expression optimizations run. let column = Series::full_null(name, 0, dtype); - let lf = column + let mut lf = column .into_frame() .lazy() .without_optimizations() .with_simplify_expr(true) .select([expr.clone()]); + let schema = lf.collect_schema()?; let optimized = lf.optimize(&mut lp_arena, &mut expr_arena)?; let lp = lp_arena.get(optimized); let aexpr = lp.get_exprs().pop().unwrap(); @@ -38,7 +39,7 @@ pub(crate) fn prepare_expression_for_context( &aexpr, ctxt, &expr_arena, - None, + &schema, &mut ExpressionConversionState::new(true, 0), ) } diff --git a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs index 777f769866d0..ad4d8cd1fb48 100644 --- a/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs +++ b/crates/polars-lazy/src/physical_plan/streaming/construct_pipeline.rs @@ -50,7 +50,7 @@ impl PhysicalPipedExpr for Wrap { fn to_physical_piped_expr( expr: &ExprIR, expr_arena: &Arena, - schema: Option<&SchemaRef>, + schema: &SchemaRef, ) -> PolarsResult> { // this is a double Arc explore if we can create a single of it. create_physical_expr( diff --git a/crates/polars-mem-engine/src/planner/lp.rs b/crates/polars-mem-engine/src/planner/lp.rs index e1b53bea2151..9bbe73207a3c 100644 --- a/crates/polars-mem-engine/src/planner/lp.rs +++ b/crates/polars-mem-engine/src/planner/lp.rs @@ -168,7 +168,7 @@ fn create_physical_plan_impl( e, Context::Default, expr_arena, - Some(&options.schema), + &options.schema, &mut state, ) }; @@ -264,7 +264,7 @@ fn create_physical_plan_impl( &predicate, Context::Default, expr_arena, - Some(&input_schema), + &input_schema, &mut state, )?; Ok(Box::new(executors::FilterExec::new( @@ -297,7 +297,7 @@ fn create_physical_plan_impl( &pred, Context::Default, expr_arena, - output_schema.as_ref(), + output_schema.as_ref().unwrap_or(&file_info.schema), &mut state, ) }) @@ -381,7 +381,7 @@ fn create_physical_plan_impl( &expr, Context::Default, expr_arena, - Some(&input_schema), + &input_schema, &mut state, )?; Ok(Box::new(executors::ProjectionExec { @@ -419,13 +419,7 @@ fn create_physical_plan_impl( let mut state = ExpressionConversionState::new(true, state.expr_depth); let selection = predicate .map(|pred| { - create_physical_expr( - &pred, - Context::Default, - expr_arena, - Some(&schema), - &mut state, - ) + create_physical_expr(&pred, Context::Default, expr_arena, &schema, &mut state) }) .transpose()?; Ok(Box::new(executors::DataFrameExec { @@ -446,7 +440,7 @@ fn create_physical_plan_impl( &by_column, Context::Default, expr_arena, - Some(input_schema.as_ref()), + input_schema.as_ref(), &mut ExpressionConversionState::new(true, state.expr_depth), )?; let input = create_physical_plan_impl(input, lp_arena, expr_arena, state)?; @@ -488,14 +482,14 @@ fn create_physical_plan_impl( &keys, Context::Default, expr_arena, - Some(&input_schema), + &input_schema, &mut ExpressionConversionState::new(true, state.expr_depth), )?; let phys_aggs = create_physical_expressions_from_irs( &aggs, Context::Aggregation, expr_arena, - Some(&input_schema), + &input_schema, &mut ExpressionConversionState::new(true, state.expr_depth), )?; @@ -594,21 +588,24 @@ fn create_physical_plan_impl( } else { false }; + let schema_left = lp_arena.get(input_left).schema(lp_arena).into_owned(); + let schema_right = lp_arena.get(input_right).schema(lp_arena).into_owned(); let input_left = create_physical_plan_impl(input_left, lp_arena, expr_arena, state)?; let input_right = create_physical_plan_impl(input_right, lp_arena, expr_arena, state)?; + let left_on = create_physical_expressions_from_irs( &left_on, Context::Default, expr_arena, - None, + &schema_left, &mut ExpressionConversionState::new(true, state.expr_depth), )?; let right_on = create_physical_expressions_from_irs( &right_on, Context::Default, expr_arena, - None, + &schema_right, &mut ExpressionConversionState::new(true, state.expr_depth), )?; let options = Arc::try_unwrap(options).unwrap_or_else(|options| (*options).clone()); @@ -642,7 +639,7 @@ fn create_physical_plan_impl( &exprs, Context::Default, expr_arena, - Some(&input_schema), + &input_schema, &mut state, )?; Ok(Box::new(executors::StackExec { diff --git a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs index 4e81e8531bac..e6711b7a339b 100644 --- a/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs +++ b/crates/polars-pipe/src/executors/sinks/group_by/aggregates/convert.rs @@ -135,7 +135,7 @@ pub(crate) fn convert_to_hash_agg( to_physical: &F, ) -> (DataType, Arc, AggregateFunction) where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { match expr_arena.get(node) { AExpr::Alias(input, _) => convert_to_hash_agg(*input, expr_arena, schema, to_physical), @@ -146,12 +146,9 @@ where ), AExpr::Agg(agg) => match agg { IRAggExpr::Min { input, .. } => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; let agg_fn = match logical_dtype.to_physical() { @@ -170,12 +167,9 @@ where (logical_dtype, phys_expr, agg_fn) }, IRAggExpr::Max { input, .. } => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; let agg_fn = match logical_dtype.to_physical() { @@ -194,12 +188,9 @@ where (logical_dtype, phys_expr, agg_fn) }, IRAggExpr::Sum(input) => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; #[cfg(feature = "dtype-categorical")] @@ -240,12 +231,9 @@ where (logical_dtype, phys_expr, agg_fn) }, IRAggExpr::Mean(input) => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; #[cfg(feature = "dtype-categorical")] @@ -270,12 +258,9 @@ where (logical_dtype, phys_expr, agg_fn) }, IRAggExpr::First(input) => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; ( logical_dtype.clone(), @@ -284,12 +269,9 @@ where ) }, IRAggExpr::Last(input) => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; ( logical_dtype.clone(), @@ -298,12 +280,9 @@ where ) }, IRAggExpr::Count(input, _) => { - let phys_expr = to_physical( - &ExprIR::from_node(*input, expr_arena), - expr_arena, - Some(schema), - ) - .unwrap(); + let phys_expr = + to_physical(&ExprIR::from_node(*input, expr_arena), expr_arena, schema) + .unwrap(); let logical_dtype = phys_expr.field(schema).unwrap().dtype; ( logical_dtype, diff --git a/crates/polars-pipe/src/pipeline/convert.rs b/crates/polars-pipe/src/pipeline/convert.rs index 0a6a8946feba..23df59fa59f7 100644 --- a/crates/polars-pipe/src/pipeline/convert.rs +++ b/crates/polars-pipe/src/pipeline/convert.rs @@ -26,10 +26,10 @@ fn exprs_to_physical( exprs: &[ExprIR], expr_arena: &Arena, to_physical: &F, - schema: Option<&SchemaRef>, + schema: &SchemaRef, ) -> PolarsResult>> where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { exprs .iter() @@ -47,7 +47,7 @@ fn get_source( verbose: bool, ) -> PolarsResult> where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { use IR::*; match source { @@ -58,9 +58,12 @@ where .. } => { let mut df = (*df).clone(); + let schema = output_schema + .clone() + .unwrap_or_else(|| Arc::new(df.schema())); if push_predicate { if let Some(predicate) = selection { - let predicate = to_physical(&predicate, expr_arena, output_schema.as_ref())?; + let predicate = to_physical(&predicate, expr_arena, &schema)?; let op = operators::FilterOperator { predicate }; let op = Box::new(op) as Box; operator_objects.push(op) @@ -83,6 +86,7 @@ where scan_type, } => { let paths = sources.into_paths(); + let schema = output_schema.as_ref().unwrap_or(&file_info.schema); // Add predicate to operators. // Except for parquet, as that format can use statistics to prune file/row-groups. @@ -95,7 +99,7 @@ where { #[cfg(feature = "parquet")] debug_assert!(!matches!(scan_type, FileScan::Parquet { .. })); - let predicate = to_physical(&predicate, expr_arena, output_schema.as_ref())?; + let predicate = to_physical(&predicate, expr_arena, schema)?; let op = operators::FilterOperator { predicate }; let op = Box::new(op) as Box; operator_objects.push(op) @@ -121,7 +125,7 @@ where let predicate = predicate .as_ref() .map(|predicate| { - let p = to_physical(predicate, expr_arena, output_schema.as_ref())?; + let p = to_physical(predicate, expr_arena, schema)?; // Arc's all the way down. :( // Temporarily until: https://github.com/rust-lang/rust/issues/65991 // stabilizes @@ -173,7 +177,7 @@ pub fn get_sink( callbacks: &mut CallBacks, ) -> PolarsResult> where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { use IR::*; let out = match lp_arena.get(node) { @@ -272,14 +276,14 @@ where left_on, expr_arena, to_physical, - Some(input_schema_left.as_ref()), + input_schema_left.as_ref(), )?); let input_schema_right = lp_arena.get(*input_right).schema(lp_arena); let join_columns_right = Arc::new(exprs_to_physical( right_on, expr_arena, to_physical, - Some(input_schema_right.as_ref()), + input_schema_right.as_ref(), )?); let swap_eval = || { @@ -448,7 +452,7 @@ where &keys, expr_arena, to_physical, - Some(&input_schema), + &input_schema, )?); let mut aggregation_columns = Vec::with_capacity(aggs.len()); @@ -488,7 +492,7 @@ where keys, expr_arena, to_physical, - Some(&input_schema), + &input_schema, )?); let mut aggregation_columns = Vec::with_capacity(aggs.len()); @@ -568,10 +572,10 @@ fn get_hstack( options: ProjectionOptions, ) -> PolarsResult where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { - Ok(operators::HstackOperator { - exprs: exprs_to_physical(exprs, expr_arena, &to_physical, Some(&input_schema))?, + Ok(HstackOperator { + exprs: exprs_to_physical(exprs, expr_arena, &to_physical, &input_schema)?, input_schema, options, }) @@ -584,7 +588,7 @@ pub fn get_operator( to_physical: &F, ) -> PolarsResult> where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { use IR::*; let op = match lp_arena.get(node) { @@ -602,7 +606,7 @@ where } => { let input_schema = lp_arena.get(*input).schema(lp_arena); let op = operators::ProjectionOperator { - exprs: exprs_to_physical(expr, expr_arena, &to_physical, Some(&input_schema))?, + exprs: exprs_to_physical(expr, expr_arena, &to_physical, input_schema.as_ref())?, options: *options, }; Box::new(op) as Box @@ -627,7 +631,7 @@ where }, Filter { predicate, input } => { let input_schema = lp_arena.get(*input).schema(lp_arena); - let predicate = to_physical(predicate, expr_arena, Some(input_schema.as_ref()))?; + let predicate = to_physical(predicate, expr_arena, input_schema.as_ref())?; let op = operators::FilterOperator { predicate }; Box::new(op) as Box }, @@ -662,7 +666,7 @@ pub fn create_pipeline( callbacks: &mut CallBacks, ) -> PolarsResult where - F: Fn(&ExprIR, &Arena, Option<&SchemaRef>) -> PolarsResult>, + F: Fn(&ExprIR, &Arena, &SchemaRef) -> PolarsResult>, { use IR::*; diff --git a/crates/polars-plan/src/plans/aexpr/schema.rs b/crates/polars-plan/src/plans/aexpr/schema.rs index 88c44233175a..d74fd054a644 100644 --- a/crates/polars-plan/src/plans/aexpr/schema.rs +++ b/crates/polars-plan/src/plans/aexpr/schema.rs @@ -72,6 +72,7 @@ impl AExpr { }, Explode(expr) => { let field = arena.get(*expr).to_field_impl(schema, arena, nested)?; + *nested = nested.saturating_sub(1); if let List(inner) = field.dtype() { Ok(Field::new(field.name().clone(), *inner.clone())) diff --git a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs index 7cb0753e5a6d..ed3f3e0376bd 100644 --- a/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs +++ b/crates/polars-plan/src/plans/optimizer/predicate_pushdown/mod.rs @@ -17,7 +17,7 @@ use crate::prelude::optimizer::predicate_pushdown::rename::process_rename; use crate::utils::{check_input_node, has_aexpr}; pub type ExprEval<'a> = - Option<&'a dyn Fn(&ExprIR, &Arena) -> Option>>; + Option<&'a dyn Fn(&ExprIR, &Arena, &SchemaRef) -> Option>>; pub struct PredicatePushDown<'a> { expr_eval: ExprEval<'a>, @@ -364,7 +364,9 @@ impl<'a> PredicatePushDown<'a> { let predicate = predicate_at_scan(acc_predicates, predicate.clone(), expr_arena); if let (Some(hive_parts), Some(predicate)) = (&scan_hive_parts, &predicate) { - if let Some(io_expr) = self.expr_eval.unwrap()(predicate, expr_arena) { + if let Some(io_expr) = + self.expr_eval.unwrap()(predicate, expr_arena, &file_info.schema) + { if let Some(stats_evaluator) = io_expr.as_stats_evaluator() { let paths = sources.as_paths().ok_or_else(|| { polars_err!(nyi = "Hive partitioning of in-memory buffers") diff --git a/crates/polars-stream/src/physical_plan/lower_expr.rs b/crates/polars-stream/src/physical_plan/lower_expr.rs index 618ec358f209..6b61a98979f2 100644 --- a/crates/polars-stream/src/physical_plan/lower_expr.rs +++ b/crates/polars-stream/src/physical_plan/lower_expr.rs @@ -348,7 +348,7 @@ fn build_fallback_node_with_ctx( expr, Context::Default, ctx.expr_arena, - Some(&ctx.phys_sm[input_node].output_schema), + &ctx.phys_sm[input_node].output_schema, &mut conv_state, ) }) diff --git a/crates/polars-stream/src/physical_plan/to_graph.rs b/crates/polars-stream/src/physical_plan/to_graph.rs index a4d58847033e..34692aa10b9a 100644 --- a/crates/polars-stream/src/physical_plan/to_graph.rs +++ b/crates/polars-stream/src/physical_plan/to_graph.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use parking_lot::Mutex; +use polars_core::schema::Schema; use polars_error::PolarsResult; use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState}; use polars_expr::reduce::into_reduction; @@ -33,13 +34,14 @@ fn has_potential_recurring_entrance(node: Node, arena: &Arena) -> bool { fn create_stream_expr( expr_ir: &ExprIR, ctx: &mut GraphConversionContext<'_>, + schema: &Arc, ) -> PolarsResult { let reentrant = has_potential_recurring_entrance(expr_ir.node(), ctx.expr_arena); let phys = create_physical_expr( expr_ir, Context::Default, ctx.expr_arena, - None, + schema, &mut ctx.expr_conversion_state, )?; Ok(StreamExpr::new(phys, reentrant)) @@ -103,7 +105,8 @@ fn to_graph_rec<'a>( }, Filter { predicate, input } => { - let phys_predicate_expr = create_stream_expr(predicate, ctx)?; + let input_schema = &ctx.phys_sm[*input].output_schema; + let phys_predicate_expr = create_stream_expr(predicate, ctx, input_schema)?; let input_key = to_graph_rec(*input, ctx)?; ctx.graph.add_node( nodes::filter::FilterNode::new(phys_predicate_expr), @@ -116,9 +119,10 @@ fn to_graph_rec<'a>( input, extend_original, } => { + let input_schema = &ctx.phys_sm[*input].output_schema; let phys_selectors = selectors .iter() - .map(|selector| create_stream_expr(selector, ctx)) + .map(|selector| create_stream_expr(selector, ctx, input_schema)) .collect::>()?; let input_key = to_graph_rec(*input, ctx)?; ctx.graph.add_node( @@ -144,9 +148,10 @@ fn to_graph_rec<'a>( }, InputIndependentSelect { selectors } => { + let empty_schema = Default::default(); let phys_selectors = selectors .iter() - .map(|selector| create_stream_expr(selector, ctx)) + .map(|selector| create_stream_expr(selector, ctx, &empty_schema)) .collect::>()?; ctx.graph.add_node( nodes::input_independent_select::InputIndependentSelectNode::new(phys_selectors), @@ -165,8 +170,11 @@ fn to_graph_rec<'a>( let (red, input_node) = into_reduction(e.node(), ctx.expr_arena, input_schema)?; reductions.push(red); - let input_phys = - create_stream_expr(&ExprIR::from_node(input_node, ctx.expr_arena), ctx)?; + let input_phys = create_stream_expr( + &ExprIR::from_node(input_node, ctx.expr_arena), + ctx, + input_schema, + )?; inputs.push(input_phys) } @@ -304,7 +312,7 @@ fn to_graph_rec<'a>( &pred, Context::Default, ctx.expr_arena, - output_schema.as_ref(), + output_schema.as_ref().unwrap_or(&file_info.schema), &mut ctx.expr_conversion_state, ) }) diff --git a/crates/polars/tests/it/lazy/aggregation.rs b/crates/polars/tests/it/lazy/aggregation.rs index 85ded9c742d0..ad043e698e2e 100644 --- a/crates/polars/tests/it/lazy/aggregation.rs +++ b/crates/polars/tests/it/lazy/aggregation.rs @@ -35,33 +35,3 @@ fn test_lazy_agg() { let min = new.column("min").unwrap(); assert_eq!(min, &Column::new("min".into(), [0.1f64, 0.01, 0.1])); } - -#[test] -#[should_panic(expected = "hardcoded error")] -/// Test where apply_multiple returns an error -fn test_apply_multiple_error() { - fn issue() -> Expr { - apply_multiple( - move |_| polars_bail!(ComputeError: "hardcoded error"), - &[col("x"), col("y")], - GetOutput::from_type(DataType::Float64), - true, - ) - } - - let df = df![ - "rf" => ["App", "App", "Gg", "App"], - "x" => ["Hey", "There", "Ante", "R"], - "y" => [Some(-1.11), Some(2.),None, Some(3.4)], - "z" => [Some(-1.11), Some(2.),None, Some(3.4)], - ] - .unwrap(); - - let _res = df - .lazy() - .with_streaming(false) - .group_by_stable([col("rf")]) - .agg([issue()]) - .collect() - .unwrap(); -} diff --git a/crates/polars/tests/it/lazy/queries.rs b/crates/polars/tests/it/lazy/queries.rs index f140a0461639..50e5c70be047 100644 --- a/crates/polars/tests/it/lazy/queries.rs +++ b/crates/polars/tests/it/lazy/queries.rs @@ -203,7 +203,7 @@ fn test_apply_multiple_columns() -> PolarsResult<()> { .select([map_multiple( multiply, [col("A"), col("B")], - GetOutput::from_type(DataType::Float64), + GetOutput::from_type(DataType::Int32), )]) .collect()?; let out = out.column("A")?; @@ -219,7 +219,7 @@ fn test_apply_multiple_columns() -> PolarsResult<()> { .agg([apply_multiple( multiply, [col("A"), col("B")], - GetOutput::from_type(DataType::Float64), + GetOutput::from_type(DataType::Int32), true, )]) .collect()?; diff --git a/py-polars/polars/functions/lazy.py b/py-polars/polars/functions/lazy.py index 61cead9871d8..7a473048bc23 100644 --- a/py-polars/polars/functions/lazy.py +++ b/py-polars/polars/functions/lazy.py @@ -1014,6 +1014,7 @@ def map_groups( ... function=lambda list_of_series: list_of_series[0] ... / list_of_series[0].sum() ... + list_of_series[1], + ... return_dtype=pl.Float64, ... ).alias("my_custom_aggregation") ... ) ... ).sort("group") diff --git a/py-polars/tests/unit/operations/map/test_map_groups.py b/py-polars/tests/unit/operations/map/test_map_groups.py index 772f4d088249..cffc78c93ef1 100644 --- a/py-polars/tests/unit/operations/map/test_map_groups.py +++ b/py-polars/tests/unit/operations/map/test_map_groups.py @@ -86,6 +86,7 @@ def test_map_groups_none() -> None: pl.map_groups( exprs=["a", pl.col("b") ** 4, pl.col("a") / 4], function=lambda x: x[0] * x[1] + x[2].sum(), + return_dtype=pl.Float64, ).alias("multiple") ) )["multiple"] @@ -127,7 +128,9 @@ def __init__(self, payload: Any) -> None: result = df.group_by("groups").agg( pl.map_groups( - [pl.col("dates"), pl.col("names")], lambda s: Foo(dict(zip(s[0], s[1]))) + [pl.col("dates"), pl.col("names")], + lambda s: Foo(dict(zip(s[0], s[1]))), + return_dtype=pl.Object, ) )