Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Internal error with repartitioning after equivalence consolidation #8043

Closed
Tracked by #8064
alamb opened this issue Nov 3, 2023 · 7 comments · Fixed by #8127
Closed
Tracked by #8064

Internal error with repartitioning after equivalence consolidation #8043

alamb opened this issue Nov 3, 2023 · 7 comments · Fixed by #8127

Comments

@alamb
Copy link
Contributor

alamb commented Nov 3, 2023

While testing #8006 with our internal test suite, one of our tests fail because there are no sort expressions in a sort preserving repartition

The input plan looks like:

2023-11-02T15:58:06.601675Z TRACE log: Optimized physical plan by CombinePartialFinalAggregate:
OutputRequirementExec
  SortExec: expr=[time@1 ASC NULLS LAST]
    CoalescePartitionsExec
      ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)], ordering_mode=Sorted
          SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16, sort_exprs=time@0 ASC NULLS LAST
            AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
              RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                SortExec: expr=[time@0 ASC NULLS LAST]
                  CoalescePartitionsExec
                    ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                      AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                        RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                          AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                            RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                              ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                                FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                                  ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2

But then after EnforceSorting the SortPreservingMergeExec seems to have to sort exprs anymore:

2023-11-02T15:58:06.605925Z TRACE log: Optimized physical plan by EnforceSorting:
OutputRequirementExec
  SortPreservingMergeExec: [time@1 ASC NULLS LAST] 
    SortExec: expr=[time@1 ASC NULLS LAST]
      ProjectionExec: expr=[cpu as iox::measurement, time@0 as time, (selector_last(sum_idle,time)@1).[value] as last, (selector_last(sum_system,time)@2).[value] as last_1]
        AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
    ----> SortPreservingRepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16 
            AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[selector_last(sum_idle,time), selector_last(sum_system,time)]
              RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=16
                ProjectionExec: expr=[time@0 as time, SUM(cpu.usage_idle)@1 as sum_idle, SUM(cpu.usage_system)@2 as sum_system]
                  AggregateExec: mode=FinalPartitioned, gby=[time@0 as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                    RepartitionExec: partitioning=Hash([time@0], 16), input_partitions=16
                      AggregateExec: mode=Partial, gby=[date_bin(10000000000, time@0, 0) as time], aggr=[SUM(cpu.usage_idle), SUM(cpu.usage_system)]
                        RepartitionExec: partitioning=RoundRobinBatch(16), input_partitions=1
                          ProjectionExec: expr=[time@1 as time, usage_idle@2 as usage_idle, usage_system@3 as usage_system]
                            FilterExec: date_bin(10000000000, time@1, 0) <= 1698940686290451000 AND time@1 <= 1698940686290451000 AND cpu@0 = cpu-total
                              ParquetExec: file_groups={1 group: [[2/8/0649f0e8b1abed092a356ec6181369fcf585431d1cc0694a0cc4ab45cf78b49d/0c5ac9b2-f6d4-4004-9036-15412da47647.parquet]]}, projection=[cpu, time, usage_idle, usage_system], predicate=date_bin(10000000000, time@2, 0) <= 1698940686290451000 AND time@2 <= 1698940686290451000 AND cpu@0 = cpu-total, pruning_predicate=time_min@0 <= 1698940686290451000 AND cpu_min@1 <= cpu-total AND cpu-total <= cpu_max@2

This then causes a failure during execution / streaming merge

Internal error: Sort expressions cannot be empty for streaming merge

Originally posted by @alamb in #8006 (comment)

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2023

@mustafasrepo / @ozankabak do you have any idea when the fix for this might be available? It is blocking updating the use of DataFusion in IOx -- perhaps if you can explain what is wrong, I can work on a patch to fix it

@ozankabak
Copy link
Contributor

@mustafasrepo is actively working on it, I think we will have the fix out in a couple days at most

@alamb
Copy link
Contributor Author

alamb commented Nov 7, 2023

@mustafasrepo is actively working on it, I think we will have the fix out in a couple days at most

Thank you so much

@alamb
Copy link
Contributor Author

alamb commented Nov 10, 2023

Ok, I spent a while debugging our problem today

What is happening is that a RepartitionExec was accidentally keeping the preserve_ordering flag even when the input wasn't sorted, which caused the wrong variant to be used

This diff fixed my problem:

diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs
index 66f7037e5c..17babcd109 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -644,16 +644,16 @@ impl RepartitionExec {
         })
     }

-    /// Set Order preserving flag
+
+    /// Set Order preserving flag, which controlls if this node is
+    /// `RepartitionExec` or `SortPreservingRepartitionExec`. If the input is
+    /// not ordered, or has only one partiton, remains a `RepartitionExec`.
     pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
-        // Set "preserve order" mode only if the input partition count is larger than 1
-        // Because in these cases naive `RepartitionExec` cannot maintain ordering. Using
-        // `SortPreservingRepartitionExec` is necessity. However, when input partition number
-        // is 1, `RepartitionExec` can maintain ordering. In this case, we don't need to use
-        // `SortPreservingRepartitionExec` variant to maintain ordering.
-        if self.input.output_partitioning().partition_count() > 1 {
-            self.preserve_order = preserve_order
-        }
+        self.preserve_order = preserve_order &&
+                // If the input isn't ordered, there is no ordering to preserve
+                self.input.output_ordering().is_some() &&
+                // if there is only one input partition, merging is required to maintain order
+                self.input.output_partitioning().partition_count() > 1;
         self
     }

I will prepare a PR with this fix shortly

@ozankabak
Copy link
Contributor

Good catch. It is interesting that your test failure surfaced one pre-existing bug and one lacking feature (i.e. supporting ordering of complex expressions which @mustafasrepo is working on).

@alamb
Copy link
Contributor Author

alamb commented Nov 10, 2023

Yeah, as I was telling some people internally, since IOx makes heavy use of sort orderings (and subsequently this code) I think we are more heavily impacted by such bugs than others.

I feel like we can improve DataFusion for everyone by being the early on testers and then feeding back both bug fixes and rough edges in the APIs (like #8120). 🚀

@alamb
Copy link
Contributor Author

alamb commented Nov 10, 2023

#8127 is ready for review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants