Skip to content

Commit

Permalink
Rename ShuffleCoalesceExec to GpuShuffleCoalesceExec (#1196)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Nov 25, 2020
1 parent 9605903 commit 6cdf1ae
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* The serialization read path is notably different. The sequence of serialized bytes IS NOT
* deserialized into a cudf `Table` but rather tracked in host memory by a `ColumnarBatch`
* that contains a [[SerializedTableColumn]]. During query planning, each GPU columnar shuffle
* exchange is followed by a [[ShuffleCoalesceExec]] that expects to receive only these
* custom batches of [[SerializedTableColumn]]. [[ShuffleCoalesceExec]] coalesces the smaller
* exchange is followed by a [[GpuShuffleCoalesceExec]] that expects to receive only these
* custom batches of [[SerializedTableColumn]]. [[GpuShuffleCoalesceExec]] coalesces the smaller
* shuffle partitions into larger tables before placing them on the GPU for further processing.
*
* @note The RAPIDS shuffle does not use this code.
Expand Down Expand Up @@ -233,7 +233,7 @@ private class GpuColumnarBatchSerializerInstance(

/**
* A special `ColumnVector` that describes a serialized table read from shuffle.
* This appears in a `ColumnarBatch` to pass serialized tables to [[ShuffleCoalesceExec]]
* This appears in a `ColumnarBatch` to pass serialized tables to [[GpuShuffleCoalesceExec]]
* which should always appear in the query plan immediately after a shuffle.
*/
class SerializedTableColumn(
Expand All @@ -254,9 +254,10 @@ object SerializedTableColumn {
/**
* Build a `ColumnarBatch` consisting of a single [[SerializedTableColumn]] describing
* the specified serialized table.
*
* @param header header for the serialized table
* @param hostBuffer host buffer containing the table data
* @return columnar batch to be passed to [[ShuffleCoalesceExec]]
* @return columnar batch to be passed to [[GpuShuffleCoalesceExec]]
*/
def from(
header: SerializedTableHeader,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* @note This should ALWAYS appear in the plan after a GPU shuffle when RAPIDS shuffle is
* not being used.
*/
case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsConf)
case class GpuShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsConf)
extends UnaryExecNode with GpuExec {

import GpuMetricNames._
Expand All @@ -63,7 +63,7 @@ case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsCo
val sparkSchema = GpuColumnVector.extractTypes(schema)

child.executeColumnar().mapPartitions { iter =>
new ShuffleCoalesceIterator(iter, targetBatchByteSize, sparkSchema, metricsMap)
new GpuShuffleCoalesceIterator(iter, targetBatchByteSize, sparkSchema, metricsMap)
}
}
}
Expand All @@ -74,7 +74,7 @@ case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsCo
* to the target batch size and then concatenated on the host before the data
* is transferred to the GPU.
*/
class ShuffleCoalesceIterator(
class GpuShuffleCoalesceIterator(
batchIter: Iterator[ColumnarBatch],
targetBatchByteSize: Long,
sparkSchema: Array[DataType],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
if (GpuShuffleEnv.isRapidsShuffleEnabled) {
GpuCoalesceBatches(plan, TargetSize(conf.gpuTargetBatchSizeBytes))
} else {
ShuffleCoalesceExec(plan, conf)
GpuShuffleCoalesceExec(plan, conf)
}
}

Expand All @@ -64,7 +64,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
GpuRowToColumnarExec(optimizeAdaptiveTransitions(r2c.child, Some(r2c)), goal)

case ColumnarToRowExec(GpuBringBackToHost(
ShuffleCoalesceExec(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty =>
GpuShuffleCoalesceExec(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty =>
// We typically want the final operator in the plan (the operator that has no parent) to be
// wrapped in `ColumnarToRowExec(GpuBringBackToHost(ShuffleCoalesceExec(_)))` operators to
// bring the data back onto the host and be translated to rows so that it can be returned
Expand Down Expand Up @@ -136,11 +136,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
* This optimizes the plan to remove [[GpuCoalesceBatches]] nodes that are unnecessary
* or undesired in some situations.
*
* @note This does not examine [[ShuffleCoalesceExec]] nodes in the plan, as they
* @note This does not examine [[GpuShuffleCoalesceExec]] nodes in the plan, as they
* are always required after GPU columnar exchanges during normal shuffle
* to place the data after shuffle on the GPU. Those nodes also do not
* coalesce to the same goal as used by [[GpuCoalesceBatches]], so a
* [[ShuffleCoalesceExec]] immediately followed by a [[GpuCoalesceBatches]] is
* [[GpuShuffleCoalesceExec]] immediately followed by a [[GpuCoalesceBatches]] is
* not unusual.
*/
def optimizeCoalesce(plan: SparkPlan): SparkPlan = plan match {
Expand Down Expand Up @@ -301,7 +301,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] {
private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match {
case exec: GpuShuffleExchangeExecBase =>
// always follow a GPU shuffle with a shuffle coalesce
ShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), conf)
GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), conf)
case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ class AdaptiveQueryExecSuite
_.isInstanceOf[GpuShuffledHashJoinBase]).get
assert(shj.children.length == 2)
assert(shj.children.forall {
case ShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(ShuffleCoalesceExec(_, _), _) => true
case GpuShuffleCoalesceExec(_, _) => true
case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true
case _ => false
})

Expand Down

0 comments on commit 6cdf1ae

Please sign in to comment.