Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update partitioning logic in ShuffledBatchRDD #319

Merged
merged 3 commits into from
Aug 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ case class GpuShuffleExchangeExec(
writeMetrics)
}

def createShuffledBatchRDD(partitionStartIndices: Option[Array[Int]]): ShuffledBatchRDD = {
new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics, partitionStartIndices)
}

/**
* Caches the created ShuffleBatchRDD so we can reuse that.
*/
Expand All @@ -113,7 +109,7 @@ case class GpuShuffleExchangeExec(
protected override def doExecuteColumnar(): RDD[ColumnarBatch] = attachTree(this, "execute") {
// Returns the same ShuffleRowRDD if this plan is used by multiple plans.
if (cachedShuffleRDD == null) {
cachedShuffleRDD = createShuffledBatchRDD(None)
cachedShuffleRDD = new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics)
}
cachedShuffleRDD
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import ai.rapids.cudf.{NvtxColor, NvtxRange}
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.format.TableMeta
import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext}
import org.apache.spark.internal.{config, Logging}
Expand All @@ -29,6 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle._
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.storage._

Expand Down Expand Up @@ -198,12 +199,18 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole
private lazy val env = SparkEnv.get
private lazy val blockManager = env.blockManager
private lazy val shouldFallThroughOnEverything = {
val fallThroughDueToExternalShuffle = !GpuShuffleEnv.isRapidsShuffleEnabled
if (fallThroughDueToExternalShuffle) {
logWarning("Rapids Shuffle Plugin is falling back to SortShuffleManager because " +
"external shuffle is enabled")
val fallThroughReasons = new ListBuffer[String]()
if (!GpuShuffleEnv.isRapidsShuffleEnabled) {
fallThroughReasons += "external shuffle is enabled"
}
fallThroughDueToExternalShuffle
if (conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key).toBoolean) {
fallThroughReasons += "adaptive query execution is enabled"
}
if (fallThroughReasons.nonEmpty) {
logWarning(s"Rapids Shuffle Plugin is falling back to SortShuffleManager " +
s"because: ${fallThroughReasons.mkString(", ")}")
}
fallThroughReasons.nonEmpty
}

private lazy val localBlockManagerId = blockManager.blockManagerId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,11 @@ import com.nvidia.spark.rapids.{GpuMetricNames, NvtxWithMetrics}

import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.{CoalescedPartitioner, CoalescedPartitionSpec, PartialMapperPartitionSpec, PartialReducerPartitionSpec, ShuffledRowRDDPartition, ShufflePartitionSpec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* The [[Partition]] used by [[ShuffledBatchRDD]]. A post-shuffle partition
* (identified by `postShufflePartitionIndex`) contains a range of pre-shuffle partitions
* (`startPreShufflePartitionIndex` to `endPreShufflePartitionIndex - 1`, inclusive).
*/
private final class ShuffledBatchRDDPartition(
val postShufflePartitionIndex: Int,
val startPreShufflePartitionIndex: Int,
val endPreShufflePartitionIndex: Int) extends Partition {
override val index: Int = postShufflePartitionIndex
}
case class ShuffledBatchRDDPartition(index: Int, spec: ShufflePartitionSpec) extends Partition

/**
* A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for
Expand Down Expand Up @@ -115,44 +106,52 @@ class CoalescedBatchPartitioner(val parent: Partitioner, val partitionStartIndic
class ShuffledBatchRDD(
var dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
metrics: Map[String, SQLMetric],
specifiedPartitionStartIndices: Option[Array[Int]] = None)
extends RDD[ColumnarBatch](dependency.rdd.context, Nil) {

private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions

private[this] val partitionStartIndices = specifiedPartitionStartIndices match {
case Some(indices) => indices
case None =>
// When specifiedPartitionStartIndices is not defined, every post-shuffle partition
// corresponds to a pre-shuffle partition.
(0 until numPreShufflePartitions).toArray
partitionSpecs: Array[ShufflePartitionSpec])
extends RDD[ColumnarBatch](dependency.rdd.context, Nil) {

def this(
dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
metrics: Map[String, SQLMetric]) = {
this(dependency, metrics,
Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1)))
}

private[this] val part =
new CoalescedBatchPartitioner(dependency.partitioner, partitionStartIndices)

override def getDependencies = List(dependency)

override val partitioner = Some(part)
override val partitioner: Option[Partitioner] =
if (partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) {
val indices = partitionSpecs.map(_.asInstanceOf[CoalescedPartitionSpec].startReducerIndex)
// TODO this check is based on assumptions of callers' behavior but is sufficient for now.
if (indices.toSet.size == partitionSpecs.length) {
Some(new CoalescedPartitioner(dependency.partitioner, indices))
} else {
None
}
} else {
None
}

override def getPartitions: Array[Partition] = {
assert(partitionStartIndices.length == part.numPartitions)
Array.tabulate[Partition](partitionStartIndices.length) { i =>
val startIndex = partitionStartIndices(i)
val endIndex =
if (i < partitionStartIndices.length - 1) {
partitionStartIndices(i + 1)
} else {
numPreShufflePartitions
}
new ShuffledBatchRDDPartition(i, startIndex, endIndex)
Array.tabulate[Partition](partitionSpecs.length) { i =>
ShuffledBatchRDDPartition(i, partitionSpecs(i))
}
}

override def getPreferredLocations(partition: Partition): Seq[String] = {
val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster]
val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
tracker.getPreferredLocationsForShuffle(dep, partition.index)
partition.asInstanceOf[ShuffledBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
// TODO order by partition size.
startReducerIndex.until(endReducerIndex).flatMap { reducerIndex =>
tracker.getPreferredLocationsForShuffle(dependency, reducerIndex)
}

case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex) =>
tracker.getMapLocation(dependency, startMapIndex, endMapIndex)

case PartialMapperPartitionSpec(mapIndex, _, _) =>
tracker.getMapLocation(dependency, mapIndex, mapIndex + 1)
}
}

override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = {
Expand All @@ -161,15 +160,35 @@ class ShuffledBatchRDD(
// `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator,
// as well as the `tempMetrics` for basic shuffle metrics.
val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics)
// The range of pre-shuffle partitions that we are fetching at here is
// [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
val reader =
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
shuffledRowPartition.startPreShufflePartitionIndex,
shuffledRowPartition.endPreShufflePartitionIndex,
context,
sqlMetricsReporter)
val reader = split.asInstanceOf[ShuffledBatchRDDPartition].spec match {
case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReader(
dependency.shuffleHandle,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)

case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
startMapIndex,
endMapIndex,
reducerIndex,
reducerIndex + 1,
context,
sqlMetricsReporter)

case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) =>
SparkEnv.get.shuffleManager.getReaderForRange(
dependency.shuffleHandle,
mapIndex,
mapIndex + 1,
startReducerIndex,
endReducerIndex,
context,
sqlMetricsReporter)
}
var ret : Iterator[ColumnarBatch] = null
val nvtxRange = new NvtxWithMetrics(
"Shuffle getPartitions", NvtxColor.DARK_GREEN, metrics(GpuMetricNames.TOTAL_TIME))
Expand Down