From a7ee5c3cf1302b83b18fa603127dab93310089fe Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Fri, 23 Oct 2020 11:03:05 -0500 Subject: [PATCH] Change Hash Aggregate to allow pass through on MapType --- .../rapids/shims/spark300/Spark300Shims.scala | 10 ++++++++++ .../rapids/shims/spark301/Spark301Shims.scala | 11 +++++++++++ .../com/nvidia/spark/rapids/GpuOverrides.scala | 15 +++++++++++++++ .../scala/com/nvidia/spark/rapids/aggregate.scala | 9 ++++++++- 4 files changed, 44 insertions(+), 1 deletion(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index c9eb9e3a4f3f..d5b15af0582c 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -217,6 +217,11 @@ class Spark300Shims extends SparkShims { val ignoreNulls: BaseExprMeta[_] = GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this)) override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls) + override def tagExprForGpu(): Unit = { + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("First on MapType is not supported") + } + } override def convertToGpu(): GpuExpression = GpuFirst(child.convertToGpu(), ignoreNulls.convertToGpu()) @@ -228,6 +233,11 @@ class Spark300Shims extends SparkShims { val ignoreNulls: BaseExprMeta[_] = GpuOverrides.wrapExpr(a.ignoreNullsExpr, conf, Some(this)) override val childExprs: Seq[BaseExprMeta[_]] = Seq(child, ignoreNulls) + override def tagExprForGpu(): Unit = { + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Last on MapType is not supported") + } + } override def convertToGpu(): GpuExpression = GpuLast(child.convertToGpu(), ignoreNulls.convertToGpu()) diff --git a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala index 1d301841ae3c..1e6d737a9b58 100644 --- a/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala +++ b/shims/spark301/src/main/scala/com/nvidia/spark/rapids/shims/spark301/Spark301Shims.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, ShuffleExchangeLike} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuShuffleExchangeExecBase} +import org.apache.spark.sql.types.MapType import org.apache.spark.storage.{BlockId, BlockManagerId} class Spark301Shims extends Spark300Shims { @@ -55,12 +56,22 @@ class Spark301Shims extends Spark300Shims { GpuOverrides.expr[First]( "first aggregate operator", (a, conf, p, r) => new ExprMeta[First](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("First on MapType is not supported") + } + } override def convertToGpu(): GpuExpression = GpuFirst(childExprs(0).convertToGpu(), a.ignoreNulls) }), GpuOverrides.expr[Last]( "last aggregate operator", (a, conf, p, r) => new ExprMeta[Last](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Last on MapType is not supported") + } + } override def convertToGpu(): GpuExpression = GpuLast(childExprs(0).convertToGpu(), a.ignoreNulls) }) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 2db0c556f283..34cddb3d8d89 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1291,6 +1291,9 @@ object GpuOverrides { if (count.children.size > 1) { willNotWorkOnGpu("count of multiple columns not supported") } + if (count.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Count on MapType is not supported") + } } override def convertToGpu(): GpuExpression = GpuCount(childExprs.map(_.convertToGpu())) @@ -1305,6 +1308,9 @@ object GpuOverrides { "will compute incorrect results. If it is known that there are no NaNs, set " + s" ${RapidsConf.HAS_NANS} to false.") } + if (max.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Max on MapType is not supported") + } } override def convertToGpu(child: Expression): GpuExpression = GpuMax(child) }), @@ -1318,6 +1324,9 @@ object GpuOverrides { "will compute incorrect results. If it is known that there are no NaNs, set " + s" ${RapidsConf.HAS_NANS} to false.") } + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Min on MapType is not supported") + } } override def convertToGpu(child: Expression): GpuExpression = GpuMin(child) }), @@ -1333,6 +1342,9 @@ object GpuOverrides { " once as part of the same query. To enable this anyways set" + s" ${RapidsConf.ENABLE_FLOAT_AGG} to true.") } + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Sum on MapType is not supported") + } } override def convertToGpu(child: Expression): GpuExpression = GpuSum(child) @@ -1349,6 +1361,9 @@ object GpuOverrides { " computed more than once as part of the same query. To enable this anyways set" + s" ${RapidsConf.ENABLE_FLOAT_AGG} to true") } + if (a.children.exists(expr => expr.dataType.isInstanceOf[MapType])) { + willNotWorkOnGpu("Avg on MapType is not supported") + } } override def convertToGpu(child: Expression): GpuExpression = GpuAverage(child) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 8386a1860f91..0d492741218e 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf import ai.rapids.cudf.NvtxColor import com.nvidia.spark.rapids.GpuMetricNames._ +import com.nvidia.spark.rapids.GpuOverrides.isSupportedType import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.TaskContext @@ -34,7 +35,7 @@ import org.apache.spark.sql.execution.{ExplainUtils, SortExec, SparkPlan, UnaryE import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.rapids.{CudfAggregate, GpuAggregateExpression, GpuDeclarativeAggregate} -import org.apache.spark.sql.types.{DoubleType, FloatType} +import org.apache.spark.sql.types.{DataType, DoubleType, FloatType, MapType, StringType} import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector} object AggregateUtils { @@ -97,6 +98,12 @@ class GpuHashAggregateMeta( aggregateAttributes ++ resultExpressions + def isSupported(t: DataType) = t match { + case MapType(StringType, StringType, _) => true + case _ => isSupportedType(t) + } + override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def tagPlanForGpu(): Unit = { if (agg.resultExpressions.isEmpty) { willNotWorkOnGpu("result expressions is empty")