Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update how we deal with type checking #1099

Merged
merged 1 commit into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,9 @@ class Spark300Shims extends SparkShims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down Expand Up @@ -270,11 +268,8 @@ class Spark300Shims extends SparkShims {
a.dataFilters,
conf)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,10 @@ class Spark300dbShims extends Spark300Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,9 @@ class Spark301dbShims extends Spark301Shims {
GpuOverrides.exec[FileSourceScanExec](
"Reading data from files, often from Hive tables",
(fsse, conf, p, r) => new SparkPlanMeta[FileSourceScanExec](fsse, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,9 @@ class Spark310Shims extends Spark301Shims {
// partition filters and data filters are not run on the GPU
override val childExprs: Seq[ExprMeta[_]] = Seq.empty

def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this)

override def convertToGpu(): GpuExec = {
Expand Down Expand Up @@ -222,11 +220,9 @@ class Spark310Shims extends Spark301Shims {
a.dataFilters,
conf)
}
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
}),
GpuOverrides.scan[OrcScan](
"ORC parsing",
Expand Down
155 changes: 88 additions & 67 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import java.time.ZoneId

import scala.reflect.ClassTag

import ai.rapids.cudf.DType

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -437,9 +439,46 @@ object GpuOverrides {
}
}

def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType)
def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_))

