Skip to content

Commit

Permalink
[SPARK-33472][SQL][3.0] Adjust RemoveRedundantSorts rule order
Browse files Browse the repository at this point in the history
Backport #30373 for branch-3.0.

### What changes were proposed in this pull request?
This PR switched the order for the rule `RemoveRedundantSorts` and `EnsureRequirements` so that `EnsureRequirements` will be invoked before `RemoveRedundantSorts` to avoid IllegalArgumentException when instantiating PartitioningCollection.

### Why are the changes needed?
`RemoveRedundantSorts` rule uses SparkPlan's `outputPartitioning` to check whether a sort node is redundant. Currently, it is added before `EnsureRequirements`. Since `PartitioningCollection` requires left and right partitioning to have the same number of partitions, which is not necessarily true before applying `EnsureRequirements`, the rule can fail with the following exception:
```
IllegalArgumentException: requirement failed: PartitioningCollection requires all of its partitionings have the same numPartitions.
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit test

Closes #30438 from allisonwang-db/spark-33472-3.0.

Authored-by: allisonwang-db <66282705+allisonwang-db@users.noreply.github.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
allisonwang-db authored and dongjoon-hyun committed Nov 20, 2020
1 parent d7c2dae commit 1e525c1
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,10 @@ object QueryExecution {
Seq(
PlanDynamicPruningFilters(sparkSession),
PlanSubqueries(sparkSession),
RemoveRedundantSorts(sparkSession.sessionState.conf),
EnsureRequirements(sparkSession.sessionState.conf),
// `RemoveRedundantSorts` needs to be added before `EnsureRequirements` to guarantee the same
// number of partitions when instantiating PartitioningCollection.
RemoveRedundantSorts(sparkSession.sessionState.conf),
ApplyColumnarRulesAndInsertTransitions(sparkSession.sessionState.conf,
sparkSession.sessionState.columnarRules),
CollapseCodegenStages(sparkSession.sessionState.conf),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
def longMetric(name: String): SQLMetric = metrics(name)

// TODO: Move to `DistributedPlan`
/** Specifies how data is partitioned across different nodes in the cluster. */
/**
* Specifies how data is partitioned across different nodes in the cluster.
* Note this method may fail if it is invoked before `EnsureRequirements` is applied
* since `PartitioningCollection` requires all its partitionings to have
* the same number of partitions.
*/
def outputPartitioning: Partitioning = UnknownPartitioning(0) // TODO: WRONG WIDTH!

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ case class AdaptiveSparkPlanExec(
// plan should reach a final status of query stages (i.e., no more addition or removal of
// Exchange nodes) after running these rules.
private def queryStagePreparationRules: Seq[Rule[SparkPlan]] = Seq(
removeRedundantSorts,
ensureRequirements
ensureRequirements,
removeRedundantSorts
) ++ context.session.sessionState.queryStagePrepRules

// A list of physical optimizer rules to be applied to a new stage before its execution. These
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.catalyst.plans.physical.{RangePartitioning, UnknownPartitioning}
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite}
import org.apache.spark.sql.execution.joins.SortMergeJoinExec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession

Expand Down Expand Up @@ -99,6 +101,29 @@ abstract class RemoveRedundantSortsSuiteBase
}
}
}

test("SPARK-33472: shuffled join with different left and right side partition numbers") {
withTempView("t1", "t2") {
spark.range(0, 100, 1, 2).select('id as "key").createOrReplaceTempView("t1")
(0 to 100).toDF("key").createOrReplaceTempView("t2")

val query = """
|SELECT /*+ MERGE(t1) */ t1.key
|FROM t1 JOIN t2 ON t1.key = t2.key
|WHERE t1.key > 10 AND t2.key < 50
|ORDER BY t1.key ASC
""".stripMargin

val df = sql(query)
val sparkPlan = df.queryExecution.sparkPlan
val join = sparkPlan.collect { case j: SortMergeJoinExec => j }.head
val leftPartitioning = join.left.outputPartitioning
assert(leftPartitioning.isInstanceOf[RangePartitioning])
assert(leftPartitioning.numPartitions == 2)
assert(join.right.outputPartitioning == UnknownPartitioning(0))
checkSorts(query, 3, 3)
}
}
}

class RemoveRedundantSortsSuite extends RemoveRedundantSortsSuiteBase
Expand Down

0 comments on commit 1e525c1

Please sign in to comment.