Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Exponential nature of ExecutionPlans output_partitioning and equivalence_properties #9084

Closed
gruuya opened this issue Jan 31, 2024 · 3 comments · Fixed by #9346
Closed

Exponential nature of ExecutionPlans output_partitioning and equivalence_properties #9084

gruuya opened this issue Jan 31, 2024 · 3 comments · Fixed by #9346
Assignees
Labels
bug Something isn't working

Comments

@gruuya
Copy link
Contributor

gruuya commented Jan 31, 2024

Describe the bug

I have been troubleshooting the TPCH-DS query 64, and have found some performance issues, seemingly stemming from ExecutionPlan::output_partitioning, ExecutionPlan::equivalence_properties and their inherent branching nature throughout various ExecutionPlan implementations.

In particular the problem manifests by the process pinning the CPU at 100%, and getting stuck in the EnforceDistribution physical optimizer, which aggravates the underlying problem by inter-leaving RepartitionExec into the existing plan tree.

To Reproduce

Setup

You can use the existing tpcds_physical_q64 test, just un-ignore it and set env_logger::init() somewhere to capture the logs. You'll also need to set RUST_MIN_STACK=3000000 to alleviate an unrelated problem described in #4786.

Flamegraph

Next you can optionally capture the flamegraph to get an initial sense of what's happening

sudo cargo flamegraph --dev -v --test tpcds_planning -- tpcds_physical_q64

I see something like flamegraph.svg.zip.

Notably, there is a large build-up of (likely under-sampled) output_partitioning and equivalence_properties calls.

Investigation

I used some counters

pub struct Counter {
    count: std::sync::Mutex<u32>,
}

impl Counter {
    pub fn new() -> Counter {
        Counter {
            count: std::sync::Mutex::new(0),
        }
    }

    pub fn reset(&self) {
        let mut counter = self.count.lock().unwrap();
        *counter = 0;
    }

    pub fn increment(&self) {
        let mut counter = self.count.lock().unwrap();
        *counter += 1;
    }

    pub fn get(&self) -> u32 {
        let counter = self.count.lock().unwrap();
        *counter
    }
}

use once_cell::sync::Lazy;

pub static OUT_PART_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new()));
pub static EQ_PROP_COUNTER: Lazy<Arc<Counter>> = Lazy::new(|| Arc::new(Counter::new()));

and then sprinkled crate::OUT_PART_COUNTER.increment() and crate::EQ_PROP_COUNTER.increment() at the begining of fn output_partitioning(&self) and fn equivalence_properties(&self) respectively, for MemoryExec, RepartitionExec, HashJoinExec, ProjectionExec and AggregateExec.

I also added the following

diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
index 0c5c2d78b..d4c0ddc22 100644
--- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs
+++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs
@@ -208,9 +208,16 @@ impl PhysicalOptimizerRule for EnforceDistribution {
         };
 
         let distribution_context = DistributionContext::new_default(adjusted);
+        use datafusion_physical_plan::{OUT_PART_COUNTER, EQ_PROP_COUNTER};
+        OUT_PART_COUNTER.reset();
+        EQ_PROP_COUNTER.reset();
         // Distribution enforcement needs to be applied bottom-up.
         let distribution_context =
             distribution_context.transform_up(&|distribution_context| {
+                log::warn!("output_partitioning {}, equivalence_properties {}", OUT_PART_COUNTER.get(), EQ_PROP_COUNTER.get());
+                OUT_PART_COUNTER.reset();
+                EQ_PROP_COUNTER.reset();
+
                 ensure_distribution(distribution_context, config)
             })?;
         Ok(distribution_context.plan)

Results

Soon enough after starting the test you'll notice these numbers reach values on the order of 100K and then millions and more as the slowdown in the iterations becomes appreciable

