diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala index e08777f46a1..9241ab7e778 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuColumnarBatchSerializer.scala @@ -42,8 +42,8 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * The serialization read path is notably different. The sequence of serialized bytes IS NOT * deserialized into a cudf `Table` but rather tracked in host memory by a `ColumnarBatch` * that contains a [[SerializedTableColumn]]. During query planning, each GPU columnar shuffle - * exchange is followed by a [[ShuffleCoalesceExec]] that expects to receive only these - * custom batches of [[SerializedTableColumn]]. [[ShuffleCoalesceExec]] coalesces the smaller + * exchange is followed by a [[GpuShuffleCoalesceExec]] that expects to receive only these + * custom batches of [[SerializedTableColumn]]. [[GpuShuffleCoalesceExec]] coalesces the smaller * shuffle partitions into larger tables before placing them on the GPU for further processing. * * @note The RAPIDS shuffle does not use this code. @@ -233,7 +233,7 @@ private class GpuColumnarBatchSerializerInstance( /** * A special `ColumnVector` that describes a serialized table read from shuffle. - * This appears in a `ColumnarBatch` to pass serialized tables to [[ShuffleCoalesceExec]] + * This appears in a `ColumnarBatch` to pass serialized tables to [[GpuShuffleCoalesceExec]] * which should always appear in the query plan immediately after a shuffle. */ class SerializedTableColumn( @@ -254,9 +254,10 @@ object SerializedTableColumn { /** * Build a `ColumnarBatch` consisting of a single [[SerializedTableColumn]] describing * the specified serialized table. + * * @param header header for the serialized table * @param hostBuffer host buffer containing the table data - * @return columnar batch to be passed to [[ShuffleCoalesceExec]] + * @return columnar batch to be passed to [[GpuShuffleCoalesceExec]] */ def from( header: SerializedTableHeader, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleCoalesceExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala similarity index 97% rename from sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleCoalesceExec.scala rename to sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala index 0695348dd60..400f788d161 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ShuffleCoalesceExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuShuffleCoalesceExec.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * @note This should ALWAYS appear in the plan after a GPU shuffle when RAPIDS shuffle is * not being used. */ -case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsConf) +case class GpuShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsConf) extends UnaryExecNode with GpuExec { import GpuMetricNames._ @@ -63,7 +63,7 @@ case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsCo val sparkSchema = GpuColumnVector.extractTypes(schema) child.executeColumnar().mapPartitions { iter => - new ShuffleCoalesceIterator(iter, targetBatchByteSize, sparkSchema, metricsMap) + new GpuShuffleCoalesceIterator(iter, targetBatchByteSize, sparkSchema, metricsMap) } } } @@ -74,7 +74,7 @@ case class ShuffleCoalesceExec(child: SparkPlan, @transient rapidsConf: RapidsCo * to the target batch size and then concatenated on the host before the data * is transferred to the GPU. */ -class ShuffleCoalesceIterator( +class GpuShuffleCoalesceIterator( batchIter: Iterator[ColumnarBatch], targetBatchByteSize: Long, sparkSchema: Array[DataType], diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 4c26c8de062..fabd47ba556 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -53,7 +53,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { if (GpuShuffleEnv.isRapidsShuffleEnabled) { GpuCoalesceBatches(plan, TargetSize(conf.gpuTargetBatchSizeBytes)) } else { - ShuffleCoalesceExec(plan, conf) + GpuShuffleCoalesceExec(plan, conf) } } @@ -64,7 +64,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { GpuRowToColumnarExec(optimizeAdaptiveTransitions(r2c.child, Some(r2c)), goal) case ColumnarToRowExec(GpuBringBackToHost( - ShuffleCoalesceExec(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty => + GpuShuffleCoalesceExec(e: GpuShuffleExchangeExecBase, _))) if parent.isEmpty => // We typically want the final operator in the plan (the operator that has no parent) to be // wrapped in `ColumnarToRowExec(GpuBringBackToHost(ShuffleCoalesceExec(_)))` operators to // bring the data back onto the host and be translated to rows so that it can be returned @@ -136,11 +136,11 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { * This optimizes the plan to remove [[GpuCoalesceBatches]] nodes that are unnecessary * or undesired in some situations. * - * @note This does not examine [[ShuffleCoalesceExec]] nodes in the plan, as they + * @note This does not examine [[GpuShuffleCoalesceExec]] nodes in the plan, as they * are always required after GPU columnar exchanges during normal shuffle * to place the data after shuffle on the GPU. Those nodes also do not * coalesce to the same goal as used by [[GpuCoalesceBatches]], so a - * [[ShuffleCoalesceExec]] immediately followed by a [[GpuCoalesceBatches]] is + * [[GpuShuffleCoalesceExec]] immediately followed by a [[GpuCoalesceBatches]] is * not unusual. */ def optimizeCoalesce(plan: SparkPlan): SparkPlan = plan match { @@ -301,7 +301,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { private def insertShuffleCoalesce(plan: SparkPlan): SparkPlan = plan match { case exec: GpuShuffleExchangeExecBase => // always follow a GPU shuffle with a shuffle coalesce - ShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), conf) + GpuShuffleCoalesceExec(exec.withNewChildren(exec.children.map(insertShuffleCoalesce)), conf) case exec => exec.withNewChildren(plan.children.map(insertShuffleCoalesce)) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala index 200481e01bc..2793e4e73d2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/AdaptiveQueryExecSuite.scala @@ -174,8 +174,8 @@ class AdaptiveQueryExecSuite _.isInstanceOf[GpuShuffledHashJoinBase]).get assert(shj.children.length == 2) assert(shj.children.forall { - case ShuffleCoalesceExec(_, _) => true - case GpuCoalesceBatches(ShuffleCoalesceExec(_, _), _) => true + case GpuShuffleCoalesceExec(_, _) => true + case GpuCoalesceBatches(GpuShuffleCoalesceExec(_, _), _) => true case _ => false })