Skip to content

Commit

Permalink
Do not use local shuffle reader for repartition
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 19, 2020
1 parent 3695e99 commit 7b556cf
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRe
import org.apache.spark.sql.execution.aggregate.AggUtils
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.exchange.{PartitioningFlexibility, ShuffleExchangeExec}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
Expand Down Expand Up @@ -670,7 +670,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
planLater(child), noUserSpecifiedNumPartition = false) :: Nil
planLater(child), PartitioningFlexibility.STRICT) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -703,10 +703,15 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
val partitionFlexibility = if (r.optNumPartitions.isEmpty) {
PartitioningFlexibility.PRESERVE_CLUSTERING
} else {
PartitioningFlexibility.STRICT
}
exchange.ShuffleExchangeExec(
r.partitioning,
planLater(r.child),
noUserSpecifiedNumPartition = r.optNumPartitions.isEmpty) :: Nil
partitionFlexibility) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] {

def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeNumPartitions && s.mapStats.isDefined
s.shuffle.canChangeClustering && s.mapStats.isDefined
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
s.shuffle.canChangeNumPartitions && s.mapStats.isDefined && partitionSpecs.nonEmpty
s.shuffle.canChangeClustering && s.mapStats.isDefined && partitionSpecs.nonEmpty
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,27 @@ trait ShuffleExchangeLike extends Exchange {
*/
def numPartitions: Int

def partitioningFlexibility: PartitioningFlexibility.Value

/**
* Returns whether the shuffle partition number can be changed.
*/
def canChangeNumPartitions: Boolean
def canChangeNumPartitions: Boolean = {
// If users specify the num partitions via APIs like `repartition(5, col)`, we shouldn't change
// it. For `SinglePartition`, it requires exactly one partition and we can't change it either.
partitioningFlexibility != PartitioningFlexibility.STRICT &&
outputPartitioning != SinglePartition
}

/**
* Returns whether the shuffle output clustering can be changed.
*/
def canChangeClustering: Boolean = {
// If users specify the partitioning via APIs like `repartition(col)`, we shouldn't change it.
// For `SinglePartition`, itself is a special partitioning and we can't change it either.
partitioningFlexibility == PartitioningFlexibility.UNSPECIFIED &&
outputPartitioning != SinglePartition
}

/**
* The asynchronous job that materializes the shuffle.
Expand All @@ -77,18 +94,26 @@ trait ShuffleExchangeLike extends Exchange {
def runtimeStatistics: Statistics
}

object PartitioningFlexibility extends Enumeration {
type PartitioningFlexibility = Value
// STRICT means we can't change the partitioning at all, including the partition number, even if
// we lose performance improvement opportunity.
val STRICT = Value
// PRESERVE_CLUSTERING means we must preserve the data clustering even if it's useless to the
// downstream operators. Shuffle partition number can be changed.
val PRESERVE_CLUSTERING = Value
// UNSPECIFIED means the partitioning can be changed as long as it doesn't break query semantic.
val UNSPECIFIED = Value
}

/**
* Performs a shuffle that will result in the desired partitioning.
*/
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
noUserSpecifiedNumPartition: Boolean = true) extends ShuffleExchangeLike {

// If users specify the num partitions via APIs like `repartition`, we shouldn't change it.
// For `SinglePartition`, it requires exactly one partition and we can't change it either.
override def canChangeNumPartitions: Boolean =
noUserSpecifiedNumPartition && outputPartitioning != SinglePartition
partitioningFlexibility: PartitioningFlexibility.Value = PartitioningFlexibility.UNSPECIFIED)
extends ShuffleExchangeLike {

private lazy val writeMetrics =
SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, PartitioningFlexibility, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE
Expand Down Expand Up @@ -766,7 +766,9 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleExchangeLike {
override def numMappers: Int = delegate.numMappers
override def numPartitions: Int = delegate.numPartitions
override def canChangeNumPartitions: Boolean = delegate.canChangeNumPartitions
override def partitioningFlexibility: PartitioningFlexibility.Value = {
delegate.partitioningFlexibility
}
override def mapOutputStatisticsFuture: Future[MapOutputStatistics] =
delegate.mapOutputStatisticsFuture
override def getShuffleRDD(partitionSpecs: Array[ShufflePartitionSpec]): RDD[_] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,4 +1307,14 @@ class AdaptiveQueryExecSuite
spark.listenerManager.unregister(listener)
}
}

test("SPARK-33494: Do not use local shuffle reader for repartition") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
val df = spark.table("testData").repartition('key)
df.collect()
// local shuffle reader breaks partitioning and shouldn't be used for repartition operation
// which is specified by users.
checkNumLocalShuffleReaders(df.queryExecution.executedPlan, numShufflesWithoutLocalReader = 1)
}
}
}

0 comments on commit 7b556cf

Please sign in to comment.