-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse #28885
[SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and subquery reuse #28885
Conversation
I measured considerable improvement (~30%) using TPCDSQueryBenchmark on scaleFactor=5 data with the TPCDS Q14a, Q14b, Q23a, Q23b, Q47 and Q57 queries. |
@@ -474,9 +475,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { | |||
Inner, | |||
None, | |||
shuffle, | |||
shuffle) | |||
shuffle.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this were the same as the other child of SortMergeJoinExec then no reuse would be required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this part, could you please let me know how copying here makes it different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so I think the whole point of exchange reuse is to find different exchange instances which result the same data and keep only one instance and reuse its output where we can (i.e. call execute
/executeBroadcast
multiple times on one exchange instance instead of call execute
/executeBroadcast
once on each instances). In this example the 2 children points to the same exchange instance (shuffle
) so there is no point in reuse here.
(We could wrap one of them in a ReusedExcahngeExec
node, but it wouldn't make any difference from performance point of view.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, although I think the reuse exchange logic that we have doesn't check if the instances are the same, and will replace it with a reuse exchange anyway, but good to have a different instance here.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
Outdated
Show resolved
Hide resolved
I've just realized that this PR partly overlaps with #28881. @prakharjain09 opened it 2 hours before I did, but my PR does a bit more than that and actually does the combined reuse in a bit different way so I wouldn't close mine yet. |
Test build #124334 has finished for PR 28885 at commit
|
Thank you for pinging me, @peter-toth . |
cc @maryannxue too FYI |
Test build #124538 has finished for PR 28885 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for doing this! I think the idea of whole plan reuse is good and your approach is correct, but I think some parts can be done differently IMO, I left some comments.
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
Outdated
Show resolved
Hide resolved
@@ -326,7 +327,8 @@ object QueryExecution { | |||
*/ | |||
private[execution] def preparations( | |||
sparkSession: SparkSession, | |||
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None): Seq[Rule[SparkPlan]] = { | |||
adaptiveExecutionRule: Option[InsertAdaptiveSparkPlan] = None, | |||
subquery: Boolean): Seq[Rule[SparkPlan]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this boolean parameter here? What will happen if we just always run the WholePlanReuse
rule?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I don't get this, why would we run this rule multiple times? This new rule traverses through the whole plan, does it make any sense to run in on subqueries and then run it on a main query which also incorporates traversing on subqueries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, from the performance perspective makes sense to exclude them. I sort of don't like having another parameter and select rules based on that, so was thinking if it's not a huge performance difference let's not do it, but it can be expensive with canonicalization, etc. I guess we don't have any other way of detecting if a physical plan is a subquery locally inside the new rule, so it's fine to do it like this, maybe we need a more explicit name for QueryExecution.prepareExecutedPlan
in the future, like PrepareSubqueryForExecution
to make it more clear that this method is only called for subqueries.
sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/Reuse.scala
Outdated
Show resolved
Hide resolved
|
||
def apply(plan: SparkPlan): SparkPlan = { | ||
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) { | ||
// To avoid costly canonicalization of an exchange or a subquery: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you have some measurements how much do we save when using Map[StructType, ...
instead of a map from the canonicalized form? I know in theory it's beneficial when there is no match, but was wondering if it has some tangible effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the Map[StructType, ...
way of caching has been there for quite some time. A simple map of canonicalized plans naturally comes to my mind too and I feel that it would do the thing without any performance degradation for most of the queries. But I'm afraid that there can be edge cases where it could introduce degradation so just to be on the safe side I wouldn't touch this preliminary schema matching when looking up in the cache.
On the other hand I think the old ArrayBuffer[...
can be easily replaced to a map of canonicalized plans to speed up look ups in the cache when schema matches.
I saw your other comment: #28885 (comment) on this topic and I think the Canonicalized[T]
wrapper would be exactly same as the old Map[StructType, ArrayBuffer[T]]
cache map, just a bit more complicated.
What I did in my latest commit: c49a0f9 is that I extracted the cache code and I think it became quite easy to follow. But I'm open for suggestions and will change the implementation if you think it is still too complicated.
case class WholePlanReuse(conf: SQLConf) extends Rule[SparkPlan] { | ||
|
||
def apply(plan: SparkPlan): SparkPlan = { | ||
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add a flag for this rule? Like exchangeAndSubqueryReuseEanbled
? I think this rule should only happen if both the flags are enabled. Do we have cases when we only want to reuse subquery and not exchange? Also, It will simplify the logic in your new rule. We can keep the old rules and then fallback to them if only one of the flags is off or the new flag is off.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I'm not sure why would anyone disable reuse subquery or reuse exchange at all. But those flags do exist and we shouldn't change their meaning that exchange or subquery reuse shouldn't happen when they are disabled.
IMHO if we introduce this "whole plan reuse" with this new rule, then we should still respect the old flags.
I also think this new rule can replace the old rules entirely with one whole plan traversal and I don't see any reason why would we keep the old rules. Actually, wouldn't be confusing if we had reuse related code at many places?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense, let's keep the old old behavior and just change the implementation.
@@ -1646,4 +1646,25 @@ class SubquerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark | |||
checkAnswer(df, df2) | |||
checkAnswer(df, Nil) | |||
} | |||
|
|||
test("Subquery reuse across the whole plan") { | |||
val df = sql( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you put the physical plan in a comment? It'll help to see what gets reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I added the plan in commit: 2b3cde2
Let me know if a sample plan would be beneficial in other test cases too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mentioned this to Peter offline -- I don't really agree with adding the plan in a comment. Other changes will continue happening in the rest of the planner, and then the plan will change more, and the comment will get out of date. If you actually want the plan to remain static, then you should add asserts against it (though I doubt that is what you want). If somebody wants to see the plan, they should run the test themselves to see the plan, right?
(I'm also unfamiliar with the norms around this part of the codebase, so its fine if its not unusual to put in the entire plan in a comment.)
@@ -474,9 +475,9 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { | |||
Inner, | |||
None, | |||
shuffle, | |||
shuffle) | |||
shuffle.copy()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't follow this part, could you please let me know how copying here makes it different?
// - we insert it into the map of canonicalized plans only when at least 2 have the same | ||
// schema | ||
val exchanges = Map[StructType, (Exchange, Map[SparkPlan, Exchange])]() | ||
val subqueries = Map[StructType, (BaseSubqueryExec, Map[SparkPlan, BaseSubqueryExec])]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The nested map somehow makese the logic unnecessarily complicated. Can we define a class like Canonicalized[T]
where T
can be an Exchange
or BaseSubqueryExec
and then implement the equals
and hashcode
of this class to first check for the schema equality? Then, we can have simply a map like Map[Canonicalized[T], T]
, which will simplify the code quite a bit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please find answer in #28885 (comment)
Test build #124572 has finished for PR 28885 at commit
|
Thanks for the review and comments @dbaliafroozeh! |
Test build #124602 has finished for PR 28885 at commit
|
case sub: ExecSubqueryExpression => | ||
val subquery = reuse(sub.plan).asInstanceOf[BaseSubqueryExec] | ||
sub.withNewPlan( | ||
if (conf.subqueryReuseEnabled) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the new abstraction for hiding the loop, how about one step further and also abstract over this pattern when the found element is the same instance as the one in the cache, something like this:
def lookupOrElse(plan: T, f: T => T) {
val res = ....
if (res eq plan) {
plan
} else {
f(res)
}
}
and then the call site becomes: lookupOrElse(subquery, ReusedSubqueryExec(_))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name lookupOrElse
sounds a bit weird to me, it suggests that f
is applied if lookup fails (item isn't in the map yet). But in this case f
should be applied if lookup founds the item in the map. So if we want that abstraction, shouldn't we call the method putOrElse
or addOrElse
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added addOrElse
in the latest commits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree lookupOrElse
is not the best name. I was thinking more about it, so we want a method with the following semantics:
Try to find an existing node with the same canonicalized form:
- if no such node found, add the node to the map and return the node itself
- if a node found that refers the same instance as the key, do nothing, and return the node itself
- otherwise, call the given function on the retrieved node from the map and then return the result
I think it's not like anything (I mean common methods) on the Map interfaces that I know of. Somehow reminds me of Java 8 putIfAbsent
method, but with the difference it calls the passed function if it's present.
On second thought, maybe we should rename the lookup
method to getOrElseUpdate
because it doesn't just lookup.
And call the second method applyIfPresent
. Maybe also doesn't make sense to have the second method anymore since it's very specific and hard to capture with a method name what's going on.
One last thing, what was happening before if we had two exchanges in the query plan that were referring to the same instance? Were we leaving them intact or replacing it with a reuse node? Because now we just leave it as is. I'm not sure though if such a situation actually can happen to have to exchanges referring to the same instance in the query plan, if not, maybe we can remove the check and (the second method) altogether. Also maybe putting a reuse node when there are the same instances is even better, it's just a wrapper and signals the presence of the same exchange node.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the name lookup
doesn't capture exactly what the method does, but getOrElseUpdate
has well known meaning and parameter list on scala maps and the first method doesn't match to that so I would use a different name like getOrElseAdd
or lookupOrElseAdd
.
Yes, it's even harder to find a good name for the second method. How about reuseOrElseAdd
? It doesn't match any of the existing methods of a map and captures both reuse (with applying f
) and add functionality.
The old behaviour was that on the second encounter of the same instance it was wrapped into a reuse node. This actually works different in this PR and the instance doesn't get wrapped on the second encounter. I did this way for 3 reasons:
- From performance perspective it doesn't make any difference
- An exchange or subquery instance can appear 2 times in the plan only if someone manually crafted the plan, which I don't think happens in a real word use case
- I'm using
getOrElseUpdate
2 times inlookup
to simplify the code, but from the result ofgetOrElseUpdate
we can't tell if the key had already been in the map or it was added during the call. Besides simple code I also like usinggetOrElseUpdate
because some concurrent map implementation supports it as atomic operation. I admit it doesn't matter in this PR, but I think it would be nice to incorporateReuseAdaptiveSubquery
functionality into this rule in a separate PR in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuseOrElseAdd
is probably a better name, but this naming is getting hard :-) About the new behavior, sounds good to me, but probably if there is no performance benefit and it's only happening in the tests, I would have just wrap the same instance in a reuse node to also not bother about the second method and the equality check, but don't have a strong opinion on this.
*/ | ||
case class ReuseExchangeAndSubquery(conf: SQLConf) extends Rule[SparkPlan] { | ||
|
||
private class ReuseCache[T <: SparkPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this new abstraction, just two nitpicky comments:
- This is not really a cache IMO, in a sense that we need to hold on to all the values, it's more a memo table. How about renaming it to
ReuseMap
? or something in this line? - Can we move this class somewhere else? I know it's being used only here, but I think it'll be great to use this anytime we wanna lookup something with canonicalized forms. If you agree to move it to a util package, let's also make the type upperbound
QueryPlan
so that we can use it for other node types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, renamed it to ReuseMap
and generalized it as you suggested.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes look good to me, added 3 minor comments.
Thanks @dbaliafroozeh. @cloud-fan, @maryannxue could you review this PR? |
Test build #124663 has finished for PR 28885 at commit
|
Test build #124679 has finished for PR 28885 at commit
|
Test build #124753 has finished for PR 28885 at commit
|
retest this please |
Test build #124810 has finished for PR 28885 at commit
|
@cloud-fan, @maryannxue, @maropu, @viirya could you please review this PR? |
*/ | ||
class ReuseMap[T <: QueryPlan[_]] { | ||
// scalastyle:off structural.type | ||
private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the existence type T2
, which stands for the canonicalized type of T
, is hard to read then we can move T2
to the class definition like this:
class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
private val map = Map[StructType, (T, Map[T2, T])]()
and initialize the ReuseMap
s as:
val exchanges = new ReuseMap[Exchange, SparkPlan]()
val subqueries = new ReuseMap[BaseSubqueryExec, SparkPlan]()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider the second version much easier to reason about. Its slightly more types you have to put into the code, but IMO those types help with readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, 5df4c53 adds the new T2 type param.
plan transformUp { | ||
case exchange: Exchange => reuse(exchange) | ||
} transformAllExpressions { | ||
// Lookup inside subqueries for duplicate exchanges | ||
case in: InSubqueryExec => | ||
val newIn = in.plan.transformUp { | ||
case exchange: Exchange => reuse(exchange) | ||
} | ||
in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec]) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transformUp
and then transformAllExpressions
was actually a bit weird for 2 reasons:
- A minor issue is that
reuse
is a partial function and yet we use it as a normal function, I mean this code probably should have been written asand the definition ofplan transformUp reuse transformAllExpressions { // Lookup inside subqueries for duplicate exchanges case in: InSubqueryExec => val newIn = in.plan.transformUp reuse in.copy(plan = newIn.asInstanceOf[BaseSubqueryExec]) }
reuse
should have beendef reuse: PartialFunction[SparkPlan, SparkPlan] = {
- A bigger issue is that this part of the code traverses the plan 2 times, first with the
transformUp
and then with thetransformAllExpressions
and both traversals can insert reuse references to the plan. Imagine that the first traversal (transformUp
) inserts a reuse reference to an exchange. Let's say it is inserts aReuseExchange
node pointing to theExchange id=1
but then the second traversal (transformAllExpressions
) finds anInSubqueryExec
expression in aFileSourceScanExec
under the nodeExchange id=1
. If there is reuse opportunity in the subplan of thatInSubqueryExec
then the change due to inserting another reuse node into the subplan propagates up and results thatExchange id=1
gets replaced toExchange id=x
in the parent plan. In this case the reuse node created in the first traversal pointing toExchange id=1
will be invalid.
(Please note that this issue is very similar to what I showed in the description of the PR but in that case the 2 traversals were due to the separateReuseSubquery
andReuseExchange
rules.)
I think the fix to this issue is to use an 1 pass, whole-plan, bottom-up traversal like I did inReuseExchangeAndSubquery
in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you tell me the exact test case that demonstrates this issue?
I kinda see the issue you're talking about, but my thinking is that maybe this just requires fixing ReuseExchange
, rather than combining both into one rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the issue described in 1. in the PR description and tested with the case SPARK-32041: No reuse interference inside ReuseExchange
in the new ReuseExchangeAndSubquerySuite
: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR28
Combining the 2 rules are required to fix 2. and tested with the case SPARK-32041: No reuse interference between ReuseExchange and ReuseSubquery
: https://github.com/apache/spark/pull/28885/files#diff-f6f54d5cfc4254d8ed9122013394351bR67
What's still WIP? This idea LGTM and I'll take a look when the code is ready for review. |
Thanks! Actually it's not WIP, I just rebased it on #32885 yesterday and waited for the tests. Results looked Ok, but it seems I need to merge master and regenerate golden files again. |
# Conflicts: # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14a/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q14b/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23a/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q23b/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24a/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q24b/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q35/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v1_4/q8/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q14a/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q24/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6.sf100/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/explain.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q6/simplified.txt # sql/core/src/test/resources/tpcds-plan-stability/approved-plans-v2_7/q64/explain.txt
Kubernetes integration test starting |
Kubernetes integration test status success |
I updated the new rule to use pruned transforms and fixed a test suite because AQE is enabled by default now. |
Kubernetes integration test unable to build dist. exiting with code: 1 |
Test build #139947 has finished for PR 28885 at commit
|
Test build #139950 has finished for PR 28885 at commit
|
@@ -41,22 +38,15 @@ object ExplainUtils extends AdaptiveSparkPlanHelper { | |||
* | |||
* @param plan Input query plan to process | |||
* @param append function used to append the explain output | |||
* @param startOperatorID The start value of operation id. The subsequent operations will | |||
* be assigned higher value. | |||
* | |||
* @return The last generated operation id for this input plan. This is to ensure we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 7187ebd.
|
||
/** | ||
* Find out duplicated exchanges and subqueries in the whole spark plan including subqueries, then | ||
* use the same exchange or subquery for all the references. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should explain why we reuse exchange and subquery in this single rule
Note that the Spark plan is a mutually recursive data structure:
SparkPlan -> Expr -> Subquery -> SparkPlan -> Expr -> Subquery -> ...
Therefore, in this rule, we recursively rewrite the exchanges and subqueries in a bottom-up way, in one go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, added the explanation in 7187ebd
* @param plan the input plan | ||
* @return the matching plan or the input plan | ||
*/ | ||
def lookupOrElseAdd(plan: T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 7187ebd
Did you see any regressions? |
Let me rerun the benchmarks again and come back to you. |
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #139987 has finished for PR 28885 at commit
|
The previous benchmark run was quite some time ago. As AQE has been enabled by default since that and now it works with most of those affected TPCDS queries, the improvement is only visible if I disable AQE. But, in that case it is still considerable: https://github.com/peter-toth/spark/blob/64c619e798c003e9b27ba534ae370db75b1cfa1c/sql/core/benchmarks/TPCDSQueryBenchmark-results.txt In some cases non-AQE became as good as AQE. UPDATE: I've included AQE and non-AQE results as well to see the improvement and catch regression. I don't see any regression. |
* Therefore, in this rule, we recursively rewrite the exchanges and subqueries in a bottom-up way, | ||
* in one go. | ||
*/ | ||
case object ReuseExchangeAndSubquery extends Rule[SparkPlan] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AQE has a totally different way to reuse exchange/subquery. It has a query-global map to store created exchange/subquery (see AdaptiveExecutionContext
), and AQE executes leaf subqueries first. This is exactly the same as what this rule is doing for exchange/subquery reuse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One followup we can do is to leverage the new ReuseMap
in AdaptiveExecutionContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, the AQE way may be better: just use the canonicalized plan as the key, instead of calling sameResult
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReuseMap
has changed since the first version of this PR.
Unfortunately, I rebased the PR already so only some discussion remained: #28885 (comment) (about reverting 2nd to 3rd).
The 1st version used a simple Map[<canonicalized plan>, <plan>]
as AdaptiveExecutionContext
does.
The 2nd version was Map[<schema>, (<first plan with this schema>, Map[<canonicalized plan>, <plan>])]
with lazy initialization of the inner map to avoid canonicalization if there are no matching schemas but still provide quick lookup by canonicalized plans.
This 3rd version reverted to the original Map[<schema>, ArrayBuffer[<plan>]]
idea that ReuseExchange
and ReuseSubquery
had used.
I can open a follow-up PR to improve ReuseMap
to 2nd version if required, but I'm not sure that the improvement would be visible with TPCDS or real life queries.
If we want to consolidate reuse map logic then I think we should also take into account that ReuseAdaptiveSubquery
uses a concurrent, lock-free TrieMap
map implementation which is not required by this non-AQE rule.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea non-AQE doesn't need thread safety, but I feel it's still better to unify the major idea:
Map[<canonicalized plan>, <plan>]
Map[<schema>, ArrayBuffer[<plan>]]
I agree it's not a big deal for perf, but code consistency is also important. Map[<canonicalized plan>, <plan>]
looks better as it's simpler, we can remove ReuseMap
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I can file a follow-up PR today or tomorrow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened the follow-up PR here: #33021
thanks, merging to master! |
Thanks for the review! |
… rules ### What changes were proposed in this pull request? This PR unifies reuse map data structures in non-AQE and AQE rules to a simple `Map[<canonicalized plan>, <plan>]` based on the discussion here: #28885 (comment) ### Why are the changes needed? The proposed `Map[<canonicalized plan>, <plan>]` is simpler than the currently used `Map[<schema>, ArrayBuffer[<plan>]]` in `ReuseMap`/`ReuseExchangeAndSubquery` (non-AQE) and consistent with the `ReuseAdaptiveSubquery` (AQE) subquery reuse rule. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #33021 from peter-toth/SPARK-35855-unify-reuse-map-data-structures. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
This PR:
Fixes an issue in
ReuseExchange
rule that can result aReusedExchange
node pointing to an invalid exchange. This can happen due to the 2 separate traversals inReuseExchange
when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.Consider the following query:
Before this PR the plan of the query was (note the
<== this reuse node points to a non-existing node
marker):After this PR:
Fixes an issue with separate consecutive
ReuseExchange
andReuseSubquery
rules that can result aReusedExchange
node pointing to an invalid exchange. This can happen due to the 2 separate rules whenReuseSubquery
rule modifies an exchange that has already been referenced (reused) inReuseExchange
rule.Consider the following query:
Before this PR the plan of the query was (note the
<== this reuse node points to a non-existing node
marker):After this PR:
(This example contains issue 1 as well.)
Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.
Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to
ExplainUtils
are to handle these references properly.This PR fixes the above 3 issues by unifying the separate rules into a
ReuseExchangeAndSubquery
rule that does a 1 pass, whole-plan, bottom-up traversal.Why are the changes needed?
Performance improvement.
How was this patch tested?
ReuseExchangeAndSubquerySuite
to cover 1. and 2.DynamicPartitionPruningSuite
,SubquerySuite
andExchangeSuite
to cover 3.ReuseMapSuite
to testReuseMap
.PlanStabilitySuite
s for invalid reuse references.