-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13523] [SQL] Reuse exchanges in a query #11403
Conversation
Test build #52081 has finished for PR 11403 at commit
|
## What changes were proposed in this pull request? This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen. For example: ```python >>> sqlContext.range(100).registerTempTable("range") >>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True) == Parsed Logical Plan == 'Project [unresolvedalias(('id / subquery#9), None)] : +- 'SubqueryAlias subquery#9 : +- 'Project [unresolvedalias('sum('id), None)] : +- 'UnresolvedRelation `range`, None +- 'Filter ('id > subquery#8) : +- 'SubqueryAlias subquery#8 : +- 'GlobalLimit 1 : +- 'LocalLimit 1 : +- 'Project [unresolvedalias('id, None)] : +- 'UnresolvedRelation `range`, None +- 'UnresolvedRelation `range`, None == Analyzed Logical Plan == (id / scalarsubquery()): double Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- SubqueryAlias range +- Range 0, 100, 1, 4, [id#0L] == Optimized Logical Plan == Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- Range 0, 100, 1, 4, [id#0L] +- Range 0, 100, 1, 4, [id#0L] == Physical Plan == WholeStageCodegen : +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())#11] : : +- Subquery subquery#9 : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L]) : : : +- INPUT : : +- Exchange SinglePartition, None : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L]) : : : +- Range 0, 1, 4, 100, [id#0L] : +- Filter (id#0L > subquery#8) : : +- Subquery subquery#8 : : +- CollectLimit 1 : : +- WholeStageCodegen : : : +- Project [id#0L] : : : +- Range 0, 1, 4, 100, [id#0L] : +- Range 0, 1, 4, 100, [id#0L] ``` The web UI looks like: ![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png) This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by #11403 . ## How was this patch tested? Existing tests, also manual tests with the example query, check the explain and web UI. Author: Davies Liu <davies@databricks.com> Closes #11417 from davies/viz_subquery.
Test build #52451 has finished for PR 11403 at commit
|
Test build #52452 has finished for PR 11403 at commit
|
Test build #52477 has finished for PR 11403 at commit
|
Test build #52480 has finished for PR 11403 at commit
|
@nongli @JoshRosen This PR is ready for review. |
Test build #52481 has finished for PR 11403 at commit
|
Test build #52483 has finished for PR 11403 at commit
|
@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy | |||
} | |||
|
|||
override def innerChildren: Seq[PlanType] = subqueries | |||
|
|||
/** | |||
* Cleaned copy of this query plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what does cleaned mean? It doesn't help me understand this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1; this is confusing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does "cleaned" actually mean "canonicalized" or something similar?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, will changed to "canonicalized"
Test build #52492 has finished for PR 11403 at commit
|
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@JoshRosen Do you have time to review this? |
Test build #52643 has finished for PR 11403 at commit
|
Taking a look at this now. |
return plan | ||
} | ||
// Build a hash map using schema of exchanges to avoid O(N*N) sameResult calls. | ||
val exchanges = mutable.HashMap[StructType, ArrayBuffer[Exchange]]() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't StructType
's equals() and hashCode() methods be affected by field names? What if the two exchanges produce logically equivalent output but assign different names to the output columns? In this case, would that lead to false-negatives when searching for exchanges that have the sameResult
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question, it could be false negative.
But usually if two plan have the same result, they should have the same inputs also the same plan and expressions, they should generate the same name (does not include the random ExprId).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we can always follow up on this later if it turns out to be a problem in practice.
FYI, I re-organized the JIRA relationships a bit under SPARK-13756. |
@@ -237,4 +237,65 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy | |||
} | |||
|
|||
override def innerChildren: Seq[PlanType] = subqueries | |||
|
|||
/** | |||
* Canonicalized copy of this query plan. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nongli, is "canonicalized" sufficiently unambiguous here or do we need to explain what this means?
Test build #52703 has finished for PR 11403 at commit
|
Test build #52709 has finished for PR 11403 at commit
|
@JoshRosen @nongli Does this look good to you now? |
LGTM |
Merging this into master, thanks! |
## What changes were proposed in this pull request? This PR support visualization for subquery in SQL web UI, also improve the explain of subquery, especially when it's used together with whole stage codegen. For example: ```python >>> sqlContext.range(100).registerTempTable("range") >>> sqlContext.sql("select id / (select sum(id) from range) from range where id > (select id from range limit 1)").explain(True) == Parsed Logical Plan == 'Project [unresolvedalias(('id / subquery#9), None)] : +- 'SubqueryAlias subquery#9 : +- 'Project [unresolvedalias('sum('id), None)] : +- 'UnresolvedRelation `range`, None +- 'Filter ('id > subquery#8) : +- 'SubqueryAlias subquery#8 : +- 'GlobalLimit 1 : +- 'LocalLimit 1 : +- 'Project [unresolvedalias('id, None)] : +- 'UnresolvedRelation `range`, None +- 'UnresolvedRelation `range`, None == Analyzed Logical Plan == (id / scalarsubquery()): double Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- SubqueryAlias range : +- Range 0, 100, 1, 4, [id#0L] +- SubqueryAlias range +- Range 0, 100, 1, 4, [id#0L] == Optimized Logical Plan == Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11] : +- SubqueryAlias subquery#9 : +- Aggregate [(sum(id#0L),mode=Complete,isDistinct=false) AS sum(id)#10L] : +- Range 0, 100, 1, 4, [id#0L] +- Filter (id#0L > subquery#8) : +- SubqueryAlias subquery#8 : +- GlobalLimit 1 : +- LocalLimit 1 : +- Project [id#0L] : +- Range 0, 100, 1, 4, [id#0L] +- Range 0, 100, 1, 4, [id#0L] == Physical Plan == WholeStageCodegen : +- Project [(cast(id#0L as double) / cast(subquery#9 as double)) AS (id / scalarsubquery())apache#11] : : +- Subquery subquery#9 : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Final,isDistinct=false)], output=[sum(id)#10L]) : : : +- INPUT : : +- Exchange SinglePartition, None : : +- WholeStageCodegen : : : +- TungstenAggregate(key=[], functions=[(sum(id#0L),mode=Partial,isDistinct=false)], output=[sum#14L]) : : : +- Range 0, 1, 4, 100, [id#0L] : +- Filter (id#0L > subquery#8) : : +- Subquery subquery#8 : : +- CollectLimit 1 : : +- WholeStageCodegen : : : +- Project [id#0L] : : : +- Range 0, 1, 4, 100, [id#0L] : +- Range 0, 1, 4, 100, [id#0L] ``` The web UI looks like: ![subquery](https://cloud.githubusercontent.com/assets/40902/13377963/932bcbae-dda7-11e5-82f7-03c9be85d77c.png) This PR also change the tree structure of WholeStageCodegen to make it consistent than others. Before this change, Both WholeStageCodegen and InputAdapter hold a references to the same plans, those could be updated without notify another, causing problems, this is discovered by apache#11403 . ## How was this patch tested? Existing tests, also manual tests with the example query, check the explain and web UI. Author: Davies Liu <davies@databricks.com> Closes apache#11417 from davies/viz_subquery.
## What changes were proposed in this pull request? It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache). Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query. In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan. Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning. After the rule, the plan will looks like: ``` WholeStageCodegen : +- Project [id#0L] : +- BroadcastHashJoin [id#0L], [id#2L], Inner, BuildRight, None : :- Project [id#0L] : : +- BroadcastHashJoin [id#0L], [id#1L], Inner, BuildRight, None : : :- Range 0, 1, 4, 1024, [id#0L] : : +- INPUT : +- INPUT :- BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) : +- WholeStageCodegen : : +- Range 0, 1, 4, 1024, [id#1L] +- ReusedExchange [id#2L], BroadcastExchange HashedRelationBroadcastMode(true,List(id#1L),List(id#1L)) ``` ![bjoin](https://cloud.githubusercontent.com/assets/40902/13414787/209e8c5c-df0a-11e5-8a0f-edff69d89e83.png) For three ways SortMergeJoin, ``` == Physical Plan == WholeStageCodegen : +- Project [id#0L] : +- SortMergeJoin [id#0L], [id#4L], None : :- INPUT : +- INPUT :- WholeStageCodegen : : +- Project [id#0L] : : +- SortMergeJoin [id#0L], [id#3L], None : : :- INPUT : : +- INPUT : :- WholeStageCodegen : : : +- Sort [id#0L ASC], false, 0 : : : +- INPUT : : +- Exchange hashpartitioning(id#0L, 200), None : : +- WholeStageCodegen : : : +- Range 0, 1, 4, 33554432, [id#0L] : +- WholeStageCodegen : : +- Sort [id#3L ASC], false, 0 : : +- INPUT : +- ReusedExchange [id#3L], Exchange hashpartitioning(id#0L, 200), None +- WholeStageCodegen : +- Sort [id#4L ASC], false, 0 : +- INPUT +- ReusedExchange [id#4L], Exchange hashpartitioning(id#0L, 200), None ``` ![sjoin](https://cloud.githubusercontent.com/assets/40902/13414790/27aea61c-df0a-11e5-8cbf-fbc985c31d95.png) If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents. ## How was this patch tested? Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in apache#11514 ). Author: Davies Liu <davies@databricks.com> Closes apache#11403 from davies/dedup.
@davies I have a question about this. Maybe you have the answer for it? Thanks. For a shuffle, although |
@davies Hi, what do you mean by "Since all the planner only work with tree, so this rule should be the last one for the entire planning."? |
What changes were proposed in this pull request?
It’s possible to have common parts in a query, for example, self join, it will be good to avoid the duplicated part to same CPUs and memory (Broadcast or cache).
Exchange will materialize the underlying RDD by shuffle or collect, it’s a great point to check duplicates and reuse them. Duplicated exchanges means they generate exactly the same result inside a query.
In order to find out the duplicated exchanges, we should be able to compare SparkPlan to check that they have same results or not. We already have that for LogicalPlan, so we should move that into QueryPlan to make it available for SparkPlan.
Once we can find the duplicated exchanges, we should replace all of them with same SparkPlan object (could be wrapped by ReusedExchage for explain), then the plan tree become a DAG. Since all the planner only work with tree, so this rule should be the last one for the entire planning.
After the rule, the plan will looks like:
For three ways SortMergeJoin,
If the same ShuffleExchange or BroadcastExchange, execute()/executeBroadcast() will be called by different parents, they should cached the RDD/Broadcast, return the same one for all the parents.
How was this patch tested?
Added some unit tests for this. Had done some manual tests on TPCDS query Q59 and Q64, we can see some exchanges are re-used (this requires a change in PhysicalRDD to for sameResult, is be done in #11514 ).