Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor fix: Enforce sorting #31

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
510b16c
Tmp
mustafasrepo Aug 7, 2024
6ef4369
Minor changes
mustafasrepo Aug 7, 2024
c3efafc
Minor changes
mustafasrepo Aug 7, 2024
2bf220d
Minor changes
mustafasrepo Aug 7, 2024
eb83917
Implement top down recursion with delete check
mustafasrepo Aug 7, 2024
0b66b15
Minor changes
mustafasrepo Aug 7, 2024
c769f9f
Minor changes
mustafasrepo Aug 7, 2024
0ad7063
Address reviews
mustafasrepo Aug 7, 2024
3661f06
Update comments
mustafasrepo Aug 7, 2024
60967c1
Minor changes
mustafasrepo Aug 7, 2024
6b87c4c
Make test deterministic
mustafasrepo Aug 7, 2024
8dd7e0a
Add fetch info to the statistics
mustafasrepo Aug 8, 2024
15423ae
Enforce distribution use inexact count estimate also.
mustafasrepo Aug 8, 2024
94fb83d
Minor changes
mustafasrepo Aug 8, 2024
9053b9f
Minor changes
mustafasrepo Aug 8, 2024
1171584
Minor changes
mustafasrepo Aug 8, 2024
711038d
Do not add unnecessary hash partitioning
mustafasrepo Aug 9, 2024
7e598e5
Minor changes
mustafasrepo Aug 9, 2024
12ad2c2
Add config option to use inexact row number estimates during planning
mustafasrepo Aug 9, 2024
2e3cc5d
Update config
mustafasrepo Aug 9, 2024
34af8ba
Minor changes
mustafasrepo Aug 9, 2024
98760bc
Minor changes
mustafasrepo Aug 9, 2024
1e4dada
Final review
ozankabak Aug 9, 2024
9fc4f3d
Address reviews
mustafasrepo Aug 9, 2024
1116058
Add handling for sort removal with fetch
mustafasrepo Aug 9, 2024
44dc292
Fix linter errors
mustafasrepo Aug 9, 2024
c6d2de6
Minor changes
mustafasrepo Aug 9, 2024
c7c85f4
Update config
mustafasrepo Aug 9, 2024
7c8967d
Cleanup stats under fetch
ozankabak Aug 9, 2024
ed35660
Update SLT comment
ozankabak Aug 10, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3019,11 +3019,11 @@ mod tests {

assert_batches_sorted_eq!(
[
"+-----+-----+----+-------+",
"| one | two | c3 | total |",
"+-----+-----+----+-------+",
"| a | 3 | 13 | 16 |",
"+-----+-----+----+-------+"
"+-----+-----+-----+-------+",
"| one | two | c3 | total |",
"+-----+-----+-----+-------+",
"| a | 3 | -72 | -69 |",
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
"+-----+-----+-----+-------+",
],
&df_sum_renamed
);
Expand Down
139 changes: 125 additions & 14 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ use crate::physical_plan::{Distribution, ExecutionPlan, InputOrderMode};

use datafusion_common::plan_err;
use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
use datafusion_physical_expr::Partitioning;
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
use datafusion_physical_plan::repartition::RepartitionExec;
use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
Expand Down Expand Up @@ -172,7 +173,6 @@ impl PhysicalOptimizerRule for EnforceSorting {
} else {
adjusted.plan
};

let plan_with_pipeline_fixer = OrderPreservationContext::new_default(new_plan);
let updated_plan = plan_with_pipeline_fixer
.transform_up(|plan_with_pipeline_fixer| {
Expand All @@ -189,12 +189,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
// missed by the bottom-up traversal:
let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
let adjusted = sort_pushdown.transform_down(pushdown_sorts)?.data;

adjusted
let adjusted = pushdown_sorts(sort_pushdown)?;
let plan = adjusted
.plan
.transform_up(|plan| Ok(Transformed::yes(replace_with_partial_sort(plan)?)))
.data()
.data()?;
Ok(plan)
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
}

fn name(&self) -> &str {
Expand Down Expand Up @@ -513,8 +513,25 @@ fn remove_corresponding_coalesce_in_sub_plan(
})
.collect::<Result<_>>()?;
}

requirements.update_plan_from_children()
let mut new_req = requirements.update_plan_from_children()?;
if let Some(repartition) = new_req.plan.as_any().downcast_ref::<RepartitionExec>() {
let mut can_remove = false;
if repartition
.input()
.output_partitioning()
.eq(repartition.partitioning())
{
// Their partitioning same
can_remove = true;
} else if let Partitioning::RoundRobinBatch(n_out) = repartition.partitioning() {
can_remove =
*n_out == repartition.input().output_partitioning().partition_count();
}
if can_remove {
new_req = new_req.children.swap_remove(0)
}
}
Ok(new_req)
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
}

