Skip to content

Commit

Permalink
Move analyzer out of optimizer
Browse files Browse the repository at this point in the history
DataFusion is a SQL query engine and also a reusable library for
building query engines. The optimizer part is generic, but analyzer's
role is to support the DataFusion SQL frontend. Separate the concerns,
so that the optimizer crate is truly reusable.
  • Loading branch information
findepi committed Oct 2, 2024
1 parent b46ca2e commit 68d696d
Show file tree
Hide file tree
Showing 25 changed files with 140 additions and 95 deletions.
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion datafusion-examples/examples/analyzer_rule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::Result;
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion_optimizer::analyzer::AnalyzerRule;
use datafusion_sql::analyzer::AnalyzerRule;
use std::sync::{Arc, Mutex};

/// This example demonstrates how to add your own [`AnalyzerRule`] to
Expand Down
5 changes: 2 additions & 3 deletions datafusion-examples/examples/sql_frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,8 @@ use datafusion_expr::{
AggregateUDF, Expr, LogicalPlan, ScalarUDF, TableProviderFilterPushDown, TableSource,
WindowUDF,
};
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule,
};
use datafusion_optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::analyzer::{Analyzer, AnalyzerRule};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::dialect::PostgreSqlDialect;
use datafusion_sql::sqlparser::parser::Parser;
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/datasource/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use std::{any::Any, borrow::Cow, sync::Arc};

use crate::datasource::{TableProvider, TableType};
use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan},
Expand All @@ -30,10 +31,8 @@ use datafusion_catalog::Session;
use datafusion_common::config::ConfigOptions;
use datafusion_common::Column;
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_optimizer::Analyzer;

use crate::datasource::{TableProvider, TableType};
use datafusion_sql::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use datafusion_sql::analyzer::Analyzer;

/// An implementation of `TableProvider` that uses another logical plan.
#[derive(Debug)]
Expand Down
3 changes: 2 additions & 1 deletion datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ use datafusion_catalog::{DynamicFileCatalog, SessionStore, UrlTableFactory};
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_optimizer::{AnalyzerRule, OptimizerRule};
use datafusion_optimizer::OptimizerRule;
use datafusion_sql::analyzer::AnalyzerRule;
use object_store::ObjectStore;
use parking_lot::RwLock;
use url::Url;
Expand Down
5 changes: 2 additions & 3 deletions datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,12 @@ use datafusion_expr::{
AggregateUDF, Explain, Expr, ExprSchemable, LogicalPlan, ScalarUDF, TableSource,
WindowUDF,
};
use datafusion_optimizer::{
Analyzer, AnalyzerRule, Optimizer, OptimizerConfig, OptimizerRule,
};
use datafusion_optimizer::{Optimizer, OptimizerConfig, OptimizerRule};
use datafusion_physical_expr::create_physical_expr;
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_sql::analyzer::{Analyzer, AnalyzerRule};
use datafusion_sql::parser::{DFParser, Statement};
use datafusion_sql::planner::{ContextProvider, ParserOptions, PlannerContext, SqlToRel};
use itertools::Itertools;
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use datafusion_expr::{
ScalarUDF, TableSource, WindowUDF,
};
use datafusion_functions::core::expr_ext::FieldAccessor;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::simplify_expressions::GuaranteeRewriter;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext};
Expand All @@ -45,6 +44,7 @@ use datafusion_sql::TableReference;

use chrono::DateTime;
use datafusion_functions::datetime;
use datafusion_sql::analyzer::Analyzer;

#[cfg(test)]
#[ctor::ctor]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/tests/user_defined/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ use datafusion_common::ScalarValue;
use datafusion_expr::tree_node::replace_sort_expression;
use datafusion_expr::{Projection, SortExpr};
use datafusion_optimizer::optimizer::ApplyOrder;
use datafusion_optimizer::AnalyzerRule;
use datafusion_sql::analyzer::AnalyzerRule;

/// Execute the specified sql and return the resulting record batches
/// pretty printed as a String.
Expand Down
19 changes: 18 additions & 1 deletion datafusion/expr/src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

//! Logical Expressions: [`Expr`]

use std::collections::{HashMap, HashSet};
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fmt::{self, Display, Formatter, Write};
use std::hash::{Hash, Hasher};
use std::mem;
Expand Down Expand Up @@ -1837,6 +1837,23 @@ fn rewrite_placeholder(expr: &mut Expr, other: &Expr, schema: &DFSchema) -> Resu
Ok(())
}

pub fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: &DFSchema,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}

cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}

#[macro_export]
macro_rules! expr_vec_fmt {
( $ARRAY:expr ) => {{
Expand Down
3 changes: 1 addition & 2 deletions datafusion/optimizer/src/decorrelate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ use std::ops::Deref;
use std::sync::Arc;

use crate::simplify_expressions::ExprSimplifier;
use crate::utils::collect_subquery_cols;

use datafusion_common::tree_node::{
Transformed, TransformedResult, TreeNode, TreeNodeRecursion, TreeNodeRewriter,
};
use datafusion_common::{plan_err, Column, DFSchemaRef, Result, ScalarValue};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr::{collect_subquery_cols, Alias};
use datafusion_expr::simplify::SimplifyContext;
use datafusion_expr::utils::{conjunction, find_join_exprs, split_conjunction};
use datafusion_expr::{expr, lit, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder};
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/eliminate_nested_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ fn extract_plan_from_distinct(plan: Arc<LogicalPlan>) -> Arc<LogicalPlan> {
#[cfg(test)]
mod tests {
use super::*;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::analyzer::Analyzer;
use crate::test::*;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_expr::{col, logical_plan::table_scan};
use datafusion_sql::analyzer::type_coercion::TypeCoercion;
use datafusion_sql::analyzer::Analyzer;

fn schema() -> Schema {
Schema::new(vec![
Expand Down
2 changes: 0 additions & 2 deletions datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
//!
//! [`LogicalPlan`]: datafusion_expr::LogicalPlan
//! [`TypeCoercion`]: analyzer::type_coercion::TypeCoercion
pub mod analyzer;
pub mod common_subexpr_eliminate;
pub mod decorrelate;
pub mod decorrelate_predicate_subquery;
Expand Down Expand Up @@ -61,7 +60,6 @@ pub mod utils;
#[cfg(test)]
pub mod test;

pub use analyzer::{Analyzer, AnalyzerRule};
pub use optimizer::{Optimizer, OptimizerConfig, OptimizerContext, OptimizerRule};
#[allow(deprecated)]
pub use utils::optimize_children;
Expand Down
42 changes: 1 addition & 41 deletions datafusion/optimizer/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use crate::analyzer::{Analyzer, AnalyzerRule};
use crate::optimizer::Optimizer;
use crate::{OptimizerContext, OptimizerRule};
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::{assert_contains, Result};
use datafusion_expr::{col, logical_plan::table_scan, LogicalPlan, LogicalPlanBuilder};
use datafusion_sql::analyzer::{Analyzer, AnalyzerRule};
use std::sync::Arc;

pub mod user_defined;
Expand Down Expand Up @@ -108,46 +108,6 @@ pub fn get_tpch_table_schema(table: &str) -> Schema {
}
}

pub fn assert_analyzed_plan_eq(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
assert_analyzed_plan_with_config_eq(options, rule, plan, expected)?;

Ok(())
}

pub fn assert_analyzed_plan_with_config_eq(
options: ConfigOptions,
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = format!("{analyzed_plan}");
assert_eq!(formatted_plan, expected);

Ok(())
}


pub fn assert_analyzed_plan_eq_display_indent(
rule: Arc<dyn AnalyzerRule + Send + Sync>,
plan: LogicalPlan,
expected: &str,
) -> Result<()> {
let options = ConfigOptions::default();
let analyzed_plan =
Analyzer::with_rules(vec![rule]).execute_and_check(plan, &options, |_, _| {})?;
let formatted_plan = analyzed_plan.display_indent_schema().to_string();
assert_eq!(formatted_plan, expected);

Ok(())
}

pub fn assert_analyzer_check_err(
rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>>,
plan: LogicalPlan,
Expand Down
19 changes: 1 addition & 18 deletions datafusion/optimizer/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::{BTreeSet, HashMap, HashSet};

use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::{Column, DFSchema, Result};
use datafusion_common::{Column, Result};
use datafusion_expr::expr_rewriter::replace_col;
use datafusion_expr::{logical_plan::LogicalPlan, Expr};

Expand Down Expand Up @@ -80,23 +80,6 @@ pub(crate) fn has_all_column_refs(expr: &Expr, schema_cols: &HashSet<Column>) ->
== column_refs.len()
}

pub(crate) fn collect_subquery_cols(
exprs: &[Expr],
subquery_schema: &DFSchema,
) -> Result<BTreeSet<Column>> {
exprs.iter().try_fold(BTreeSet::new(), |mut cols, expr| {
let mut using_cols: Vec<Column> = vec![];
for col in expr.column_refs().into_iter() {
if subquery_schema.has_column(col) {
using_cols.push(col.clone());
}
}

cols.extend(using_cols);
Result::<_>::Ok(cols)
})
}

pub(crate) fn replace_qualified_name(
expr: Expr,
cols: &BTreeSet<Column>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/optimizer/tests/optimizer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use datafusion_expr::test::function_stub::sum_udaf;
use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::count::count_udaf;
use datafusion_optimizer::analyzer::Analyzer;
use datafusion_optimizer::optimizer::Optimizer;
use datafusion_optimizer::{OptimizerConfig, OptimizerContext, OptimizerRule};
use datafusion_sql::analyzer::Analyzer;
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::Statement;
use datafusion_sql::sqlparser::dialect::GenericDialect;
Expand Down
1 change: 1 addition & 0 deletions datafusion/sql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ arrow-array = { workspace = true }
arrow-schema = { workspace = true }
datafusion-common = { workspace = true, default-features = true }
datafusion-expr = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
regex = { workspace = true }
sqlparser = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@

use crate::analyzer::AnalyzerRule;

use crate::utils::NamePreserver;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_common::Result;
use datafusion_expr::expr::{AggregateFunction, WindowFunction};
use datafusion_expr::expr_rewriter::NamePreserver;
use datafusion_expr::utils::COUNT_STAR_EXPANSION;
use datafusion_expr::{lit, Expr, LogicalPlan, WindowFunctionDefinition};

Expand Down Expand Up @@ -101,7 +101,6 @@ fn analyze_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
#[cfg(test)]
mod tests {
use super::*;
use crate::test::*;
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use datafusion_expr::expr::Sort;
Expand All @@ -114,6 +113,10 @@ mod tests {
use datafusion_functions_aggregate::expr_fn::max;
use std::sync::Arc;

use crate::test::{
assert_analyzed_plan_eq_display_indent, test_table_scan,
test_table_scan_with_name,
};
use datafusion_functions_aggregate::expr_fn::{count, sum};

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::sync::Arc;

use crate::AnalyzerRule;
use crate::analyzer::AnalyzerRule;
use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TransformedResult};
use datafusion_common::{Column, Result};
Expand Down Expand Up @@ -160,15 +160,14 @@ fn replace_columns(
mod tests {
use arrow::datatypes::{DataType, Field, Schema};

use super::*;
use crate::analyzer::Analyzer;
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
use crate::Analyzer;
use datafusion_common::{JoinType, TableReference};
use datafusion_expr::{
col, in_subquery, qualified_wildcard, table_scan, wildcard, LogicalPlanBuilder,
};

use super::*;

fn assert_plan_eq(plan: LogicalPlan, expected: &str) -> Result<()> {
assert_analyzed_plan_eq_display_indent(
Arc::new(ExpandWildcardRule::new()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@ use datafusion_common::config::ConfigOptions;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{DFSchema, Result};

use crate::utils::NamePreserver;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::expr_rewriter::{FunctionRewrite, NamePreserver};
use datafusion_expr::utils::merge_schema;
use datafusion_expr::LogicalPlan;
use std::sync::Arc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ mod tests {
use std::{borrow::Cow, sync::Arc, vec};

use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::test::assert_analyzed_plan_eq;

use crate::test::assert_analyzed_plan_eq;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_expr::{col, lit, Expr, LogicalPlan, LogicalPlanBuilder, TableSource};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use std::fmt::Debug;
use std::sync::Arc;

use log::debug;
use log::{debug, trace};

use datafusion_common::config::ConfigOptions;
use datafusion_common::instant::Instant;
Expand All @@ -36,7 +36,6 @@ use crate::analyzer::expand_wildcard_rule::ExpandWildcardRule;
use crate::analyzer::inline_table_scan::InlineTableScan;
use crate::analyzer::subquery::check_subquery_expr;
use crate::analyzer::type_coercion::TypeCoercion;
use crate::utils::log_plan;

use self::function_rewrite::ApplyFunctionRewrites;

Expand Down Expand Up @@ -191,3 +190,9 @@ fn check_plan(plan: &LogicalPlan) -> Result<()> {
})
.map(|_| ())
}

/// Log the plan in debug/tracing mode after some part of the optimizer runs
fn log_plan(description: &str, plan: &LogicalPlan) {
debug!("{description}:\n{}\n", plan.display_indent());
trace!("{description}::\n{}\n", plan.display_indent_schema());
}
Loading

0 comments on commit 68d696d

Please sign in to comment.