Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/eliminate aggregate #28

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
10000fb
Delete docs.yaml
metesynnada Feb 22, 2024
6c76423
Merge pull request #5 from synnada-ai/ci-action-fixing
mustafasrepo Feb 22, 2024
599516b
Merge branch 'apache:main' into main
mustafasrepo Feb 23, 2024
b20b65c
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
6c066b9
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
6770106
Merge remote-tracking branch 'upstream/main'
mustafasrepo Feb 27, 2024
38db3d8
Merge branch 'apache:main' into main
mustafasrepo Feb 27, 2024
e21ac2b
Merge branch 'apache:main' into main
mustafasrepo Feb 28, 2024
b7e6320
Merge branch 'apache:main' into main
mustafasrepo Mar 1, 2024
d6e39a4
Merge branch 'apache:main' into main
mustafasrepo Mar 5, 2024
a2e53c2
initialize eliminate_aggregate.rs rule
Aug 1, 2024
79d82a0
Merge remote-tracking branch 'refs/remotes/origin/apache_main' into f…
Aug 1, 2024
5598fb4
remove redundant prints
Aug 1, 2024
c3257a6
Add multiple group by expression handling.
mustafasrepo Aug 2, 2024
7484eee
rename eliminate_aggregate.rs as eliminate_distinct.rs
Aug 2, 2024
b917cb3
Merge remote-tracking branch 'refs/remotes/origin/apache_main' into f…
Aug 2, 2024
37674ad
remove logic for distinct on since group by statement must exist in p…
Aug 2, 2024
06b47ec
format code
Aug 2, 2024
289d157
add eliminate_distinct rule to tests
Aug 2, 2024
a8e1b05
simplify function
Aug 2, 2024
b381368
fix child issue
Aug 2, 2024
d2da821
format
Aug 2, 2024
1bd49fd
fix docs
Aug 2, 2024
d890715
remove eliminate_distinct rule and make it a part of replace_distinct…
Aug 2, 2024
a200df9
Update datafusion/common/src/functional_dependencies.rs
mertak-synnada Aug 2, 2024
a592885
add comment and fix variable call
Aug 2, 2024
650613d
fix test cases as optimized plan
Aug 2, 2024
9341adf
format code
Aug 2, 2024
cea9595
simplify comments
mertak-synnada Aug 2, 2024
8155b4e
do not replace redundant distincts with aggregate
Aug 2, 2024
a759ff9
merge from remote
mertak-synnada Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 25 additions & 15 deletions datafusion/common/src/functional_dependencies.rs
Original file line number Diff line number Diff line change
Expand Up @@ -524,22 +524,32 @@ pub fn aggregate_functional_dependencies(
}
}

