Skip to content

Commit

Permalink
Add in support for DateAddInterval (NVIDIA#1841)
Browse files Browse the repository at this point in the history
Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Mar 5, 2021
1 parent de3db66 commit 3b56772
Show file tree
Hide file tree
Showing 5 changed files with 228 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.CreateNamedStruct"></a>spark.rapids.sql.expression.CreateNamedStruct|`named_struct`, `struct`|Creates a struct with the given field names and values|true|None|
<a name="sql.expression.CurrentRow$"></a>spark.rapids.sql.expression.CurrentRow$| |Special boundary for a window frame, indicating stopping at the current row|true|None|
<a name="sql.expression.DateAdd"></a>spark.rapids.sql.expression.DateAdd|`date_add`|Returns the date that is num_days after start_date|true|None|
<a name="sql.expression.DateAddInterval"></a>spark.rapids.sql.expression.DateAddInterval| |Adds interval to date|true|None|
<a name="sql.expression.DateDiff"></a>spark.rapids.sql.expression.DateDiff|`datediff`|Returns the number of days from startDate to endDate|true|None|
<a name="sql.expression.DateSub"></a>spark.rapids.sql.expression.DateSub|`date_sub`|Returns the date that is num_days before start_date|true|None|
<a name="sql.expression.DayOfMonth"></a>spark.rapids.sql.expression.DayOfMonth|`dayofmonth`, `day`|Returns the day of the month from a date or timestamp|true|None|
Expand Down
132 changes: 132 additions & 0 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -4042,6 +4042,138 @@ Accelerator support is described below.
<td> </td>
</tr>
<tr>
<td rowSpan="6">DateAddInterval</td>
<td rowSpan="6"> </td>
<td rowSpan="6">Adds interval to date</td>
<td rowSpan="6">None</td>
<td rowSpan="3">project</td>
<td>start</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>interval</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><em>PS (month intervals are not supported; Literal value only)</em></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td>S</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="3">lambda</td>
<td>start</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>interval</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td>result</td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td><b>NS</b></td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
<td> </td>
</tr>
<tr>
<td rowSpan="6">DateDiff</td>
<td rowSpan="6">`datediff`</td>
<td rowSpan="6">Returns the number of days from startDate to endDate</td>
Expand Down
8 changes: 8 additions & 0 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ def test_timeadd(data_gen):
lambda spark: unary_op_df(spark, TimestampGen(start=datetime(5, 1, 1, tzinfo=timezone.utc), end=datetime(15, 1, 1, tzinfo=timezone.utc)), seed=1)
.selectExpr("a + (interval {} days {} seconds)".format(days, seconds)))

@pytest.mark.parametrize('data_gen', vals, ids=idfn)
def test_dateaddinterval(data_gen):
days, seconds = data_gen
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, DateGen(start=date(200, 1, 1), end=date(800, 1, 1)), seed=1)
.selectExpr('a + (interval {} days {} seconds)'.format(days, seconds),
'a - (interval {} days {} seconds)'.format(days, seconds)))