/// Updates child to remove the unnecessary sort below it.
Expand All @@ -540,8 +557,12 @@ fn remove_corresponding_sort_from_sub_plan(
requires_single_partition: bool,
) -> Result<PlanWithCorrespondingSort> {
// A `SortExec` is always at the bottom of the tree.
if is_sort(&node.plan) {
node = node.children.swap_remove(0);
if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
if sort_exec.fetch().is_none() {
node = node.children.swap_remove(0);
} else {
// Do not remove the sort with fetch
}
mustafasrepo marked this conversation as resolved.
Show resolved Hide resolved
} else {
let mut any_connection = false;
let required_dist = node.plan.required_input_distribution();
Expand Down Expand Up @@ -632,8 +653,9 @@ mod tests {
use datafusion_common::Result;
use datafusion_expr::JoinType;
use datafusion_physical_expr::expressions::{col, Column, NotExpr};

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};

use rstest::rstest;

fn create_test_schema() -> Result<SchemaRef> {
Expand Down Expand Up @@ -716,10 +738,7 @@ mod tests {

let mut sort_pushdown = SortPushDown::new_default(updated_plan.plan);
assign_initial_requirements(&mut sort_pushdown);
sort_pushdown
.transform_down(pushdown_sorts)
.data()
.and_then(check_integrity)?;
check_integrity(pushdown_sorts(sort_pushdown)?)?;
// TODO: End state payloads will be checked here.
}

Expand Down Expand Up @@ -1049,6 +1068,98 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_sort6() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = Arc::new(
SortExec::new(vec![sort_expr("non_nullable_col", &schema)], source)
.with_fetch(Some(2)),
);
let physical_plan = sort_exec(
vec![
sort_expr("non_nullable_col", &schema),
sort_expr("nullable_col", &schema),
],
input,
);

let expected_input = [
"SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_sort7() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
let input = Arc::new(SortExec::new(
vec![sort_expr("non_nullable_col", &schema)],
source,
));
let limit = Arc::new(LocalLimitExec::new(input, 2));
let physical_plan = sort_exec(
vec![
sort_expr("non_nullable_col", &schema),
sort_expr("nullable_col", &schema),
],
limit,
);

let expected_input = [
"SortExec: expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" LocalLimitExec: fetch=2",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"LocalLimitExec: fetch=2",
" SortExec: TopK(fetch=2), expr=[non_nullable_col@1 ASC,nullable_col@0 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_do_not_pushdown_through_limit() -> Result<()> {
let schema = create_test_schema()?;
let source = memory_exec(&schema);
// let input = sort_exec(vec![sort_expr("non_nullable_col", &schema)], source);
let input = Arc::new(SortExec::new(
vec![sort_expr("non_nullable_col", &schema)],
source,
));
let limit = Arc::new(GlobalLimitExec::new(input, 0, Some(5))) as _;
let physical_plan = sort_exec(vec![sort_expr("nullable_col", &schema)], limit);

let expected_input = [
"SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
let expected_optimized = [
"SortExec: expr=[nullable_col@0 ASC], preserve_partitioning=[false]",
" GlobalLimitExec: skip=0, fetch=5",
" SortExec: expr=[non_nullable_col@1 ASC], preserve_partitioning=[false]",
" MemoryExec: partitions=1, partition_sizes=[0]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan, true);

Ok(())
}

#[tokio::test]
async fn test_remove_unnecessary_spm1() -> Result<()> {
let schema = create_test_schema()?;
Expand Down
Loading