def isSupportedType(dataType: DataType): Boolean = dataType match {
/**
* Is this particular type supported or not.
* @param dataType the type to check
* @param allowNull should NullType be allowed
* @param allowDecimal should DecimalType be allowed
* @param allowBinary should BinaryType be allowed
* @param allowCalendarInterval should CalendarIntervalType be allowed
* @param allowArray should ArrayType be allowed
* @param allowStruct should StructType be allowed
* @param allowStringMaps should a Map[String, String] specifically be allowed
* @param allowMaps should MapType be allowed generically
* @param allowNesting should nested types like array struct and map allow nested types
* within them, or only primitive types.
* @return true if it is allowed else false
*/
def isSupportedType(dataType: DataType,
allowNull: Boolean = false,
allowDecimal: Boolean = false,
allowBinary: Boolean = false,
allowCalendarInterval: Boolean = false,
allowArray: Boolean = false,
allowStruct: Boolean = false,
allowStringMaps: Boolean = false,
allowMaps: Boolean = false,
allowNesting: Boolean = false): Boolean = {
def checkNested(dataType: DataType): Boolean = {
isSupportedType(dataType,
allowNull = allowNull,
allowDecimal = allowDecimal,
allowBinary = allowBinary && allowNesting,
allowCalendarInterval = allowCalendarInterval && allowNesting,
allowArray = allowArray && allowNesting,
allowStruct = allowStruct && allowNesting,
allowStringMaps = allowStringMaps && allowNesting,
allowMaps = allowMaps && allowNesting,
allowNesting = allowNesting)
}
dataType match {
case BooleanType => true
case ByteType => true
case ShortType => true
Expand All @@ -450,8 +489,19 @@ object GpuOverrides {
case DateType => true
case TimestampType => ZoneId.systemDefault().normalized() == GpuOverrides.UTC_TIMEZONE_ID
case StringType => true
case dt: DecimalType if allowDecimal => dt.precision <= DType.DECIMAL64_MAX_PRECISION
case NullType => allowNull
case BinaryType => allowBinary
case CalendarIntervalType => allowCalendarInterval
case ArrayType(elementType, _) if allowArray => checkNested(elementType)
case MapType(StringType, StringType, _) if allowStringMaps => true
case MapType(keyType, valueType, _) if allowMaps =>
checkNested(keyType) && checkNested(valueType)
case StructType(fields) if allowStruct =>
fields.map(_.dataType).forall(checkNested)
case _ => false
}
}

/**
* Checks to see if any expressions are a String Literal
Expand Down Expand Up @@ -547,15 +597,8 @@ object GpuOverrides {
}
}

/**
* We are overriding this method because currently we only support CalendarIntervalType
* as a Literal
*/
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case CalendarIntervalType => true
case x => isSupportedType(x)
}

override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowCalendarInterval = true)
}),
expr[Signum](
"Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive",
Expand All @@ -565,37 +608,35 @@ object GpuOverrides {
expr[Alias](
"Gives a column a name",
(a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case BinaryType => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true, allowBinary = true)

override def convertToGpu(child: Expression): GpuExpression =
GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata)
}),
expr[AttributeReference](
"References an input column",
(att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

// This is the only NOOP operator. It goes away when things are bound
override def convertToGpu(): Expression = att

// There are so many of these that we don't need to print them out.
override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = {}
// There are so many of these that we don't need to print them out, unless it
// will not work on the GPU
override def print(append: StringBuilder, depth: Int, all: Boolean): Unit = {
if (!this.canThisBeReplaced) {
super.print(append, depth, all)
}
}
}),
expr[Cast](
"Convert a column of one type of data into another type",
(cast, conf, p, r) => new CastExprMeta[Cast](cast, SparkSession.active.sessionState.conf
.ansiEnabled, conf, p, r) {
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case BinaryType => true
case x => isSupportedType(x)
}
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowBinary = true)
}),
expr[AnsiCast](
"Convert a column of one type of data into another type",
Expand Down Expand Up @@ -627,11 +668,8 @@ object GpuOverrides {
(currentRow, conf, p, r) => new ExprMeta[CurrentRow.type](currentRow, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(currentRow)

// CURRENT ROW needs to support NullType.
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case _: NullType => true
case anythingElse => isSupportedType(anythingElse)
}
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowNull = true)
}
),
expr[UnboundedPreceding.type](
Expand All @@ -640,11 +678,8 @@ object GpuOverrides {
new ExprMeta[UnboundedPreceding.type](unboundedPreceding, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(unboundedPreceding)

// UnboundedPreceding needs to support NullType.
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case _: NullType => true
case anythingElse => isSupportedType(anythingElse)
}
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowNull = true)
}
),
expr[UnboundedFollowing.type](
Expand All @@ -653,11 +688,8 @@ object GpuOverrides {
new ExprMeta[UnboundedFollowing.type](unboundedFollowing, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuSpecialFrameBoundary(unboundedFollowing)

// UnboundedFollowing needs to support NullType.
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case _: NullType => true
case anythingElse => isSupportedType(anythingElse)
}
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowNull = true)
}
),
expr[RowNumber](
Expand Down Expand Up @@ -783,11 +815,8 @@ object GpuOverrides {
expr[IsNotNull](
"Checks if a value is not null",
(a, conf, p, r) => new UnaryExprMeta[IsNotNull](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)
override def convertToGpu(child: Expression): GpuExpression = GpuIsNotNull(child)
}),
expr[IsNaN](
Expand Down Expand Up @@ -1162,11 +1191,9 @@ object GpuOverrides {
expr[EqualTo](
"Check if the values are equal",
(a, conf, p, r) => new BinaryExprMeta[EqualTo](a, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuEqualTo(lhs, rhs)
}),
Expand Down Expand Up @@ -1445,10 +1472,8 @@ object GpuOverrides {
(a, conf, p, r) => new UnaryExprMeta[Md5](a, conf, p, r) {
override def convertToGpu(child: Expression): GpuExpression = GpuMd5(child)

override def areAllSupportedTypes(types: DataType*): Boolean = types.forall {
case BinaryType => true
case x => isSupportedType(x)
}
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowBinary = true)
}
),
expr[Upper](
Expand Down Expand Up @@ -1773,11 +1798,9 @@ object GpuOverrides {
"The backend for most select, withColumn and dropColumn statements",
(proj, conf, p, r) => {
new SparkPlanMeta[ProjectExec](proj, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def convertToGpu(): GpuExec =
GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded())
}
Expand Down Expand Up @@ -1860,11 +1883,9 @@ object GpuOverrides {
exec[FilterExec](
"The backend for most filter statements",
(filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) {
def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def convertToGpu(): GpuExec =
GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded())
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,14 @@ abstract class RapidsMeta[INPUT <: BASE, BASE, OUTPUT <: BASE](
/**
* Check if all the types are supported in this Meta
*/
def areAllSupportedTypes(types: DataType*): Boolean = {
GpuOverrides.areAllSupportedTypes(types: _*)
}
final def areAllSupportedTypes(types: DataType*): Boolean =
types.forall(isSupportedType)

/**
* Check if this type is supported or not.
*/
def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t)

/**
* Keep this on the CPU, but possibly convert its children under it to run on the GPU if enabled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,8 @@ class GpuHashAggregateMeta(
aggregateAttributes ++
resultExpressions

def isSupported(t: DataType) = t match {
case MapType(StringType, StringType, _) => true
case _ => isSupportedType(t)
}
override def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupported)
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t, allowStringMaps = true)

override def tagPlanForGpu(): Unit = {
if (agg.resultExpressions.isEmpty) {
Expand Down
Loading