Skip to content

Commit

Permalink
[window] Add GpuWindowExec requiredChildOrdering
Browse files Browse the repository at this point in the history
WindowExec has checks for child ordering and distribution.
This commit adds requiredChildOrdering and requiredChildDistribution
to GpuWindowExec.

Signed-off-by: Mithun RK <mythrocks@gmail.com>
  • Loading branch information
mythrocks committed Sep 2, 2020
1 parent 78f2519 commit 99c67db
Showing 1 changed file with 23 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import com.nvidia.spark.rapids.GpuMetricNames._

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.window.WindowExec
Expand Down Expand Up @@ -69,6 +69,11 @@ class GpuWindowExecMeta(windowExec: WindowExec,
val windowExpressions: Seq[BaseExprMeta[NamedExpression]] =
inputWindowExpressions.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

val partitionSpec: Seq[BaseExprMeta[Expression]] =
windowExec.partitionSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this)))
val orderSpec: Seq[BaseExprMeta[SortOrder]] =
windowExec.orderSpec.map(GpuOverrides.wrapExpr(_, conf, Some(this)))

override def tagPlanForGpu(): Unit = {

// Implementation depends on receiving a `NamedExpression` wrapped WindowExpression.
Expand All @@ -83,6 +88,8 @@ class GpuWindowExecMeta(windowExec: WindowExec,
override def convertToGpu(): GpuExec = {
GpuWindowExec(
windowExpressions.map(_.convertToGpu()),
partitionSpec.map(_.convertToGpu()),
orderSpec.map(_.convertToGpu().asInstanceOf[SortOrder]),
childPlans.head.convertIfNeeded(),
resultColumnsOnly
)
Expand All @@ -91,6 +98,8 @@ class GpuWindowExecMeta(windowExec: WindowExec,

case class GpuWindowExec(
windowExpressionAliases: Seq[Expression],
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
child: SparkPlan,
resultColumnsOnly: Boolean
) extends UnaryExecNode with GpuExec {
Expand All @@ -101,6 +110,18 @@ case class GpuWindowExec(
child.output ++ windowExpressionAliases.map(_.asInstanceOf[NamedExpression].toAttribute)
}

override def requiredChildDistribution: Seq[Distribution] = {
if (partitionSpec.isEmpty) {
// Only show warning when the number of bytes is larger than 100 MiB?
logWarning("No Partition Defined for Window operation! Moving all data to a single "
+ "partition, this can cause serious performance degradation.")
AllTuples :: Nil
} else ClusteredDistribution(partitionSpec) :: Nil
}

override def requiredChildOrdering: Seq[Seq[SortOrder]] =
Seq(partitionSpec.map(SortOrder(_, Ascending)) ++ orderSpec)

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning
Expand Down

0 comments on commit 99c67db

Please sign in to comment.