Skip to content

Commit

Permalink
Merge branch 'branch-0.5' into map-columnar
Browse files Browse the repository at this point in the history
  • Loading branch information
firestarman committed Apr 2, 2021
2 parents 9971a4c + e05347c commit 274078c
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 142 deletions.
9 changes: 0 additions & 9 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,15 +309,6 @@ Also, the GPU does not support casting from strings containing hex values.

To enable this operation on the GPU, set
[`spark.rapids.sql.castStringToFloat.enabled`](configs.md#sql.castStringToFloat.enabled) to `true`.

### String to Integral Types

The GPU will return incorrect results for strings representing values greater than Long.MaxValue or
less than Long.MinValue. The correct behavior would be to return null for these values, but the GPU
currently overflows and returns an incorrect integer value.

To enable this operation on the GPU, set
[`spark.rapids.sql.castStringToInteger.enabled`](configs.md#sql.castStringToInteger.enabled) to `true`.

### String to Date

Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ Name | Description | Default Value
<a name="sql.castFloatToString.enabled"></a>spark.rapids.sql.castFloatToString.enabled|Casting from floating point types to string on the GPU returns results that have a different precision than the default results of Spark.|false
<a name="sql.castStringToDecimal.enabled"></a>spark.rapids.sql.castStringToDecimal.enabled|When set to true, enables casting from strings to decimal type on the GPU. Currently string to decimal type on the GPU might produce results which slightly differed from the correct results when the string represents any number exceeding the max precision that CAST_STRING_TO_FLOAT can keep. For instance, the GPU returns 99999999999999987 given input string "99999999999999999". The cause of divergence is that we can not cast strings containing scientific notation to decimal directly. So, we have to cast strings to floats firstly. Then, cast floats to decimals. The first step may lead to precision loss.|false
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToInteger.enabled"></a>spark.rapids.sql.castStringToInteger.enabled|When set to true, enables casting from strings to integer types (byte, short, int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect results when the string represents a number larger than Long.MaxValue or smaller than Long.MinValue.|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
Expand Down
1 change: 0 additions & 1 deletion docs/tuning-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,5 +209,4 @@ performance.
- [`spark.rapids.sql.variableFloatAgg.enabled`](configs.md#sql.variableFloatAgg.enabled)
- [`spark.rapids.sql.hasNans`](configs.md#sql.hasNans)
- [`spark.rapids.sql.castFloatToString.enabled`](configs.md#sql.castFloatToString.enabled)
- [`spark.rapids.sql.castStringToInteger.enabled`](configs.md#sql.castStringToInteger.enabled)
- [`spark.rapids.sql.castStringToFloat.enabled`](configs.md#sql.castStringToFloat.enabled)
165 changes: 49 additions & 116 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,7 @@ class CastExprMeta[INPUT <: CastBase](
"CPU returns \"+Infinity\" and \"-Infinity\" respectively. To enable this operation on " +
"the GPU, set" + s" ${RapidsConf.ENABLE_CAST_STRING_TO_FLOAT} to true.")
}
if (!conf.isCastStringToIntegerEnabled && cast.child.dataType == DataTypes.StringType &&
Seq(DataTypes.ByteType, DataTypes.ShortType, DataTypes.IntegerType, DataTypes.LongType)
.contains(cast.dataType)) {
willNotWorkOnGpu("the GPU will return incorrect results for strings representing" +
"values greater than Long.MaxValue or less than Long.MinValue. To enable this " +
"operation on the GPU, set" +
s" ${RapidsConf.ENABLE_CAST_STRING_TO_INTEGER} to true.")
}
if (!conf.isCastStringToTimestampEnabled && fromDataType == DataTypes.StringType
if (!conf.isCastStringToTimestampEnabled && fromType == DataTypes.StringType
&& toType == DataTypes.TimestampType) {
willNotWorkOnGpu("the GPU only supports a subset of formats " +
"when casting strings to timestamps. Refer to the CAST documentation " +
Expand Down Expand Up @@ -133,18 +125,6 @@ object GpuCast {
*/
private val FULL_TIMESTAMP_LENGTH = 27

/**
* Regex for identifying strings that contain numeric values that can be casted to integral
* types. This includes floating point numbers but not numbers containing exponents.
*/
private val CASTABLE_TO_INT_REGEX = "\\s*[+\\-]?[0-9]*(\\.)?[0-9]+\\s*$"

/**
* Regex for identifying strings that contain numeric values that can be casted to integral
* types when ansi is enabled.
*/
private val ANSI_CASTABLE_TO_INT_REGEX = "\\s*[+\\-]?[0-9]+\\s*$"

/**
* Regex to match timestamps with or without trailing zeros.
*/
Expand Down Expand Up @@ -398,44 +378,8 @@ case class GpuCast(
castStringToFloats(trimmed, ansiMode,
GpuColumnVector.getNonNestedRapidsType(dataType))
case ByteType | ShortType | IntegerType | LongType =>
// filter out values that are not valid longs or nulls
val regex = if (ansiMode) {
GpuCast.ANSI_CASTABLE_TO_INT_REGEX
} else {
GpuCast.CASTABLE_TO_INT_REGEX
}
val longStrings = withResource(trimmed.matchesRe(regex)) { regexMatches =>
if (ansiMode) {
withResource(regexMatches.all()) { allRegexMatches =>
if (!allRegexMatches.getBoolean) {
throw new NumberFormatException(GpuCast.INVALID_INPUT_MESSAGE)
}
}
}
withResource(Scalar.fromNull(DType.STRING)) { nullString =>
regexMatches.ifElse(trimmed, nullString)
}
}
// cast to specific integral type after filtering out values that are not in range
// for that type. Note that the scalar values here are named parameters so are not
// created until they are needed
withResource(longStrings) { longStrings =>
GpuColumnVector.getNonNestedRapidsType(dataType) match {
case DType.INT8 =>
castStringToIntegralType(longStrings, DType.INT8,
Scalar.fromInt(Byte.MinValue), Scalar.fromInt(Byte.MaxValue))
case DType.INT16 =>
castStringToIntegralType(longStrings, DType.INT16,
Scalar.fromInt(Short.MinValue), Scalar.fromInt(Short.MaxValue))
case DType.INT32 =>
castStringToIntegralType(longStrings, DType.INT32,
Scalar.fromInt(Int.MinValue), Scalar.fromInt(Int.MaxValue))
case DType.INT64 =>
longStrings.castTo(DType.INT64)
case _ =>
throw new IllegalStateException("Invalid integral type")
}
}
castStringToInts(trimmed, ansiMode,
GpuColumnVector.getNonNestedRapidsType(dataType))
}
}
case (StringType, dt: DecimalType) =>
Expand Down Expand Up @@ -733,6 +677,52 @@ case class GpuCast(
}
}

def castStringToInts(
input: ColumnVector,
ansiEnabled: Boolean,
dType: DType): ColumnVector = {
val cleaned = if (!ansiEnabled) {
// TODO would be great to get rid of this regex, but the overflow checks don't work
// on the more lenient pattern.
// To avoid doing the expensive regex all the time, we will first check to see if we need
// to do it. The only time we do need to do it is when we have a '.' in any of the strings.
val data = input.getData
val hasDot = withResource(
ColumnView.fromDeviceBuffer(data, 0, DType.INT8, data.getLength.toInt)) { childData =>
withResource(GpuScalar.from('.'.toByte, ByteType)) { dot =>
childData.contains(dot)
}
}
if (hasDot) {
withResource(input.extractRe("^([+\\-]?[0-9]+)(?:\\.[0-9]*)?$")) { table =>
table.getColumn(0).incRefCount()
}
} else {
input.incRefCount()
}
} else {
input.incRefCount()
}
withResource(cleaned) { cleaned =>
withResource(cleaned.isInteger(dType)) { isInt =>
if (ansiEnabled) {
withResource(isInt.all()) { allInts =>
if (!allInts.getBoolean) {
throw new NumberFormatException(GpuCast.INVALID_INPUT_MESSAGE)
}
}
cleaned.castTo(dType)
} else {
withResource(cleaned.castTo(dType)) { parsedInt =>
withResource(GpuScalar.from(null, dataType)) { nullVal =>
isInt.ifElse(parsedInt, nullVal)
}
}
}
}
}
}

def castStringToFloats(
input: ColumnVector,
ansiEnabled: Boolean,
Expand Down Expand Up @@ -1109,63 +1099,6 @@ case class GpuCast(
}
}

/**
* Cast column of long values to a smaller integral type (bytes, short, int).
*
* @param longStrings Long values in string format
* @param castToType Type to cast to
* @param minValue Named parameter for function to create Scalar representing range minimum value
* @param maxValue Named parameter for function to create Scalar representing range maximum value
* @return Values cast to specified integral type
*/
private def castStringToIntegralType(longStrings: ColumnVector,
castToType: DType,
minValue: => Scalar,
maxValue: => Scalar): ColumnVector = {

// evaluate min and max named parameters once since they are used in multiple places
withResource(minValue) { minValue: Scalar =>
withResource(maxValue) { maxValue: Scalar =>
withResource(Scalar.fromNull(DType.INT64)) { nulls =>
withResource(longStrings.castTo(DType.INT64)) { values =>

// replace values less than minValue with null
val gtEqMinOrNull = withResource(values.greaterOrEqualTo(minValue)) { isGtEqMin =>
if (ansiMode) {
withResource(isGtEqMin.all()) { all =>
if (!all.getBoolean) {
throw new NumberFormatException(GpuCast.INVALID_INPUT_MESSAGE)
}
}
}
isGtEqMin.ifElse(values, nulls)
}

// replace values greater than maxValue with null
val ltEqMaxOrNull = withResource(gtEqMinOrNull) { gtEqMinOrNull =>
withResource(gtEqMinOrNull.lessOrEqualTo(maxValue)) { isLtEqMax =>
if (ansiMode) {
withResource(isLtEqMax.all()) { all =>
if (!all.getBoolean) {
throw new NumberFormatException(GpuCast.INVALID_INPUT_MESSAGE)
}
}
}
isLtEqMax.ifElse(gtEqMinOrNull, nulls)
}
}

// cast the final values
withResource(ltEqMaxOrNull) { ltEqMaxOrNull =>
ltEqMaxOrNull.castTo(castToType)
}
}
}
}

}
}

private def castIntegralsToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = {

// Use INT64 bounds instead of FLOAT64 bounds, which enables precise comparison.
Expand Down
10 changes: 0 additions & 10 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -582,14 +582,6 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val ENABLE_CAST_STRING_TO_INTEGER = conf("spark.rapids.sql.castStringToInteger.enabled")
.doc("When set to true, enables casting from strings to integer types (byte, short, " +
"int, long) on the GPU. Casting from string to integer types on the GPU returns incorrect " +
"results when the string represents a number larger than Long.MaxValue or smaller than " +
"Long.MinValue.")
.booleanConf
.createWithDefault(false)

val ENABLE_CSV_TIMESTAMPS = conf("spark.rapids.sql.csvTimestamps.enabled")
.doc("When set to true, enables the CSV parser to read timestamps. The default output " +
"format for Spark includes a timezone at the end. Anything except the UTC timezone is not " +
Expand Down Expand Up @@ -1191,8 +1183,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isCastStringToTimestampEnabled: Boolean = get(ENABLE_CAST_STRING_TO_TIMESTAMP)

lazy val isCastStringToIntegerEnabled: Boolean = get(ENABLE_CAST_STRING_TO_INTEGER)

lazy val isCastStringToFloatEnabled: Boolean = get(ENABLE_CAST_STRING_TO_FLOAT)

lazy val isCastStringToDecimalEnabled: Boolean = get(ENABLE_CAST_STRING_TO_DECIMAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,6 @@ class AdaptiveQueryExecSuite
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.LOCAL_SHUFFLE_READER_ENABLED.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "400")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, "50")
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
Expand Down Expand Up @@ -370,7 +369,6 @@ class AdaptiveQueryExecSuite
// disable DemoteBroadcastHashJoin rule from removing BHJ due to empty partitions
.set(SQLConf.NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN.key, "0")
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true")
.set(RapidsConf.TEST_ALLOWED_NONGPU.key, "DataWritingCommandExec")

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -37,7 +37,6 @@ class AnsiCastOpSuite extends GpuExpressionTestSuite {
.set("spark.sql.storeAssignmentPolicy", "ANSI") // note this is the default in 3.0.0
.set(RapidsConf.ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES.key, "true")
.set(RapidsConf.ENABLE_CAST_FLOAT_TO_STRING.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_FLOAT.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "true")

Expand Down
10 changes: 9 additions & 1 deletion tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ class CastOpSuite extends GpuExpressionTestSuite {
.set(RapidsConf.ENABLE_CAST_FLOAT_TO_INTEGRAL_TYPES.key, "true")
.set(RapidsConf.ENABLE_CAST_FLOAT_TO_STRING.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_TIMESTAMP.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_INTEGER.key, "true")
.set(RapidsConf.ENABLE_CAST_STRING_TO_FLOAT.key, "true")
.set("spark.sql.ansi.enabled", String.valueOf(ansiEnabled))

Expand Down Expand Up @@ -362,6 +361,15 @@ class CastOpSuite extends GpuExpressionTestSuite {
col("doubles").cast(TimestampType))
}

testSparkResultsAreEqual("Test cast from strings to int", doublesAsStrings,
conf = sparkConf) {
frame => frame.select(
col("c0").cast(LongType),
col("c0").cast(IntegerType),
col("c0").cast(ShortType),
col("c0").cast(ByteType))
}

testSparkResultsAreEqual("Test cast from strings to doubles", doublesAsStrings,
conf = sparkConf, maxFloatDiff = 0.0001) {
frame => frame.select(
Expand Down

0 comments on commit 274078c

Please sign in to comment.