Skip to content

Commit

Permalink
[SPARK-29375][SPARK-28940][SPARK-32041][SQL] Whole plan exchange and …
Browse files Browse the repository at this point in the history
…subquery reuse

### What changes were proposed in this pull request?
This PR:
1. Fixes an issue in `ReuseExchange` rule that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate traversals in `ReuseExchange` when the 2nd traversal modifies an exchange that has already been referenced (reused) in the 1st traversal.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   )
   SELECT * FROM t AS a JOIN t AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#298]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#289]
   :           :                 +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#179]
   :              +- *(1) Project [k#17L]
   :                 +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :                    +- *(1) ColumnarToRow
   :                       +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#184] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(7) SortMergeJoin [id#14L], [id#18L], Inner
   :- *(3) Sort [id#14L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   :     +- *(2) Project [id#14L, k#17L]
   :        +- *(2) BroadcastHashJoin [k#15L], [k#17L], Inner, BuildRight
   :           :- *(2) Project [id#14L, k#15L]
   :           :  +- *(2) Filter isnotnull(id#14L)
   :           :     +- *(2) ColumnarToRow
   :           :        +- FileScan parquet default.df1[id#14L,k#15L] Batched: true, DataFilters: [isnotnull(id#14L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#15L), dynamicpruningexpression(k#15L IN dynamicpruning#26)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :           :              +- SubqueryBroadcast dynamicpruning#26, 0, [k#17L], [id=#103]
   :           :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   :           :                    +- *(1) Project [k#17L]
   :           :                       +- *(1) Filter ((isnotnull(id#16L) AND (id#16L < 2)) AND isnotnull(k#17L))
   :           :                          +- *(1) ColumnarToRow
   :           :                             +- FileScan parquet default.df2[id#16L,k#17L] Batched: true, DataFilters: [isnotnull(id#16L), (id#16L < 2), isnotnull(k#17L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :           +- ReusedExchange [k#17L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#102]
   +- *(6) Sort [id#18L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#18L, k#21L], Exchange hashpartitioning(id#14L, 5), true, [id=#231]
   ```
2. Fixes an issue with separate consecutive `ReuseExchange` and `ReuseSubquery` rules that can result a `ReusedExchange` node pointing to an invalid exchange. This can happen due to the 2 separate rules when `ReuseSubquery` rule modifies an exchange that has already been referenced (reused) in `ReuseExchange` rule.
   Consider the following query:
   ```
   WITH t AS (
     SELECT df1.id, df2.k
     FROM df1 JOIN df2 ON df1.k = df2.k
     WHERE df2.id < 2
   ),
   t2 AS (
     SELECT * FROM t
     UNION
     SELECT * FROM t
   )
   SELECT * FROM t2 AS a JOIN t2 AS b ON a.id = b.id
   ```
   Before this PR the plan of the query was (note the `<== this reuse node points to a non-existing node` marker):
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#979]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#975]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                 :     :                 +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   :                 :        +- *(1) Project [k#49L]
   :                 :           +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :              +- *(1) ColumnarToRow
   :                 :                 +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#926]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#656]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#761] <== this reuse node points to a non-existing node
   ```
   After this PR:
   ```
   == Physical Plan ==
   *(15) SortMergeJoin [id#46L], [id#58L], Inner
   :- *(7) Sort [id#46L ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   :     +- *(6) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :        +- Exchange hashpartitioning(id#46L, k#49L, 5), true, [id=#789]
   :           +- *(5) HashAggregate(keys=[id#46L, k#49L], functions=[])
   :              +- Union
   :                 :- *(2) Project [id#46L, k#49L]
   :                 :  +- *(2) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                 :     :- *(2) Project [id#46L, k#47L]
   :                 :     :  +- *(2) Filter isnotnull(id#46L)
   :                 :     :     +- *(2) ColumnarToRow
   :                 :     :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                 :     :              +- SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                 :     :                 +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 :     :                    +- *(1) Project [k#49L]
   :                 :     :                       +- *(1) Filter ((isnotnull(id#48L) AND (id#48L < 2)) AND isnotnull(k#49L))
   :                 :     :                          +- *(1) ColumnarToRow
   :                 :     :                             +- FileScan parquet default.df2[id#48L,k#49L] Batched: true, DataFilters: [isnotnull(id#48L), (id#48L < 2), isnotnull(k#49L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [], PushedFilters: [IsNotNull(id), LessThan(id,2), IsNotNull(k)], ReadSchema: struct<id:bigint,k:bigint>
   :                 :     +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   :                 +- *(4) Project [id#46L, k#49L]
   :                    +- *(4) BroadcastHashJoin [k#47L], [k#49L], Inner, BuildRight
   :                       :- *(4) Project [id#46L, k#47L]
   :                       :  +- *(4) Filter isnotnull(id#46L)
   :                       :     +- *(4) ColumnarToRow
   :                       :        +- FileScan parquet default.df1[id#46L,k#47L] Batched: true, DataFilters: [isnotnull(id#46L)], Format: Parquet, Location: InMemoryFileIndex[file:/Users/petertoth/git/apache/spark/sql/core/spark-warehouse/org.apache.spar..., PartitionFilters: [isnotnull(k#47L), dynamicpruningexpression(k#47L IN dynamicpruning#66)], PushedFilters: [IsNotNull(id)], ReadSchema: struct<id:bigint>
   :                       :              +- ReusedSubquery SubqueryBroadcast dynamicpruning#66, 0, [k#49L], [id=#485]
   :                       +- ReusedExchange [k#49L], BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true])), [id=#484]
   +- *(14) Sort [id#58L ASC NULLS FIRST], false, 0
      +- ReusedExchange [id#58L, k#61L], Exchange hashpartitioning(id#46L, 5), true, [id=#793]
   ```
   (This example contains issue 1 as well.)

3. Improves the reuse of exchanges and subqueries by enabling reuse across the whole plan. This means that the new combined rule utilizes the reuse opportunities between parent and subqueries by traversing the whole plan. The traversal is started on the top level query only.

4. Due to the order of traversal this PR does while adding reuse nodes, the reuse nodes appear in parent queries if reuse is possible between different levels of queries (typical for DPP). This is not an issue from execution perspective, but this also means "forward references" in explain formatted output where parent queries come first. The changes I made to `ExplainUtils` are to handle these references properly.

This PR fixes the above 3 issues by unifying the separate rules into a `ReuseExchangeAndSubquery` rule that does a 1 pass, whole-plan, bottom-up traversal.

### Why are the changes needed?
Performance improvement.

### How was this patch tested?
- New UTs in `ReuseExchangeAndSubquerySuite` to cover 1. and 2.
- New UTs in `DynamicPartitionPruningSuite`, `SubquerySuite` and `ExchangeSuite` to cover 3.
- New `ReuseMapSuite` to test `ReuseMap`.
- Checked new golden files of `PlanStabilitySuite`s for invalid reuse references.
- TPCDS benchmarks.

Closes #28885 from peter-toth/SPARK-29375-SPARK-28940-whole-plan-reuse.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
peter-toth authored and cloud-fan committed Jun 21, 2021
1 parent 248fda3 commit 682e7f2
Show file tree
Hide file tree
Showing 490 changed files with 51,841 additions and 52,460 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import scala.collection.mutable.{ArrayBuffer, Map}

import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.types.StructType

/**
* Map of canonicalized plans that can be used to find reuse possibilities.
*
* To avoid costly canonicalization of a plan:
* - we use its schema first to check if it can be replaced to a reused one at all
* - we insert it into the map of canonicalized plans only when at least 2 have the same schema
*
* @tparam T the type of the node we want to reuse
* @tparam T2 the type of the canonicalized node
*/
class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
private val map = Map[StructType, ArrayBuffer[T]]()

/**
* Find a matching plan with the same canonicalized form in the map or add the new plan to the
* map otherwise.
*
* @param plan the input plan
* @return the matching plan or the input plan
*/
private def lookupOrElseAdd(plan: T): T = {
val sameSchema = map.getOrElseUpdate(plan.schema, ArrayBuffer())
val samePlan = sameSchema.find(plan.sameResult)
if (samePlan.isDefined) {
samePlan.get
} else {
sameSchema += plan
plan
}
}

/**
* Find a matching plan with the same canonicalized form in the map and apply `f` on it or add
* the new plan to the map otherwise.
*
* @param plan the input plan
* @param f the function to apply
* @tparam T2 the type of the reuse node
* @return the matching plan with `f` applied or the input plan
*/
def reuseOrElseAdd[T2 >: T](plan: T, f: T => T2): T2 = {
val found = lookupOrElseAdd(plan)
if (found eq plan) {
plan
} else {
f(found)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryNode}
import org.apache.spark.sql.types.IntegerType

case class TestNode(children: Seq[TestNode], output: Seq[Attribute]) extends LogicalPlan {
override protected def withNewChildrenInternal(
newChildren: IndexedSeq[LogicalPlan]): LogicalPlan = copy(children = children)
}
case class TestReuseNode(child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan =
copy(child = newChild)
}

class ReuseMapSuite extends SparkFunSuite {
private val leafNode1 = TestNode(Nil, Seq(AttributeReference("a", IntegerType)()))
private val leafNode2 = TestNode(Nil, Seq(AttributeReference("b", IntegerType)()))
private val parentNode1 = TestNode(Seq(leafNode1), Seq(AttributeReference("a", IntegerType)()))
private val parentNode2 = TestNode(Seq(leafNode2), Seq(AttributeReference("b", IntegerType)()))

private def reuse(testNode: TestNode) = TestReuseNode(testNode)

test("no reuse if same instance") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()

reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode1, reuse) == leafNode1)
assert(reuseMap.reuseOrElseAdd(parentNode1, reuse) == parentNode1)
}

test("reuse if different instance with same canonicalized plan") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode1.clone.asInstanceOf[TestNode], reuse) ==
reuse(leafNode1))
assert(reuseMap.reuseOrElseAdd(parentNode1.clone.asInstanceOf[TestNode], reuse) ==
reuse(parentNode1))
}

test("no reuse if different canonicalized plan") {
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
reuseMap.reuseOrElseAdd(leafNode1, reuse)
reuseMap.reuseOrElseAdd(parentNode1, reuse)

assert(reuseMap.reuseOrElseAdd(leafNode2, reuse) == leafNode2)
assert(reuseMap.reuseOrElseAdd(parentNode2, reuse) == parentNode2)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
Expand All @@ -28,11 +27,9 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, AdaptiveS
object ExplainUtils extends AdaptiveSparkPlanHelper {
/**
* Given a input physical plan, performs the following tasks.
* 1. Computes the operator id for current operator and records it in the operator
* by setting a tag.
* 2. Computes the whole stage codegen id for current operator and records it in the
* 1. Computes the whole stage codegen id for current operator and records it in the
* operator by setting a tag.
* 3. Generate the two part explain output for this plan.
* 2. Generate the two part explain output for this plan.
* 1. First part explains the operator tree with each operator tagged with an unique
* identifier.
* 2. Second part explains each operator in a verbose manner.
Expand All @@ -41,22 +38,11 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
*
* @param plan Input query plan to process
* @param append function used to append the explain output
* @param startOperatorID The start value of operation id. The subsequent operations will
* be assigned higher value.
*
* @return The last generated operation id for this input plan. This is to ensure we
* always assign incrementing unique id to each operator.
*
*/
private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit,
startOperatorID: Int): Int = {

val operationIDs = new mutable.ArrayBuffer[(Int, QueryPlan[_])]()
var currentOperatorID = startOperatorID
plan: T,
append: String => Unit): Unit = {
try {
currentOperatorID = generateOperatorIDs(plan, currentOperatorID, operationIDs)
generateWholeStageCodegenIds(plan)

QueryPlan.append(
Expand All @@ -67,31 +53,36 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
printOperatorId = true)

append("\n")
var i: Integer = 0
for ((opId, curPlan) <- operationIDs) {
append(curPlan.verboseStringWithOperatorId())
}

val operationsWithID = ArrayBuffer.empty[QueryPlan[_]]
collectOperatorsWithID(plan, operationsWithID)
operationsWithID.foreach(p => append(p.verboseStringWithOperatorId()))

} catch {
case e: AnalysisException => append(e.toString)
}
currentOperatorID
}

/**
* Given a input physical plan, performs the following tasks.
* 1. Generates the explain output for the input plan excluding the subquery plans.
* 2. Generates the explain output for each subquery referenced in the plan.
*/
def processPlan[T <: QueryPlan[T]](
plan: => QueryPlan[T],
append: String => Unit): Unit = {
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
try {
val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
var currentOperatorID = 0
currentOperatorID = processPlanSkippingSubqueries(plan, append, currentOperatorID)
currentOperatorID = generateOperatorIDs(plan, currentOperatorID)

val subqueries = ArrayBuffer.empty[(SparkPlan, Expression, BaseSubqueryExec)]
getSubqueries(plan, subqueries)
var i = 0

subqueries.foldLeft(currentOperatorID) {
(curId, plan) => generateOperatorIDs(plan._3.child, curId)
}

processPlanSkippingSubqueries(plan, append)

var i = 0
for (sub <- subqueries) {
if (i == 0) {
append("\n===== Subqueries =====\n\n")
Expand All @@ -104,10 +95,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
// the explain output. In case of subquery reuse, we don't print subquery plan more
// than once. So we skip [[ReusedSubqueryExec]] here.
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
currentOperatorID = processPlanSkippingSubqueries(
sub._3.child,
append,
currentOperatorID)
processPlanSkippingSubqueries(sub._3.child, append)
}
append("\n")
}
Expand All @@ -117,59 +105,85 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}

/**
* Traverses the supplied input plan in a bottom-up fashion does the following :
* 1. produces a map : operator identifier -> operator
* 2. Records the operator id via setting a tag in the operator.
* Traverses the supplied input plan in a bottom-up fashion and records the operator id via
* setting a tag in the operator.
* Note :
* 1. Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
* appear in the explain output.
* 2. operator identifier starts at startOperatorID + 1
* - Operator such as WholeStageCodegenExec and InputAdapter are skipped as they don't
* appear in the explain output.
* - Operator identifier starts at startOperatorID + 1
*
* @param plan Input query plan to process
* @param startOperatorID The start value of operation id. The subsequent operations will
* be assigned higher value.
* @param operatorIDs A output parameter that contains a map of operator id and query plan. This
* is used by caller to print the detail portion of the plan.
* @return The last generated operation id for this input plan. This is to ensure we
* always assign incrementing unique id to each operator.
* @param startOperatorID The start value of operation id. The subsequent operations will be
* assigned higher value.
* @return The last generated operation id for this input plan. This is to ensure we always
* assign incrementing unique id to each operator.
*/
private def generateOperatorIDs(
plan: QueryPlan[_],
startOperatorID: Int,
operatorIDs: mutable.ArrayBuffer[(Int, QueryPlan[_])]): Int = {
private def generateOperatorIDs(plan: QueryPlan[_], startOperatorID: Int): Int = {
var currentOperationID = startOperatorID
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return currentOperationID
}
plan.foreachUp {
case p: WholeStageCodegenExec =>
case p: InputAdapter =>
case other: QueryPlan[_] =>

def setOpId(): Unit = if (other.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
other.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
operatorIDs += ((currentOperationID, other))
}
def setOpId(plan: QueryPlan[_]): Unit = if (plan.getTagValue(QueryPlan.OP_ID_TAG).isEmpty) {
currentOperationID += 1
plan.setTagValue(QueryPlan.OP_ID_TAG, currentOperationID)
}

other match {
case p: AdaptiveSparkPlanExec =>
currentOperationID =
generateOperatorIDs(p.executedPlan, currentOperationID, operatorIDs)
setOpId()
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID, operatorIDs)
setOpId()
case _ =>
setOpId()
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId, operatorIDs)
}
plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID)
setOpId(p)
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID)
setOpId(p)
case other: QueryPlan[_] =>
setOpId(other)
other.innerChildren.foldLeft(currentOperationID) {
(curId, plan) => generateOperatorIDs(plan, curId)
}
}
currentOperationID
}

/**
* Traverses the supplied input plan in a bottom-up fashion and collects operators with assigned
* ids.
*
* @param plan Input query plan to process
* @param operators An output parameter that contains the operators.
*/
private def collectOperatorsWithID(
plan: QueryPlan[_],
operators: ArrayBuffer[QueryPlan[_]]): Unit = {
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}

def collectOperatorWithID(plan: QueryPlan[_]): Unit = {
if (plan.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
operators += plan
}
}

plan.foreachUp {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
collectOperatorsWithID(p.executedPlan, operators)
collectOperatorWithID(p)
case p: QueryStageExec =>
collectOperatorsWithID(p.plan, operators)
collectOperatorWithID(p)
case other: QueryPlan[_] =>
collectOperatorWithID(other)
other.innerChildren.foreach(collectOperatorsWithID(_, operators))
}
}

/**
* Traverses the supplied input plan in a top-down fashion and records the
* whole stage code gen id in the plan via setting a tag.
Expand Down
Loading

0 comments on commit 682e7f2

Please sign in to comment.