From f7edbe07aa9f2a3d19559698849ad4672b08fb9d Mon Sep 17 00:00:00 2001 From: MithunR Date: Tue, 8 Sep 2020 12:00:25 -0700 Subject: [PATCH] [window] Add GpuWindowExec requiredChildOrdering (#645) WindowExec has checks for child ordering and distribution. This commit adds requiredChildOrdering and requiredChildDistribution to GpuWindowExec. Signed-off-by: Mithun RK --- .../nvidia/spark/rapids/GpuWindowExec.scala | 25 +++++++++++++++++-- 1 file changed, 23 insertions(+), 2 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala index 60111b059c3..fb3f5fcc899 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuWindowExec.scala @@ -21,8 +21,8 @@ import com.nvidia.spark.rapids.GpuMetricNames._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning} import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.execution.window.WindowExec @@ -69,6 +69,11 @@ class GpuWindowExecMeta(windowExec: WindowExec, val windowExpressions: Seq[BaseExprMeta[NamedExpression]] = inputWindowExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val partitionSpec: Seq[BaseExprMeta[Expression]] = + windowExec.partitionSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + val orderSpec: Seq[BaseExprMeta[SortOrder]] = + windowExec.orderSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this))) + override def tagPlanForGpu(): Unit = { // Implementation depends on receiving a `NamedExpression` wrapped WindowExpression. @@ -83,6 +88,8 @@ class GpuWindowExecMeta(windowExec: WindowExec, override def convertToGpu(): GpuExec = { GpuWindowExec( windowExpressions.map(_.convertToGpu()), + partitionSpec.map(_.convertToGpu()), + orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]), childPlans.head.convertIfNeeded(), resultColumnsOnly ) @@ -91,6 +98,8 @@ class GpuWindowExecMeta(windowExec: WindowExec, case class GpuWindowExec( windowExpressionAliases: Seq[Expression], + partitionSpec: Seq[Expression], + orderSpec: Seq[SortOrder], child: SparkPlan, resultColumnsOnly: Boolean ) extends UnaryExecNode with GpuExec { @@ -101,6 +110,18 @@ case class GpuWindowExec( child.output ++ windowExpressionAliases.map(_.asInstanceOf[NamedExpression].toAttribute) } + override def requiredChildDistribution: Seq[Distribution] = { + if (partitionSpec.isEmpty) { + // Only show warning when the number of bytes is larger than 100 MiB? + logWarning("No Partition Defined for Window operation! Moving all data to a single " + + "partition, this can cause serious performance degradation.") + AllTuples :: Nil + } else ClusteredDistribution(partitionSpec) :: Nil + } + + override def requiredChildOrdering: Seq[Seq[SortOrder]] = + Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec) + override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning