diff --git a/datafusion-examples/examples/rewrite_expr.rs b/datafusion-examples/examples/rewrite_expr.rs index 541448ebf149..b95eb0175f52 100644 --- a/datafusion-examples/examples/rewrite_expr.rs +++ b/datafusion-examples/examples/rewrite_expr.rs @@ -19,10 +19,11 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::{plan_err, Result, ScalarValue}; +use datafusion_expr::analyzer::AnalyzerRule; use datafusion_expr::{ AggregateUDF, Between, Expr, Filter, LogicalPlan, ScalarUDF, TableSource, WindowUDF, }; -use datafusion_optimizer::analyzer::{Analyzer, AnalyzerRule}; +use datafusion_optimizer::analyzer::Analyzer; use datafusion_optimizer::optimizer::Optimizer; use datafusion_optimizer::{utils, OptimizerConfig, OptimizerContext, OptimizerRule}; use datafusion_sql::planner::{ContextProvider, SqlToRel}; diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 8bc65a0ca2cc..26836aba5a1c 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -50,7 +50,7 @@ use crate::{ DropView, Explain, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable, TableSource, TableType, ToStringifiedPlan, UNNAMED_TABLE, }, - optimizer::analyzer::{Analyzer, AnalyzerRule}, + optimizer::analyzer::Analyzer, optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule}, physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule}, physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan}, @@ -69,6 +69,7 @@ use datafusion_common::{ OwnedTableReference, SchemaReference, }; use datafusion_execution::registry::SerializerRegistry; +use datafusion_expr::analyzer::AnalyzerRuleRef; use datafusion_expr::{ logical_plan::{DdlStatement, Statement}, var_provider::is_system_variables, @@ -1603,15 +1604,6 @@ impl SessionState { self } - /// Override the [`AnalyzerRule`]s optimizer plan rules. - pub fn with_analyzer_rules( - mut self, - rules: Vec>, - ) -> Self { - self.analyzer = Analyzer::with_rules(rules); - self - } - /// Replace the entire list of [`OptimizerRule`]s used to optimize plans pub fn with_optimizer_rules( mut self, @@ -1630,16 +1622,6 @@ impl SessionState { self } - /// Add `analyzer_rule` to the end of the list of - /// [`AnalyzerRule`]s used to rewrite queries. - pub fn add_analyzer_rule( - mut self, - analyzer_rule: Arc, - ) -> Self { - self.analyzer.rules.push(analyzer_rule); - self - } - /// Add `optimizer_rule` to the end of the list of /// [`OptimizerRule`]s used to rewrite queries. pub fn add_optimizer_rule( @@ -2156,6 +2138,16 @@ impl FunctionRegistry for SessionState { }) } + /// Override the `AnalyzerRule`s optimizer plan rules. + fn with_analyzer_rules(&mut self, rules: Vec) { + self.analyzer = Analyzer::with_rules(rules); + } + + /// return the existing analyzer rules + fn analyzer_rules(&self) -> Vec { + self.analyzer.rules.clone() + } + fn register_udf(&mut self, udf: Arc) -> Result>> { udf.aliases().iter().for_each(|alias| { self.scalar_functions.insert(alias.clone(), udf.clone()); diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 5dc3e1ce7d3f..0a990892a347 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -158,7 +158,7 @@ //! [`WindowUDF`]: crate::logical_expr::WindowUDF //! [`QueryPlanner`]: execution::context::QueryPlanner //! [`OptimizerRule`]: datafusion_optimizer::optimizer::OptimizerRule -//! [`AnalyzerRule`]: datafusion_optimizer::analyzer::AnalyzerRule +//! [`AnalyzerRule`]: datafusion_expr::analyzer::AnalyzerRule //! [`PhysicalOptimizerRule`]: crate::physical_optimizer::optimizer::PhysicalOptimizerRule //! //! # Architecture diff --git a/datafusion/execution/src/registry.rs b/datafusion/execution/src/registry.rs index 5bc9a7a07b6f..ce2f6c5972d4 100644 --- a/datafusion/execution/src/registry.rs +++ b/datafusion/execution/src/registry.rs @@ -18,6 +18,7 @@ //! FunctionRegistry trait use datafusion_common::{not_impl_err, plan_datafusion_err, Result}; +use datafusion_expr::analyzer::AnalyzerRuleRef; use datafusion_expr::{AggregateUDF, ScalarUDF, UserDefinedLogicalNode, WindowUDF}; use std::collections::HashMap; use std::{collections::HashSet, sync::Arc}; @@ -67,6 +68,13 @@ pub trait FunctionRegistry { not_impl_err!("Registering WindowUDF") } + /// Return the existing analyzer rules + fn analyzer_rules(&self) -> Vec { + vec![] + } + + fn with_analyzer_rules(&mut self, _rules: Vec) {} + /// Deregisters a [`ScalarUDF`], returning the implementation that was /// deregistered. /// diff --git a/datafusion/expr/src/analyzer.rs b/datafusion/expr/src/analyzer.rs new file mode 100644 index 000000000000..824b5d01e712 --- /dev/null +++ b/datafusion/expr/src/analyzer.rs @@ -0,0 +1,48 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make +/// the plan valid prior to the rest of the DataFusion optimization process. +/// +/// This is different than an [`OptimizerRule`](crate::OptimizerRule) +/// which must preserve the semantics of the `LogicalPlan`, while computing +/// results in a more optimal way. +/// +/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific +/// forms such as a subquery reference, or do type coercion to ensure the types +/// of operands are correct. +/// +/// Use [`SessionState::register_analyzer_rule`] to register additional +/// `AnalyzerRule`s. +/// +/// [`SessionState::register_analyzer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.register_analyzer_rule +use datafusion_common::config::ConfigOptions; +use datafusion_common::Result; + +use crate::LogicalPlan; + +pub trait AnalyzerRule { + /// Rewrite `plan` + fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result; + + /// A human readable name for this analyzer rule + fn name(&self) -> &str; +} + +pub type AnalyzerRuleRef = Arc; diff --git a/datafusion/expr/src/lib.rs b/datafusion/expr/src/lib.rs index a297f2dc7886..b5632af61cc9 100644 --- a/datafusion/expr/src/lib.rs +++ b/datafusion/expr/src/lib.rs @@ -39,6 +39,7 @@ mod udf; mod udwf; pub mod aggregate_function; +pub mod analyzer; pub mod conditional_expressions; pub mod execution_props; pub mod expr; diff --git a/datafusion/functions-array/src/analyzer/mod.rs b/datafusion/functions-array/src/analyzer/mod.rs new file mode 100644 index 000000000000..c14e4c0a4c4f --- /dev/null +++ b/datafusion/functions-array/src/analyzer/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +pub mod rewrite; diff --git a/datafusion/optimizer/src/analyzer/rewrite_expr.rs b/datafusion/functions-array/src/analyzer/rewrite.rs similarity index 95% rename from datafusion/optimizer/src/analyzer/rewrite_expr.rs rename to datafusion/functions-array/src/analyzer/rewrite.rs index 99578e91183c..c9b1196e107e 100644 --- a/datafusion/optimizer/src/analyzer/rewrite_expr.rs +++ b/datafusion/functions-array/src/analyzer/rewrite.rs @@ -17,27 +17,26 @@ //! Analyzer rule for to replace operators with function calls (e.g `||` to array_concat`) -#[cfg(feature = "array_expressions")] use std::sync::Arc; -use super::AnalyzerRule; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNodeRewriter}; -#[cfg(feature = "array_expressions")] + use datafusion_common::{utils::list_ndims, DFSchemaRef}; use datafusion_common::{DFSchema, Result}; +use datafusion_expr::analyzer::AnalyzerRule; use datafusion_expr::expr::ScalarFunction; use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::merge_schema; use datafusion_expr::BuiltinScalarFunction; use datafusion_expr::GetFieldAccess; use datafusion_expr::GetIndexedField; -#[cfg(feature = "array_expressions")] + use datafusion_expr::{BinaryExpr, Operator, ScalarFunctionDefinition}; use datafusion_expr::{Expr, LogicalPlan}; -#[cfg(feature = "array_expressions")] -use datafusion_functions_array::expr_fn::{array_append, array_concat, array_prepend}; + +use crate::array_has::array_has_all; +use crate::concat::{array_append, array_concat, array_prepend}; #[derive(Default)] pub struct OperatorToFunction {} @@ -77,7 +76,6 @@ fn analyze_internal(plan: &LogicalPlan) -> Result { } let mut expr_rewrite = OperatorToFunctionRewriter { - #[cfg(feature = "array_expressions")] schema: Arc::new(schema), }; @@ -95,7 +93,6 @@ fn analyze_internal(plan: &LogicalPlan) -> Result { } pub(crate) struct OperatorToFunctionRewriter { - #[cfg(feature = "array_expressions")] pub(crate) schema: DFSchemaRef, } @@ -103,7 +100,6 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { type Node = Expr; fn f_up(&mut self, expr: Expr) -> Result> { - #[cfg(feature = "array_expressions")] if let Expr::BinaryExpr(BinaryExpr { ref left, op, @@ -125,7 +121,7 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { // TODO: change OperatorToFunction to OperatoToArrayFunction and configure it with array_expressions feature // after other array functions are udf-based - #[cfg(feature = "array_expressions")] + if let Some(expr) = rewrite_array_has_all_operator_to_func(left, op, right) { return Ok(Transformed::yes(expr)); } @@ -170,14 +166,12 @@ impl TreeNodeRewriter for OperatorToFunctionRewriter { // Note This rewrite is only done if the built in DataFusion `array_expressions` feature is enabled. // Even if users implement their own array functions, those functions are not equal to the DataFusion // udf based array functions, so this rewrite is not corrrect -#[cfg(feature = "array_expressions")] + fn rewrite_array_has_all_operator_to_func( left: &Expr, op: Operator, right: &Expr, ) -> Option { - use super::array_has_all; - if op != Operator::AtArrow && op != Operator::ArrowAt { return None; } @@ -198,6 +192,7 @@ fn rewrite_array_has_all_operator_to_func( let left = left.clone(); let right = right.clone(); + // TODO: run kernel function directly? let expr = if let Operator::ArrowAt = op { array_has_all(right, left) } else { @@ -220,7 +215,7 @@ fn rewrite_array_has_all_operator_to_func( /// 4) (arry concat, array append, array prepend) || array -> array concat /// /// 5) (arry concat, array append, array prepend) || scalar -> array append -#[cfg(feature = "array_expressions")] + fn rewrite_array_concat_operator_to_func( left: &Expr, op: Operator, @@ -306,7 +301,7 @@ fn rewrite_array_concat_operator_to_func( /// 1) (arry concat, array append, array prepend) || column -> (array append, array concat) /// /// 2) column1 || column2 -> (array prepend, array append, array concat) -#[cfg(feature = "array_expressions")] + fn rewrite_array_concat_operator_to_func_for_column( left: &Expr, op: Operator, diff --git a/datafusion/functions-array/src/lib.rs b/datafusion/functions-array/src/lib.rs index 95143570cc5f..d4cd0eedabcf 100644 --- a/datafusion/functions-array/src/lib.rs +++ b/datafusion/functions-array/src/lib.rs @@ -28,6 +28,7 @@ #[macro_use] pub mod macros; +mod analyzer; mod array_has; mod concat; mod kernels; @@ -35,6 +36,7 @@ mod make_array; mod udf; mod utils; +use analyzer::rewrite::OperatorToFunction; use datafusion_common::Result; use datafusion_execution::FunctionRegistry; use datafusion_expr::ScalarUDF; @@ -94,5 +96,13 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> { } Ok(()) as Result<()> })?; + + let mut rules = registry.analyzer_rules(); + // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), + // and TypeCoercion may cast the argument types from Scalar to List. + rules.insert(1, Arc::new(OperatorToFunction::new())); + + registry.with_analyzer_rules(rules); + Ok(()) } diff --git a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs index c07445fa7f48..fad54aede14b 100644 --- a/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs +++ b/datafusion/optimizer/src/analyzer/count_wildcard_rule.rs @@ -17,13 +17,12 @@ use std::sync::Arc; -use crate::analyzer::AnalyzerRule; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{ Transformed, TransformedResult, TreeNode, TreeNodeRewriter, }; use datafusion_common::Result; +use datafusion_expr::analyzer::AnalyzerRule; use datafusion_expr::expr::{AggregateFunction, AggregateFunctionDefinition, InSubquery}; use datafusion_expr::expr_rewriter::rewrite_preserving_name; use datafusion_expr::utils::COUNT_STAR_EXPANSION; diff --git a/datafusion/optimizer/src/analyzer/inline_table_scan.rs b/datafusion/optimizer/src/analyzer/inline_table_scan.rs index b21ec851dfcd..1458debe794b 100644 --- a/datafusion/optimizer/src/analyzer/inline_table_scan.rs +++ b/datafusion/optimizer/src/analyzer/inline_table_scan.rs @@ -19,11 +19,10 @@ //! such as DataFrames and Views and inlines the LogicalPlan. use std::sync::Arc; -use crate::analyzer::AnalyzerRule; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::Result; +use datafusion_expr::analyzer::AnalyzerRule; use datafusion_expr::expr::{Exists, InSubquery}; use datafusion_expr::{ logical_plan::LogicalPlan, Expr, Filter, LogicalPlanBuilder, TableScan, diff --git a/datafusion/optimizer/src/analyzer/mod.rs b/datafusion/optimizer/src/analyzer/mod.rs index d68521284176..6268f1fc5940 100644 --- a/datafusion/optimizer/src/analyzer/mod.rs +++ b/datafusion/optimizer/src/analyzer/mod.rs @@ -17,7 +17,6 @@ pub mod count_wildcard_rule; pub mod inline_table_scan; -pub mod rewrite_expr; pub mod subquery; pub mod type_coercion; @@ -31,6 +30,7 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::instant::Instant; use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion_common::{DataFusionError, Result}; +use datafusion_expr::analyzer::{AnalyzerRule, AnalyzerRuleRef}; use datafusion_expr::expr::Exists; use datafusion_expr::expr::InSubquery; use datafusion_expr::utils::inspect_expr_pre; @@ -38,38 +38,11 @@ use datafusion_expr::{Expr, LogicalPlan}; use log::debug; use std::sync::Arc; -#[cfg(feature = "array_expressions")] -use datafusion_functions_array::expr_fn::array_has_all; - -use self::rewrite_expr::OperatorToFunction; - -/// [`AnalyzerRule`]s transform [`LogicalPlan`]s in some way to make -/// the plan valid prior to the rest of the DataFusion optimization process. -/// -/// This is different than an [`OptimizerRule`](crate::OptimizerRule) -/// which must preserve the semantics of the `LogicalPlan`, while computing -/// results in a more optimal way. -/// -/// For example, an `AnalyzerRule` may resolve [`Expr`]s into more specific -/// forms such as a subquery reference, or do type coercion to ensure the types -/// of operands are correct. -/// -/// Use [`SessionState::add_analyzer_rule`] to register additional -/// `AnalyzerRule`s. -/// -/// [`SessionState::add_analyzer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.add_analyzer_rule -pub trait AnalyzerRule { - /// Rewrite `plan` - fn analyze(&self, plan: LogicalPlan, config: &ConfigOptions) -> Result; - - /// A human readable name for this analyzer rule - fn name(&self) -> &str; -} /// A rule-based Analyzer. #[derive(Clone)] pub struct Analyzer { /// All rules to apply - pub rules: Vec>, + pub rules: Vec, } impl Default for Analyzer { @@ -81,11 +54,9 @@ impl Default for Analyzer { impl Analyzer { /// Create a new analyzer using the recommended list of rules pub fn new() -> Self { - let rules: Vec> = vec![ + let rules: Vec = vec![ Arc::new(InlineTableScan::new()), - // OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar), - // and TypeCoercion may cast the argument types from Scalar to List. - Arc::new(OperatorToFunction::new()), + // Arc::new(OperatorToFunction::new()), register in `funcitons-array` crate Arc::new(TypeCoercion::new()), Arc::new(CountWildcardRule::new()), ]; @@ -93,7 +64,7 @@ impl Analyzer { } /// Create a new analyzer with the given rules - pub fn with_rules(rules: Vec>) -> Self { + pub fn with_rules(rules: Vec) -> Self { Self { rules } } @@ -113,9 +84,9 @@ impl Analyzer { // TODO add common rule executor for Analyzer and Optimizer for rule in &self.rules { - new_plan = rule.analyze(new_plan, config).map_err(|e| { - DataFusionError::Context(rule.name().to_string(), Box::new(e)) - })?; + new_plan = rule + .analyze(new_plan, config) + .map_err(|e| DataFusionError::Context(rule.name().into(), Box::new(e)))?; log_plan(rule.name(), &new_plan); observer(&new_plan, rule.as_ref()); } diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index fe63766fc265..542712bc20c6 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -64,7 +64,7 @@ use log::{debug, warn}; /// Use [`SessionState::add_optimizer_rule`] to register additional /// `OptimizerRule`s. /// -/// [`AnalyzerRule`]: crate::analyzer::AnalyzerRule +/// [`AnalyzerRule`]: datafusion_expr::analyzer::AnalyzerRule /// [`SessionState::add_optimizer_rule`]: https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionState.html#method.add_optimizer_rule pub trait OptimizerRule { diff --git a/datafusion/optimizer/src/test/mod.rs b/datafusion/optimizer/src/test/mod.rs index e691fe9a5351..889b2d2e34c5 100644 --- a/datafusion/optimizer/src/test/mod.rs +++ b/datafusion/optimizer/src/test/mod.rs @@ -15,12 +15,13 @@ // specific language governing permissions and limitations // under the License. -use crate::analyzer::{Analyzer, AnalyzerRule}; +use crate::analyzer::Analyzer; use crate::optimizer::{assert_schema_is_the_same, Optimizer}; use crate::{OptimizerContext, OptimizerRule}; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::config::ConfigOptions; use datafusion_common::{assert_contains, Result}; +use datafusion_expr::analyzer::AnalyzerRule; use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan, LogicalPlanBuilder}; use std::sync::Arc;