Skip to content

Commit

Permalink
SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecut…
Browse files Browse the repository at this point in the history
…ionEnabled
  • Loading branch information
wangyum committed Mar 23, 2020
1 parent a0cf972 commit af4248b
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2701,7 +2701,13 @@ class SQLConf extends Serializable with Logging {

def cacheVectorizedReaderEnabled: Boolean = getConf(CACHE_VECTORIZED_READER_ENABLED)

def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
def numShufflePartitions: Int = {
if (adaptiveExecutionEnabled && coalesceShufflePartitionsEnabled) {
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(getConf(SHUFFLE_PARTITIONS))
} else {
getConf(SHUFFLE_PARTITIONS)
}
}

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

Expand All @@ -2714,9 +2720,6 @@ class SQLConf extends Serializable with Logging {

def coalesceShufflePartitionsEnabled: Boolean = getConf(COALESCE_PARTITIONS_ENABLED)

def initialShufflePartitionNum: Int =
getConf(COALESCE_PARTITIONS_INITIAL_PARTITION_NUM).getOrElse(numShufflePartitions)

def minBatchesToRetain: Int = getConf(MIN_BATCHES_TO_RETAIN)

def maxBatchesToRetainInMemory: Int = getConf(MAX_BATCHES_TO_RETAIN_IN_MEMORY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ import org.apache.spark.sql.internal.SQLConf
* the input partition ordering requirements are met.
*/
case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
private def defaultNumPreShufflePartitions: Int =
if (conf.adaptiveExecutionEnabled && conf.coalesceShufflePartitionsEnabled) {
conf.initialShufflePartitionNum
} else {
conf.numShufflePartitions
}
private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions

private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = {
val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,5 +780,21 @@ class AdaptiveQueryExecSuite
}
}
}

test("SPARK-31220 repartition obeys initialPartitionNum when adaptiveExecutionEnabled") {
Seq(true, false).foreach { adaptiveExecutionEnabled =>
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> s"$adaptiveExecutionEnabled",
SQLConf.SHUFFLE_PARTITIONS.key -> "6",
SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key -> "7") {
val partitionsNum = spark.range(10).repartition($"id").rdd.collectPartitions().length
if (adaptiveExecutionEnabled) {
assert(partitionsNum === 7)
} else {
assert(partitionsNum === 6)
}
}
}
}
}

0 comments on commit af4248b

Please sign in to comment.