// If we have a single GROUP BY key, we can guarantee uniqueness after
// When we have a GROUP BY key, we can guarantee uniqueness after
// aggregation:
if group_by_expr_names.len() == 1 {
// If `source_indices` contain 0, delete this functional dependency
// as it will be added anyway with mode `Dependency::Single`:
aggregate_func_dependencies.retain(|item| !item.source_indices.contains(&0));
// Add a new functional dependency associated with the whole table:
aggregate_func_dependencies.push(
// Use nullable property of the group by expression
FunctionalDependence::new(
vec![0],
target_indices,
aggr_fields[0].is_nullable(),
)
.with_mode(Dependency::Single),
);
if !group_by_expr_names.is_empty() {
let count = group_by_expr_names.len();
let source_indices = (0..count).collect::<Vec<_>>();
let nullable = source_indices
.iter()
.any(|idx| aggr_fields[*idx].is_nullable());
// If GROUP BY expressions do not already act as a determinant:
if !aggregate_func_dependencies.iter().any(|item| {
// If `item.source_indices` is a subset of GROUP BY expressions, we shouldn't add
// them since `item.source_indices` defines this relation already.

// This simple count comparison is working well because
// GROUP BY statement comes here as a prefix
// It is guaranteed that group by indices would cover the range: [0..count]
mertak-synnada marked this conversation as resolved.
Show resolved Hide resolved
item.source_indices.iter().all(|idx| idx < &count)
}) {
// Add a new functional dependency associated with the whole table:
// Use nullable property of the GROUP BY expression:
aggregate_func_dependencies.push(
// Use nullable property of the GROUP BY expression:
FunctionalDependence::new(source_indices, target_indices, nullable)
.with_mode(Dependency::Single),
);
}
}
FunctionalDependencies::new(aggregate_func_dependencies)
}
Expand Down
87 changes: 87 additions & 0 deletions datafusion/optimizer/src/replace_distinct_aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
match plan {
LogicalPlan::Distinct(Distinct::All(input)) => {
let group_expr = expand_wildcard(input.schema(), &input, None)?;

let fields = input.schema().fields();
let all_fields = (0..fields.len()).collect::<Vec<_>>();
let func_deps = input.schema().functional_dependencies().clone();

for func_dep in func_deps.iter() {
// Means distinct is exactly same with below Group By
// so delete the redundant distinct
if func_dep.source_indices == all_fields {
return Ok(Transformed::yes(input.as_ref().clone()));
}
}

// Replace with Aggregation
ozankabak marked this conversation as resolved.
Show resolved Hide resolved
let aggr_plan = LogicalPlan::Aggregate(Aggregate::try_new(
input,
group_expr,
Expand Down Expand Up @@ -165,3 +179,76 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
Some(BottomUp)
}
}

#[cfg(test)]
mod tests {
use crate::replace_distinct_aggregate::ReplaceDistinctWithAggregate;
use crate::test::*;
use datafusion_common::Result;
use datafusion_expr::{
col, logical_plan::builder::LogicalPlanBuilder, Expr, LogicalPlan,
};
use datafusion_functions_aggregate::sum::sum;
use std::sync::Arc;
ozankabak marked this conversation as resolved.
Show resolved Hide resolved

fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> Result<()> {
assert_optimized_plan_eq(
Arc::new(ReplaceDistinctWithAggregate::new()),
plan.clone(),
expected,
)
}

#[test]
fn eliminate_redundant_distinct_simple() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("c")], Vec::<Expr>::new())?
.project(vec![col("c")])?
.distinct()?
.build()?;

let expected = "Projection: test.c\n Aggregate: groupBy=[[test.c]], aggr=[[]]\n TableScan: test";
assert_optimized_plan_equal(&plan, expected)
}

#[test]
fn eliminate_redundant_distinct_pair() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a"), col("b")], Vec::<Expr>::new())?
.project(vec![col("a"), col("b")])?
.distinct()?
.build()?;

let expected =
"Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n TableScan: test";
assert_optimized_plan_equal(&plan, expected)
}

#[test]
fn do_not_eliminate_distinct() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.project(vec![col("a"), col("b")])?
.distinct()?
.build()?;

let expected = "Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n TableScan: test";
assert_optimized_plan_equal(&plan, expected)
}

#[test]
fn do_not_eliminate_distinct_with_aggr() -> Result<()> {
let table_scan = test_table_scan().unwrap();
let plan = LogicalPlanBuilder::from(table_scan)
.aggregate(vec![col("a"), col("b"), col("c")], vec![sum(col("c"))])?
.project(vec![col("a"), col("b")])?
.distinct()?
.build()?;

let expected =
"Aggregate: groupBy=[[test.a, test.b]], aggr=[[]]\n Projection: test.a, test.b\n Aggregate: groupBy=[[test.a, test.b, test.c]], aggr=[[sum(test.c)]]\n TableScan: test";
assert_optimized_plan_equal(&plan, expected)
}
}
2 changes: 1 addition & 1 deletion datafusion/optimizer/src/single_distinct_to_groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use hashbrown::HashSet;
/// single distinct to group by optimizer rule
/// ```text
/// Before:
/// SELECT a, count(DINSTINCT b), sum(c)
/// SELECT a, count(DISTINCT b), sum(c)
/// FROM t
/// GROUP BY a
///
Expand Down
Loading