From 10000fb5bdb88a5e7a5cbfb81f3bd5d028bbeea7 Mon Sep 17 00:00:00 2001 From: metesynnada <100111937+metesynnada@users.noreply.github.com> Date: Thu, 22 Feb 2024 09:56:23 +0300 Subject: [PATCH 01/19] Delete docs.yaml --- .github/workflows/docs.yaml | 64 ------------------------------------- 1 file changed, 64 deletions(-) delete mode 100644 .github/workflows/docs.yaml diff --git a/.github/workflows/docs.yaml b/.github/workflows/docs.yaml deleted file mode 100644 index ab6a615ab60b..000000000000 --- a/.github/workflows/docs.yaml +++ /dev/null @@ -1,64 +0,0 @@ -on: - push: - branches: - - main - paths: - - .asf.yaml - - .github/workflows/docs.yaml - - docs/** - -name: Deploy DataFusion site - -jobs: - build-docs: - name: Build docs - runs-on: ubuntu-latest - steps: - - name: Checkout docs sources - uses: actions/checkout@v4 - - - name: Checkout asf-site branch - uses: actions/checkout@v4 - with: - ref: asf-site - path: asf-site - - - name: Setup Python - uses: actions/setup-python@v5 - with: - python-version: "3.10" - - - name: Install dependencies - run: | - set -x - python3 -m venv venv - source venv/bin/activate - pip install -r docs/requirements.txt - - - name: Build docs - run: | - set -x - source venv/bin/activate - cd docs - ./build.sh - - - name: Copy & push the generated HTML - run: | - set -x - cd asf-site/ - rsync \ - -a \ - --delete \ - --exclude '/.git/' \ - ../docs/build/html/ \ - ./ - cp ../.asf.yaml . - touch .nojekyll - git status --porcelain - if [ "$(git status --porcelain)" != "" ]; then - git config user.name "github-actions[bot]" - git config user.email "github-actions[bot]@users.noreply.github.com" - git add --all - git commit -m 'Publish built docs triggered by ${{ github.sha }}' - git push || git push --force - fi From a2e53c2402096b215487cdec51117a5bc0e73537 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Thu, 1 Aug 2024 18:11:16 +0300 Subject: [PATCH 02/19] initialize eliminate_aggregate.rs rule --- .../optimizer/src/eliminate_aggregate.rs | 158 ++++++++++++++++++ datafusion/optimizer/src/lib.rs | 1 + datafusion/optimizer/src/optimizer.rs | 2 + .../src/single_distinct_to_groupby.rs | 2 +- 4 files changed, 162 insertions(+), 1 deletion(-) create mode 100644 datafusion/optimizer/src/eliminate_aggregate.rs diff --git a/datafusion/optimizer/src/eliminate_aggregate.rs b/datafusion/optimizer/src/eliminate_aggregate.rs new file mode 100644 index 000000000000..7dd96fceafd4 --- /dev/null +++ b/datafusion/optimizer/src/eliminate_aggregate.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Optimizer rule to replaces redundant aggregations on a plan. +//! This saves time in planning and executing the query. + +use std::ops::Deref; +use crate::optimizer::ApplyOrder; +use datafusion_common::{Result}; +use datafusion_common::display::{ToStringifiedPlan}; +use datafusion_common::tree_node::TreeNode; +use datafusion_expr::{logical_plan::{LogicalPlan}, Aggregate, Join, Distinct}; +use crate::{OptimizerConfig, OptimizerRule}; + +#[derive(Default)] +pub struct EliminateAggregate{ + group_bys: Vec, +} + +impl EliminateAggregate { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {group_bys: Vec::new()} + } +} + +impl OptimizerRule for EliminateAggregate { + fn try_optimize( + &self, + plan: &LogicalPlan, + _config: &dyn OptimizerConfig, + ) -> Result> { + // SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; + // SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; + + // SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5; + // SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5; + + // logical_plan + // Limit: skip=0, fetch=5 + // --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] + // ----TableScan: aggregate_test_100 projection=[c3] + // + // Limit: skip=0, fetch=5 + // --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] + // ----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] + // ------TableScan: aggregate_test_100 projection=[c3] + + match plan { + LogicalPlan::Distinct(Distinct::All(distinct)) => { + let fields = distinct.schema().fields(); + let all_fields = (0..fields.len()).collect::>(); + let func_deps = distinct.schema().functional_dependencies().clone(); + + for func_dep in func_deps.iter() { + if func_dep.source_indices == all_fields { + return Ok(Some(distinct.inputs()[0].clone())) + } + } + return Ok(None) + + }, + LogicalPlan::Distinct(Distinct::On(plan)) => { + Ok(None) + }, + LogicalPlan::Aggregate(Aggregate { + input, + .. + }) => { + println!("Aggregate! -- Plan"); + println!("{}", plan.display_indent_schema()); + println!("Aggregate! -- Input"); + println!("{}", input.display_indent_schema()); + // self.clone().group_bys.push(plan.clone()); + Ok(None) + }, + LogicalPlan::Join(Join { + .. + }) => { + println!("Join!"); + println!("{}", plan.display_indent_schema()); + Ok(None) + } + _ => { + println!("Plan!"); + println!("{}", plan.display_indent_schema()); + Ok(None) + }, + } + } + + fn name(&self) -> &str { + "eliminate_aggregate" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } +} + +#[cfg(test)] +mod tests { + use crate::eliminate_aggregate::EliminateAggregate; + use datafusion_common::{Result}; + use datafusion_expr::{ + col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, + }; + use std::sync::Arc; + + use crate::test::*; + + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq(Arc::new(EliminateAggregate::new()), plan, expected) + } + + #[test] + fn eliminate_redundant_distinct_simple() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("c")], Vec::::new())? + .project(vec![col("c")])? + .distinct()? + .build()?; + + let expected = "Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; + // No aggregate / scan / limit + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_redundant_distinct_pair() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b")], Vec::::new())? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + // No aggregate / scan / limit + let expected = "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index b54facc5d682..3733934f9525 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -22,6 +22,7 @@ pub mod decorrelate_predicate_subquery; pub mod eliminate_cross_join; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; +pub mod eliminate_aggregate; pub mod eliminate_join; pub mod eliminate_limit; pub mod eliminate_nested_union; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 5a594e75dd8a..33159bd00f94 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -25,6 +25,7 @@ use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; use crate::eliminate_cross_join::EliminateCrossJoin; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; +use crate::eliminate_aggregate::EliminateAggregate; use crate::eliminate_join::EliminateJoin; use crate::eliminate_limit::EliminateLimit; use crate::eliminate_nested_union::EliminateNestedUnion; @@ -238,6 +239,7 @@ impl Optimizer { Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(EliminateDuplicatedExpr::new()), Arc::new(EliminateFilter::new()), + Arc::new(EliminateAggregate::new()), Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/optimizer/src/single_distinct_to_groupby.rs b/datafusion/optimizer/src/single_distinct_to_groupby.rs index 7e6fb6b355ab..00370a220e0e 100644 --- a/datafusion/optimizer/src/single_distinct_to_groupby.rs +++ b/datafusion/optimizer/src/single_distinct_to_groupby.rs @@ -38,7 +38,7 @@ use hashbrown::HashSet; /// single distinct to group by optimizer rule /// ```text /// Before: -/// SELECT a, COUNT(DINSTINCT b), SUM(c) +/// SELECT a, COUNT(DISTINCT b), SUM(c) /// FROM t /// GROUP BY a /// From 5598fb4275695f50293bbe0eea96026fd55efad5 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Thu, 1 Aug 2024 18:22:56 +0300 Subject: [PATCH 03/19] remove redundant prints --- .../optimizer/src/eliminate_aggregate.rs | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_aggregate.rs b/datafusion/optimizer/src/eliminate_aggregate.rs index 7dd96fceafd4..c7752b7ee4bf 100644 --- a/datafusion/optimizer/src/eliminate_aggregate.rs +++ b/datafusion/optimizer/src/eliminate_aggregate.rs @@ -72,32 +72,30 @@ impl OptimizerRule for EliminateAggregate { } } return Ok(None) - }, - LogicalPlan::Distinct(Distinct::On(plan)) => { - Ok(None) + LogicalPlan::Distinct(Distinct::On(distinct)) => { + let fields = distinct.schema.fields(); + let all_fields = (0..fields.len()).collect::>(); + let func_deps = distinct.schema.functional_dependencies().clone(); + + for func_dep in func_deps.iter() { + if func_dep.source_indices == all_fields { + return Ok(Some(distinct.input.as_ref().clone())) + } + } + return Ok(None) }, LogicalPlan::Aggregate(Aggregate { - input, .. }) => { - println!("Aggregate! -- Plan"); - println!("{}", plan.display_indent_schema()); - println!("Aggregate! -- Input"); - println!("{}", input.display_indent_schema()); - // self.clone().group_bys.push(plan.clone()); Ok(None) }, LogicalPlan::Join(Join { .. }) => { - println!("Join!"); - println!("{}", plan.display_indent_schema()); Ok(None) } _ => { - println!("Plan!"); - println!("{}", plan.display_indent_schema()); Ok(None) }, } @@ -124,7 +122,7 @@ mod tests { use crate::test::*; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq(Arc::new(EliminateAggregate::new()), plan, expected) + assert_optimized_plan_eq(Arc::new(EliminateAggregate::new()), plan.clone(), expected) } #[test] From c3257a6fe9497d18f6e774e0fda7034d91afba3b Mon Sep 17 00:00:00 2001 From: Mustafa Akur Date: Fri, 2 Aug 2024 09:23:47 +0300 Subject: [PATCH 04/19] Add multiple group by expression handling. --- .../common/src/functional_dependencies.rs | 36 +++++++------ .../optimizer/src/eliminate_aggregate.rs | 51 +++++++++---------- datafusion/optimizer/src/lib.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 2 +- 4 files changed, 46 insertions(+), 45 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 452f1862b274..577cb6d9e890 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -524,22 +524,28 @@ pub fn aggregate_functional_dependencies( } } - // If we have a single GROUP BY key, we can guarantee uniqueness after + // When we have a GROUP BY key, we can guarantee uniqueness after // aggregation: - if group_by_expr_names.len() == 1 { - // If `source_indices` contain 0, delete this functional dependency - // as it will be added anyway with mode `Dependency::Single`: - aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0)); - // Add a new functional dependency associated with the whole table: - aggregate_func_dependencies.push( - // Use nullable property of the group by expression - FunctionalDependence::new( - vec![0], - target_indices, - aggr_fields[0].is_nullable(), - ) - .with_mode(Dependency::Single), - ); + if !group_by_expr_names.is_empty() { + let source_indices = (0..group_by_expr_names.len()).collect::>(); + let nullable = source_indices + .iter() + .any(|idx| aggr_fields[*idx].is_nullable()); + // If `source_indices` is not already a determinant in the existing `aggregate_func_dependencies`. + if !aggregate_func_dependencies.iter().any(|item| { + // `item.source_indices` is a subset of the `source_indices`. In this case, we shouldn't add + // `source_indices` as `item.source_indices` defines this relation already. + item.source_indices + .iter() + .all(|idx| source_indices.contains(idx)) + }) { + // Add a new functional dependency associated with the whole table: + aggregate_func_dependencies.push( + // Use nullable property of the group by expression + FunctionalDependence::new(source_indices, target_indices, nullable) + .with_mode(Dependency::Single), + ); + } } FunctionalDependencies::new(aggregate_func_dependencies) } diff --git a/datafusion/optimizer/src/eliminate_aggregate.rs b/datafusion/optimizer/src/eliminate_aggregate.rs index c7752b7ee4bf..2e1f19540565 100644 --- a/datafusion/optimizer/src/eliminate_aggregate.rs +++ b/datafusion/optimizer/src/eliminate_aggregate.rs @@ -18,23 +18,25 @@ //! Optimizer rule to replaces redundant aggregations on a plan. //! This saves time in planning and executing the query. -use std::ops::Deref; use crate::optimizer::ApplyOrder; -use datafusion_common::{Result}; -use datafusion_common::display::{ToStringifiedPlan}; -use datafusion_common::tree_node::TreeNode; -use datafusion_expr::{logical_plan::{LogicalPlan}, Aggregate, Join, Distinct}; use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::display::ToStringifiedPlan; +use datafusion_common::tree_node::TreeNode; +use datafusion_common::Result; +use datafusion_expr::{logical_plan::LogicalPlan, Aggregate, Distinct, Join}; +use std::ops::Deref; #[derive(Default)] -pub struct EliminateAggregate{ +pub struct EliminateAggregate { group_bys: Vec, } impl EliminateAggregate { #[allow(missing_docs)] pub fn new() -> Self { - Self {group_bys: Vec::new()} + Self { + group_bys: Vec::new(), + } } } @@ -68,11 +70,11 @@ impl OptimizerRule for EliminateAggregate { for func_dep in func_deps.iter() { if func_dep.source_indices == all_fields { - return Ok(Some(distinct.inputs()[0].clone())) + return Ok(Some(distinct.inputs()[0].clone())); } } - return Ok(None) - }, + return Ok(None); + } LogicalPlan::Distinct(Distinct::On(distinct)) => { let fields = distinct.schema.fields(); let all_fields = (0..fields.len()).collect::>(); @@ -80,24 +82,14 @@ impl OptimizerRule for EliminateAggregate { for func_dep in func_deps.iter() { if func_dep.source_indices == all_fields { - return Ok(Some(distinct.input.as_ref().clone())) + return Ok(Some(distinct.input.as_ref().clone())); } } - return Ok(None) - }, - LogicalPlan::Aggregate(Aggregate { - .. - }) => { - Ok(None) - }, - LogicalPlan::Join(Join { - .. - }) => { - Ok(None) + return Ok(None); } - _ => { - Ok(None) - }, + LogicalPlan::Aggregate(Aggregate { .. }) => Ok(None), + LogicalPlan::Join(Join { .. }) => Ok(None), + _ => Ok(None), } } @@ -113,7 +105,7 @@ impl OptimizerRule for EliminateAggregate { #[cfg(test)] mod tests { use crate::eliminate_aggregate::EliminateAggregate; - use datafusion_common::{Result}; + use datafusion_common::Result; use datafusion_expr::{ col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, }; @@ -122,7 +114,11 @@ mod tests { use crate::test::*; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq(Arc::new(EliminateAggregate::new()), plan.clone(), expected) + assert_optimized_plan_eq( + Arc::new(EliminateAggregate::new()), + plan.clone(), + expected, + ) } #[test] @@ -152,5 +148,4 @@ mod tests { let expected = "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } - } diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index f38aca251899..a0a3301f9a33 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -34,11 +34,11 @@ pub mod analyzer; pub mod common_subexpr_eliminate; pub mod decorrelate; pub mod decorrelate_predicate_subquery; +pub mod eliminate_aggregate; pub mod eliminate_cross_join; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; pub mod eliminate_group_by_constant; -pub mod eliminate_aggregate; pub mod eliminate_join; pub mod eliminate_limit; pub mod eliminate_nested_union; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index ba5026822a77..9830cf58a92c 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -33,11 +33,11 @@ use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; +use crate::eliminate_aggregate::EliminateAggregate; use crate::eliminate_cross_join::EliminateCrossJoin; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; use crate::eliminate_group_by_constant::EliminateGroupByConstant; -use crate::eliminate_aggregate::EliminateAggregate; use crate::eliminate_join::EliminateJoin; use crate::eliminate_limit::EliminateLimit; use crate::eliminate_nested_union::EliminateNestedUnion; From 7484eee78c0e560b041d81b3b5ed2805f01f0cf7 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 09:43:24 +0300 Subject: [PATCH 05/19] rename eliminate_aggregate.rs as eliminate_distinct.rs implement as rewrite function --- ...ate_aggregate.rs => eliminate_distinct.rs} | 89 ++++++++----------- datafusion/optimizer/src/lib.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 4 +- 3 files changed, 42 insertions(+), 53 deletions(-) rename datafusion/optimizer/src/{eliminate_aggregate.rs => eliminate_distinct.rs} (64%) diff --git a/datafusion/optimizer/src/eliminate_aggregate.rs b/datafusion/optimizer/src/eliminate_distinct.rs similarity index 64% rename from datafusion/optimizer/src/eliminate_aggregate.rs rename to datafusion/optimizer/src/eliminate_distinct.rs index 2e1f19540565..e8f71ff1a091 100644 --- a/datafusion/optimizer/src/eliminate_aggregate.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -16,52 +16,51 @@ // under the License. //! Optimizer rule to replaces redundant aggregations on a plan. -//! This saves time in planning and executing the query. +//! This reduces redundant Aggregations in PyhsicalPlan. +//! +//! This optimizer changes this kind of query +//! +//! SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; +//! to this +//! SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::display::ToStringifiedPlan; -use datafusion_common::tree_node::TreeNode; +use datafusion_common::tree_node::{Transformed}; use datafusion_common::Result; -use datafusion_expr::{logical_plan::LogicalPlan, Aggregate, Distinct, Join}; -use std::ops::Deref; +use datafusion_expr::{logical_plan::LogicalPlan, Distinct}; #[derive(Default)] -pub struct EliminateAggregate { - group_bys: Vec, -} +pub struct EliminateDistinct {} -impl EliminateAggregate { +impl EliminateDistinct { #[allow(missing_docs)] pub fn new() -> Self { - Self { - group_bys: Vec::new(), - } + Self {} } } -impl OptimizerRule for EliminateAggregate { - fn try_optimize( +impl OptimizerRule for EliminateDistinct { + fn name(&self) -> &str { + "eliminate_distinct" + } + + fn apply_order(&self) -> Option { + Some(ApplyOrder::BottomUp) + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( &self, - plan: &LogicalPlan, + plan: LogicalPlan, _config: &dyn OptimizerConfig, - ) -> Result> { - // SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; - // SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; - - // SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5; - // SELECT DISTINCT c3 FROM (SELECT c3 FROM a1 GROUP BY c3) GROUP BY c3 LIMIT 5; - - // logical_plan - // Limit: skip=0, fetch=5 - // --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] - // ----TableScan: aggregate_test_100 projection=[c3] - // - // Limit: skip=0, fetch=5 - // --Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] - // ----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] - // ------TableScan: aggregate_test_100 projection=[c3] - + ) -> Result< + datafusion_common::tree_node::Transformed, + datafusion_common::DataFusionError, + > { match plan { LogicalPlan::Distinct(Distinct::All(distinct)) => { let fields = distinct.schema().fields(); @@ -70,10 +69,10 @@ impl OptimizerRule for EliminateAggregate { for func_dep in func_deps.iter() { if func_dep.source_indices == all_fields { - return Ok(Some(distinct.inputs()[0].clone())); + return Ok(Transformed::yes(distinct.inputs()[0].clone())); } } - return Ok(None); + return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All(distinct)))); } LogicalPlan::Distinct(Distinct::On(distinct)) => { let fields = distinct.schema.fields(); @@ -82,29 +81,19 @@ impl OptimizerRule for EliminateAggregate { for func_dep in func_deps.iter() { if func_dep.source_indices == all_fields { - return Ok(Some(distinct.input.as_ref().clone())); + return Ok(Transformed::yes(distinct.input.as_ref().clone())); } } - return Ok(None); + return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::On(distinct)))); } - LogicalPlan::Aggregate(Aggregate { .. }) => Ok(None), - LogicalPlan::Join(Join { .. }) => Ok(None), - _ => Ok(None), + _ => Ok(Transformed::no(plan)), } } - - fn name(&self) -> &str { - "eliminate_aggregate" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::BottomUp) - } } #[cfg(test)] mod tests { - use crate::eliminate_aggregate::EliminateAggregate; + use crate::eliminate_distinct::EliminateDistinct; use datafusion_common::Result; use datafusion_expr::{ col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, @@ -115,7 +104,7 @@ mod tests { fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq( - Arc::new(EliminateAggregate::new()), + Arc::new(EliminateDistinct::new()), plan.clone(), expected, ) @@ -145,7 +134,7 @@ mod tests { .build()?; // No aggregate / scan / limit - let expected = "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index a0a3301f9a33..bddaebf47d47 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -34,7 +34,7 @@ pub mod analyzer; pub mod common_subexpr_eliminate; pub mod decorrelate; pub mod decorrelate_predicate_subquery; -pub mod eliminate_aggregate; +pub mod eliminate_distinct; pub mod eliminate_cross_join; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 9830cf58a92c..1cccf42dcba4 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -33,7 +33,7 @@ use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; -use crate::eliminate_aggregate::EliminateAggregate; +use crate::eliminate_distinct::EliminateDistinct; use crate::eliminate_cross_join::EliminateCrossJoin; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; @@ -258,7 +258,7 @@ impl Optimizer { Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(EliminateDuplicatedExpr::new()), Arc::new(EliminateFilter::new()), - Arc::new(EliminateAggregate::new()), + Arc::new(EliminateDistinct::new()), Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), From 37674adb81b4ce12e76d98902b6e5ac42bfbc910 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 09:58:01 +0300 Subject: [PATCH 06/19] remove logic for distinct on since group by statement must exist in projection --- datafusion/optimizer/src/eliminate_distinct.rs | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index e8f71ff1a091..d22f39d57796 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -74,18 +74,6 @@ impl OptimizerRule for EliminateDistinct { } return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All(distinct)))); } - LogicalPlan::Distinct(Distinct::On(distinct)) => { - let fields = distinct.schema.fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = distinct.schema.functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.input.as_ref().clone())); - } - } - return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::On(distinct)))); - } _ => Ok(Transformed::no(plan)), } } From 06b47ec5f0a9b012390dc1a5e50f98c8ab6ac591 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 10:14:29 +0300 Subject: [PATCH 07/19] format code --- datafusion/optimizer/src/eliminate_distinct.rs | 9 ++++++--- datafusion/optimizer/src/lib.rs | 2 +- datafusion/optimizer/src/optimizer.rs | 2 +- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index d22f39d57796..a11ef11fff34 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -26,7 +26,7 @@ use crate::optimizer::ApplyOrder; use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::{Transformed}; +use datafusion_common::tree_node::Transformed; use datafusion_common::Result; use datafusion_expr::{logical_plan::LogicalPlan, Distinct}; @@ -72,7 +72,9 @@ impl OptimizerRule for EliminateDistinct { return Ok(Transformed::yes(distinct.inputs()[0].clone())); } } - return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All(distinct)))); + Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( + distinct, + )))) } _ => Ok(Transformed::no(plan)), } @@ -122,7 +124,8 @@ mod tests { .build()?; // No aggregate / scan / limit - let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + let expected = + "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } } diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index bddaebf47d47..97f9d6f29037 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -34,8 +34,8 @@ pub mod analyzer; pub mod common_subexpr_eliminate; pub mod decorrelate; pub mod decorrelate_predicate_subquery; -pub mod eliminate_distinct; pub mod eliminate_cross_join; +pub mod eliminate_distinct; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; pub mod eliminate_group_by_constant; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1cccf42dcba4..ea090bad73ea 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -33,8 +33,8 @@ use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; -use crate::eliminate_distinct::EliminateDistinct; use crate::eliminate_cross_join::EliminateCrossJoin; +use crate::eliminate_distinct::EliminateDistinct; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; use crate::eliminate_group_by_constant::EliminateGroupByConstant; From 289d157c771fd10bd6cc68a1ddad5b31dee8b32b Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 10:36:22 +0300 Subject: [PATCH 08/19] add eliminate_distinct rule to tests --- datafusion/sqllogictest/test_files/explain.slt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 5a1733460120..160a08371a3a 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -193,6 +193,7 @@ logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after rewrite_disjunctive_predicate SAME TEXT AS ABOVE logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE logical_plan after eliminate_filter SAME TEXT AS ABOVE +logical_plan after eliminate_distinct SAME TEXT AS ABOVE logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE @@ -220,6 +221,7 @@ logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after rewrite_disjunctive_predicate SAME TEXT AS ABOVE logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE logical_plan after eliminate_filter SAME TEXT AS ABOVE +logical_plan after eliminate_distinct SAME TEXT AS ABOVE logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE From a8e1b053e9bcc41bdcc5e138d417b3748e817bb0 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 11:41:02 +0300 Subject: [PATCH 09/19] simplify function add additional tests for not removing cases --- .../optimizer/src/eliminate_distinct.rs | 62 ++++++++++++------- 1 file changed, 41 insertions(+), 21 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index a11ef11fff34..6a583809bf40 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -57,27 +57,23 @@ impl OptimizerRule for EliminateDistinct { &self, plan: LogicalPlan, _config: &dyn OptimizerConfig, - ) -> Result< - datafusion_common::tree_node::Transformed, - datafusion_common::DataFusionError, - > { - match plan { - LogicalPlan::Distinct(Distinct::All(distinct)) => { - let fields = distinct.schema().fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = distinct.schema().functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.inputs()[0].clone())); - } - } - Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( - distinct, - )))) + ) -> Result, datafusion_common::DataFusionError> { + let LogicalPlan::Distinct(Distinct::All(distinct)) = plan else { + return Ok(Transformed::no(plan)); + }; + + let fields = distinct.schema().fields(); + let all_fields = (0..fields.len()).collect::>(); + let func_deps = distinct.schema().functional_dependencies().clone(); + + for func_dep in func_deps.iter() { + if func_dep.source_indices == all_fields { + return Ok(Transformed::yes(distinct.inputs()[0].clone())); } - _ => Ok(Transformed::no(plan)), } + Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( + distinct, + )))) } } @@ -110,7 +106,6 @@ mod tests { .build()?; let expected = "Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; - // No aggregate / scan / limit assert_optimized_plan_equal(&plan, expected) } @@ -123,9 +118,34 @@ mod tests { .distinct()? .build()?; - // No aggregate / scan / limit let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } + + #[test] + fn do_not_eliminate_distinct() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = "Distinct:\n Projection: test.a, test.b\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn do_not_eliminate_distinct_with_aggr() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b"), col("c")], Vec::::new())? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = + "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } } From b381368a8bffdd0294cff8286a91de28a94c7efe Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 13:39:53 +0300 Subject: [PATCH 10/19] fix child issue --- datafusion/optimizer/src/eliminate_distinct.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index 6a583809bf40..be33ec6d956e 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -68,7 +68,7 @@ impl OptimizerRule for EliminateDistinct { for func_dep in func_deps.iter() { if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.inputs()[0].clone())); + return Ok(Transformed::yes(distinct.as_ref().clone())); } } Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( @@ -85,7 +85,7 @@ mod tests { col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, }; use std::sync::Arc; - + use datafusion_functions_aggregate::sum::sum; use crate::test::*; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { @@ -105,7 +105,7 @@ mod tests { .distinct()? .build()?; - let expected = "Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; + let expected = "Projection: test.c\n Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } @@ -119,7 +119,7 @@ mod tests { .build()?; let expected = - "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + "Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } @@ -139,13 +139,13 @@ mod tests { fn do_not_eliminate_distinct_with_aggr() -> Result<()> { let table_scan = test_table_scan().unwrap(); let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a"), col("b"), col("c")], Vec::::new())? + .aggregate(vec![col("a"), col("b"), col("c")], vec![sum(col("c"))])? .project(vec![col("a"), col("b")])? .distinct()? .build()?; let expected = - "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[]]\n TableScan: test"; + "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[sum(test.c)]]\n TableScan: test"; assert_optimized_plan_equal(&plan, expected) } } From d2da8211b390f24b7fbbfa99ad4d1ac1d1e34ade Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 13:42:30 +0300 Subject: [PATCH 11/19] format --- datafusion/optimizer/src/eliminate_distinct.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index be33ec6d956e..1c123fb29dcd 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -80,13 +80,13 @@ impl OptimizerRule for EliminateDistinct { #[cfg(test)] mod tests { use crate::eliminate_distinct::EliminateDistinct; + use crate::test::*; use datafusion_common::Result; use datafusion_expr::{ col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, }; - use std::sync::Arc; use datafusion_functions_aggregate::sum::sum; - use crate::test::*; + use std::sync::Arc; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq( From 1bd49fd1a9bb05348052c7f3b75ab0e3411b4586 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 13:53:15 +0300 Subject: [PATCH 12/19] fix docs --- datafusion/optimizer/src/eliminate_distinct.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs index 1c123fb29dcd..fd646cf1616a 100644 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ b/datafusion/optimizer/src/eliminate_distinct.rs @@ -16,10 +16,8 @@ // under the License. //! Optimizer rule to replaces redundant aggregations on a plan. -//! This reduces redundant Aggregations in PyhsicalPlan. //! //! This optimizer changes this kind of query -//! //! SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; //! to this //! SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; From d890715173fed630fee25f6013a760a2ca0e8eea Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 14:51:24 +0300 Subject: [PATCH 13/19] remove eliminate_distinct rule and make it a part of replace_distinct_aggregate --- .../optimizer/src/eliminate_distinct.rs | 149 ------------------ datafusion/optimizer/src/lib.rs | 1 - datafusion/optimizer/src/optimizer.rs | 2 - .../src/replace_distinct_aggregate.rs | 87 ++++++++++ .../sqllogictest/test_files/explain.slt | 2 - 5 files changed, 87 insertions(+), 154 deletions(-) delete mode 100644 datafusion/optimizer/src/eliminate_distinct.rs diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs deleted file mode 100644 index fd646cf1616a..000000000000 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ /dev/null @@ -1,149 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Optimizer rule to replaces redundant aggregations on a plan. -//! -//! This optimizer changes this kind of query -//! SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; -//! to this -//! SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; - -use crate::optimizer::ApplyOrder; -use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::Transformed; -use datafusion_common::Result; -use datafusion_expr::{logical_plan::LogicalPlan, Distinct}; - -#[derive(Default)] -pub struct EliminateDistinct {} - -impl EliminateDistinct { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl OptimizerRule for EliminateDistinct { - fn name(&self) -> &str { - "eliminate_distinct" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::BottomUp) - } - - fn supports_rewrite(&self) -> bool { - true - } - - fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> Result, datafusion_common::DataFusionError> { - let LogicalPlan::Distinct(Distinct::All(distinct)) = plan else { - return Ok(Transformed::no(plan)); - }; - - let fields = distinct.schema().fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = distinct.schema().functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.as_ref().clone())); - } - } - Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All( - distinct, - )))) - } -} - -#[cfg(test)] -mod tests { - use crate::eliminate_distinct::EliminateDistinct; - use crate::test::*; - use datafusion_common::Result; - use datafusion_expr::{ - col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, - }; - use datafusion_functions_aggregate::sum::sum; - use std::sync::Arc; - - fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq( - Arc::new(EliminateDistinct::new()), - plan.clone(), - expected, - ) - } - - #[test] - fn eliminate_redundant_distinct_simple() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("c")], Vec::::new())? - .project(vec![col("c")])? - .distinct()? - .build()?; - - let expected = "Projection: test.c\n Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) - } - - #[test] - fn eliminate_redundant_distinct_pair() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a"), col("b")], Vec::::new())? - .project(vec![col("a"), col("b")])? - .distinct()? - .build()?; - - let expected = - "Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) - } - - #[test] - fn do_not_eliminate_distinct() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .project(vec![col("a"), col("b")])? - .distinct()? - .build()?; - - let expected = "Distinct:\n Projection: test.a, test.b\n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) - } - - #[test] - fn do_not_eliminate_distinct_with_aggr() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a"), col("b"), col("c")], vec![sum(col("c"))])? - .project(vec![col("a"), col("b")])? - .distinct()? - .build()?; - - let expected = - "Distinct:\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[sum(test.c)]]\n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) - } -} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 97f9d6f29037..3b1df3510d2a 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -35,7 +35,6 @@ pub mod common_subexpr_eliminate; pub mod decorrelate; pub mod decorrelate_predicate_subquery; pub mod eliminate_cross_join; -pub mod eliminate_distinct; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; pub mod eliminate_group_by_constant; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index ea090bad73ea..93923a4e1e74 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -34,7 +34,6 @@ use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; use crate::eliminate_cross_join::EliminateCrossJoin; -use crate::eliminate_distinct::EliminateDistinct; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; use crate::eliminate_group_by_constant::EliminateGroupByConstant; @@ -258,7 +257,6 @@ impl Optimizer { Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(EliminateDuplicatedExpr::new()), Arc::new(EliminateFilter::new()), - Arc::new(EliminateDistinct::new()), Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 430517121f2a..22d6ef066620 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -77,6 +77,20 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { match plan { LogicalPlan::Distinct(Distinct::All(input)) => { let group_expr = expand_wildcard(input.schema(), &input, None)?; + + let fields = input.schema().fields(); + let all_fields = (0..fields.len()).collect::>(); + let func_deps = input.schema().functional_dependencies().clone(); + + for func_dep in func_deps.iter() { + // Means distinct is exactly same with below Group By + // so delete the redundant distinct + if func_dep.source_indices == all_fields { + return Ok(Transformed::yes(input.as_ref().clone())); + } + } + + // Replace with Aggregation let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new( input, group_expr, @@ -165,3 +179,76 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { Some(BottomUp) } } + +#[cfg(test)] +mod tests { + use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; + use crate::test::*; + use datafusion_common::Result; + use datafusion_expr::{ + col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, + }; + use datafusion_functions_aggregate::sum::sum; + use std::sync::Arc; + + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq( + Arc::new(ReplaceDistinctWithAggregate::new()), + plan.clone(), + expected, + ) + } + + #[test] + fn eliminate_redundant_distinct_simple() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("c")], Vec::::new())? + .project(vec![col("c")])? + .distinct()? + .build()?; + + let expected = "Projection: test.c\n Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_redundant_distinct_pair() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b")], Vec::::new())? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = + "Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn do_not_eliminate_distinct() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn do_not_eliminate_distinct_with_aggr() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b"), col("c")], vec![sum(col("c"))])? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = + "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[sum(test.c)]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } +} diff --git a/datafusion/sqllogictest/test_files/explain.slt b/datafusion/sqllogictest/test_files/explain.slt index 160a08371a3a..5a1733460120 100644 --- a/datafusion/sqllogictest/test_files/explain.slt +++ b/datafusion/sqllogictest/test_files/explain.slt @@ -193,7 +193,6 @@ logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after rewrite_disjunctive_predicate SAME TEXT AS ABOVE logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE logical_plan after eliminate_filter SAME TEXT AS ABOVE -logical_plan after eliminate_distinct SAME TEXT AS ABOVE logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE @@ -221,7 +220,6 @@ logical_plan after simplify_expressions SAME TEXT AS ABOVE logical_plan after rewrite_disjunctive_predicate SAME TEXT AS ABOVE logical_plan after eliminate_duplicated_expr SAME TEXT AS ABOVE logical_plan after eliminate_filter SAME TEXT AS ABOVE -logical_plan after eliminate_distinct SAME TEXT AS ABOVE logical_plan after eliminate_cross_join SAME TEXT AS ABOVE logical_plan after common_sub_expression_eliminate SAME TEXT AS ABOVE logical_plan after eliminate_limit SAME TEXT AS ABOVE From a200df9fc7e2c20d755dd1024fcd9e70e125784d Mon Sep 17 00:00:00 2001 From: mertak-synnada Date: Fri, 2 Aug 2024 15:00:13 +0300 Subject: [PATCH 14/19] Update datafusion/common/src/functional_dependencies.rs Co-authored-by: Mehmet Ozan Kabak --- datafusion/common/src/functional_dependencies.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 577cb6d9e890..d135ac4a87e2 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -527,21 +527,20 @@ pub fn aggregate_functional_dependencies( // When we have a GROUP BY key, we can guarantee uniqueness after // aggregation: if !group_by_expr_names.is_empty() { - let source_indices = (0..group_by_expr_names.len()).collect::>(); + let count = group_by_expr_names.len(); let nullable = source_indices .iter() .any(|idx| aggr_fields[*idx].is_nullable()); - // If `source_indices` is not already a determinant in the existing `aggregate_func_dependencies`. + // If GROUP BY expressions do not already act as a determinant: if !aggregate_func_dependencies.iter().any(|item| { - // `item.source_indices` is a subset of the `source_indices`. In this case, we shouldn't add - // `source_indices` as `item.source_indices` defines this relation already. - item.source_indices - .iter() - .all(|idx| source_indices.contains(idx)) + // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add + // them since `item.source_indices` defines this relation already. + item.source_indices.iter().all(|idx| idx < count) }) { // Add a new functional dependency associated with the whole table: aggregate_func_dependencies.push( - // Use nullable property of the group by expression + // Use nullable property of the GROUP BY expression: + let source_indices = (0..count).collect::>(); FunctionalDependence::new(source_indices, target_indices, nullable) .with_mode(Dependency::Single), ); From a59288514c72f522f6092d87d8b783ad11c281e6 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 15:07:57 +0300 Subject: [PATCH 15/19] add comment and fix variable call --- datafusion/common/src/functional_dependencies.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index d135ac4a87e2..fe0c72876a49 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -528,6 +528,7 @@ pub fn aggregate_functional_dependencies( // aggregation: if !group_by_expr_names.is_empty() { let count = group_by_expr_names.len(); + let source_indices = (0..count).collect::>(); let nullable = source_indices .iter() .any(|idx| aggr_fields[*idx].is_nullable()); @@ -535,12 +536,16 @@ pub fn aggregate_functional_dependencies( if !aggregate_func_dependencies.iter().any(|item| { // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add // them since `item.source_indices` defines this relation already. - item.source_indices.iter().all(|idx| idx < count) + + // This simple count comparison is working well because + // GROUP BY statement comes here as a prefix + // It is guaranteed that group by indices would cover the range: [0..count] + item.source_indices.iter().all(|idx| idx < &count) }) { // Add a new functional dependency associated with the whole table: + // Use nullable property of the GROUP BY expression: aggregate_func_dependencies.push( // Use nullable property of the GROUP BY expression: - let source_indices = (0..count).collect::>(); FunctionalDependence::new(source_indices, target_indices, nullable) .with_mode(Dependency::Single), ); From 650613d93c00d669c6c8da8116306f701a038bd2 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 15:38:00 +0300 Subject: [PATCH 16/19] fix test cases as optimized plan --- .../src/replace_distinct_aggregate.rs | 19 +++++++++---------- .../sqllogictest/test_files/aggregate.slt | 18 ++++-------------- 2 files changed, 13 insertions(+), 24 deletions(-) diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 22d6ef066620..b085802f7845 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -78,19 +78,16 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { LogicalPlan::Distinct(Distinct::All(input)) => { let group_expr = expand_wildcard(input.schema(), &input, None)?; - let fields = input.schema().fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = input.schema().functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - // Means distinct is exactly same with below Group By - // so delete the redundant distinct - if func_dep.source_indices == all_fields { + let field_count = input.schema().fields().len(); + for dep in input.schema().functional_dependencies().iter() { + // If distinct is exactly the same with a previous GROUP BY, we can + // simply remove it: + if dep.source_indices[..field_count].iter().enumerate().all(|(idx, f_idx)| idx == *f_idx) { return Ok(Transformed::yes(input.as_ref().clone())); } } - // Replace with Aggregation + // Replace with aggregation: let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new( input, group_expr, @@ -182,14 +179,16 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { #[cfg(test)] mod tests { + use std::sync::Arc; + use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; use crate::test::*; + use datafusion_common::Result; use datafusion_expr::{ col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, }; use datafusion_functions_aggregate::sum::sum; - use std::sync::Arc; fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { assert_optimized_plan_eq( diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index ee72289d66eb..abeeb767b948 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -4536,19 +4536,14 @@ EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; logical_plan 01)Limit: skip=0, fetch=5 02)--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -04)------TableScan: aggregate_test_100 projection=[c3] +03)----TableScan: aggregate_test_100 projection=[c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5] -07)------------CoalescePartitionsExec -08)--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true query I SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; @@ -4699,19 +4694,14 @@ EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; logical_plan 01)Limit: skip=0, fetch=5 02)--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -04)------TableScan: aggregate_test_100 projection=[c3] +03)----TableScan: aggregate_test_100 projection=[c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[] -07)------------CoalescePartitionsExec -08)--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true statement ok set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true; From 9341adf3f22b73e1747549d87dea8ecafa91dc84 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 15:41:13 +0300 Subject: [PATCH 17/19] format code --- datafusion/optimizer/src/replace_distinct_aggregate.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index b085802f7845..f73eeacfbf0e 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -82,7 +82,11 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { for dep in input.schema().functional_dependencies().iter() { // If distinct is exactly the same with a previous GROUP BY, we can // simply remove it: - if dep.source_indices[..field_count].iter().enumerate().all(|(idx, f_idx)| idx == *f_idx) { + if dep.source_indices[..field_count] + .iter() + .enumerate() + .all(|(idx, f_idx)| idx == *f_idx) + { return Ok(Transformed::yes(input.as_ref().clone())); } } From cea959587c5f1195cb538a38416fa07a772af7f5 Mon Sep 17 00:00:00 2001 From: mertak-synnada Date: Fri, 2 Aug 2024 15:52:22 +0300 Subject: [PATCH 18/19] simplify comments Co-authored-by: Mehmet Ozan Kabak --- datafusion/common/src/functional_dependencies.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index fe0c72876a49..666ea73027b3 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -537,9 +537,8 @@ pub fn aggregate_functional_dependencies( // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add // them since `item.source_indices` defines this relation already. - // This simple count comparison is working well because - // GROUP BY statement comes here as a prefix - // It is guaranteed that group by indices would cover the range: [0..count] + // The following simple comparison is working well because + // GROUP BY expressions come here as a prefix. item.source_indices.iter().all(|idx| idx < &count) }) { // Add a new functional dependency associated with the whole table: From 8155b4e81b605010f1839128c2687e9753d600c6 Mon Sep 17 00:00:00 2001 From: Mert Akkaya Date: Fri, 2 Aug 2024 09:58:01 +0300 Subject: [PATCH 19/19] do not replace redundant distincts with aggregate --- .../common/src/functional_dependencies.rs | 20 ++- .../optimizer/src/eliminate_distinct.rs | 140 ------------------ datafusion/optimizer/src/lib.rs | 1 - datafusion/optimizer/src/optimizer.rs | 2 - .../src/replace_distinct_aggregate.rs | 90 +++++++++++ .../sqllogictest/test_files/aggregate.slt | 18 +-- 6 files changed, 106 insertions(+), 165 deletions(-) delete mode 100644 datafusion/optimizer/src/eliminate_distinct.rs diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 577cb6d9e890..fe0c72876a49 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -527,21 +527,25 @@ pub fn aggregate_functional_dependencies( // When we have a GROUP BY key, we can guarantee uniqueness after // aggregation: if !group_by_expr_names.is_empty() { - let source_indices = (0..group_by_expr_names.len()).collect::>(); + let count = group_by_expr_names.len(); + let source_indices = (0..count).collect::>(); let nullable = source_indices .iter() .any(|idx| aggr_fields[*idx].is_nullable()); - // If `source_indices` is not already a determinant in the existing `aggregate_func_dependencies`. + // If GROUP BY expressions do not already act as a determinant: if !aggregate_func_dependencies.iter().any(|item| { - // `item.source_indices` is a subset of the `source_indices`. In this case, we shouldn't add - // `source_indices` as `item.source_indices` defines this relation already. - item.source_indices - .iter() - .all(|idx| source_indices.contains(idx)) + // If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add + // them since `item.source_indices` defines this relation already. + + // This simple count comparison is working well because + // GROUP BY statement comes here as a prefix + // It is guaranteed that group by indices would cover the range: [0..count] + item.source_indices.iter().all(|idx| idx < &count) }) { // Add a new functional dependency associated with the whole table: + // Use nullable property of the GROUP BY expression: aggregate_func_dependencies.push( - // Use nullable property of the group by expression + // Use nullable property of the GROUP BY expression: FunctionalDependence::new(source_indices, target_indices, nullable) .with_mode(Dependency::Single), ); diff --git a/datafusion/optimizer/src/eliminate_distinct.rs b/datafusion/optimizer/src/eliminate_distinct.rs deleted file mode 100644 index e8f71ff1a091..000000000000 --- a/datafusion/optimizer/src/eliminate_distinct.rs +++ /dev/null @@ -1,140 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Optimizer rule to replaces redundant aggregations on a plan. -//! This reduces redundant Aggregations in PyhsicalPlan. -//! -//! This optimizer changes this kind of query -//! -//! SELECT DISTINCT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; -//! to this -//! SELECT c3 FROM aggregate_test_100 GROUP BY c3 LIMIT 5; - -use crate::optimizer::ApplyOrder; -use crate::{OptimizerConfig, OptimizerRule}; -use datafusion_common::tree_node::{Transformed}; -use datafusion_common::Result; -use datafusion_expr::{logical_plan::LogicalPlan, Distinct}; - -#[derive(Default)] -pub struct EliminateDistinct {} - -impl EliminateDistinct { - #[allow(missing_docs)] - pub fn new() -> Self { - Self {} - } -} - -impl OptimizerRule for EliminateDistinct { - fn name(&self) -> &str { - "eliminate_distinct" - } - - fn apply_order(&self) -> Option { - Some(ApplyOrder::BottomUp) - } - - fn supports_rewrite(&self) -> bool { - true - } - - fn rewrite( - &self, - plan: LogicalPlan, - _config: &dyn OptimizerConfig, - ) -> Result< - datafusion_common::tree_node::Transformed, - datafusion_common::DataFusionError, - > { - match plan { - LogicalPlan::Distinct(Distinct::All(distinct)) => { - let fields = distinct.schema().fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = distinct.schema().functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.inputs()[0].clone())); - } - } - return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::All(distinct)))); - } - LogicalPlan::Distinct(Distinct::On(distinct)) => { - let fields = distinct.schema.fields(); - let all_fields = (0..fields.len()).collect::>(); - let func_deps = distinct.schema.functional_dependencies().clone(); - - for func_dep in func_deps.iter() { - if func_dep.source_indices == all_fields { - return Ok(Transformed::yes(distinct.input.as_ref().clone())); - } - } - return Ok(Transformed::no(LogicalPlan::Distinct(Distinct::On(distinct)))); - } - _ => Ok(Transformed::no(plan)), - } - } -} - -#[cfg(test)] -mod tests { - use crate::eliminate_distinct::EliminateDistinct; - use datafusion_common::Result; - use datafusion_expr::{ - col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, - }; - use std::sync::Arc; - - use crate::test::*; - - fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { - assert_optimized_plan_eq( - Arc::new(EliminateDistinct::new()), - plan.clone(), - expected, - ) - } - - #[test] - fn eliminate_redundant_distinct_simple() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("c")], Vec::::new())? - .project(vec![col("c")])? - .distinct()? - .build()?; - - let expected = "Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; - // No aggregate / scan / limit - assert_optimized_plan_equal(&plan, expected) - } - - #[test] - fn eliminate_redundant_distinct_pair() -> Result<()> { - let table_scan = test_table_scan().unwrap(); - let plan = LogicalPlanBuilder::from(table_scan) - .aggregate(vec![col("a"), col("b")], Vec::::new())? - .project(vec![col("a"), col("b")])? - .distinct()? - .build()?; - - // No aggregate / scan / limit - let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; - assert_optimized_plan_equal(&plan, expected) - } -} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index bddaebf47d47..3b1df3510d2a 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -34,7 +34,6 @@ pub mod analyzer; pub mod common_subexpr_eliminate; pub mod decorrelate; pub mod decorrelate_predicate_subquery; -pub mod eliminate_distinct; pub mod eliminate_cross_join; pub mod eliminate_duplicated_expr; pub mod eliminate_filter; diff --git a/datafusion/optimizer/src/optimizer.rs b/datafusion/optimizer/src/optimizer.rs index 1cccf42dcba4..93923a4e1e74 100644 --- a/datafusion/optimizer/src/optimizer.rs +++ b/datafusion/optimizer/src/optimizer.rs @@ -33,7 +33,6 @@ use datafusion_expr::logical_plan::LogicalPlan; use crate::common_subexpr_eliminate::CommonSubexprEliminate; use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery; -use crate::eliminate_distinct::EliminateDistinct; use crate::eliminate_cross_join::EliminateCrossJoin; use crate::eliminate_duplicated_expr::EliminateDuplicatedExpr; use crate::eliminate_filter::EliminateFilter; @@ -258,7 +257,6 @@ impl Optimizer { Arc::new(RewriteDisjunctivePredicate::new()), Arc::new(EliminateDuplicatedExpr::new()), Arc::new(EliminateFilter::new()), - Arc::new(EliminateDistinct::new()), Arc::new(EliminateCrossJoin::new()), Arc::new(CommonSubexprEliminate::new()), Arc::new(EliminateLimit::new()), diff --git a/datafusion/optimizer/src/replace_distinct_aggregate.rs b/datafusion/optimizer/src/replace_distinct_aggregate.rs index 430517121f2a..f73eeacfbf0e 100644 --- a/datafusion/optimizer/src/replace_distinct_aggregate.rs +++ b/datafusion/optimizer/src/replace_distinct_aggregate.rs @@ -77,6 +77,21 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { match plan { LogicalPlan::Distinct(Distinct::All(input)) => { let group_expr = expand_wildcard(input.schema(), &input, None)?; + + let field_count = input.schema().fields().len(); + for dep in input.schema().functional_dependencies().iter() { + // If distinct is exactly the same with a previous GROUP BY, we can + // simply remove it: + if dep.source_indices[..field_count] + .iter() + .enumerate() + .all(|(idx, f_idx)| idx == *f_idx) + { + return Ok(Transformed::yes(input.as_ref().clone())); + } + } + + // Replace with aggregation: let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new( input, group_expr, @@ -165,3 +180,78 @@ impl OptimizerRule for ReplaceDistinctWithAggregate { Some(BottomUp) } } + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate; + use crate::test::*; + + use datafusion_common::Result; + use datafusion_expr::{ + col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan, + }; + use datafusion_functions_aggregate::sum::sum; + + fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq( + Arc::new(ReplaceDistinctWithAggregate::new()), + plan.clone(), + expected, + ) + } + + #[test] + fn eliminate_redundant_distinct_simple() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("c")], Vec::::new())? + .project(vec![col("c")])? + .distinct()? + .build()?; + + let expected = "Projection: test.c\n Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn eliminate_redundant_distinct_pair() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b")], Vec::::new())? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = + "Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn do_not_eliminate_distinct() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } + + #[test] + fn do_not_eliminate_distinct_with_aggr() -> Result<()> { + let table_scan = test_table_scan().unwrap(); + let plan = LogicalPlanBuilder::from(table_scan) + .aggregate(vec![col("a"), col("b"), col("c")], vec![sum(col("c"))])? + .project(vec![col("a"), col("b")])? + .distinct()? + .build()?; + + let expected = + "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[sum(test.c)]]\n TableScan: test"; + assert_optimized_plan_equal(&plan, expected) + } +} diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index ee72289d66eb..abeeb767b948 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -4536,19 +4536,14 @@ EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; logical_plan 01)Limit: skip=0, fetch=5 02)--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -04)------TableScan: aggregate_test_100 projection=[c3] +03)----TableScan: aggregate_test_100 projection=[c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[], lim=[5] -07)------------CoalescePartitionsExec -08)--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[], lim=[5] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true query I SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; @@ -4699,19 +4694,14 @@ EXPLAIN SELECT DISTINCT c3 FROM aggregate_test_100 group by c3 limit 5; logical_plan 01)Limit: skip=0, fetch=5 02)--Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -03)----Aggregate: groupBy=[[aggregate_test_100.c3]], aggr=[[]] -04)------TableScan: aggregate_test_100 projection=[c3] +03)----TableScan: aggregate_test_100 projection=[c3] physical_plan 01)GlobalLimitExec: skip=0, fetch=5 02)--AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[] 03)----CoalescePartitionsExec 04)------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[] 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -06)----------AggregateExec: mode=Final, gby=[c3@0 as c3], aggr=[] -07)------------CoalescePartitionsExec -08)--------------AggregateExec: mode=Partial, gby=[c3@0 as c3], aggr=[] -09)----------------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1 -10)------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true +06)----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3], has_header=true statement ok set datafusion.optimizer.enable_distinct_aggregation_soft_limit = true;