diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleExchangeExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleExchangeExec.scala index f548848ceaf..fbb62da3a89 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleExchangeExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleExchangeExec.scala @@ -98,10 +98,6 @@ case class GpuShuffleExchangeExec( writeMetrics) } - def createShuffledBatchRDD(partitionStartIndices: Option[Array[Int]]): ShuffledBatchRDD = { - new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics, partitionStartIndices) - } - /** * Caches the created ShuffleBatchRDD so we can reuse that. */ @@ -113,7 +109,7 @@ case class GpuShuffleExchangeExec( protected override def doExecuteColumnar(): RDD[ColumnarBatch] = attachTree(this, "execute") { // Returns the same ShuffleRowRDD if this plan is used by multiple plans. if (cachedShuffleRDD == null) { - cachedShuffleRDD = createShuffledBatchRDD(None) + cachedShuffleRDD = new ShuffledBatchRDD(shuffleBatchDependency, metrics ++ readMetrics) } cachedShuffleRDD } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala index d8d602205e9..de2ad6da97d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/RapidsShuffleInternalManager.scala @@ -20,7 +20,7 @@ import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.format.TableMeta import com.nvidia.spark.rapids.shuffle.{RapidsShuffleRequestHandler, RapidsShuffleServer, RapidsShuffleTransport} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv, TaskContext} import org.apache.spark.internal.{config, Logging} @@ -29,6 +29,7 @@ import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle._ import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage._ @@ -198,12 +199,18 @@ abstract class RapidsShuffleInternalManagerBase(conf: SparkConf, isDriver: Boole private lazy val env = SparkEnv.get private lazy val blockManager = env.blockManager private lazy val shouldFallThroughOnEverything = { - val fallThroughDueToExternalShuffle = !GpuShuffleEnv.isRapidsShuffleEnabled - if (fallThroughDueToExternalShuffle) { - logWarning("Rapids Shuffle Plugin is falling back to SortShuffleManager because " + - "external shuffle is enabled") + val fallThroughReasons = new ListBuffer[String]() + if (!GpuShuffleEnv.isRapidsShuffleEnabled) { + fallThroughReasons += "external shuffle is enabled" } - fallThroughDueToExternalShuffle + if (conf.get(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key).toBoolean) { + fallThroughReasons += "adaptive query execution is enabled" + } + if (fallThroughReasons.nonEmpty) { + logWarning(s"Rapids Shuffle Plugin is falling back to SortShuffleManager " + + s"because: ${fallThroughReasons.mkString(", ")}") + } + fallThroughReasons.nonEmpty } private lazy val localBlockManagerId = blockManager.blockManagerId diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala index 81e8b89d10a..f9c1f3e3c5d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/ShuffledBatchRDD.scala @@ -24,20 +24,11 @@ import com.nvidia.spark.rapids.{GpuMetricNames, NvtxWithMetrics} import org.apache.spark._ import org.apache.spark.rdd.RDD +import org.apache.spark.sql.execution.{CoalescedPartitioner, CoalescedPartitionSpec, PartialMapperPartitionSpec, PartialReducerPartitionSpec, ShuffledRowRDDPartition, ShufflePartitionSpec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} import org.apache.spark.sql.vectorized.ColumnarBatch -/** - * The [[Partition]] used by [[ShuffledBatchRDD]]. A post-shuffle partition - * (identified by `postShufflePartitionIndex`) contains a range of pre-shuffle partitions - * (`startPreShufflePartitionIndex` to `endPreShufflePartitionIndex - 1`, inclusive). - */ -private final class ShuffledBatchRDDPartition( - val postShufflePartitionIndex: Int, - val startPreShufflePartitionIndex: Int, - val endPreShufflePartitionIndex: Int) extends Partition { - override val index: Int = postShufflePartitionIndex -} +case class ShuffledBatchRDDPartition(index: Int, spec: ShufflePartitionSpec) extends Partition /** * A dummy partitioner for use with records whose partition ids have been pre-computed (i.e. for @@ -115,44 +106,52 @@ class CoalescedBatchPartitioner(val parent: Partitioner, val partitionStartIndic class ShuffledBatchRDD( var dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch], metrics: Map[String, SQLMetric], - specifiedPartitionStartIndices: Option[Array[Int]] = None) - extends RDD[ColumnarBatch](dependency.rdd.context, Nil) { - - private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions - - private[this] val partitionStartIndices = specifiedPartitionStartIndices match { - case Some(indices) => indices - case None => - // When specifiedPartitionStartIndices is not defined, every post-shuffle partition - // corresponds to a pre-shuffle partition. - (0 until numPreShufflePartitions).toArray + partitionSpecs: Array[ShufflePartitionSpec]) + extends RDD[ColumnarBatch](dependency.rdd.context, Nil) { + + def this( + dependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch], + metrics: Map[String, SQLMetric]) = { + this(dependency, metrics, + Array.tabulate(dependency.partitioner.numPartitions)(i => CoalescedPartitionSpec(i, i + 1))) } - private[this] val part = - new CoalescedBatchPartitioner(dependency.partitioner, partitionStartIndices) - override def getDependencies = List(dependency) - override val partitioner = Some(part) + override val partitioner: Option[Partitioner] = + if (partitionSpecs.forall(_.isInstanceOf[CoalescedPartitionSpec])) { + val indices = partitionSpecs.map(_.asInstanceOf[CoalescedPartitionSpec].startReducerIndex) + // TODO this check is based on assumptions of callers' behavior but is sufficient for now. + if (indices.toSet.size == partitionSpecs.length) { + Some(new CoalescedPartitioner(dependency.partitioner, indices)) + } else { + None + } + } else { + None + } override def getPartitions: Array[Partition] = { - assert(partitionStartIndices.length == part.numPartitions) - Array.tabulate[Partition](partitionStartIndices.length) { i => - val startIndex = partitionStartIndices(i) - val endIndex = - if (i < partitionStartIndices.length - 1) { - partitionStartIndices(i + 1) - } else { - numPreShufflePartitions - } - new ShuffledBatchRDDPartition(i, startIndex, endIndex) + Array.tabulate[Partition](partitionSpecs.length) { i => + ShuffledBatchRDDPartition(i, partitionSpecs(i)) } } override def getPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - val dep = dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]] - tracker.getPreferredLocationsForShuffle(dep, partition.index) + partition.asInstanceOf[ShuffledBatchRDDPartition].spec match { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + // TODO order by partition size. + startReducerIndex.until(endReducerIndex).flatMap { reducerIndex => + tracker.getPreferredLocationsForShuffle(dependency, reducerIndex) + } + + case PartialReducerPartitionSpec(_, startMapIndex, endMapIndex) => + tracker.getMapLocation(dependency, startMapIndex, endMapIndex) + + case PartialMapperPartitionSpec(mapIndex, _, _) => + tracker.getMapLocation(dependency, mapIndex, mapIndex + 1) + } } override def compute(split: Partition, context: TaskContext): Iterator[ColumnarBatch] = { @@ -161,15 +160,35 @@ class ShuffledBatchRDD( // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) - // The range of pre-shuffle partitions that we are fetching at here is - // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1]. - val reader = - SparkEnv.get.shuffleManager.getReader( - dependency.shuffleHandle, - shuffledRowPartition.startPreShufflePartitionIndex, - shuffledRowPartition.endPreShufflePartitionIndex, - context, - sqlMetricsReporter) + val reader = split.asInstanceOf[ShuffledBatchRDDPartition].spec match { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + SparkEnv.get.shuffleManager.getReader( + dependency.shuffleHandle, + startReducerIndex, + endReducerIndex, + context, + sqlMetricsReporter) + + case PartialReducerPartitionSpec(reducerIndex, startMapIndex, endMapIndex) => + SparkEnv.get.shuffleManager.getReaderForRange( + dependency.shuffleHandle, + startMapIndex, + endMapIndex, + reducerIndex, + reducerIndex + 1, + context, + sqlMetricsReporter) + + case PartialMapperPartitionSpec(mapIndex, startReducerIndex, endReducerIndex) => + SparkEnv.get.shuffleManager.getReaderForRange( + dependency.shuffleHandle, + mapIndex, + mapIndex + 1, + startReducerIndex, + endReducerIndex, + context, + sqlMetricsReporter) + } var ret : Iterator[ColumnarBatch] = null val nvtxRange = new NvtxWithMetrics( "Shuffle getPartitions", NvtxColor.DARK_GREEN, metrics(GpuMetricNames.TOTAL_TIME))