Skip to content

Commit

Permalink
better naming
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Nov 23, 2020
1 parent 5383910 commit 5f8f4ed
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.streaming.{InternalOutputModes, StreamingRe
import org.apache.spark.sql.execution.aggregate.AggUtils
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.exchange.{PartitioningFlexibility, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{ShuffleExchangeExec, ShuffleOrigin}
import org.apache.spark.sql.execution.python._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.MemoryPlan
Expand Down Expand Up @@ -670,7 +670,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions),
planLater(child), PartitioningFlexibility.STRICT) :: Nil
planLater(child), ShuffleOrigin.REPARTITION_WITH_NUM) :: Nil
} else {
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
Expand Down Expand Up @@ -703,15 +703,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case r: logical.Range =>
execution.RangeExec(r) :: Nil
case r: logical.RepartitionByExpression =>
val partitionFlexibility = if (r.optNumPartitions.isEmpty) {
PartitioningFlexibility.PRESERVE_CLUSTERING
val shuffleOrigin = if (r.optNumPartitions.isEmpty) {
ShuffleOrigin.REPARTITION
} else {
PartitioningFlexibility.STRICT
ShuffleOrigin.REPARTITION_WITH_NUM
}
exchange.ShuffleExchangeExec(
r.partitioning,
planLater(r.child),
partitionFlexibility) :: Nil
exchange.ShuffleExchangeExec(r.partitioning, planLater(r.child), shuffleOrigin) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,9 @@ object OptimizeLocalShuffleReader extends Rule[SparkPlan] {

def canUseLocalShuffleReader(plan: SparkPlan): Boolean = plan match {
case s: ShuffleQueryStageExec =>
s.shuffle.canChangeClustering && s.mapStats.isDefined
s.shuffle.canChangePartitioning && s.mapStats.isDefined
case CustomShuffleReaderExec(s: ShuffleQueryStageExec, partitionSpecs) =>
s.shuffle.canChangeClustering && s.mapStats.isDefined && partitionSpecs.nonEmpty
s.shuffle.canChangePartitioning && s.mapStats.isDefined && partitionSpecs.nonEmpty
case _ => false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,26 +56,24 @@ trait ShuffleExchangeLike extends Exchange {
*/
def numPartitions: Int

def partitioningFlexibility: PartitioningFlexibility.Value
def shuffleOrigin: ShuffleOrigin.Value

/**
* Returns whether the shuffle partition number can be changed.
*/
final def canChangeNumPartitions: Boolean = {
// If users specify the num partitions via APIs like `repartition(5, col)`, we shouldn't change
// it. For `SinglePartition`, it requires exactly one partition and we can't change it either.
partitioningFlexibility != PartitioningFlexibility.STRICT &&
outputPartitioning != SinglePartition
shuffleOrigin != ShuffleOrigin.REPARTITION_WITH_NUM && outputPartitioning != SinglePartition
}

/**
* Returns whether the shuffle output clustering can be changed.
* Returns whether the shuffle output data partitioning can be changed.
*/
final def canChangeClustering: Boolean = {
final def canChangePartitioning: Boolean = {
// If users specify the partitioning via APIs like `repartition(col)`, we shouldn't change it.
// For `SinglePartition`, itself is a special partitioning and we can't change it either.
partitioningFlexibility == PartitioningFlexibility.UNSPECIFIED &&
outputPartitioning != SinglePartition
shuffleOrigin == ShuffleOrigin.ENSURE_REQUIREMENTS && outputPartitioning != SinglePartition
}

/**
Expand All @@ -94,16 +92,19 @@ trait ShuffleExchangeLike extends Exchange {
def runtimeStatistics: Statistics
}

object PartitioningFlexibility extends Enumeration {
type PartitioningFlexibility = Value
// STRICT means we can't change the partitioning at all, including the partition number, even if
// we lose performance improvement opportunity.
val STRICT = Value
// PRESERVE_CLUSTERING means we must preserve the data clustering even if it's useless to the
// downstream operators. Shuffle partition number can be changed.
val PRESERVE_CLUSTERING = Value
// UNSPECIFIED means the partitioning can be changed as long as it doesn't break query semantic.
val UNSPECIFIED = Value
// Describes where the shuffle operator comes from.
object ShuffleOrigin extends Enumeration {
type ShuffleOrigin = Value
// Indicates that the shuffle operator was added by the internal `EnsureRequirements` rule. It
// means that the shuffle operator is used to ensure internal data partitioning requirements and
// Spark is free to optimize it as long as the requirements are still ensured.
val ENSURE_REQUIREMENTS = Value
// Indicates that the shuffle operator was added by the user-specified repartition operator. Spark
// can still optimize it via changing shuffle partition number, as data partitioning won't change.
val REPARTITION = Value
// Indicates that the shuffle operator was added by the user-specified repartition operator with
// a certain partition number. Spark can't optimize it.
val REPARTITION_WITH_NUM = Value
}

/**
Expand All @@ -112,7 +113,7 @@ object PartitioningFlexibility extends Enumeration {
case class ShuffleExchangeExec(
override val outputPartitioning: Partitioning,
child: SparkPlan,
partitioningFlexibility: PartitioningFlexibility.Value = PartitioningFlexibility.UNSPECIFIED)
shuffleOrigin: ShuffleOrigin.Value = ShuffleOrigin.ENSURE_REQUIREMENTS)
extends ShuffleExchangeLike {

private lazy val writeMetrics =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, PartitioningFlexibility, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin}
import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.COLUMN_BATCH_SIZE
Expand Down Expand Up @@ -766,8 +766,8 @@ case class PreRuleReplaceAddWithBrokenVersion() extends Rule[SparkPlan] {
case class MyShuffleExchangeExec(delegate: ShuffleExchangeExec) extends ShuffleExchangeLike {
override def numMappers: Int = delegate.numMappers
override def numPartitions: Int = delegate.numPartitions
override def partitioningFlexibility: PartitioningFlexibility.Value = {
delegate.partitioningFlexibility
override def shuffleOrigin: ShuffleOrigin.Value = {
delegate.shuffleOrigin
}
override def mapOutputStatisticsFuture: Future[MapOutputStatistics] =
delegate.mapOutputStatisticsFuture
Expand Down

0 comments on commit 5f8f4ed

Please sign in to comment.