From 3b56772449eb85ca2b659b3c395c0dbfb65cd80f Mon Sep 17 00:00:00 2001 From: Niranjan Artal <50492963+nartal1@users.noreply.github.com> Date: Thu, 4 Mar 2021 16:24:32 -0800 Subject: [PATCH] Add in support for DateAddInterval (#1841) Signed-off-by: Niranjan Artal --- docs/configs.md | 1 + docs/supported_ops.md | 132 ++++++++++++++++++ .../src/main/python/date_time_test.py | 8 ++ .../nvidia/spark/rapids/GpuOverrides.scala | 25 ++++ .../sql/rapids/datetimeExpressions.scala | 67 ++++++++- 5 files changed, 228 insertions(+), 5 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 1cfd46ab609..3959eea23ec 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -139,6 +139,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values|true|None| spark.rapids.sql.expression.CurrentRow$| |Special boundary for a window frame, indicating stopping at the current row|true|None| spark.rapids.sql.expression.DateAdd|`date_add`|Returns the date that is num_days after start_date|true|None| +spark.rapids.sql.expression.DateAddInterval| |Adds interval to date|true|None| spark.rapids.sql.expression.DateDiff|`datediff`|Returns the number of days from startDate to endDate|true|None| spark.rapids.sql.expression.DateSub|`date_sub`|Returns the date that is num_days before start_date|true|None| spark.rapids.sql.expression.DayOfMonth|`dayofmonth`, `day`|Returns the day of the month from a date or timestamp|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f18b65ffb8c..ecd8573bdf5 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -4042,6 +4042,138 @@ Accelerator support is described below. +DateAddInterval + +Adds interval to date +None +project +start + + + + + + + +S + + + + + + + + + + + + +interval + + + + + + + + + + + + + +PS (month intervals are not supported; Literal value only) + + + + + + +result + + + + + + + +S + + + + + + + + + + + + +lambda +start + + + + + + + +NS + + + + + + + + + + + + +interval + + + + + + + + + + + + + +NS + + + + + + +result + + + + + + + +NS + + + + + + + + + + + + DateDiff `datediff` Returns the number of days from startDate to endDate diff --git a/integration_tests/src/main/python/date_time_test.py b/integration_tests/src/main/python/date_time_test.py index 4ac34b608aa..a2a21643aaf 100644 --- a/integration_tests/src/main/python/date_time_test.py +++ b/integration_tests/src/main/python/date_time_test.py @@ -41,6 +41,14 @@ def test_timeadd(data_gen): lambda spark: unary_op_df(spark, TimestampGen(start=datetime(5, 1, 1, tzinfo=timezone.utc), end=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1) .selectExpr("a + (interval {} days {} seconds)".format(days, seconds))) +@pytest.mark.parametrize('data_gen', vals, ids=idfn) +def test_dateaddinterval(data_gen): + days, seconds = data_gen + assert_gpu_and_cpu_are_equal_collect( + lambda spark : unary_op_df(spark, DateGen(start=date(200, 1, 1), end=date(800, 1, 1)), seed=1) + .selectExpr('a + (interval {} days {} seconds)'.format(days, seconds), + 'a - (interval {} days {} seconds)'.format(days, seconds))) + @pytest.mark.parametrize('data_gen', date_gens, ids=idfn) def test_datediff(data_gen): assert_gpu_and_cpu_are_equal_collect( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 4ee26765271..989ee7c09ef 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1330,6 +1330,31 @@ object GpuOverrides { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuTimeAdd(lhs, rhs) }), + expr[DateAddInterval]( + "Adds interval to date", + ExprChecks.binaryProjectNotLambda(TypeSig.DATE, TypeSig.DATE, + ("start", TypeSig.DATE, TypeSig.DATE), + ("interval", TypeSig.lit(TypeEnum.CALENDAR) + .withPsNote(TypeEnum.CALENDAR, "month intervals are not supported"), + TypeSig.CALENDAR)), + (a, conf, p, r) => new BinaryExprMeta[DateAddInterval](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + GpuOverrides.extractLit(a.interval).foreach { lit => + val intvl = lit.value.asInstanceOf[CalendarInterval] + if (intvl.months != 0) { + willNotWorkOnGpu("interval months isn't supported") + } + } + a.timeZoneId.foreach { + case zoneId if ZoneId.of(zoneId).normalized() != GpuOverrides.UTC_TIMEZONE_ID => + willNotWorkOnGpu(s"Only UTC zone id is supported. Actual zone id: $zoneId") + case _ => + } + } + + override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = + GpuDateAddInterval(lhs, rhs) + }), expr[ToUnixTimestamp]( "Returns the UNIX timestamp of the given time", ExprChecks.binaryProjectNotLambda(TypeSig.LONG, TypeSig.LONG, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 37013f38aab..fbeeb4f7ac8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.rapids import java.time.ZoneId +import java.util.concurrent.TimeUnit -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar} +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar} import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta} import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy} @@ -149,7 +150,7 @@ abstract class GpuTimeMath( val usToSub = intvl.days.toLong * 24 * 60 * 60 * 1000 * 1000 + intvl.microseconds if (usToSub != 0) { withResource(Scalar.fromLong(usToSub)) { us_s => - withResource(l.getBase.castTo(DType.INT64)) { us => + withResource(l.getBase.logicalCastTo(DType.INT64)) { us => withResource(intervalMath(us_s, us)) { longResult => GpuColumnVector.from(longResult.castTo(DType.TIMESTAMP_MICROSECONDS), dataType) } @@ -172,7 +173,7 @@ abstract class GpuTimeMath( } } - def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector + def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector } case class GpuTimeAdd(start: Expression, @@ -184,7 +185,7 @@ case class GpuTimeAdd(start: Expression, copy(timeZoneId = Option(timeZoneId)) } - override def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector = { + override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { us.add(us_s) } } @@ -198,11 +199,67 @@ case class GpuTimeSub(start: Expression, copy(timeZoneId = Option(timeZoneId)) } - def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector = { + override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { us.sub(us_s) } } +case class GpuDateAddInterval(start: Expression, + interval: Expression, + timeZoneId: Option[String] = None) + extends GpuTimeMath(start, interval, timeZoneId) { + + override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = { + copy(timeZoneId = Option(timeZoneId)) + } + + override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = { + us.add(us_s) + } + + override def inputTypes: Seq[AbstractDataType] = Seq(DateType, CalendarIntervalType) + + override def dataType: DataType = DateType + + override def columnarEval(batch: ColumnarBatch): Any = { + + withResourceIfAllowed(left.columnarEval(batch)) { lhs => + withResourceIfAllowed(right.columnarEval(batch)) { rhs => + (lhs, rhs) match { + case (l: GpuColumnVector, intvl: CalendarInterval) => + if (intvl.months != 0) { + throw new UnsupportedOperationException("Months aren't supported at the moment") + } + val microSecondsInOneDay = TimeUnit.DAYS.toMicros(1) + val microSecToDays = if (intvl.microseconds < 0) { + // This is to calculate when subtraction is performed. Need to take into account the + // interval( which are less than days). Convert it into days which needs to be + // subtracted along with intvl.days(if provided). + (intvl.microseconds.abs.toDouble / microSecondsInOneDay).ceil.toInt * -1 + } else { + (intvl.microseconds.toDouble / microSecondsInOneDay).toInt + } + val daysToAdd = intvl.days + microSecToDays + if (daysToAdd != 0) { + withResource(Scalar.fromInt(daysToAdd)) { us_s => + withResource(l.getBase.logicalCastTo(DType.INT32)) { us => + withResource(intervalMath(us_s, us)) { intResult => + GpuColumnVector.from(intResult.castTo(DType.TIMESTAMP_DAYS), dataType) + } + } + } + } else { + l.incRefCount() + } + case _ => + throw new UnsupportedOperationException("GpuDateAddInterval takes column and " + + "interval as an argument only") + } + } + } + } +} + case class GpuDateDiff(endDate: Expression, startDate: Expression) extends GpuBinaryExpression with ImplicitCastInputTypes {