Skip to content

Commit

Permalink
update shuffle partitioning logic (NVIDIA#319)
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Aug 3, 2020
1 parent 51a1f14 commit 5086e66
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ case class GpuShuffleExchangeExec(
writeMetrics)
}

def createShuffledBatchRDD(partitionStartIndices: Option[Array[Int]]): ShuffledBatchRDD = {
new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics, partitionStartIndices)
}

/**
* Caches the created ShuffleBatchRDD so we can reuse that.
*/
Expand All @@ -113,7 +109,7 @@ case class GpuShuffleExchangeExec(
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = attachTree(this, "execute") {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = createShuffledBatchRDD(None)
cachedShuffleRDD = new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics)
}
cachedShuffleRDD
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.format.TableMeta
import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext}
import org.apache.spark.internal.{config, Logging}
Expand All @@ -29,6 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage._

Expand Down Expand Up @@ -198,12 +199,18 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole
private lazy val env = SparkEnv.get
private lazy val blockManager = env.blockManager
private lazy val shouldFallThroughOnEverything = {
val fallThroughDueToExternalShuffle = !GpuShuffleEnv.isRapidsShuffleEnabled
if (fallThroughDueToExternalShuffle) {
logWarning("Rapids Shuffle Plugin is falling back to SortShuffleManager because " +
"external shuffle is enabled")
val fallThroughReasons = new ListBuffer[String]()
if (!GpuShuffleEnv.isRapidsShuffleEnabled) {
fallThroughReasons += "external shuffle is enabled"
}
fallThroughDueToExternalShuffle
if (conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key).toBoolean) {
fallThroughReasons += "adaptive query execution is enabled"
}
if (fallThroughReasons.nonEmpty) {
logWarning(s"Rapids Shuffle Plugin is falling back to SortShuffleManager " +
s"because: ${fallThroughReasons.mkString(", ")}")
}
fallThroughReasons.nonEmpty
}

private lazy val localBlockManagerId = blockManager.blockManagerId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,11 @@ import com.nvidia.spark.rapids.{GpuMetricNames, NvtxWithMetrics}

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.{CoalescedPartitioner, CoalescedPartitionSpec, PartialMapperPartitionSpec, PartialReducerPartitionSpec, ShuffledRowRDDPartition, ShufflePartitionSpec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* The [[Partition]] used by [[ShuffledBatchRDD]]. A post-shuffle partition
* (identified by `postShufflePartitionIndex`) contains a range of pre-shuffle partitions
* (`startPreShufflePartitionIndex` to `endPreShufflePartitionIndex - 1`, inclusive).
*/
private final class ShuffledBatchRDDPartition(
val postShufflePartitionIndex: Int,
val startPreShufflePartitionIndex: Int,
val endPreShufflePartitionIndex: Int) extends Partition {
override val index: Int = postShufflePartitionIndex
}
case class ShuffledBatchRDDPartition(index: Int, spec: ShufflePartitionSpec) extends Partition

/**
* A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for
Expand Down Expand Up @@ -115,44 +106,52 @@ class CoalescedBatchPartitioner(val parent: Partitioner, val partitionStartIndic
class ShuffledBatchRDD(
var dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
extends RDD[ColumnarBatch](dependency.rdd.context, Nil) {

private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions

private[this] val partitionStartIndices = specifiedPartitionStartIndices match {
case Some(indices) => indices
case None =>
// When specifiedPartitionStartIndices is not defined, every post-shuffle partition
// corresponds to a pre-shuffle partition.
(0 until numPreShufflePartitions).toArray
partitionSpecs: Array[ShufflePartitionSpec])
extends RDD[ColumnarBatch](dependency.rdd.context, Nil) {

def this(
dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
metrics: Map[String, SQLMetric]) = {
this(dependency, metrics,
Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1)))
}

private[this] val part =
new CoalescedBatchPartitioner(dependency.partitioner, partitionStartIndices)

override def getDependencies = List(dependency)

override val partitioner = Some(part)
override val partitioner: Option[Partitioner] =
if (partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) {
val indices = partitionSpecs.map(_.asInstanceOf[CoalescedPartitionSpec].startReducerIndex)
// TODO this check is based on assumptions of callers' behavior but is sufficient for now.
if (indices.toSet.size == partitionSpecs.length) {
Some(new CoalescedPartitioner(dependency.partitioner, indices))
} else {
None
}
} else {
None
}

override def getPartitions: Array[Partition] = {
assert(partitionStartIndices.length == part.numPartitions)
Array.tabulate[Partition](partitionStartIndices.length) { i =>
val startIndex = partitionStartIndices(i)
val endIndex =
if (i < partitionStartIndices.length - 1) {
partitionStartIndices(i + 1)
} else {
numPreShufflePartitions
}
new ShuffledBatchRDDPartition(i, startIndex, endIndex)
Array.tabulate[Partition](partitionSpecs.length) { i =>
ShuffledBatchRDDPartition(i, partitionSpecs(i))
}
}

override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
partition.asInstanceOf[ShuffledBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
// TODO order by partition size.
startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
}

case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex) =>
tracker.getMapLocation(dependency, startMapIndex, endMapIndex)

case PartialMapperPartitionSpec(mapIndex, _, _) =>
tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
Expand All @@ -161,15 +160,35 @@ class ShuffledBatchRDD(
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
shuffledRowPartition.startPreShufflePartitionIndex,
shuffledRowPartition.endPreShufflePartitionIndex,
context,
sqlMetricsReporter)
val reader = split.asInstanceOf[ShuffledBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)

case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
reducerIndex,
reducerIndex + 1,
context,
sqlMetricsReporter)

case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
mapIndex,
mapIndex + 1,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)
}
var ret : Iterator[ColumnarBatch] = null
val nvtxRange = new NvtxWithMetrics(
"Shuffle getPartitions", NvtxColor.DARK_GREEN, metrics(GpuMetricNames.TOTAL_TIME))
Expand Down

0 comments on commit 5086e66

Please sign in to comment.