Skip to content

Commit

Permalink
rename eliminate_aggregate.rs as eliminate_distinct.rs
Browse files Browse the repository at this point in the history
implement as rewrite function
  • Loading branch information
Mert Akkaya committed Aug 2, 2024
1 parent c3257a6 commit 7484eee
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,51 @@
// under the License.

//! Optimizer rule to replaces redundant aggregations on a plan.
//! This saves time in planning and executing the query.
//! This reduces redundant Aggregations in PyhsicalPlan.
//!
//! This optimizer changes this kind of query
//!
//! SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;
//! to this
//! SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::tree_node::TreeNode;
use datafusion_common::tree_node::{Transformed};
use datafusion_common::Result;
use datafusion_expr::{logical_plan::LogicalPlan, Aggregate, Distinct, Join};
use std::ops::Deref;
use datafusion_expr::{logical_plan::LogicalPlan, Distinct};

#[derive(Default)]
pub struct EliminateAggregate {
group_bys: Vec<LogicalPlan>,
}
pub struct EliminateDistinct {}

impl EliminateAggregate {
impl EliminateDistinct {
#[allow(missing_docs)]
pub fn new() -> Self {
Self {
group_bys: Vec::new(),
}
Self {}
}
}

impl OptimizerRule for EliminateAggregate {
fn try_optimize(
impl OptimizerRule for EliminateDistinct {
fn name(&self) -> &str {
"eliminate_distinct"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}

fn supports_rewrite(&self) -> bool {
true
}

fn rewrite(
&self,
plan: &LogicalPlan,
plan: LogicalPlan,
_config: &dyn OptimizerConfig,
) -> Result<Option<LogicalPlan>> {
// SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;
// SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5;

// SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5;
// SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5;

// logical_plan
// Limit: skip=0, fetch=5
// --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
// ----TableScan: aggregate_test_100 projection=[c3]
//
// Limit: skip=0, fetch=5
// --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
// ----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]]
// ------TableScan: aggregate_test_100 projection=[c3]

) -> Result<
datafusion_common::tree_node::Transformed<LogicalPlan>,
datafusion_common::DataFusionError,
> {
match plan {
LogicalPlan::Distinct(Distinct::All(distinct)) => {
let fields = distinct.schema().fields();
Expand All @@ -70,10 +69,10 @@ impl OptimizerRule for EliminateAggregate {

for func_dep in func_deps.iter() {
if func_dep.source_indices == all_fields {
return Ok(Some(distinct.inputs()[0].clone()));
return Ok(Transformed::yes(distinct.inputs()[0].clone()));
}
}
return Ok(None);
return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All(distinct))));
}
LogicalPlan::Distinct(Distinct::On(distinct)) => {
let fields = distinct.schema.fields();
Expand All @@ -82,29 +81,19 @@ impl OptimizerRule for EliminateAggregate {

for func_dep in func_deps.iter() {
if func_dep.source_indices == all_fields {
return Ok(Some(distinct.input.as_ref().clone()));
return Ok(Transformed::yes(distinct.input.as_ref().clone()));
}
}
return Ok(None);
return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::On(distinct))));
}
LogicalPlan::Aggregate(Aggregate { .. }) => Ok(None),
LogicalPlan::Join(Join { .. }) => Ok(None),
_ => Ok(None),
_ => Ok(Transformed::no(plan)),
}
}

fn name(&self) -> &str {
"eliminate_aggregate"
}

fn apply_order(&self) -> Option<ApplyOrder> {
Some(ApplyOrder::BottomUp)
}
}

#[cfg(test)]
mod tests {
use crate::eliminate_aggregate::EliminateAggregate;
use crate::eliminate_distinct::EliminateDistinct;
use datafusion_common::Result;
use datafusion_expr::{
col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan,
Expand All @@ -115,7 +104,7 @@ mod tests {

fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq(
Arc::new(EliminateAggregate::new()),
Arc::new(EliminateDistinct::new()),
plan.clone(),
expected,
)
Expand Down Expand Up @@ -145,7 +134,7 @@ mod tests {
.build()?;

// No aggregate / scan / limit
let expected = "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test";
let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test";
assert_optimized_plan_equal(&plan, expected)
}
}
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub mod analyzer;
pub mod common_subexpr_eliminate;
pub mod decorrelate;
pub mod decorrelate_predicate_subquery;
pub mod eliminate_aggregate;
pub mod eliminate_distinct;
pub mod eliminate_cross_join;
pub mod eliminate_duplicated_expr;
pub mod eliminate_filter;
Expand Down
4 changes: 2 additions & 2 deletions datafusion/optimizer/src/optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use datafusion_expr::logical_plan::LogicalPlan;

use crate::common_subexpr_eliminate::CommonSubexprEliminate;
use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
use crate::eliminate_aggregate::EliminateAggregate;
use crate::eliminate_distinct::EliminateDistinct;
use crate::eliminate_cross_join::EliminateCrossJoin;
use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr;
use crate::eliminate_filter::EliminateFilter;
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Optimizer {
Arc::new(RewriteDisjunctivePredicate::new()),
Arc::new(EliminateDuplicatedExpr::new()),
Arc::new(EliminateFilter::new()),
Arc::new(EliminateAggregate::new()),
Arc::new(EliminateDistinct::new()),
Arc::new(EliminateCrossJoin::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Expand Down

0 comments on commit 7484eee

Please sign in to comment.