-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add LimitPushdown optimization rule and CoalesceBatchesExec fetch #27
Conversation
…c as supporting limit pushdown
Rebased the fork because of conflicts |
07)------------AggregateExec: mode=Partial, gby=[date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@0) as date_bin(Utf8("15 minutes"),unbounded_csv_with_timestamps.ts)], aggr=[], ordering_mode=Sorted | ||
08)--------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | ||
09)----------------StreamingTableExec: partition_sizes=1, projection=[ts], infinite_source=true, output_ordering=[ts@0 DESC] | ||
03)----LocalLimitExec: fetch=5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This LocalLimitExec seems to be redundant, since it is pushed down below projection.
03)----ProjectionExec: expr=[name@0 as name, date_bin(IntervalMonthDayNano { months: 0, days: 0, nanoseconds: 900000000000 }, ts@1) as time_chunks] | ||
04)------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 | ||
05)--------StreamingTableExec: partition_sizes=1, projection=[name, ts], infinite_source=true, output_ordering=[name@0 DESC, ts@1 DESC] | ||
03)----LocalLimitExec: fetch=5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same problem above
03)----LocalLimitExec: fetch=10 | ||
04)------ProjectionExec: expr=[a@0 as a2, b@1 as b] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This limit pushed down below projection as fetch inside the CoalesceBatchesExec. This limit is redundant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks very cool 👓
I think it would also help if we could document somewhere why we need both this physical optimizer and the logical optimizer here: https://github.com/apache/datafusion/blob/main/datafusion/optimizer/src/push_down_limit.rs
I wonder perhaps if we should remove the logical pushdown entirely in favor of physical pushdown 🤔
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
//! This rule reduces the amount of data transferred by pushing down limits as much as possible. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
//! This rule reduces the amount of data transferred by pushing down limits as much as possible. | |
//! [`LimitPushdown`]: Pushes limits into the plan as much as much as possible. |
"| | TableScan: t projection=[a, b] |", | ||
"| physical_plan | GlobalLimitExec: skip=0, fetch=10 |", | ||
"| | SortPreservingMergeExec: [a@0 ASC NULLS LAST,b@1 ASC NULLS LAST], fetch=10 |", | ||
"| | LocalLimitExec: fetch=10 |", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way to get this plan would be to support limit pushdown into MemoryExec
(rather than applying the limit with a new LocalLimitExec
)
@@ -319,6 +376,86 @@ mod tests { | |||
Ok(()) | |||
} | |||
|
|||
#[tokio::test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
11)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c3], has_header=true | ||
03)----LocalLimitExec: fetch=5 | ||
04)------UnionExec | ||
05)--------SortExec: expr=[c9@1 DESC], preserve_partitioning=[true] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think SortExec can be limited -- which would likely be better to use too (it has a special implementation with limit)
I think you could also push the limit all the way down to the CSVExec in this plan too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually would have expected the logical optimizer to have already pushed the limit into the inputs 🤔
Congrats again, @alihandroid. We only have two remaining issues to address. You can go ahead and open this PR to the upstream repo now. While you work on the remaining tasks, we might receive additional feedback from the upstream reviewers, so there's no need to wait any longer. |
@berkaysynnada Still working on the coalesce batches test cases but the limit merging is done. The PR at the upstream repo is apache#11652 |
Great job. The tests are failing because there are two |
Remove redundant lınes ın docstrıng
Merged upstream. |
Which issue does this PR close?
Closes #.
Rationale for this change
Physical plans can be optimized further by pushing
GlobalLimitExec
andLocalLimitExec
down through certain nodes, or using versions of their children nodes with fetch limits, without changing the result. This reduces unnecessary data transfer and processing for a more efficient plan execution.CoalesceBatchesExec
can also benefit from this improvement, and as such, a fetch limit functionality is implemented for it.For example,
can be turned into
and
can be turned into
without changing the result, but using fewer resources and finishing faster
Other examples can be found in the tests provided in
limit_pushdown.rs
What changes are included in this PR?
Implement
LimitPushdown
Rule:ExecutionPlan
trait:with_fetch(&self, fetch: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
: Returns fetching version if supported, None otherwise. The default implementation returns Nonesupports_limit_pushdown(&self) -> bool
: Returns true if a node supports limit pushdown. The default implemenation returns falseAdd fetch support to
CoalesceBatchesExec
:fetch
field andwith_fetch
implementationnew_with_fetch
constructorAre these changes tested?
Unit tests are provided for
LimitPushdown
and the new fetching support forCoalesceBatchesExec
Are there any user-facing changes?
No. The changes only affect performance