[2024-01-31T13:58:01Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 40955, equivalence_properties 58363
[2024-01-31T13:58:01Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:03Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 245749, equivalence_properties 350184
[2024-01-31T13:58:03Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 81915, equivalence_properties 116731
[2024-01-31T13:58:03Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:06Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 491509, equivalence_properties 700392
[2024-01-31T13:58:07Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 163835, equivalence_properties 233467
[2024-01-31T13:58:07Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:12Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 983029, equivalence_properties 1400808
[2024-01-31T13:58:14Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 327675, equivalence_properties 466939
[2024-01-31T13:58:14Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:25Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 1966069, equivalence_properties 2801640
[2024-01-31T13:58:29Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 655355, equivalence_properties 933883
[2024-01-31T13:58:29Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 0
[2024-01-31T13:58:29Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1
[2024-01-31T13:58:29Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 0, equivalence_properties 1
[2024-01-31T13:58:51Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932157, equivalence_properties 5603310
[2024-01-31T13:59:13Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 3932151, equivalence_properties 5603324
[2024-01-31T13:59:50Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 6553589, equivalence_properties 9338876
[2024-01-31T14:02:12Z WARN  datafusion::physical_optimizer::enforce_distribution] output_partitioning 24903668, equivalence_properties 35487723

Explanation

WARNING: A lot of gritty details below
Taking a look at one of the plans that appears early in the optimization process:

[
    "AggregateExec: mode=Partial, gby=[cs_item_sk@0 as cs_item_sk], aggr=[SUM(catalog_sales.cs_ext_list_price), SUM(catalog_returns.cr_refunded_cash + catalog_returns.cr_reversed_charge + catalog_returns.cr_store_credit)]",
    "  ProjectionExec: expr=[cs_item_sk@0 as cs_item_sk, cs_ext_list_price@2 as cs_ext_list_price, cr_refunded_cash@5 as cr_refunded_cash, cr_reversed_charge@6 as cr_reversed_charge, cr_store_credit@7 as cr_store_credit]",
    "    HashJoinExec: mode=Partitioned, join_type=Inner, on=[(cs_item_sk@0, cr_item_sk@0), (cs_order_number@1, cr_order_number@1)]",
    "      RepartitionExec: partitioning=Hash([cs_item_sk@0, cs_order_number@1], 12), input_partitions=0",
    "        MemoryExec: partitions=0, partition_sizes=[]",
    "      RepartitionExec: partitioning=Hash([cr_item_sk@0, cr_order_number@1], 12), input_partitions=0",
    "        MemoryExec: partitions=0, partition_sizes=[]",
]

and exploring the relevant implementations of output_partitioning and equivalence_properties, you can see that they have the potential to branch off into calling two methods on the input, thus leading to a exponential call tree.

In particular calling just output_partitioning() on the top-most plan leads to:

  • in AggregateExec::output_partitioning
    1 x out_part() + 2 x input.out_part() + input.eq_prop()
  • for the ProjectionExec node below this gets expanded to
    1 x out_part() + 2 x (out_part() + input.out_part() + input.eq_prop()) + (eq_prop() + input.eq_prop()) = 
    3 x out_part() + eq_prop() + 2 x input.out_part() + 3 x input.eq_prop()
  • Subsequently, HashJoinExec will expand these calls to left and right inputs
    3 x out_part() + eq_prop() + 2 x (out_part() + left.out_part() + right.out_part()) + 3 x (eq_prop() + left.eq_prop() + right.eq_prop()) = 
    5 x out_part() + 4 x eq_prop() + 2 x (left.out_part() + right.out_part()) + 3 x (left.eq_prop() + right.eq_prop())
  • Next, given that both left and right inputs above are RepartitionExecs which are leaf call-nodes for out_part(), but also expand eq_prop into 2 of inputs methods we get
    5 x out_part() + 4 x eq_prop() + 2 x (out_part() + out_part()) + 3 x (eq_prop() + l_input.eq_prop() + l_input.out_part() + eq_prop() + r_input.eq_prop() + r_input.out_part()) =
    9 x out_part() + 10 x eq_prop() + 6 x input.eq_prop() + 6 x input.eq_prop() =
  • Lastly the bottom plan nodes are MemoryExecs which terminate the eq_prop() call chain as well
    9 x out_part() + 10 x eq_prop() + 6 x eq_prop() + 6 x eq_prop() =
    15 x out_part() + 16 x eq_prop()

So for computing 1 thing (Partitioning), from a total of 7 plans the total invocation count for the above two methods was 31, thus with some overlapping. Note that some of these calls involve allocating stuff on the heap as well as some other computations, which can add up when the invocation count grows substantially.

Finally, given that these 2 methods are liberally called inside ensure_distribution and its sub-routines, I think this explains the enormous call count and ultimately the slow-down.

Expected behavior

Some potential mitigations:

  • Delay calling and re-use results from output_partitioning and equivalence_properties within methods. For instance in AggregateExec the second call to input.output_partitioning is redundant and the call to input.equivalence_properties() can also be delayed only in case of a match (and likewise for ProjectionExec).
  • A proper solution would probably involve EnforceDistribution interleaving some helper plans alongside with RepartitionExec which serve to cache/short-circuit the output_partitioning and equivalence_properties already computed for the input below.
  • Alternatively, some kind of visitor/memoization implementation that records the output the two methods bottom-up in the node tree might also be viable.

Additional context

No response

@gruuya gruuya added the bug Something isn't working label Jan 31, 2024
@alamb
Copy link
Contributor

alamb commented Jan 31, 2024

cc @mustafasrepo @metesynnada

@gruuya
Copy link
Contributor Author

gruuya commented Feb 1, 2024

Actually it seems that just storing the output equivalence properties when creating ProjectionExec (which were already being calculated) is sufficient to prevent the runaway overlapping fan-out of these methods in this case: #9097

@metesynnada
Copy link
Contributor

metesynnada commented Feb 1, 2024

Hi @gruuya, this is a good observation and a well-done benchmark, thanks for the effort.

With the help of #8817, the EnforceDistribution and EnforceSorting can be revamped to cache the necessary fields like output partitioning. We should deal with this problem soon. @mustafasrepo may give ideas on this subject since it is his expertise.

Also, he and I will check your #9097 as soon as possible to merge our ideas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
4 participants