From 6c09216ebf1bb637ba067995ce251976817dc1f8 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 11 Nov 2020 11:37:40 -0600 Subject: [PATCH] Update how we deal with type checking Signed-off-by: Robert (Bobby) Evans --- .../rapids/shims/spark300/Spark300Shims.scala | 15 +- .../shims/spark300db/Spark300dbShims.scala | 9 +- .../shims/spark301db/Spark301dbShims.scala | 8 +- .../rapids/shims/spark310/Spark310Shims.scala | 16 +- .../nvidia/spark/rapids/GpuOverrides.scala | 155 ++++++++++-------- .../com/nvidia/spark/rapids/RapidsMeta.scala | 11 +- .../com/nvidia/spark/rapids/aggregate.scala | 7 +- .../sql/rapids/complexTypeExtractors.scala | 17 +- .../InternalColumnarRddConverter.scala | 2 +- .../spark/sql/rapids/stringFunctions.scala | 4 +- 10 files changed, 123 insertions(+), 121 deletions(-) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index d032f53e3b3..5cbeb507227 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -136,11 +136,9 @@ class Spark300Shims extends SparkShims { GpuOverrides.exec[FileSourceScanExec]( "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty @@ -270,11 +268,8 @@ class Spark300Shims extends SparkShims { a.dataFilters, conf) } - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) }), GpuOverrides.scan[OrcScan]( "ORC parsing", diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index a02c33da995..fbe20aa9a9a 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -89,11 +89,10 @@ class Spark300dbShims extends Spark300Shims { GpuOverrides.exec[FileSourceScanExec]( "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty diff --git a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala index aceb7479a8e..9406513340b 100644 --- a/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala +++ b/shims/spark301db/src/main/scala/com/nvidia/spark/rapids/shims/spark301db/Spark301dbShims.scala @@ -83,11 +83,9 @@ class Spark301dbShims extends Spark301Shims { GpuOverrides.exec[FileSourceScanExec]( "Reading data from files, often from Hive tables", (fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index ba464bb6d72..6c065afe858 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -141,11 +141,9 @@ class Spark310Shims extends Spark301Shims { // partition filters and data filters are not run on the GPU override val childExprs: Seq[ExprMeta[_]] = Seq.empty - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) override def convertToGpu(): GpuExec = { @@ -222,11 +220,9 @@ class Spark310Shims extends Spark301Shims { a.dataFilters, conf) } - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) }), GpuOverrides.scan[OrcScan]( "ORC parsing", diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 6410b2d297f..cfcfaad3f12 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -20,6 +20,8 @@ import java.time.ZoneId import scala.reflect.ClassTag +import ai.rapids.cudf.DType + import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions._ @@ -437,9 +439,46 @@ object GpuOverrides { } } - def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType) + def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_)) - def isSupportedType(dataType: DataType): Boolean = dataType match { + /** + * Is this particular type supported or not. + * @param dataType the type to check + * @param allowNull should NullType be allowed + * @param allowDecimal should DecimalType be allowed + * @param allowBinary should BinaryType be allowed + * @param allowCalendarInterval should CalendarIntervalType be allowed + * @param allowArray should ArrayType be allowed + * @param allowStruct should StructType be allowed + * @param allowStringMaps should a Map[String, String] specifically be allowed + * @param allowMaps should MapType be allowed generically + * @param allowNesting should nested types like array struct and map allow nested types + * within them, or only primitive types. + * @return true if it is allowed else false + */ + def isSupportedType(dataType: DataType, + allowNull: Boolean = false, + allowDecimal: Boolean = false, + allowBinary: Boolean = false, + allowCalendarInterval: Boolean = false, + allowArray: Boolean = false, + allowStruct: Boolean = false, + allowStringMaps: Boolean = false, + allowMaps: Boolean = false, + allowNesting: Boolean = false): Boolean = { + def checkNested(dataType: DataType): Boolean = { + isSupportedType(dataType, + allowNull = allowNull, + allowDecimal = allowDecimal, + allowBinary = allowBinary && allowNesting, + allowCalendarInterval = allowCalendarInterval && allowNesting, + allowArray = allowArray && allowNesting, + allowStruct = allowStruct && allowNesting, + allowStringMaps = allowStringMaps && allowNesting, + allowMaps = allowMaps && allowNesting, + allowNesting = allowNesting) + } + dataType match { case BooleanType => true case ByteType => true case ShortType => true @@ -450,8 +489,19 @@ object GpuOverrides { case DateType => true case TimestampType => ZoneId.systemDefault().normalized() == GpuOverrides.UTC_TIMEZONE_ID case StringType => true + case dt: DecimalType if allowDecimal => dt.precision <= DType.DECIMAL64_MAX_PRECISION + case NullType => allowNull + case BinaryType => allowBinary + case CalendarIntervalType => allowCalendarInterval + case ArrayType(elementType, _) if allowArray => checkNested(elementType) + case MapType(StringType, StringType, _) if allowStringMaps => true + case MapType(keyType, valueType, _) if allowMaps => + checkNested(keyType) && checkNested(valueType) + case StructType(fields) if allowStruct => + fields.map(_.dataType).forall(checkNested) case _ => false } + } /** * Checks to see if any expressions are a String Literal @@ -547,15 +597,8 @@ object GpuOverrides { } } - /** - * We are overriding this method because currently we only support CalendarIntervalType - * as a Literal - */ - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case CalendarIntervalType => true - case x => isSupportedType(x) - } - + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowCalendarInterval = true) }), expr[Signum]( "Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive", @@ -565,37 +608,35 @@ object GpuOverrides { expr[Alias]( "Gives a column a name", (a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case BinaryType => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true, allowBinary = true) + override def convertToGpu(child: Expression): GpuExpression = GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata) }), expr[AttributeReference]( "References an input column", (att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + // This is the only NOOP operator. It goes away when things are bound override def convertToGpu(): Expression = att - // There are so many of these that we don't need to print them out. - override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = {} + // There are so many of these that we don't need to print them out, unless it + // will not work on the GPU + override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = { + if (!this.canThisBeReplaced) { + super.print(append, depth, all) + } + } }), expr[Cast]( "Convert a column of one type of data into another type", (cast, conf, p, r) => new CastExprMeta[Cast](cast, SparkSession.active.sessionState.conf .ansiEnabled, conf, p, r) { - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case BinaryType => true - case x => isSupportedType(x) - } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowBinary = true) }), expr[AnsiCast]( "Convert a column of one type of data into another type", @@ -627,11 +668,8 @@ object GpuOverrides { (currentRow, conf, p, r) => new ExprMeta[CurrentRow.type](currentRow, conf, p, r) { override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(currentRow) - // CURRENT ROW needs to support NullType. - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case _: NullType => true - case anythingElse => isSupportedType(anythingElse) - } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) } ), expr[UnboundedPreceding.type]( @@ -640,11 +678,8 @@ object GpuOverrides { new ExprMeta[UnboundedPreceding.type](unboundedPreceding, conf, p, r) { override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(unboundedPreceding) - // UnboundedPreceding needs to support NullType. - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case _: NullType => true - case anythingElse => isSupportedType(anythingElse) - } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) } ), expr[UnboundedFollowing.type]( @@ -653,11 +688,8 @@ object GpuOverrides { new ExprMeta[UnboundedFollowing.type](unboundedFollowing, conf, p, r) { override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(unboundedFollowing) - // UnboundedFollowing needs to support NullType. - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case _: NullType => true - case anythingElse => isSupportedType(anythingElse) - } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowNull = true) } ), expr[RowNumber]( @@ -783,11 +815,8 @@ object GpuOverrides { expr[IsNotNull]( "Checks if a value is not null", (a, conf, p, r) => new UnaryExprMeta[IsNotNull](a, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) override def convertToGpu(child: Expression): GpuExpression = GpuIsNotNull(child) }), expr[IsNaN]( @@ -1162,11 +1191,9 @@ object GpuOverrides { expr[EqualTo]( "Check if the values are equal", (a, conf, p, r) => new BinaryExprMeta[EqualTo](a, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuEqualTo(lhs, rhs) }), @@ -1445,10 +1472,8 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[Md5](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuMd5(child) - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall { - case BinaryType => true - case x => isSupportedType(x) - } + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowBinary = true) } ), expr[Upper]( @@ -1773,11 +1798,9 @@ object GpuOverrides { "The backend for most select, withColumn and dropColumn statements", (proj, conf, p, r) => { new SparkPlanMeta[ProjectExec](proj, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + override def convertToGpu(): GpuExec = GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded()) } @@ -1860,11 +1883,9 @@ object GpuOverrides { exec[FilterExec]( "The backend for most filter statements", (filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) { - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) + override def convertToGpu(): GpuExec = GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded()) }), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index f3f5b7425eb..68e324242de 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -102,9 +102,14 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE]( /** * Check if all the types are supported in this Meta */ - def areAllSupportedTypes(types: DataType*): Boolean = { - GpuOverrides.areAllSupportedTypes(types: _*) - } + final def areAllSupportedTypes(types: DataType*): Boolean = + types.forall(isSupportedType) + + /** + * Check if this type is supported or not. + */ + def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t) /** * Keep this on the CPU, but possibly convert its children under it to run on the GPU if enabled. diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala index 0b73a5ef1d0..419ceee30a8 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/aggregate.scala @@ -98,11 +98,8 @@ class GpuHashAggregateMeta( aggregateAttributes ++ resultExpressions - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case _ => isSupportedType(t) - } - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) override def tagPlanForGpu(): Unit = { if (agg.resultExpressions.isEmpty) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala index fc336084f6f..7f9aaf1db73 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/complexTypeExtractors.scala @@ -42,13 +42,8 @@ class GpuGetArrayItemMeta( ordinal: Expression): GpuExpression = GpuGetArrayItem(arr, ordinal) - def isSupported(t: DataType) = t match { - // For now we will only do one level of array type support - case a : ArrayType => isSupportedType(a.elementType) - case _ => isSupportedType(t) - } - - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowArray = true, allowNesting = false) } /** @@ -115,12 +110,8 @@ class GpuGetMapValueMeta( key: Expression): GpuExpression = GpuGetMapValue(child, key) - def isSupported(t: DataType) = t match { - case MapType(StringType, StringType, _) => true - case StringType => true - } - - override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported) + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowStringMaps = true) } case class GpuGetMapValue(child: Expression, key: Expression) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala index 361f3ddca70..d552d8d4822 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/InternalColumnarRddConverter.scala @@ -475,7 +475,7 @@ object InternalColumnarRddConverter extends Logging { def convert(df: DataFrame): RDD[Table] = { val schema = df.schema if (!GpuOverrides.areAllSupportedTypes(schema.map(_.dataType) :_*)) { - val unsupported = schema.map(_.dataType).filter(!GpuOverrides.areAllSupportedTypes(_)).toSet + val unsupported = schema.map(_.dataType).filter(!GpuOverrides.isSupportedType(_)).toSet throw new IllegalArgumentException(s"Cannot convert $df to GPU columnar $unsupported are " + s"not currently supported data types for columnar.") } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala index f4313dfb7b3..fffc82cc306 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/stringFunctions.scala @@ -834,8 +834,8 @@ class GpuStringSplitMeta( limit: Expression): GpuExpression = GpuStringSplit(str, regexp, limit) - // For now we support all of the possible input and output types for this operator - override def areAllSupportedTypes(types: DataType*): Boolean = true + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowArray = true, allowNesting = false) } case class GpuStringSplit(str: Expression, regex: Expression, limit: Expression)