Skip to content

Commit

Permalink
initialize fetch() api for execution plan
Browse files Browse the repository at this point in the history
remove unnecessary limit plans when used with sort + fetch
add test case for Sort and Limit with offset
push down limit even if a child with no fetch appears when the child supports push down
  • Loading branch information
mertak-synnada committed Aug 7, 2024
1 parent 07dca3a commit 9192ca9
Show file tree
Hide file tree
Showing 27 changed files with 508 additions and 476 deletions.
26 changes: 12 additions & 14 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2920,13 +2920,12 @@ mod tests {
assert_eq!(
"\
Projection: t1.c1, t2.c1, Boolean(true) AS new_column\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
\n Sort: t1.c1 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1]",
format!("{}", df_with_column.clone().into_optimized_plan()?)
);

Expand Down Expand Up @@ -3114,13 +3113,12 @@ mod tests {

assert_eq!("\
Projection: t1.c1 AS AAA, t1.c2, t1.c3, t2.c1, t2.c2, t2.c3\
\n Limit: skip=0, fetch=1\
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
\n Sort: t1.c1 ASC NULLS FIRST, t1.c2 ASC NULLS FIRST, t1.c3 ASC NULLS FIRST, t2.c1 ASC NULLS FIRST, t2.c2 ASC NULLS FIRST, t2.c3 ASC NULLS FIRST, fetch=1\
\n Inner Join: t1.c1 = t2.c1\
\n SubqueryAlias: t1\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]\
\n SubqueryAlias: t2\
\n TableScan: aggregate_test_100 projection=[c1, c2, c3]",
format!("{}", df_renamed.clone().into_optimized_plan()?)
);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/arrow_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ impl ExecutionPlan for ArrowExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ impl ExecutionPlan for AvroExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ impl ExecutionPlan for CsvExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ impl ExecutionPlan for NdJsonExec {
Some(self.metrics.clone_inner())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/datasource/physical_plan/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,10 @@ impl ExecutionPlan for ParquetExec {
Ok(self.projected_statistics.clone())
}

fn fetch(&self) -> Option<usize> {
self.base_config.limit
}

fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
let new_config = self.base_config.clone().with_limit(limit);

Expand Down
12 changes: 12 additions & 0 deletions datafusion/core/src/physical_optimizer/enforce_sorting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_plan::sorts::partial_sort::PartialSortExec;
use datafusion_physical_plan::ExecutionPlanProperties;

use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use itertools::izip;

/// This rule inspects [`SortExec`]'s in the given physical plan and removes the
Expand Down Expand Up @@ -388,6 +389,7 @@ fn analyze_immediate_sort_removal(
) -> Transformed<PlanWithCorrespondingSort> {
if let Some(sort_exec) = node.plan.as_any().downcast_ref::<SortExec>() {
let sort_input = sort_exec.input();
let fetch = sort_exec.fetch();
// If this sort is unnecessary, we should remove it:
if sort_input
.equivalence_properties()
Expand All @@ -399,6 +401,16 @@ fn analyze_immediate_sort_removal(
// Replace the sort with a sort-preserving merge:
let expr = sort_exec.expr().to_vec();
Arc::new(SortPreservingMergeExec::new(expr, sort_input.clone())) as _
} else if let Some(fetch) = fetch {
// Remove the sort:
node.children = node.children.swap_remove(0).children;
// Add limit
if sort_input.output_partitioning().partition_count() > 1 {
Arc::new(LocalLimitExec::new(sort_input.clone(), fetch)) as _
} else {
Arc::new(GlobalLimitExec::new(sort_input.clone(), 0, Some(fetch)))
as _
}
} else {
// Remove the sort:
node.children = node.children.swap_remove(0).children;
Expand Down
55 changes: 55 additions & 0 deletions datafusion/core/src/physical_optimizer/limit_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,36 @@ pub fn push_down_limits(
} else {
add_fetch_to_child(&limit_exec, child.clone())
}
} else if plan.supports_limit_pushdown() {
let new_children = add_fetch_to_children(plan.fetch(), &plan.children());
if new_children.iter().all(|item| item.is_none()) {
None
} else {
let new_children = updated_children(new_children, plan.children());
Some(plan.clone().with_new_children(new_children)?)
}
} else {
None
};

Ok(maybe_modified.map_or(Transformed::no(plan), Transformed::yes))
}

fn updated_children(
new_children: Vec<Option<Arc<dyn ExecutionPlan>>>,
children: Vec<&Arc<dyn ExecutionPlan>>,
) -> Vec<Arc<dyn ExecutionPlan>> {
let mut updated_children = vec![];
for (new_child, old_child) in new_children.into_iter().zip(children) {
if let Some(new_child) = new_child {
updated_children.push(new_child)
} else {
updated_children.push(old_child.clone())
}
}
updated_children
}

/// Transforms the [`ExecutionPlan`] into a [`LimitExec`] if it is a
/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
Expand Down Expand Up @@ -250,6 +273,38 @@ fn add_fetch_to_child(
}
}

fn add_fetch_to_children(
fetch: Option<usize>,
children: &[&Arc<dyn ExecutionPlan>],
) -> Vec<Option<Arc<dyn ExecutionPlan>>> {
if fetch.is_none() {
return vec![None];
}
let mut latest_fetch: Option<usize> = fetch;

children
.iter()
.map(|child| {
if let Some(child_with_fetch) = child.with_fetch(fetch) {
latest_fetch = fetch;
Some(child_with_fetch)
} else if child.supports_limit_pushdown() {
if latest_fetch.is_some() {
let new_children =
add_fetch_to_children(latest_fetch, &child.children());
let new_children =
updated_children(new_children, children.to_owned());
Some(Arc::clone(child).with_new_children(new_children).ok()?)
} else {
Some(Arc::clone(child))
}
} else {
None
}
})
.collect()
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
21 changes: 9 additions & 12 deletions datafusion/core/tests/memory_limit/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,15 @@ async fn sort_preserving_merge() {
// SortPreservingMergeExec (not a Sort which would compete
// with the SortPreservingMergeExec for memory)
&[
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"| logical_plan | Limit: skip=0, fetch=10 |",
"| | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |",
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | LocalLimitExec: fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+---------------------------------------------------------------------------------------------------------------+",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| plan_type | plan |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
"| logical_plan | Sort: t.a ASC NULLS LAST, t.b ASC NULLS LAST, fetch=10 |",
"| | TableScan: t projection=[a, b] |",
"| physical_plan | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |",
"| | MemoryExec: partitions=2, partition_sizes=[5, 5], output_ordering=a@0 ASC NULLS LAST,b@1 ASC NULLS LAST |",
"| | |",
"+---------------+-----------------------------------------------------------------------------------------------------------+",
]
)
.run()
Expand Down
23 changes: 11 additions & 12 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -612,18 +612,17 @@ async fn test_physical_plan_display_indent() {
let dataframe = ctx.sql(sql).await.unwrap();
let physical_plan = dataframe.create_physical_plan().await.unwrap();
let expected = vec![
"GlobalLimitExec: skip=0, fetch=10",
" SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
"SortPreservingMergeExec: [the_min@2 DESC], fetch=10",
" SortExec: TopK(fetch=10), expr=[the_min@2 DESC], preserve_partitioning=[true]",
" ProjectionExec: expr=[c1@0 as c1, max(aggregate_test_100.c12)@1 as max(aggregate_test_100.c12), min(aggregate_test_100.c12)@2 as the_min]",
" AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" RepartitionExec: partitioning=Hash([c1@0], 9000), input_partitions=9000",
" AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[max(aggregate_test_100.c12), min(aggregate_test_100.c12)]",
" CoalesceBatchesExec: target_batch_size=4096",
" FilterExec: c12@1 < 10",
" RepartitionExec: partitioning=RoundRobinBatch(9000), input_partitions=1",
" CsvExec: file_groups={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, projection=[c1, c12], has_header=true",
];

let normalizer = ExplainNormalizer::new();
Expand Down
6 changes: 5 additions & 1 deletion datafusion/optimizer/src/push_down_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ impl OptimizerRule for PushDownLimit {
Some(sort.fetch.map(|f| f.min(sort_fetch)).unwrap_or(sort_fetch))
};
if new_fetch == sort.fetch {
original_limit(skip, fetch, LogicalPlan::Sort(sort))
if skip > 0 {
original_limit(skip, fetch, LogicalPlan::Sort(sort))
} else {
Ok(Transformed::yes(LogicalPlan::Sort(sort)))
}
} else {
sort.fetch = new_fetch;
limit.input = Arc::new(LogicalPlan::Sort(sort));
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/sorts/partial_sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ impl ExecutionPlan for PartialSortExec {
&self.cache
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn required_input_distribution(&self) -> Vec<Distribution> {
if self.preserve_partitioning {
vec![Distribution::UnspecifiedDistribution]
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ impl ExecutionPlan for SortPreservingMergeExec {
&self.cache
}

fn fetch(&self) -> Option<usize> {
self.fetch
}

fn required_input_distribution(&self) -> Vec<Distribution> {
vec![Distribution::UnspecifiedDistribution]
}
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/streaming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ impl ExecutionPlan for StreamingTableExec {
&self.cache
}

fn fetch(&self) -> Option<usize> {
self.limit
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
13 changes: 8 additions & 5 deletions datafusion/sql/src/relation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,16 @@ fn optimize_subquery_sort(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>>
has_limit = true;
return Ok(Transformed::no(c));
}
if let LogicalPlan::Sort(_) = c {
if !has_limit {
has_limit = false;
return Ok(Transformed::yes(c.inputs()[0].clone()));
match c {
LogicalPlan::Sort(s) => {
if !has_limit {
has_limit = false;
return Ok(Transformed::yes(s.input.as_ref().clone()));
}
Ok(Transformed::no(LogicalPlan::Sort(s)))
}
_ => Ok(Transformed::no(c)),
}
Ok(Transformed::no(c))
});
new_plan
}
Loading

0 comments on commit 9192ca9

Please sign in to comment.