From ab0d61f9432d13f4d21963d4edc8717669552aff Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Nov 2020 08:21:20 -0700 Subject: [PATCH 01/18] Support unix_timestamp on GPU for subset of formats Signed-off-by: Andy Grove --- docs/configs.md | 2 +- .../com/nvidia/spark/rapids/GpuCast.scala | 29 ++-- .../nvidia/spark/rapids/GpuOverrides.scala | 23 ++- .../sql/rapids/datetimeExpressions.scala | 154 +++++++++++++++--- .../spark/rapids/ParseDateTimeSuite.scala | 129 +++++++++++++++ 5 files changed, 299 insertions(+), 38 deletions(-) create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala diff --git a/docs/configs.md b/docs/configs.md index 8bc5ea11956..779401f2603 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -224,7 +224,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.UnaryPositive|`positive`|A numeric value with a + in front of it|true|None| spark.rapids.sql.expression.UnboundedFollowing$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None| spark.rapids.sql.expression.UnboundedPreceding$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None| -spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null| +spark.rapids.sql.expression.UnixTimestamp|`unix_timestamp`|Returns the UNIX timestamp of current or specified time|true|None| spark.rapids.sql.expression.Upper|`upper`, `ucase`|String uppercase operator|false|This is not 100% compatible with the Spark version because in some cases unicode characters change byte width when changing the case. The GPU string conversion does not support these characters. For a full list of unsupported characters see https://github.com/rapidsai/cudf/issues/3132| spark.rapids.sql.expression.WeekDay|`weekday`|Returns the day of the week (0 = Monday...6=Sunday)|true|None| spark.rapids.sql.expression.WindowExpression| |Calculates a return value for every input row of a table based on a group (or "window") of rows|true|None| diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 119b0147ded..b879ec4c387 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -122,6 +122,12 @@ object GpuCast { val INVALID_FLOAT_CAST_MSG = "At least one value is either null or is an invalid number" + val EPOCH = "epoch" + val NOW = "now" + val TODAY = "today" + val YESTERDAY = "yesterday" + val TOMORROW = "tomorrow" + /** * Returns true iff we can cast `from` to `to` using the GPU. */ @@ -182,6 +188,17 @@ object GpuCast { case _ => false } } + + def calculateSpecialDates: Map[String, Int] = { + val now = DateTimeUtils.currentDate(ZoneId.of("UTC")) + Map( + EPOCH -> 0, + NOW -> now, + TODAY -> now, + YESTERDAY -> (now - 1), + TOMORROW -> (now + 1) + ) + } } /** @@ -655,16 +672,6 @@ case class GpuCast( } } - // special dates - val now = DateTimeUtils.currentDate(ZoneId.of("UTC")) - val specialDates: Map[String, Int] = Map( - "epoch" -> 0, - "now" -> now, - "today" -> now, - "yesterday" -> (now - 1), - "tomorrow" -> (now + 1) - ) - var sanitizedInput = input.incRefCount() // replace partial months @@ -677,6 +684,8 @@ case class GpuCast( cv.stringReplaceWithBackrefs("-([0-9])([ T](:?[\\r\\n]|.)*)?\\Z", "-0\\1") } + val specialDates = calculateSpecialDates + withResource(sanitizedInput) { sanitizedInput => // convert dates that are in valid formats yyyy, yyyy-mm, yyyy-mm-dd diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index c721ec4fd8a..6d09e7a7684 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleEx import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ import org.apache.spark.sql.execution.window.WindowExec +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids._ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.rapids.execution.{GpuBroadcastMeta, GpuBroadcastNestedLoopJoinMeta, GpuCustomShuffleReaderExec, GpuShuffleMeta} @@ -1069,9 +1070,9 @@ object GpuOverrides { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { if (conf.isImprovedTimestampOpsEnabled) { // passing the already converted strf string for a little optimization - GpuToUnixTimestampImproved(lhs, rhs, strfFormat) + GpuToUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat) } else { - GpuToUnixTimestamp(lhs, rhs, strfFormat) + GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat) } } }) @@ -1083,14 +1084,12 @@ object GpuOverrides { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = { if (conf.isImprovedTimestampOpsEnabled) { // passing the already converted strf string for a little optimization - GpuUnixTimestampImproved(lhs, rhs, strfFormat) + GpuUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat) } else { - GpuUnixTimestamp(lhs, rhs, strfFormat) + GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat) } } - }) - .incompat("Incorrectly formatted strings and bogus dates produce garbage data" + - " instead of null"), + }), expr[Hour]( "Returns the hour component of the string/timestamp", (a, conf, p, r) => new UnaryExprMeta[Hour](a, conf, p, r) { @@ -2012,6 +2011,16 @@ object GpuOverrides { ).map(r => (r.getClassFor.asSubclass(classOf[SparkPlan]), r)).toMap val execs: Map[Class[_ <: SparkPlan], ExecRule[_ <: SparkPlan]] = commonExecs ++ ShimLoader.getSparkShims.getExecs + + def getTimeParserPolicy: TimeParserPolicy = { + val policy = SQLConf.get.getConfString(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION") + policy match { + case "LEGACY" => LegacyTimeParserPolicy + case "EXCEPTION" => ExceptionTimeParserPolicy + case "CORRECTED" => CorrectedTimeParserPolicy + } + } + } /** Tag the initial plan when AQE is enabled */ case class GpuQueryStagePrepOverrides() extends Rule[SparkPlan] with Logging { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 3c48dfcd2e4..017024d8a03 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -16,15 +16,17 @@ package org.apache.spark.sql.rapids +import java.sql.SQLException import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} -import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException -import com.nvidia.spark.rapids.GpuOverrides.extractStringLit +import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.CalendarInterval @@ -285,6 +287,7 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi (expr: A, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], rule: ConfKeysAndIncompat) extends BinaryExprMeta[A](expr, conf, parent, rule) { + var sparkFormat: String = _ var strfFormat: String = _ override def tagExprForGpu(): Unit = { if (ZoneId.of(expr.timeZoneId.get).normalized() != GpuOverrides.UTC_TIMEZONE_ID) { @@ -293,11 +296,24 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi // Date and Timestamp work too if (expr.right.dataType == StringType) { try { - val rightLit = extractStringLit(expr.right) - if (rightLit.isDefined) { - strfFormat = DateUtils.toStrf(rightLit.get) - } else { - willNotWorkOnGpu("format has to be a string literal") + extractStringLit(expr.right) match { + case Some(rightLit) => + if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { + willNotWorkOnGpu("legacyTimeParserPolicy is LEGACY") + } else { + val gpuSupportedFormats = Seq( + "yyyy-MM-dd", + "yyyy-MM-dd HH:mm:ss" + ) + sparkFormat = rightLit + if (gpuSupportedFormats.contains(sparkFormat)) { + strfFormat = DateUtils.toStrf(sparkFormat) + } else { + willNotWorkOnGpu(s"Unsupported GpuUnixTimestamp format: $sparkFormat") + } + } + case None => + willNotWorkOnGpu("format has to be a string literal") } } catch { case x: TimestampFormatConversionException => @@ -307,6 +323,11 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi } } +sealed trait TimeParserPolicy extends Serializable +object LegacyTimeParserPolicy extends TimeParserPolicy +object ExceptionTimeParserPolicy extends TimeParserPolicy +object CorrectedTimeParserPolicy extends TimeParserPolicy + /** * A direct conversion of Spark's ToTimestamp class which converts time to UNIX timestamp by * first converting to microseconds and then dividing by the downScaleFactor @@ -316,6 +337,7 @@ abstract class GpuToTimestamp def downScaleFactor = 1000000 // MICROS IN SECOND + def sparkFormat: String def strfFormat: String override def inputTypes: Seq[AbstractDataType] = @@ -326,6 +348,8 @@ abstract class GpuToTimestamp override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess + private val timeParserPolicy = getTimeParserPolicy + override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { throw new IllegalArgumentException("rhs has to be a scalar for the unixtimestamp to work") } @@ -338,7 +362,93 @@ abstract class GpuToTimestamp override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - lhs.getBase.asTimestampMicroseconds(strfFormat) + + val DAY_MICROS = 24 * 60 * 60 * 1000000L + val specialDates = GpuCast.calculateSpecialDates + .map { + case (name, days) => (name, days * DAY_MICROS) + } + + def daysScalar(name: String): Scalar = { + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDates(name)) + } + + def daysEqual(name: String): ColumnVector = { + lhs.getBase.equalTo(Scalar.fromString(name)) + } + + // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime + // and ToUnixTime and will support parsing a subset of a string so we check the length of + // the string as well which works well for fixed-length formats but if/when we want to + // support variable-length formats (such as timestamps with milliseconds) then we will need + // to use regex instead. + val isTimestamp = withResource(lhs.getBase.getCharLengths) { actualLen => + withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => + withResource(actualLen.equalTo(expectedLen)) { lengthOk => + withResource(lhs.getBase.isTimestamp(strfFormat)) { isTimestamp => + isTimestamp.and(lengthOk) + } + } + } + } + + // in addition to date/timestamp strings, we also need to check for special dates and null + // values, since anything else is invalid and should throw an error or be converted to null + // depending on the policy + withResource(isTimestamp) { isTimestamp => + withResource(daysEqual(GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(GpuCast.NOW)) { isNow => + withResource(daysEqual(GpuCast.TODAY)) { isToday => + withResource(daysEqual(GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(GpuCast.TOMORROW)) { isTomorrow => + withResource(lhs.getBase.isNull) { isNull => + val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( + isYesterday.or(isTomorrow.or(isNull)))))) + + // throw error if legacyTimeParserPolicy is EXCEPTION + if (timeParserPolicy == ExceptionTimeParserPolicy) { + withResource(Scalar.fromBool(false)) { falseScalar => + if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { + throw new RuntimeException( + s"Expression ${this.getClass.getSimpleName} failed to parse one or " + + "more values because they did not match the specified format. Set " + + "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + + "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + + "LEGACY will force this expression to run on CPU though)") + } + } + } + + // do the conversion + withResource(Scalar.fromNull(DType.TIMESTAMP_MICROSECONDS)) { nullValue => + withResource(lhs.getBase.asTimestampMicroseconds(strfFormat)) { converted => + withResource(daysScalar(GpuCast.EPOCH)) { epoch => + withResource(daysScalar(GpuCast.NOW)) { now => + withResource(daysScalar(GpuCast.TODAY)) { today => + withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => + withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => + isTimestamp.ifElse(converted, + isEpoch.ifElse(epoch, + isNow.ifElse(now, + isToday.ifElse(today, + isYesterday.ifElse(yesterday, + isTomorrow.ifElse(tomorrow, + nullValue)))))) + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } else { // Timestamp or DateType lhs.getBase.asTimestampMicroseconds() } @@ -397,9 +507,10 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp { } case class GpuUnixTimestamp(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestamp { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestamp { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -411,9 +522,10 @@ case class GpuUnixTimestamp(strTs: Expression, } case class GpuToUnixTimestamp(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestamp { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestamp { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -425,9 +537,10 @@ case class GpuToUnixTimestamp(strTs: Expression, } case class GpuUnixTimestampImproved(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestampImproved { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestampImproved { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) @@ -439,9 +552,10 @@ case class GpuUnixTimestampImproved(strTs: Expression, } case class GpuToUnixTimestampImproved(strTs: Expression, - format: Expression, - strf: String, - timeZoneId: Option[String] = None) extends GpuToTimestampImproved { + format: Expression, + sparkFormat: String, + strf: String, + timeZoneId: Option[String] = None) extends GpuToTimestampImproved { override def strfFormat = strf override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { copy(timeZoneId = Option(timeZoneId)) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala new file mode 100644 index 00000000000..ac7c45cc96b --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.{col, to_date, unix_timestamp} +import org.apache.spark.sql.internal.SQLConf + +class ParseDateTimeSuite extends SparkQueryCompareTestSuite { + + testSparkResultsAreEqual("to_date", + datesAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd")) + } + + testSparkResultsAreEqual("to_date default pattern", + datesAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", to_date(col("c0"))) + } + + testSparkResultsAreEqual("unix_timestamp parse date", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd")) + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp millis (fall back to CPU)", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED") + .set(RapidsConf.TEST_ALLOWED_NONGPU.key, "ProjectExec,Alias,UnixTimestamp,Literal")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss.SSS")) + } + + testSparkResultsAreEqual("unix_timestamp parse timestamp default pattern", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"))) + } + + test("fail to parse when policy is EXCEPTION") { + val e = intercept[SparkException] { + val df = withGpuSparkSession(spark => { + timestampsAsStrings(spark) + .repartition(2) + .withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + }, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION")) + df.collect() + } + assert(e.getMessage.contains("failed to parse one or more values")) + } + + test("fall back to CPU when policy is LEGACY") { + val e = intercept[IllegalArgumentException] { + val df = withGpuSparkSession(spark => { + timestampsAsStrings(spark) + .repartition(2) + .withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) + }, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "LEGACY")) + df.collect() + } + assert(e.getMessage.contains( + "Part of the plan is not columnar class org.apache.spark.sql.execution.ProjectExec")) + } + + private def timestampsAsStrings(spark: SparkSession) = { + import spark.implicits._ + timestampValues.toDF("c0") + } + + private def datesAsStrings(spark: SparkSession) = { + import spark.implicits._ + val values = Seq( + GpuCast.EPOCH, + GpuCast.NOW, + GpuCast.TODAY, + GpuCast.YESTERDAY, + GpuCast.TOMORROW, + ) ++ timestampValues + values.toDF("c0") + } + + private val timestampValues = Seq( + "", + "null", + null, + "\n", + "1999-12-31 ", + "1999-12-31 11", + "1999-12-31 11:", + "1999-12-31 11:5", + "1999-12-31 11:59", + "1999-12-31 11:59:", + "1999-12-31 11:59:5", + "1999-12-31 11:59:59", + "1999-12-31 11:59:59.", + "1999-12-31 11:59:59.9", + "1999-12-31 11:59:59.99", + "1999-12-31 11:59:59.999", + "1999-12-31", + "1999-12-31\n", + "\t1999-12-31", + "\n1999-12-31", + "1999/12/31" + ) +} + From d0768c761bf5bfca5b5cecb860ddb47379443fb1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Fri, 13 Nov 2020 10:10:18 -0700 Subject: [PATCH 02/18] close scalar value Signed-off-by: Andy Grove --- .../org/apache/spark/sql/rapids/datetimeExpressions.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 017024d8a03..a4195bd8550 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -374,7 +374,9 @@ abstract class GpuToTimestamp } def daysEqual(name: String): ColumnVector = { - lhs.getBase.equalTo(Scalar.fromString(name)) + withResource(Scalar.fromString(name)) { scalarName => + lhs.getBase.equalTo(scalarName) + } } // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime From 1c43f8d57fb93d83e104af06134ea50c06ce7bb1 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 10:18:34 -0700 Subject: [PATCH 03/18] compatible formats will now run on GPU without requiring incompatibleOps to be set Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/DateUtils.scala | 2 + .../com/nvidia/spark/rapids/GpuCast.scala | 6 +-- .../sql/rapids/datetimeExpressions.scala | 45 +++++++++++-------- .../spark/rapids/ParseDateTimeSuite.scala | 2 +- 4 files changed, 31 insertions(+), 24 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index a879011580a..43087df9dbe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -36,6 +36,8 @@ object DateUtils { "MM" -> "%m", "LL" -> "%m", "dd" -> "%d", "mm" -> "%M", "ss" -> "%S", "HH" -> "%H", "yy" -> "%y", "yyyy" -> "%Y", "SSSSSS" -> "%f") + val ONE_DAY_MICROSECONDS = 86400000000L + case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int) /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index b879ec4c387..bba40dcc66c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -96,8 +96,6 @@ object GpuCast { "\\A\\d{4}\\-\\d{2}\\-\\d{2}[ T]\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z\\Z" private val TIMESTAMP_REGEX_NO_DATE = "\\A[T]?(\\d{2}:\\d{2}:\\d{2}\\.\\d{6}Z)\\Z" - private val ONE_DAY_MICROSECONDS = 86400000000L - /** * Regex for identifying strings that contain numeric values that can be casted to integral * types. This includes floating point numbers but not numbers containing exponents. @@ -768,8 +766,8 @@ case class GpuCast( "epoch" -> 0, "now" -> today, "today" -> today, - "yesterday" -> (today - ONE_DAY_MICROSECONDS), - "tomorrow" -> (today + ONE_DAY_MICROSECONDS) + "yesterday" -> (today - DateUtils.ONE_DAY_MICROSECONDS), + "tomorrow" -> (today + DateUtils.ONE_DAY_MICROSECONDS) ) var sanitizedInput = input.incRefCount() diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index a4195bd8550..bf4dc7c5953 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -298,19 +298,15 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi try { extractStringLit(expr.right) match { case Some(rightLit) => + sparkFormat = rightLit if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { willNotWorkOnGpu("legacyTimeParserPolicy is LEGACY") + } else if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) || + conf.isIncompatEnabled) { + strfFormat = DateUtils.toStrf(sparkFormat) } else { - val gpuSupportedFormats = Seq( - "yyyy-MM-dd", - "yyyy-MM-dd HH:mm:ss" - ) - sparkFormat = rightLit - if (gpuSupportedFormats.contains(sparkFormat)) { - strfFormat = DateUtils.toStrf(sparkFormat) - } else { - willNotWorkOnGpu(s"Unsupported GpuUnixTimestamp format: $sparkFormat") - } + willNotWorkOnGpu(s"incompatible format '$sparkFormat'. Set " + + s"spark.rapids.sql.incompatibleOps.enabled=true to force onto GPU.") } case None => willNotWorkOnGpu("format has to be a string literal") @@ -328,6 +324,14 @@ object LegacyTimeParserPolicy extends TimeParserPolicy object ExceptionTimeParserPolicy extends TimeParserPolicy object CorrectedTimeParserPolicy extends TimeParserPolicy +object GpuToTimestamp { + /** We are compatible with Spark for these formats */ + val COMPATIBLE_FORMATS = Seq( + "yyyy-MM-dd", + "yyyy-MM-dd HH:mm:ss" + ) +} + /** * A direct conversion of Spark's ToTimestamp class which converts time to UNIX timestamp by * first converting to microseconds and then dividing by the downScaleFactor @@ -363,10 +367,9 @@ abstract class GpuToTimestamp val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - val DAY_MICROS = 24 * 60 * 60 * 1000000L val specialDates = GpuCast.calculateSpecialDates .map { - case (name, days) => (name, days * DAY_MICROS) + case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS) } def daysScalar(name: String): Scalar = { @@ -380,18 +383,22 @@ abstract class GpuToTimestamp } // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime - // and ToUnixTime and will support parsing a subset of a string so we check the length of - // the string as well which works well for fixed-length formats but if/when we want to + // and ToUnixTime and will support parsing a subset of a string so we check the length of + // the string as well which works well for fixed-length formats but if/when we want to // support variable-length formats (such as timestamps with milliseconds) then we will need // to use regex instead. - val isTimestamp = withResource(lhs.getBase.getCharLengths) { actualLen => - withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => - withResource(actualLen.equalTo(expectedLen)) { lengthOk => - withResource(lhs.getBase.isTimestamp(strfFormat)) { isTimestamp => - isTimestamp.and(lengthOk) + val isTimestamp = if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat)) { + withResource(lhs.getBase.getCharLengths) { actualLen => + withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => + withResource(actualLen.equalTo(expectedLen)) { lengthOk => + withResource(lhs.getBase.isTimestamp(strfFormat)) { isTimestamp => + isTimestamp.and(lengthOk) + } } } } + } else { + ColumnVector.fromScalar(Scalar.fromBool(true), lhs.getRowCount.toInt) } // in addition to date/timestamp strings, we also need to check for special dates and null diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index ac7c45cc96b..77451a97251 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -97,7 +97,7 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { GpuCast.NOW, GpuCast.TODAY, GpuCast.YESTERDAY, - GpuCast.TOMORROW, + GpuCast.TOMORROW ) ++ timestampValues values.toDF("c0") } From 4a8a922c64b9e232960062a77230448ba555ded3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 12:02:30 -0700 Subject: [PATCH 04/18] code cleanup and address more review comments Signed-off-by: Andy Grove --- .../com/nvidia/spark/rapids/DateUtils.scala | 2 ++ .../sql/rapids/datetimeExpressions.scala | 24 ++++++++++--------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index 43087df9dbe..bb882e9c1fd 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -36,6 +36,8 @@ object DateUtils { "MM" -> "%m", "LL" -> "%m", "dd" -> "%d", "mm" -> "%M", "ss" -> "%S", "HH" -> "%H", "yy" -> "%y", "yyyy" -> "%Y", "SSSSSS" -> "%f") + val ONE_SECOND_MICROSECONDS = 1000000 + val ONE_DAY_MICROSECONDS = 86400000000L case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index bf4dc7c5953..e3ff7406809 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -16,7 +16,6 @@ package org.apache.spark.sql.rapids -import java.sql.SQLException import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} @@ -25,8 +24,8 @@ import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.spark.{SPARK_VERSION, SparkUpgradeException} import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.unsafe.types.CalendarInterval @@ -300,7 +299,7 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi case Some(rightLit) => sparkFormat = rightLit if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { - willNotWorkOnGpu("legacyTimeParserPolicy is LEGACY") + willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported") } else if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) || conf.isIncompatEnabled) { strfFormat = DateUtils.toStrf(sparkFormat) @@ -339,7 +338,7 @@ object GpuToTimestamp { abstract class GpuToTimestamp extends GpuBinaryExpression with TimeZoneAwareExpression with ExpectsInputTypes { - def downScaleFactor = 1000000 // MICROS IN SECOND + def downScaleFactor = DateUtils.ONE_SECOND_MICROSECONDS def sparkFormat: String def strfFormat: String @@ -382,12 +381,12 @@ abstract class GpuToTimestamp } } - // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime - // and ToUnixTime and will support parsing a subset of a string so we check the length of - // the string as well which works well for fixed-length formats but if/when we want to - // support variable-length formats (such as timestamps with milliseconds) then we will need - // to use regex instead. val isTimestamp = if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat)) { + // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime + // and ToUnixTime and will support parsing a subset of a string so we check the length of + // the string as well which works well for fixed-length formats but if/when we want to + // support variable-length formats (such as timestamps with milliseconds) then we will need + // to use regex instead. withResource(lhs.getBase.getCharLengths) { actualLen => withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => withResource(actualLen.equalTo(expectedLen)) { lengthOk => @@ -398,6 +397,8 @@ abstract class GpuToTimestamp } } } else { + // this is the incompatibleOps case where we do not guarantee compatibility with Spark + // and assume that all non-null inputs are valid ColumnVector.fromScalar(Scalar.fromBool(true), lhs.getRowCount.toInt) } @@ -418,12 +419,13 @@ abstract class GpuToTimestamp if (timeParserPolicy == ExceptionTimeParserPolicy) { withResource(Scalar.fromBool(false)) { falseScalar => if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { - throw new RuntimeException( + throw new SparkUpgradeException(SPARK_VERSION, s"Expression ${this.getClass.getSimpleName} failed to parse one or " + "more values because they did not match the specified format. Set " + "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + - "LEGACY will force this expression to run on CPU though)") + "LEGACY will force this expression to run on CPU though)", + new RuntimeException("Failed to parse one or more values")) } } } From 6b0e654628d5c1d26294a3667329f7e0d0e7277d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 13:13:40 -0700 Subject: [PATCH 05/18] add specific config option for enabling incompatible date formats on GPU --- docs/configs.md | 1 + .../com/nvidia/spark/rapids/TimeOperatorsSuite.scala | 4 +++- .../scala/com/nvidia/spark/rapids/RapidsConf.scala | 9 +++++++++ .../apache/spark/sql/rapids/datetimeExpressions.scala | 5 +++-- .../com/nvidia/spark/rapids/ParseDateTimeSuite.scala | 10 +++++++++- 5 files changed, 25 insertions(+), 4 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 779401f2603..eedcb89e284 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -71,6 +71,7 @@ Name | Description | Default Value spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows. Because this is not as compatible with spark, we have it disabled by default.|false spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false +spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, setting this to true will force all parsing onto GPU even for formats that can result in incorrect results when parsing invalid inputs.|false spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|false spark.rapids.sql.python.gpu.enabled|This is an experimental feature and is likely to change in the future. Enable (true) or disable (false) support for scheduling Python Pandas UDFs with GPU resources. When enabled, pandas UDFs are assumed to share the same GPU that the RAPIDs accelerator uses and will honor the python GPU configs|false spark.rapids.sql.reader.batchSizeBytes|Soft limit on the maximum number of bytes the reader reads per batch. The readers will read chunks of data until this limit is met or exceeded. Note that the reader may estimate the number of bytes that will be used on the GPU in some cases based on the schema and number of rows in each batch.|2147483647 diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala index 949a78945ad..09958a2c6b4 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/TimeOperatorsSuite.scala @@ -16,6 +16,7 @@ package com.nvidia.spark.rapids +import org.apache.spark.SparkConf import org.apache.spark.sql.functions._ class TimeOperatorsSuite extends SparkQueryCompareTestSuite { @@ -28,7 +29,8 @@ class TimeOperatorsSuite extends SparkQueryCompareTestSuite { } testSparkResultsAreEqual( - "Test from_unixtime with alternative month and two digit year", datesPostEpochDf) { + "Test from_unixtime with alternative month and two digit year", datesPostEpochDf, + conf = new SparkConf().set(RapidsConf.INCOMPATIBLE_DATE_FORMATS.key, "true")) { frame => frame.select(from_unixtime(col("dates"),"dd/LL/yy HH:mm:ss.SSSSSS")) } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index c2da5002904..05e90232982 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -424,6 +424,13 @@ object RapidsConf { .booleanConf .createWithDefault(false) + val INCOMPATIBLE_DATE_FORMATS = conf("spark.rapids.sql.incompatibleDateFormats.enabled") + .doc("When parsing strings as dates and timestamps in functions like unix_timestamp, " + + "setting this to true will force all parsing onto GPU even for formats that can " + + "result in incorrect results when parsing invalid inputs.") + .booleanConf + .createWithDefault(false) + val IMPROVED_FLOAT_OPS = conf("spark.rapids.sql.improvedFloatOps.enabled") .doc("For some floating point operations spark uses one way to compute the value " + "and the underlying cudf implementation can use an improved algorithm. " + @@ -919,6 +926,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS) + lazy val incompatDateFormats: Boolean = get(INCOMPATIBLE_DATE_FORMATS) + lazy val includeImprovedFloat: Boolean = get(IMPROVED_FLOAT_OPS) lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index e3ff7406809..1a30e0ba959 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -301,11 +301,11 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) { willNotWorkOnGpu("legacyTimeParserPolicy LEGACY is not supported") } else if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat) || - conf.isIncompatEnabled) { + conf.incompatDateFormats) { strfFormat = DateUtils.toStrf(sparkFormat) } else { willNotWorkOnGpu(s"incompatible format '$sparkFormat'. Set " + - s"spark.rapids.sql.incompatibleOps.enabled=true to force onto GPU.") + s"spark.rapids.sql.incompatibleDateFormats.enabled=true to force onto GPU.") } case None => willNotWorkOnGpu("format has to be a string literal") @@ -327,6 +327,7 @@ object GpuToTimestamp { /** We are compatible with Spark for these formats */ val COMPATIBLE_FORMATS = Seq( "yyyy-MM-dd", + "dd/MM/yyyy", "yyyy-MM-dd HH:mm:ss" ) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 77451a97251..1113713800d 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -23,12 +23,18 @@ import org.apache.spark.sql.internal.SQLConf class ParseDateTimeSuite extends SparkQueryCompareTestSuite { - testSparkResultsAreEqual("to_date", + testSparkResultsAreEqual("to_date yyyy-MM-dd", datesAsStrings, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd")) } + testSparkResultsAreEqual("to_date dd/MM/yyyy", + datesAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", to_date(col("c0"), "dd/MM/yyyy")) + } + testSparkResultsAreEqual("to_date default pattern", datesAsStrings, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { @@ -119,6 +125,8 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { "1999-12-31 11:59:59.9", "1999-12-31 11:59:59.99", "1999-12-31 11:59:59.999", + "31/12/1999", + "31/12/1999 11:59:59.999", "1999-12-31", "1999-12-31\n", "\t1999-12-31", From 1d15e6be7bf5e9c65928d5c9488f95b1dd4cc4f9 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 13:33:20 -0700 Subject: [PATCH 06/18] update documentation Signed-off-by: Andy Grove --- docs/compatibility.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/compatibility.md b/docs/compatibility.md index 80720517904..e2251e0d4aa 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -212,6 +212,24 @@ window functions like `row_number`, `lead`, and `lag` can produce different resu includes both `-0.0` and `0.0`, or if the ordering is ambiguous. Spark can produce different results from one run to another if the ordering is ambiguous on a window function too. +## Parsing strings as dates or timestamps + +When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`, only a subset +of possible formats is supported on GPU with full compatibility with Spark. The supported formats are: + +- `dd/MM/yyyy` +- `yyyy-MM-dd` +- `yyyy-MM-dd HH:mm:ss` + +Other formats may result in incorrect results and will not run on GPU by default. Some of the specific issues with +other formats are: + +- Spark supports partial microseconds but the plugin does not +- The plugin will produce incorrect results for input data that is not in the correct format in some cases + +To enable all formats on GPU, set +[`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) to `true`. + ## Casting between types In general, performing `cast` and `ansi_cast` operations on the GPU is compatible with the same operations on the CPU. However, there are some exceptions. For this reason, certain casts are disabled on the GPU by default and require configuration options to be specified to enable them. From 900e0ba6f814b460ab8fd85f8d1436e53d7d39d0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 14:22:30 -0700 Subject: [PATCH 07/18] improve docs Signed-off-by: Andy Grove --- docs/compatibility.md | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/compatibility.md b/docs/compatibility.md index e2251e0d4aa..052c0a90e05 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -214,18 +214,20 @@ different results from one run to another if the ordering is ambiguous on a wind ## Parsing strings as dates or timestamps -When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`, only a subset -of possible formats is supported on GPU with full compatibility with Spark. The supported formats are: +When converting strings to dates or timestamps using functions like `to_date` and `unix_timestamp`, +only a subset of possible formats are supported on GPU with full compatibility with Spark. The +supported formats are: - `dd/MM/yyyy` - `yyyy-MM-dd` - `yyyy-MM-dd HH:mm:ss` -Other formats may result in incorrect results and will not run on GPU by default. Some of the specific issues with -other formats are: +Other formats may result in incorrect results and will not run on the GPU by default. Some +specific issues with other formats are: - Spark supports partial microseconds but the plugin does not -- The plugin will produce incorrect results for input data that is not in the correct format in some cases +- The plugin will produce incorrect results for input data that is not in the correct format in +some cases To enable all formats on GPU, set [`spark.rapids.sql.incompatibleDateFormats.enabled`](configs.md#sql.incompatibleDateFormats.enabled) to `true`. From 9021145457d526ec799d78eb532a3d14a385e8af Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Nov 2020 14:24:21 -0700 Subject: [PATCH 08/18] use constants for special dates Signed-off-by: Andy Grove --- .../main/scala/com/nvidia/spark/rapids/GpuCast.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index bba40dcc66c..c1fd0dd382b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -763,11 +763,11 @@ case class GpuCast( val today: Long = cal.getTimeInMillis * 1000 val todayStr = new SimpleDateFormat("yyyy-MM-dd").format(cal.getTime) val specialDates: Map[String, Long] = Map( - "epoch" -> 0, - "now" -> today, - "today" -> today, - "yesterday" -> (today - DateUtils.ONE_DAY_MICROSECONDS), - "tomorrow" -> (today + DateUtils.ONE_DAY_MICROSECONDS) + GpuCast.EPOCH -> 0, + GpuCast.NOW -> today, + GpuCast.TODAY -> today, + GpuCast.YESTERDAY -> (today - DateUtils.ONE_DAY_MICROSECONDS), + GpuCast.TOMORROW -> (today + DateUtils.ONE_DAY_MICROSECONDS) ) var sanitizedInput = input.incRefCount() From 94bfbf99bcac105582b84dc8b151d81ed92fd51a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 10:27:45 -0700 Subject: [PATCH 09/18] Add support for more date formats and remove incompat from to_unix_timestamp Signed-off-by: Andy Grove --- .../src/main/python/date_time_test.py | 19 ++++++++--------- .../nvidia/spark/rapids/GpuOverrides.scala | 4 +--- .../sql/rapids/datetimeExpressions.scala | 3 +++ .../spark/rapids/ParseDateTimeSuite.scala | 21 +++++++++++++++++++ 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 80ecc92a0c7..02e7d4ad3ac 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -160,29 +160,27 @@ def test_dayofyear(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.dayofyear(f.col('a')))) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a')))) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)")) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf) -@incompat #Really only the string is @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf) @@ -190,16 +188,17 @@ def test_to_unix_timestamp_improved(data_gen): (StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'), (ConvertGen(DateGen(nullable=False), lambda d: d.strftime('%Y/%m').zfill(7), data_type=StringType()), 'yyyy/MM')] -@incompat @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - print("date: " + date_form) + print("test_string_to_unix_timestamp date_form: {}".format(date_form)) + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form))) + lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf) -@incompat @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): + print("test_string_unix_timestamp date_form: {}".format(date_form)) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form))) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 6d09e7a7684..4527ed9698d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1075,9 +1075,7 @@ object GpuOverrides { GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat) } } - }) - .incompat("Incorrectly formatted strings and bogus dates produce garbage data" + - " instead of null"), + }), expr[UnixTimestamp]( "Returns the UNIX timestamp of current or specified time", (a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r){ diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 1a30e0ba959..80e98725f11 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -327,6 +327,9 @@ object GpuToTimestamp { /** We are compatible with Spark for these formats */ val COMPATIBLE_FORMATS = Seq( "yyyy-MM-dd", + "yyyy-MM", + "yyyy/MM/dd", + "yyyy/MM", "dd/MM/yyyy", "yyyy-MM-dd HH:mm:ss" ) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 1113713800d..05dfffef2bb 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -47,6 +47,21 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd")) } + testSparkResultsAreEqual("unix_timestamp parse yyyy/MM", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => df.withColumn("c1", unix_timestamp(col("c0"), "yyyy/MM")) + } + + testSparkResultsAreEqual("to_unix_timestamp parse yyyy/MM", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + df => { + df.createOrReplaceTempView("df") + df.sqlContext.sql("SELECT c0, to_unix_timestamp(c0, 'yyyy/MM') FROM df") + } + } + testSparkResultsAreEqual("unix_timestamp parse timestamp", timestampsAsStrings, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { @@ -128,6 +143,12 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { "31/12/1999", "31/12/1999 11:59:59.999", "1999-12-31", + "1999/12/31", + "1999-12", + "1999/12", + "1975/06", + "1975/06/18", + "1975/06/18 06:48:57", "1999-12-31\n", "\t1999-12-31", "\n1999-12-31", From d6c7971ca15064a0f40aa7ce2161de89ceb0c96f Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 10:30:46 -0700 Subject: [PATCH 10/18] remove debug print Signed-off-by: Andy Grove --- integration_tests/src/main/python/date_time_test.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 02e7d4ad3ac..b99b759ddf5 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -190,7 +190,6 @@ def test_to_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - print("test_string_to_unix_timestamp date_form: {}".format(date_form)) conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} assert_gpu_and_cpu_are_equal_collect( @@ -198,7 +197,6 @@ def test_string_to_unix_timestamp(data_gen, date_form): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): - print("test_string_unix_timestamp date_form: {}".format(date_form)) assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form))) From b93fe90c8d015a9202e3531ea3546df2669dce9a Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 10:34:05 -0700 Subject: [PATCH 11/18] Revert unnecessary change Signed-off-by: Andy Grove --- integration_tests/src/main/python/date_time_test.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index b99b759ddf5..897d89c1ff3 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -172,15 +172,13 @@ def test_to_unix_timestamp(data_gen): @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", - "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf) @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", - "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf) @@ -190,8 +188,7 @@ def test_to_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", - "spark.sql.legacy.timeParserPolicy": "EXCEPTION"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf) From df640361366a77d94c40e42a6f93b7dc0fd8c1e0 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 11:45:03 -0700 Subject: [PATCH 12/18] Make ToUnixTimestamp consistent with UnixTimestamp Signed-off-by: Andy Grove --- docs/configs.md | 2 +- .../com/nvidia/spark/rapids/DateUtils.scala | 2 + .../sql/rapids/datetimeExpressions.scala | 172 +++++++++++++----- .../spark/rapids/ParseDateTimeSuite.scala | 10 + 4 files changed, 139 insertions(+), 47 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index eedcb89e284..417055721df 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -220,7 +220,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.TimeSub| |Subtracts interval from timestamp|true|None| spark.rapids.sql.expression.ToDegrees|`degrees`|Converts radians to degrees|true|None| spark.rapids.sql.expression.ToRadians|`radians`|Converts degrees to radians|true|None| -spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|false|This is not 100% compatible with the Spark version because Incorrectly formatted strings and bogus dates produce garbage data instead of null| +spark.rapids.sql.expression.ToUnixTimestamp|`to_unix_timestamp`|Returns the UNIX timestamp of the given time|true|None| spark.rapids.sql.expression.UnaryMinus|`negative`|Negate a numeric value|true|None| spark.rapids.sql.expression.UnaryPositive|`positive`|A numeric value with a + in front of it|true|None| spark.rapids.sql.expression.UnboundedFollowing$| |Special boundary for a window frame, indicating all rows preceding the current row|true|None| diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala index bb882e9c1fd..a452dce42b7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/DateUtils.scala @@ -38,6 +38,8 @@ object DateUtils { val ONE_SECOND_MICROSECONDS = 1000000 + val ONE_DAY_SECONDS = 86400L + val ONE_DAY_MICROSECONDS = 86400000000L case class FormatKeywordToReplace(word: String, startIndex: Int, endIndex: Int) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 80e98725f11..a3ea358e5e2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.rapids import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} +import com.nvidia.spark.RebaseHelper.withResource import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} @@ -333,6 +334,53 @@ object GpuToTimestamp { "dd/MM/yyyy", "yyyy-MM-dd HH:mm:ss" ) + + val specialDatesSceonds = GpuCast.calculateSpecialDates + .map { + case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS) + } + + val specialDatesMicros = GpuCast.calculateSpecialDates + .map { + case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS) + } + + def daysScalarSeconds(name: String): Scalar = { + Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSceonds(name)) + } + + def daysScalarMicros(name: String): Scalar = { + Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDatesMicros(name)) + } + + def daysEqual(col: ColumnVector, name: String): ColumnVector = { + withResource(Scalar.fromString(name)) { scalarName => + col.equalTo(scalarName) + } + } + + def isTimestamp(col: ColumnVector, sparkFormat: String, strfFormat: String) : ColumnVector = { + if (COMPATIBLE_FORMATS.contains(sparkFormat)) { + // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime + // and ToUnixTime and will support parsing a subset of a string so we check the length of + // the string as well which works well for fixed-length formats but if/when we want to + // support variable-length formats (such as timestamps with milliseconds) then we will need + // to use regex instead. + withResource(col.getCharLengths) { actualLen => + withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => + withResource(actualLen.equalTo(expectedLen)) { lengthOk => + withResource(col.isTimestamp(strfFormat)) { isTimestamp => + isTimestamp.and(lengthOk) + } + } + } + } + } else { + // this is the incompatibleOps case where we do not guarantee compatibility with Spark + // and assume that all non-null inputs are valid + ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt) + } + } } /** @@ -342,6 +390,8 @@ object GpuToTimestamp { abstract class GpuToTimestamp extends GpuBinaryExpression with TimeZoneAwareExpression with ExpectsInputTypes { + import GpuToTimestamp._ + def downScaleFactor = DateUtils.ONE_SECOND_MICROSECONDS def sparkFormat: String @@ -370,51 +420,17 @@ abstract class GpuToTimestamp val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - val specialDates = GpuCast.calculateSpecialDates - .map { - case (name, days) => (name, days * DateUtils.ONE_DAY_MICROSECONDS) - } - - def daysScalar(name: String): Scalar = { - Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, specialDates(name)) - } - - def daysEqual(name: String): ColumnVector = { - withResource(Scalar.fromString(name)) { scalarName => - lhs.getBase.equalTo(scalarName) - } - } - - val isTimestamp = if (GpuToTimestamp.COMPATIBLE_FORMATS.contains(sparkFormat)) { - // the cuDF `is_timestamp` function is less restrictive than Spark's behavior for UnixTime - // and ToUnixTime and will support parsing a subset of a string so we check the length of - // the string as well which works well for fixed-length formats but if/when we want to - // support variable-length formats (such as timestamps with milliseconds) then we will need - // to use regex instead. - withResource(lhs.getBase.getCharLengths) { actualLen => - withResource(Scalar.fromInt(sparkFormat.length)) { expectedLen => - withResource(actualLen.equalTo(expectedLen)) { lengthOk => - withResource(lhs.getBase.isTimestamp(strfFormat)) { isTimestamp => - isTimestamp.and(lengthOk) - } - } - } - } - } else { - // this is the incompatibleOps case where we do not guarantee compatibility with Spark - // and assume that all non-null inputs are valid - ColumnVector.fromScalar(Scalar.fromBool(true), lhs.getRowCount.toInt) - } + val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) // in addition to date/timestamp strings, we also need to check for special dates and null // values, since anything else is invalid and should throw an error or be converted to null // depending on the policy withResource(isTimestamp) { isTimestamp => - withResource(daysEqual(GpuCast.EPOCH)) { isEpoch => - withResource(daysEqual(GpuCast.NOW)) { isNow => - withResource(daysEqual(GpuCast.TODAY)) { isToday => - withResource(daysEqual(GpuCast.YESTERDAY)) { isYesterday => - withResource(daysEqual(GpuCast.TOMORROW)) { isTomorrow => + withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => + withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => + withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => withResource(lhs.getBase.isNull) { isNull => val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( isYesterday.or(isTomorrow.or(isNull)))))) @@ -437,11 +453,11 @@ abstract class GpuToTimestamp // do the conversion withResource(Scalar.fromNull(DType.TIMESTAMP_MICROSECONDS)) { nullValue => withResource(lhs.getBase.asTimestampMicroseconds(strfFormat)) { converted => - withResource(daysScalar(GpuCast.EPOCH)) { epoch => - withResource(daysScalar(GpuCast.NOW)) { now => - withResource(daysScalar(GpuCast.TODAY)) { today => - withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => - withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => + withResource(daysScalarMicros(GpuCast.EPOCH)) { epoch => + withResource(daysScalarMicros(GpuCast.NOW)) { now => + withResource(daysScalarMicros(GpuCast.TODAY)) { today => + withResource(daysScalarMicros(GpuCast.YESTERDAY)) { yesterday => + withResource(daysScalarMicros(GpuCast.TOMORROW)) { tomorrow => isTimestamp.ifElse(converted, isEpoch.ifElse(epoch, isNow.ifElse(now, @@ -489,10 +505,74 @@ abstract class GpuToTimestamp * first converting to microseconds */ abstract class GpuToTimestampImproved extends GpuToTimestamp { + import GpuToTimestamp._ + + private val timeParserPolicy = getTimeParserPolicy + override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - lhs.getBase.asTimestampSeconds(strfFormat) + + val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) + + // in addition to date/timestamp strings, we also need to check for special dates and null + // values, since anything else is invalid and should throw an error or be converted to null + // depending on the policy + withResource(isTimestamp) { isTimestamp => + withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => + withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => + withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => + withResource(lhs.getBase.isNull) { isNull => + val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( + isYesterday.or(isTomorrow.or(isNull)))))) + + // throw error if legacyTimeParserPolicy is EXCEPTION + if (timeParserPolicy == ExceptionTimeParserPolicy) { + withResource(Scalar.fromBool(false)) { falseScalar => + if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { + throw new SparkUpgradeException(SPARK_VERSION, + s"Expression ${this.getClass.getSimpleName} failed to parse one or " + + "more values because they did not match the specified format. Set " + + "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + + "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + + "LEGACY will force this expression to run on CPU though)", + new RuntimeException("Failed to parse one or more values")) + } + } + } + + // do the conversion + withResource(Scalar.fromNull(DType.TIMESTAMP_SECONDS)) { nullValue => + withResource(lhs.getBase.asTimestampSeconds(strfFormat)) { converted => + withResource(daysScalarSeconds(GpuCast.EPOCH)) { epoch => + withResource(daysScalarSeconds(GpuCast.NOW)) { now => + withResource(daysScalarSeconds(GpuCast.TODAY)) { today => + withResource(daysScalarSeconds(GpuCast.YESTERDAY)) { yesterday => + withResource(daysScalarSeconds(GpuCast.TOMORROW)) { tomorrow => + isTimestamp.ifElse(converted, + isEpoch.ifElse(epoch, + isNow.ifElse(now, + isToday.ifElse(today, + isYesterday.ifElse(yesterday, + isTomorrow.ifElse(tomorrow, + nullValue)))))) + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } else if (lhs.dataType() == DateType){ lhs.getBase.asTimestampSeconds() } else { // Timestamp diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 05dfffef2bb..3aafc245326 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -62,6 +62,16 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { } } + testSparkResultsAreEqual("to_unix_timestamp parse yyyy/MM (improvedTimeOps)", + timestampsAsStrings, + new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED") + .set(RapidsConf.IMPROVED_TIMESTAMP_OPS.key, "true")) { + df => { + df.createOrReplaceTempView("df") + df.sqlContext.sql("SELECT c0, to_unix_timestamp(c0, 'yyyy/MM') FROM df") + } + } + testSparkResultsAreEqual("unix_timestamp parse timestamp", timestampsAsStrings, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { From dda37b1233357724f9e7254ed5094bcc17334493 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 12:08:58 -0700 Subject: [PATCH 13/18] refactor to remove duplicate code Signed-off-by: Andy Grove --- .../src/main/python/date_time_test.py | 3 +- .../sql/rapids/datetimeExpressions.scala | 217 +++++++----------- 2 files changed, 90 insertions(+), 130 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 897d89c1ff3..e32dd70e3d9 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -188,9 +188,8 @@ def test_to_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf) + lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form))) @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index a3ea358e5e2..531b228c0e9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -19,8 +19,7 @@ package org.apache.spark.sql.rapids import java.time.ZoneId import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} -import com.nvidia.spark.RebaseHelper.withResource -import com.nvidia.spark.rapids.{BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} +import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, ConfKeysAndIncompat, DateUtils, GpuBinaryExpression, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -324,7 +323,7 @@ object LegacyTimeParserPolicy extends TimeParserPolicy object ExceptionTimeParserPolicy extends TimeParserPolicy object CorrectedTimeParserPolicy extends TimeParserPolicy -object GpuToTimestamp { +object GpuToTimestamp extends Arm { /** We are compatible with Spark for these formats */ val COMPATIBLE_FORMATS = Seq( "yyyy-MM-dd", @@ -381,6 +380,76 @@ object GpuToTimestamp { ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt) } } + + def parseStringAsTimestamp( + col: ColumnVector, + sparkFormat: String, + strfFormat: String, + timeParserPolicy: TimeParserPolicy, + dtype: DType, + daysScalar: String => Scalar, + asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = { + + val isTimestamp = GpuToTimestamp.isTimestamp(col, sparkFormat, strfFormat) + + // in addition to date/timestamp strings, we also need to check for special dates and null + // values, since anything else is invalid and should throw an error or be converted to null + // depending on the policy + withResource(isTimestamp) { isTimestamp => + withResource(daysEqual(col, GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(col, GpuCast.NOW)) { isNow => + withResource(daysEqual(col, GpuCast.TODAY)) { isToday => + withResource(daysEqual(col, GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(col, GpuCast.TOMORROW)) { isTomorrow => + withResource(col.isNull) { isNull => + val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( + isYesterday.or(isTomorrow.or(isNull)))))) + + // throw error if legacyTimeParserPolicy is EXCEPTION + if (timeParserPolicy == ExceptionTimeParserPolicy) { + withResource(Scalar.fromBool(false)) { falseScalar => + if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { + throw new SparkUpgradeException(SPARK_VERSION, + s"Expression ${this.getClass.getSimpleName} failed to parse one or " + + "more values because they did not match the specified format. Set " + + "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + + "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + + "LEGACY will force this expression to run on CPU though)", + new RuntimeException("Failed to parse one or more values")) + } + } + } + + // do the conversion + withResource(Scalar.fromNull(dtype)) { nullValue => + withResource(asTimestamp(col, strfFormat)) { converted => + withResource(daysScalar(GpuCast.EPOCH)) { epoch => + withResource(daysScalar(GpuCast.NOW)) { now => + withResource(daysScalar(GpuCast.TODAY)) { today => + withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => + withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => + isTimestamp.ifElse(converted, + isEpoch.ifElse(epoch, + isNow.ifElse(now, + isToday.ifElse(today, + isYesterday.ifElse(yesterday, + isTomorrow.ifElse(tomorrow, + nullValue)))))) + } + } + } + } + } + } + } + } + } + } + } + } + } + } + } } /** @@ -405,7 +474,7 @@ abstract class GpuToTimestamp override lazy val resolved: Boolean = childrenResolved && checkInputDataTypes().isSuccess - private val timeParserPolicy = getTimeParserPolicy + val timeParserPolicy = getTimeParserPolicy override def doColumnar(lhs: GpuColumnVector, rhs: GpuColumnVector): ColumnVector = { throw new IllegalArgumentException("rhs has to be a scalar for the unixtimestamp to work") @@ -419,67 +488,14 @@ abstract class GpuToTimestamp override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - - val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) - - // in addition to date/timestamp strings, we also need to check for special dates and null - // values, since anything else is invalid and should throw an error or be converted to null - // depending on the policy - withResource(isTimestamp) { isTimestamp => - withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => - withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => - withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => - withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => - withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => - withResource(lhs.getBase.isNull) { isNull => - val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( - isYesterday.or(isTomorrow.or(isNull)))))) - - // throw error if legacyTimeParserPolicy is EXCEPTION - if (timeParserPolicy == ExceptionTimeParserPolicy) { - withResource(Scalar.fromBool(false)) { falseScalar => - if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { - throw new SparkUpgradeException(SPARK_VERSION, - s"Expression ${this.getClass.getSimpleName} failed to parse one or " + - "more values because they did not match the specified format. Set " + - "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + - "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + - "LEGACY will force this expression to run on CPU though)", - new RuntimeException("Failed to parse one or more values")) - } - } - } - - // do the conversion - withResource(Scalar.fromNull(DType.TIMESTAMP_MICROSECONDS)) { nullValue => - withResource(lhs.getBase.asTimestampMicroseconds(strfFormat)) { converted => - withResource(daysScalarMicros(GpuCast.EPOCH)) { epoch => - withResource(daysScalarMicros(GpuCast.NOW)) { now => - withResource(daysScalarMicros(GpuCast.TODAY)) { today => - withResource(daysScalarMicros(GpuCast.YESTERDAY)) { yesterday => - withResource(daysScalarMicros(GpuCast.TOMORROW)) { tomorrow => - isTimestamp.ifElse(converted, - isEpoch.ifElse(epoch, - isNow.ifElse(now, - isToday.ifElse(today, - isYesterday.ifElse(yesterday, - isTomorrow.ifElse(tomorrow, - nullValue)))))) - } - } - } - } - } - } - } - } - } - } - } - } - } - } - + parseStringAsTimestamp( + lhs.getBase, + sparkFormat, + strfFormat, + timeParserPolicy, + DType.TIMESTAMP_MICROSECONDS, + daysScalarMicros, + (col, strfFormat) => col.asTimestampMicroseconds(strfFormat)) } else { // Timestamp or DateType lhs.getBase.asTimestampMicroseconds() } @@ -507,72 +523,17 @@ abstract class GpuToTimestamp abstract class GpuToTimestampImproved extends GpuToTimestamp { import GpuToTimestamp._ - private val timeParserPolicy = getTimeParserPolicy - override def doColumnar(lhs: GpuColumnVector, rhs: Scalar): ColumnVector = { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format - - val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) - - // in addition to date/timestamp strings, we also need to check for special dates and null - // values, since anything else is invalid and should throw an error or be converted to null - // depending on the policy - withResource(isTimestamp) { isTimestamp => - withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => - withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => - withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => - withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => - withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => - withResource(lhs.getBase.isNull) { isNull => - val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( - isYesterday.or(isTomorrow.or(isNull)))))) - - // throw error if legacyTimeParserPolicy is EXCEPTION - if (timeParserPolicy == ExceptionTimeParserPolicy) { - withResource(Scalar.fromBool(false)) { falseScalar => - if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { - throw new SparkUpgradeException(SPARK_VERSION, - s"Expression ${this.getClass.getSimpleName} failed to parse one or " + - "more values because they did not match the specified format. Set " + - "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + - "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + - "LEGACY will force this expression to run on CPU though)", - new RuntimeException("Failed to parse one or more values")) - } - } - } - - // do the conversion - withResource(Scalar.fromNull(DType.TIMESTAMP_SECONDS)) { nullValue => - withResource(lhs.getBase.asTimestampSeconds(strfFormat)) { converted => - withResource(daysScalarSeconds(GpuCast.EPOCH)) { epoch => - withResource(daysScalarSeconds(GpuCast.NOW)) { now => - withResource(daysScalarSeconds(GpuCast.TODAY)) { today => - withResource(daysScalarSeconds(GpuCast.YESTERDAY)) { yesterday => - withResource(daysScalarSeconds(GpuCast.TOMORROW)) { tomorrow => - isTimestamp.ifElse(converted, - isEpoch.ifElse(epoch, - isNow.ifElse(now, - isToday.ifElse(today, - isYesterday.ifElse(yesterday, - isTomorrow.ifElse(tomorrow, - nullValue)))))) - } - } - } - } - } - } - } - } - } - } - } - } - } - } - + parseStringAsTimestamp( + lhs.getBase, + sparkFormat, + strfFormat, + timeParserPolicy, + DType.TIMESTAMP_SECONDS, + daysScalarSeconds, + (col, strfFormat) => col.asTimestampSeconds(strfFormat)) } else if (lhs.dataType() == DateType){ lhs.getBase.asTimestampSeconds() } else { // Timestamp From b6545be9c5bd37a87c7bb2e5fffad949a31c0273 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 12:36:13 -0700 Subject: [PATCH 14/18] fix resource leaks and fix regressions in python tests Signed-off-by: Andy Grove --- .../src/main/python/date_time_test.py | 12 ++- .../sql/rapids/datetimeExpressions.scala | 79 ++++++++++++------- 2 files changed, 57 insertions(+), 34 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index e32dd70e3d9..0ac4cc91497 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -172,13 +172,15 @@ def test_to_unix_timestamp(data_gen): @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))), conf) @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", + "spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf) @@ -188,12 +190,14 @@ def test_to_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): + conf = {"spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form))) + lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf) @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): + conf = {"spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form))) + lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)), conf) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 531b228c0e9..20061e87016 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -375,14 +375,15 @@ object GpuToTimestamp extends Arm { } } } else { - // this is the incompatibleOps case where we do not guarantee compatibility with Spark - // and assume that all non-null inputs are valid + // this is the incompatibleDateFormats case where we do not guarantee compatibility with + // Spark and assume that all non-null inputs are valid ColumnVector.fromScalar(Scalar.fromBool(true), col.getRowCount.toInt) } } + // scalastyle:off line.size.limit def parseStringAsTimestamp( - col: ColumnVector, + lhs: GpuColumnVector, sparkFormat: String, strfFormat: String, timeParserPolicy: TimeParserPolicy, @@ -390,51 +391,68 @@ object GpuToTimestamp extends Arm { daysScalar: String => Scalar, asTimestamp: (ColumnVector, String) => ColumnVector): ColumnVector = { - val isTimestamp = GpuToTimestamp.isTimestamp(col, sparkFormat, strfFormat) + val isTimestamp = GpuToTimestamp.isTimestamp(lhs.getBase, sparkFormat, strfFormat) // in addition to date/timestamp strings, we also need to check for special dates and null // values, since anything else is invalid and should throw an error or be converted to null // depending on the policy withResource(isTimestamp) { isTimestamp => - withResource(daysEqual(col, GpuCast.EPOCH)) { isEpoch => - withResource(daysEqual(col, GpuCast.NOW)) { isNow => - withResource(daysEqual(col, GpuCast.TODAY)) { isToday => - withResource(daysEqual(col, GpuCast.YESTERDAY)) { isYesterday => - withResource(daysEqual(col, GpuCast.TOMORROW)) { isTomorrow => - withResource(col.isNull) { isNull => - val canBeConverted = isTimestamp.or(isEpoch.or(isNow.or(isToday.or( - isYesterday.or(isTomorrow.or(isNull)))))) + withResource(daysEqual(lhs.getBase, GpuCast.EPOCH)) { isEpoch => + withResource(daysEqual(lhs.getBase, GpuCast.NOW)) { isNow => + withResource(daysEqual(lhs.getBase, GpuCast.TODAY)) { isToday => + withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => + withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => + withResource(lhs.getBase.isNull) { isNull => // throw error if legacyTimeParserPolicy is EXCEPTION if (timeParserPolicy == ExceptionTimeParserPolicy) { withResource(Scalar.fromBool(false)) { falseScalar => - if (canBeConverted.hasNulls || canBeConverted.contains(falseScalar)) { - throw new SparkUpgradeException(SPARK_VERSION, - s"Expression ${this.getClass.getSimpleName} failed to parse one or " + - "more values because they did not match the specified format. Set " + - "spark.sql.legacy.timeParserPolicy to CORRECTED to return null " + - "for invalid values, or to LEGACY for pre-Spark 3.0.0 behavior (" + - "LEGACY will force this expression to run on CPU though)", - new RuntimeException("Failed to parse one or more values")) + withResource(isTomorrow.or(isNull)) { a => + withResource(isYesterday.or(a)) { b => + withResource(isToday.or(b)) { c => + withResource(isNow.or(c)) { d => + withResource(isEpoch.or(d)) { e => + withResource(isTimestamp.or(e)) { canBeConverted => + if (canBeConverted.hasNulls || + canBeConverted.contains(falseScalar)) { + throw new SparkUpgradeException(SPARK_VERSION, + s"Expression ${this.getClass.getSimpleName} failed to " + + "parse one or more values because they did not match " + + "the specified format. Set " + + "spark.sql.legacy.timeParserPolicy to CORRECTED to " + + "return null for invalid values, or to LEGACY for " + + "pre-Spark 3.0.0 behavior (LEGACY will force this " + + "expression to run on CPU though)", + new RuntimeException("Failed to parse one or more values")) + } + } + } + } + } + } } } } // do the conversion withResource(Scalar.fromNull(dtype)) { nullValue => - withResource(asTimestamp(col, strfFormat)) { converted => + withResource(asTimestamp(lhs.getBase, strfFormat)) { converted => withResource(daysScalar(GpuCast.EPOCH)) { epoch => withResource(daysScalar(GpuCast.NOW)) { now => withResource(daysScalar(GpuCast.TODAY)) { today => withResource(daysScalar(GpuCast.YESTERDAY)) { yesterday => withResource(daysScalar(GpuCast.TOMORROW)) { tomorrow => - isTimestamp.ifElse(converted, - isEpoch.ifElse(epoch, - isNow.ifElse(now, - isToday.ifElse(today, - isYesterday.ifElse(yesterday, - isTomorrow.ifElse(tomorrow, - nullValue)))))) + withResource(isTomorrow.ifElse(tomorrow, nullValue)) { a => + withResource(isYesterday.ifElse(yesterday, a)) { b => + withResource(isToday.ifElse(today, b)) { c => + withResource(isNow.ifElse(now, c)) { d => + withResource(isEpoch.ifElse(epoch, d)) { e => + isTimestamp.ifElse(converted, e) + } + } + } + } + } } } } @@ -450,6 +468,7 @@ object GpuToTimestamp extends Arm { } } } + // scalastyle:on line.size.limit } /** @@ -489,7 +508,7 @@ abstract class GpuToTimestamp val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format parseStringAsTimestamp( - lhs.getBase, + lhs, sparkFormat, strfFormat, timeParserPolicy, @@ -527,7 +546,7 @@ abstract class GpuToTimestampImproved extends GpuToTimestamp { val tmp = if (lhs.dataType == StringType) { // rhs is ignored we already parsed the format parseStringAsTimestamp( - lhs.getBase, + lhs, sparkFormat, strfFormat, timeParserPolicy, From ddf154a32e8040665da1bc91939a5aee9f9823c3 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 12:53:23 -0700 Subject: [PATCH 15/18] scalstyle Signed-off-by: Andy Grove --- .../scala/org/apache/spark/sql/rapids/datetimeExpressions.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 20061e87016..24b30a68756 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -381,7 +381,6 @@ object GpuToTimestamp extends Arm { } } - // scalastyle:off line.size.limit def parseStringAsTimestamp( lhs: GpuColumnVector, sparkFormat: String, @@ -468,7 +467,6 @@ object GpuToTimestamp extends Arm { } } } - // scalastyle:on line.size.limit } /** From 85cfef7f6351735f5b20fb4a1c9f01bf37864f2d Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 12:54:59 -0700 Subject: [PATCH 16/18] update docs Signed-off-by: Andy Grove --- docs/compatibility.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/compatibility.md b/docs/compatibility.md index 052c0a90e05..f8ebbc745b9 100644 --- a/docs/compatibility.md +++ b/docs/compatibility.md @@ -219,6 +219,9 @@ only a subset of possible formats are supported on GPU with full compatibility w supported formats are: - `dd/MM/yyyy` +- `yyyy/MM` +- `yyyy/MM/dd` +- `yyyy-MM` - `yyyy-MM-dd` - `yyyy-MM-dd HH:mm:ss` From 30aa7657267eabd4c8ebe67b8e5dd966958fd554 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Tue, 17 Nov 2020 14:49:50 -0700 Subject: [PATCH 17/18] fix error in handling of legacyTimeParserPolicy=EXCEPTION Signed-off-by: Andy Grove --- .../src/main/python/date_time_test.py | 9 ++--- .../sql/rapids/datetimeExpressions.scala | 36 ++----------------- .../spark/rapids/ParseDateTimeSuite.scala | 12 ------- 3 files changed, 5 insertions(+), 52 deletions(-) diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 0ac4cc91497..a7e7405844d 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -179,8 +179,7 @@ def test_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn) def test_to_unix_timestamp_improved(data_gen): - conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true", - "spark.sql.legacy.timeParserPolicy": "CORRECTED"} + conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"} assert_gpu_and_cpu_are_equal_collect( lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"), conf) @@ -190,14 +189,12 @@ def test_to_unix_timestamp_improved(data_gen): @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_to_unix_timestamp(data_gen, date_form): - conf = {"spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form)), conf) + lambda spark : unary_op_df(spark, data_gen, seed=1).selectExpr("to_unix_timestamp(a, '{}')".format(date_form))) @pytest.mark.parametrize('data_gen,date_form', str_date_and_format_gen, ids=idfn) def test_string_unix_timestamp(data_gen, date_form): - conf = {"spark.sql.legacy.timeParserPolicy": "CORRECTED"} assert_gpu_and_cpu_are_equal_collect( - lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form)), conf) + lambda spark : unary_op_df(spark, data_gen, seed=1).select(f.unix_timestamp(f.col('a'), date_form))) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 24b30a68756..0db90cca7af 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -334,7 +334,7 @@ object GpuToTimestamp extends Arm { "yyyy-MM-dd HH:mm:ss" ) - val specialDatesSceonds = GpuCast.calculateSpecialDates + val specialDatesSeconds = GpuCast.calculateSpecialDates .map { case (name, days) => (name, days * DateUtils.ONE_DAY_SECONDS) } @@ -345,7 +345,7 @@ object GpuToTimestamp extends Arm { } def daysScalarSeconds(name: String): Scalar = { - Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSceonds(name)) + Scalar.timestampFromLong(DType.TIMESTAMP_SECONDS, specialDatesSeconds(name)) } def daysScalarMicros(name: String): Scalar = { @@ -402,38 +402,6 @@ object GpuToTimestamp extends Arm { withResource(daysEqual(lhs.getBase, GpuCast.YESTERDAY)) { isYesterday => withResource(daysEqual(lhs.getBase, GpuCast.TOMORROW)) { isTomorrow => withResource(lhs.getBase.isNull) { isNull => - - // throw error if legacyTimeParserPolicy is EXCEPTION - if (timeParserPolicy == ExceptionTimeParserPolicy) { - withResource(Scalar.fromBool(false)) { falseScalar => - withResource(isTomorrow.or(isNull)) { a => - withResource(isYesterday.or(a)) { b => - withResource(isToday.or(b)) { c => - withResource(isNow.or(c)) { d => - withResource(isEpoch.or(d)) { e => - withResource(isTimestamp.or(e)) { canBeConverted => - if (canBeConverted.hasNulls || - canBeConverted.contains(falseScalar)) { - throw new SparkUpgradeException(SPARK_VERSION, - s"Expression ${this.getClass.getSimpleName} failed to " + - "parse one or more values because they did not match " + - "the specified format. Set " + - "spark.sql.legacy.timeParserPolicy to CORRECTED to " + - "return null for invalid values, or to LEGACY for " + - "pre-Spark 3.0.0 behavior (LEGACY will force this " + - "expression to run on CPU though)", - new RuntimeException("Failed to parse one or more values")) - } - } - } - } - } - } - } - } - } - - // do the conversion withResource(Scalar.fromNull(dtype)) { nullValue => withResource(asTimestamp(lhs.getBase, strfFormat)) { converted => withResource(daysScalar(GpuCast.EPOCH)) { epoch => diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 3aafc245326..80bd7d72352 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -91,18 +91,6 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite { df => df.withColumn("c1", unix_timestamp(col("c0"))) } - test("fail to parse when policy is EXCEPTION") { - val e = intercept[SparkException] { - val df = withGpuSparkSession(spark => { - timestampsAsStrings(spark) - .repartition(2) - .withColumn("c1", unix_timestamp(col("c0"), "yyyy-MM-dd HH:mm:ss")) - }, new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "EXCEPTION")) - df.collect() - } - assert(e.getMessage.contains("failed to parse one or more values")) - } - test("fall back to CPU when policy is LEGACY") { val e = intercept[IllegalArgumentException] { val df = withGpuSparkSession(spark => { From dbafb3af9b1683ec06ac5c5afef15c0ba8fc1a63 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 18 Nov 2020 08:11:50 -0700 Subject: [PATCH 18/18] fix test failures against Spark 3.1.0 Signed-off-by: Andy Grove --- .../spark/rapids/ParseDateTimeSuite.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala index 80bd7d72352..558270de290 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParseDateTimeSuite.scala @@ -23,15 +23,28 @@ import org.apache.spark.sql.internal.SQLConf class ParseDateTimeSuite extends SparkQueryCompareTestSuite { + val execsAllowedNonGpu = ShimLoader.getSparkShims.getSparkShimVersion match { + case SparkShimVersion(3, 1, _) => + // The behavior has changed in Spark 3.1.0 and `to_date` gets translated to + // `cast(gettimestamp(c0#20108, yyyy-MM-dd, Some(UTC)) as date)` and we do + // not currently support `gettimestamp` on GPU + // https://github.com/NVIDIA/spark-rapids/issues/1157 + Seq("ProjectExec,Alias,Cast,GetTimestamp,Literal") + case _ => + Seq.empty + } + testSparkResultsAreEqual("to_date yyyy-MM-dd", datesAsStrings, - new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + conf = new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED"), + execsAllowedNonGpu = execsAllowedNonGpu) { df => df.withColumn("c1", to_date(col("c0"), "yyyy-MM-dd")) } testSparkResultsAreEqual("to_date dd/MM/yyyy", datesAsStrings, - new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")) { + conf = new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED"), + execsAllowedNonGpu = execsAllowedNonGpu) { df => df.withColumn("c1", to_date(col("c0"), "dd/MM/yyyy")) }