Skip to content

Commit

Permalink
Change Hash Aggregate to allow pass through on MapType
Browse files Browse the repository at this point in the history
Signed-off-by: Kuhu Shukla <kuhus@nvidia.com>
  • Loading branch information
kuhushukla committed Oct 23, 2020
1 parent 3a96362 commit b98dcda
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class Spark300Shims extends SparkShims {
val ignoreNulls: BaseExprMeta[_] =
GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this))
override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls)
override def tagExprForGpu(): Unit = {
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("First on MapType is not supported")
}
}

override def convertToGpu(): GpuExpression =
GpuFirst(child.convertToGpu(), ignoreNulls.convertToGpu())
Expand All @@ -228,6 +233,11 @@ class Spark300Shims extends SparkShims {
val ignoreNulls: BaseExprMeta[_] =
GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this))
override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls)
override def tagExprForGpu(): Unit = {
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Last on MapType is not supported")
}
}

override def convertToGpu(): GpuExpression =
GpuLast(child.convertToGpu(), ignoreNulls.convertToGpu())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuShuffleExchangeExecBase}
import org.apache.spark.sql.types.MapType
import org.apache.spark.storage.{BlockId, BlockManagerId}

class Spark301Shims extends Spark300Shims {
Expand All @@ -55,12 +56,22 @@ class Spark301Shims extends Spark300Shims {
GpuOverrides.expr[First](
"first aggregate operator",
(a, conf, p, r) => new ExprMeta[First](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("First on MapType is not supported")
}
}
override def convertToGpu(): GpuExpression =
GpuFirst(childExprs(0).convertToGpu(), a.ignoreNulls)
}),
GpuOverrides.expr[Last](
"last aggregate operator",
(a, conf, p, r) => new ExprMeta[Last](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Last on MapType is not supported")
}
}
override def convertToGpu(): GpuExpression =
GpuLast(childExprs(0).convertToGpu(), a.ignoreNulls)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,9 @@ object GpuOverrides {
if (count.children.size > 1) {
willNotWorkOnGpu("count of multiple columns not supported")
}
if (count.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Count on MapType is not supported")
}
}

override def convertToGpu(): GpuExpression = GpuCount(childExprs.map(_.convertToGpu()))
Expand All @@ -1305,6 +1308,9 @@ object GpuOverrides {
"will compute incorrect results. If it is known that there are no NaNs, set " +
s" ${RapidsConf.HAS_NANS} to false.")
}
if (max.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Max on MapType is not supported")
}
}
override def convertToGpu(child: Expression): GpuExpression = GpuMax(child)
}),
Expand All @@ -1318,6 +1324,9 @@ object GpuOverrides {
"will compute incorrect results. If it is known that there are no NaNs, set " +
s" ${RapidsConf.HAS_NANS} to false.")
}
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Min on MapType is not supported")
}
}
override def convertToGpu(child: Expression): GpuExpression = GpuMin(child)
}),
Expand All @@ -1333,6 +1342,9 @@ object GpuOverrides {
" once as part of the same query. To enable this anyways set" +
s" ${RapidsConf.ENABLE_FLOAT_AGG} to true.")
}
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Sum on MapType is not supported")
}
}

override def convertToGpu(child: Expression): GpuExpression = GpuSum(child)
Expand All @@ -1349,6 +1361,9 @@ object GpuOverrides {
" computed more than once as part of the same query. To enable this anyways set" +
s" ${RapidsConf.ENABLE_FLOAT_AGG} to true")
}
if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) {
willNotWorkOnGpu("Avg on MapType is not supported")
}
}

override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
import ai.rapids.cudf
import ai.rapids.cudf.NvtxColor
import com.nvidia.spark.rapids.GpuMetricNames._
import com.nvidia.spark.rapids.GpuOverrides.isSupportedType
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.TaskContext
Expand All @@ -34,7 +35,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryE
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate}
import org.apache.spark.sql.types.{DoubleType, FloatType}
import org.apache.spark.sql.types.{DataType, DoubleType, FloatType, MapType, StringType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}

object AggregateUtils {
Expand Down Expand Up @@ -97,6 +98,12 @@ class GpuHashAggregateMeta(
aggregateAttributes ++
resultExpressions

def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)

override def tagPlanForGpu(): Unit = {
if (agg.resultExpressions.isEmpty) {
willNotWorkOnGpu("result expressions is empty")
Expand Down

0 comments on commit b98dcda

Please sign in to comment.