@pytest.mark.parametrize('data_gen', date_gens, ids=idfn)
def test_datediff(data_gen):
assert_gpu_and_cpu_are_equal_collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,6 +1330,31 @@ object GpuOverrides {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuTimeAdd(lhs, rhs)
}),
expr[DateAddInterval](
"Adds interval to date",
ExprChecks.binaryProjectNotLambda(TypeSig.DATE, TypeSig.DATE,
("start", TypeSig.DATE, TypeSig.DATE),
("interval", TypeSig.lit(TypeEnum.CALENDAR)
.withPsNote(TypeEnum.CALENDAR, "month intervals are not supported"),
TypeSig.CALENDAR)),
(a, conf, p, r) => new BinaryExprMeta[DateAddInterval](a, conf, p, r) {
override def tagExprForGpu(): Unit = {
GpuOverrides.extractLit(a.interval).foreach { lit =>
val intvl = lit.value.asInstanceOf[CalendarInterval]
if (intvl.months != 0) {
willNotWorkOnGpu("interval months isn't supported")
}
}
a.timeZoneId.foreach {
case zoneId if ZoneId.of(zoneId).normalized() != GpuOverrides.UTC_TIMEZONE_ID =>
willNotWorkOnGpu(s"Only UTC zone id is supported. Actual zone id: $zoneId")
case _ =>
}
}

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuDateAddInterval(lhs, rhs)
}),
expr[ToUnixTimestamp](
"Returns the UNIX timestamp of the given time",
ExprChecks.binaryProjectNotLambda(TypeSig.LONG, TypeSig.LONG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package org.apache.spark.sql.rapids

import java.time.ZoneId
import java.util.concurrent.TimeUnit

import ai.rapids.cudf.{BinaryOp, ColumnVector, DType, Scalar}
import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, DType, Scalar}
import com.nvidia.spark.rapids.{Arm, BinaryExprMeta, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.DateUtils.TimestampFormatConversionException
import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy}
Expand Down Expand Up @@ -149,7 +150,7 @@ abstract class GpuTimeMath(
val usToSub = intvl.days.toLong * 24 * 60 * 60 * 1000 * 1000 + intvl.microseconds
if (usToSub != 0) {
withResource(Scalar.fromLong(usToSub)) { us_s =>
withResource(l.getBase.castTo(DType.INT64)) { us =>
withResource(l.getBase.logicalCastTo(DType.INT64)) { us =>
withResource(intervalMath(us_s, us)) { longResult =>
GpuColumnVector.from(longResult.castTo(DType.TIMESTAMP_MICROSECONDS), dataType)
}
Expand All @@ -172,7 +173,7 @@ abstract class GpuTimeMath(
}
}

def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector
def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector
}

case class GpuTimeAdd(start: Expression,
Expand All @@ -184,7 +185,7 @@ case class GpuTimeAdd(start: Expression,
copy(timeZoneId = Option(timeZoneId))
}

override def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector = {
override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = {
us.add(us_s)
}
}
Expand All @@ -198,11 +199,67 @@ case class GpuTimeSub(start: Expression,
copy(timeZoneId = Option(timeZoneId))
}

def intervalMath(us_s: Scalar, us: ColumnVector): ColumnVector = {
override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = {
us.sub(us_s)
}
}

case class GpuDateAddInterval(start: Expression,
interval: Expression,
timeZoneId: Option[String] = None)
extends GpuTimeMath(start, interval, timeZoneId) {

override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
}

override def intervalMath(us_s: Scalar, us: ColumnView): ColumnVector = {
us.add(us_s)
}

override def inputTypes: Seq[AbstractDataType] = Seq(DateType, CalendarIntervalType)

override def dataType: DataType = DateType

override def columnarEval(batch: ColumnarBatch): Any = {

withResourceIfAllowed(left.columnarEval(batch)) { lhs =>
withResourceIfAllowed(right.columnarEval(batch)) { rhs =>
(lhs, rhs) match {
case (l: GpuColumnVector, intvl: CalendarInterval) =>
if (intvl.months != 0) {
throw new UnsupportedOperationException("Months aren't supported at the moment")
}
val microSecondsInOneDay = TimeUnit.DAYS.toMicros(1)
val microSecToDays = if (intvl.microseconds < 0) {
// This is to calculate when subtraction is performed. Need to take into account the
// interval( which are less than days). Convert it into days which needs to be
// subtracted along with intvl.days(if provided).
(intvl.microseconds.abs.toDouble / microSecondsInOneDay).ceil.toInt * -1
} else {
(intvl.microseconds.toDouble / microSecondsInOneDay).toInt
}
val daysToAdd = intvl.days + microSecToDays
if (daysToAdd != 0) {
withResource(Scalar.fromInt(daysToAdd)) { us_s =>
withResource(l.getBase.logicalCastTo(DType.INT32)) { us =>
withResource(intervalMath(us_s, us)) { intResult =>
GpuColumnVector.from(intResult.castTo(DType.TIMESTAMP_DAYS), dataType)
}
}
}
} else {
l.incRefCount()
}
case _ =>
throw new UnsupportedOperationException("GpuDateAddInterval takes column and " +
"interval as an argument only")
}
}
}
}
}

case class GpuDateDiff(endDate: Expression, startDate: Expression)
extends GpuBinaryExpression with ImplicitCastInputTypes {

Expand Down

0 comments on commit 3b56772

Please sign in to comment.