Skip to content

Commit

Permalink
change api design
Browse files Browse the repository at this point in the history
Signed-off-by: jayzhan211 <jayzhan211@gmail.com>
  • Loading branch information
jayzhan211 committed Mar 12, 2024
1 parent a40b0b7 commit bd3ed5f
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 70 deletions.
49 changes: 10 additions & 39 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ use crate::{
DropView, Explain, LogicalPlan, LogicalPlanBuilder, PlanType, SetVariable,
TableSource, TableType, ToStringifiedPlan, UNNAMED_TABLE,
},
optimizer::analyzer::{Analyzer, AnalyzerRule},
optimizer::analyzer::Analyzer,
optimizer::optimizer::{Optimizer, OptimizerConfig, OptimizerRule},
physical_optimizer::optimizer::{PhysicalOptimizer, PhysicalOptimizerRule},
physical_plan::{udaf::AggregateUDF, udf::ScalarUDF, ExecutionPlan},
Expand All @@ -69,6 +69,7 @@ use datafusion_common::{
OwnedTableReference, SchemaReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::analyzer::AnalyzerRuleRef;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expand Down Expand Up @@ -1702,12 +1703,6 @@ impl SessionState {
self
}

/// Override the `AnalyzerRule`s optimizer plan rules.
pub fn with_analyzer_rules(mut self, rules: Vec<AnalyzerRuleRef>) -> Self {
self.analyzer = Analyzer::with_rules(rules);
self
}

/// Replace the entire list of [`OptimizerRule`]s used to optimize plans
pub fn with_optimizer_rules(
mut self,
Expand Down Expand Up @@ -2242,23 +2237,14 @@ impl FunctionRegistry for SessionState {
})
}

/// Add `analyzer_rule` to the `index` of the list of
/// `AnalyzerRule`s used to rewrite queries.
fn register_analyzer_rule(
&mut self,
analyzer_rule: AnalyzerRuleRef,
index: usize,
) -> Result<()> {
if index > self.analyzer.rules.len() {
return exec_err!(
"index out of range, index: {index}, length: {:?}",
self.analyzer.rules.len()
);
}
let mut split = self.analyzer.rules.split_off(index);
self.analyzer.rules.push_back(analyzer_rule);
self.analyzer.rules.append(&mut split);
Ok(())
/// Override the `AnalyzerRule`s optimizer plan rules.
fn with_analyzer_rules(&mut self, rules: Vec<AnalyzerRuleRef>) {
self.analyzer = Analyzer::with_rules(rules);
}

/// return the existing analyzer rules
fn analyzer_rules(&self) -> Vec<AnalyzerRuleRef> {
self.analyzer.rules.clone()
}

fn register_udf(&mut self, udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> {
Expand Down Expand Up @@ -2314,21 +2300,6 @@ impl FunctionRegistry for SessionState {
}
Ok(udwf)
}

fn deregister_analyzer_rule(&mut self, index: usize) -> Result<()> {
if index > self.analyzer.rules.len() {
return exec_err!(
"index out of range, index: {index}, length: {:?}",
self.analyzer.rules.len()
);
}

let mut split = self.analyzer.rules.split_off(index);
// split off should includes the element at index
split.pop_front().unwrap();
self.analyzer.rules.append(&mut split);
Ok(())
}
}

impl OptimizerConfig for SessionState {
Expand Down
19 changes: 5 additions & 14 deletions datafusion/execution/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,13 @@ pub trait FunctionRegistry {
not_impl_err!("Registering WindowUDF")
}

/// Registers a new `AnalyzerRule` at the given index.
/// Returns an error if the index is out of bounds.
fn register_analyzer_rule(
&mut self,
_rule: AnalyzerRuleRef,
_index: usize,
) -> Result<()> {
not_impl_err!("Registering AnalyzerRule")
/// Return the existing analyzer rules
fn analyzer_rules(&self) -> Vec<AnalyzerRuleRef> {
vec![]
}

fn with_analyzer_rules(&mut self, _rules: Vec<AnalyzerRuleRef>) {}

/// Deregisters a [`ScalarUDF`], returning the implementation that was
/// deregistered.
///
Expand All @@ -104,12 +101,6 @@ pub trait FunctionRegistry {
fn deregister_udwf(&mut self, _name: &str) -> Result<Option<Arc<WindowUDF>>> {
not_impl_err!("Deregistering WindowUDF")
}

/// Deregsiters an `AnalyzerRule` at the given index.
/// Returns an error if the index is out of bounds.
fn deregister_analyzer_rule(&mut self, _index: usize) -> Result<()> {
not_impl_err!("DeRegistering AnalyzerRule")
}
}

/// Serializer and deserializer registry for extensions like [UserDefinedLogicalNode].
Expand Down
18 changes: 6 additions & 12 deletions datafusion/functions-array/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ mod utils;
use analyzer::rewrite::OperatorToFunction;
use datafusion_common::Result;
use datafusion_execution::FunctionRegistry;
use datafusion_expr::{analyzer::AnalyzerRuleRef, ScalarUDF};
use datafusion_expr::ScalarUDF;
use log::debug;
use std::sync::Arc;

Expand Down Expand Up @@ -97,18 +97,12 @@ pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
Ok(()) as Result<()>
})?;

let rules: Vec<(AnalyzerRuleRef, usize)> = vec![
// The order here should be adjusted based on the rules defined in the optimizer
//
// OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar),
// and TypeCoercion may cast the argument types from Scalar to List.
(Arc::new(OperatorToFunction::new()), 1),
];
let mut rules = registry.analyzer_rules();
// OperatorToFunction should be run before TypeCoercion, since it rewrite based on the argument types (List or Scalar),
// and TypeCoercion may cast the argument types from Scalar to List.
rules.insert(1, Arc::new(OperatorToFunction::new()));

rules.into_iter().try_for_each(|(rule, index)| {
registry.register_analyzer_rule(rule, index)?;
Ok(()) as Result<()>
})?;
registry.with_analyzer_rules(rules);

Ok(())
}
7 changes: 2 additions & 5 deletions datafusion/optimizer/src/analyzer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@ use datafusion_expr::expr::InSubquery;
use datafusion_expr::utils::inspect_expr_pre;
use datafusion_expr::{Expr, LogicalPlan};
use log::debug;
use std::collections::LinkedList;
use std::sync::Arc;

/// A rule-based Analyzer.
#[derive(Clone)]
pub struct Analyzer {
/// All rules to apply
pub rules: LinkedList<AnalyzerRuleRef>,
pub rules: Vec<AnalyzerRuleRef>,
}

impl Default for Analyzer {
Expand All @@ -66,9 +65,7 @@ impl Analyzer {

/// Create a new analyzer with the given rules
pub fn with_rules(rules: Vec<AnalyzerRuleRef>) -> Self {
Self {
rules: LinkedList::from_iter(rules),
}
Self { rules }
}

/// Analyze the logical plan by applying analyzer rules, and
Expand Down

0 comments on commit bd3ed5f

Please sign in to comment.