Skip to content

Commit

Permalink
Add in support for the SQL functions Least and Greatest (#1037)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Oct 29, 2020
1 parent c923bde commit a855df7
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 1 deletion.
2 changes: 2 additions & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.GetMapValue"></a>spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None|
<a name="sql.expression.GreaterThan"></a>spark.rapids.sql.expression.GreaterThan|`>`|> operator|true|None|
<a name="sql.expression.GreaterThanOrEqual"></a>spark.rapids.sql.expression.GreaterThanOrEqual|`>=`|>= operator|true|None|
<a name="sql.expression.Greatest"></a>spark.rapids.sql.expression.Greatest|`greatest`|Returns the greatest value of all parameters, skipping null values|true|None|
<a name="sql.expression.Hour"></a>spark.rapids.sql.expression.Hour|`hour`|Returns the hour component of the string/timestamp|true|None|
<a name="sql.expression.If"></a>spark.rapids.sql.expression.If|`if`|IF expression|true|None|
<a name="sql.expression.In"></a>spark.rapids.sql.expression.In|`in`|IN operator|true|None|
Expand All @@ -160,6 +161,7 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.Lag"></a>spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None|
<a name="sql.expression.LastDay"></a>spark.rapids.sql.expression.LastDay|`last_day`|Returns the last day of the month which the date belongs to|true|None|
<a name="sql.expression.Lead"></a>spark.rapids.sql.expression.Lead|`lead`|Window function that returns N entries ahead of this one|true|None|
<a name="sql.expression.Least"></a>spark.rapids.sql.expression.Least|`least`|Returns the least value of all parameters, skipping null values|true|None|
<a name="sql.expression.Length"></a>spark.rapids.sql.expression.Length|`length`, `character_length`, `char_length`|String character length|true|None|
<a name="sql.expression.LessThan"></a>spark.rapids.sql.expression.LessThan|`<`|< operator|true|None|
<a name="sql.expression.LessThanOrEqual"></a>spark.rapids.sql.expression.LessThanOrEqual|`<=`|<= operator|true|None|
Expand Down
30 changes: 30 additions & 0 deletions integration_tests/src/main/python/arithmetic_ops_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,3 +427,33 @@ def test_scalar_pow():
def test_columnar_pow(data_gen):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : binary_op_df(spark, data_gen).selectExpr('pow(a, b)'))

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_least(data_gen):
num_cols = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
# we want lots of nulls
gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0))
for x in range(0, num_cols)], nullable=False)

command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)]
command_args.append(s1)
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).select(
f.least(*command_args)))

@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn)
def test_greatest(data_gen):
num_cols = 20
s1 = gen_scalar(data_gen, force_no_nulls=True)
# we want lots of nulls
gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0))
for x in range(0, num_cols)], nullable=False)
command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)]
command_args.append(s1)
data_type = data_gen.data_type
assert_gpu_and_cpu_are_equal_collect(
lambda spark : gen_df(spark, gen).select(
f.greatest(*command_args)))

Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,18 @@ object GpuOverrides {
override def convertToGpu(): GpuExpression = GpuCoalesce(childExprs.map(_.convertToGpu()))
}
),
expr[Least] (
"Returns the least value of all parameters, skipping null values",
(a, conf, p, r) => new ExprMeta[Least](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuLeast(childExprs.map(_.convertToGpu()))
}
),
expr[Greatest] (
"Returns the greatest value of all parameters, skipping null values",
(a, conf, p, r) => new ExprMeta[Greatest](a, conf, p, r) {
override def convertToGpu(): GpuExpression = GpuGreatest(childExprs.map(_.convertToGpu()))
}
),
expr[Atan](
"Inverse tangent",
(a, conf, p, r) => new UnaryExprMeta[Atan](a, conf, p, r) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ package org.apache.spark.sql.rapids

import ai.rapids.cudf._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.RapidsPluginImplicits._

import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant}
import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion}
import org.apache.spark.sql.catalyst.expressions.{ComplexTypeMergingExpression, ExpectsInputTypes, Expression, NullIntolerant}
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuUnaryMinus(child: Expression) extends GpuUnaryExpression
with ExpectsInputTypes with NullIntolerant {
Expand Down Expand Up @@ -225,3 +229,200 @@ case class GpuPmod(left: Expression, right: Expression) extends GpuDivModLike {

override def dataType: DataType = left.dataType
}

trait GpuGreatestLeastBase extends ComplexTypeMergingExpression with GpuExpression {
override def nullable: Boolean = children.forall(_.nullable)
override def foldable: Boolean = children.forall(_.foldable)

/**
* The binary operation that should be performed when combining two values together.
*/
def binaryOp: BinaryOp

/**
* In the case of floating point values should NaN win and become the output if NaN is
* the value for either input, or lose and not be the output unless the other choice is
* null.
*/
def shouldNanWin: Boolean

private[this] def isFp = dataType == FloatType || dataType == DoubleType
// TODO need a better way to do this for nested types
protected lazy val dtype: DType = GpuColumnVector.getRapidsType(dataType)

override def checkInputDataTypes(): TypeCheckResult = {
if (children.length <= 1) {
TypeCheckResult.TypeCheckFailure(
s"input to function $prettyName requires at least two arguments")
} else if (!TypeCoercion.haveSameType(inputTypesForMerging)) {
TypeCheckResult.TypeCheckFailure(
s"The expressions should all have the same type," +
s" got LEAST(${children.map(_.dataType.catalogString).mkString(", ")}).")
} else {
TypeUtils.checkForOrderingExpr(dataType, s"function $prettyName")
}
}

/**
* Convert the input into either a ColumnVector or a Scalar
* @param a what to convert
* @param expandScalar if we get a scalar should we expand it out to a ColumnVector to avoid
* scalar scalar math.
* @param rows If we expand a scalar how many rows should we do?
* @return the resulting ColumnVector or Scalar
*/
private[this] def convertAndCloseIfNeeded(
a: Any,
expandScalar: Boolean,
rows: Int): AutoCloseable =
a match {
case gcv: GpuColumnVector => gcv.getBase
case cv: ColumnVector => cv
case s: Scalar =>
if (expandScalar) {
withResource(s) { s =>
ColumnVector.fromScalar(s, rows)
}
} else {
s
}
case a =>
if (expandScalar) {
withResource(GpuScalar.from(a, dataType)) { s =>
ColumnVector.fromScalar(s, rows)
}
} else {
GpuScalar.from(a, dataType)
}
}

/**
* Take 2 inputs that are either a Scalar or a ColumnVector and combine them with the correct
* operator. This will blow up if both of the values are scalars though.
* @param r first value
* @param c second value
* @return the combined value
*/
private[this] def combineButNoClose(r: Any, c: Any): Any = (r, c) match {
case (r: ColumnVector, c: ColumnVector) =>
r.binaryOp(binaryOp, c, dtype)
case (r: ColumnVector, c: Scalar) =>
r.binaryOp(binaryOp, c, dtype)
case (r: Scalar, c: ColumnVector) =>
r.binaryOp(binaryOp, c, dtype)
}

private[this] def makeNanWin(checkForNans: ColumnVector, result: ColumnVector): ColumnVector = {
withResource(checkForNans.isNan) { shouldReplace =>
shouldReplace.ifElse(checkForNans, result)
}
}

private[this] def makeNanWin(checkForNans: Scalar, result: ColumnVector): ColumnVector = {
if (GpuScalar.isNan(checkForNans)) {
ColumnVector.fromScalar(checkForNans, result.getRowCount.toInt)
} else {
result.incRefCount()
}
}

private[this] def makeNanLose(resultIfNotNull: ColumnVector,
checkForNans: ColumnVector): ColumnVector = {
withResource(checkForNans.isNan) { isNan =>
withResource(resultIfNotNull.isNotNull) { isNotNull =>
withResource(isNan.and(isNotNull)) { shouldReplace =>
shouldReplace.ifElse(resultIfNotNull, checkForNans)
}
}
}
}

private[this] def makeNanLose(resultIfNotNull: Scalar,
checkForNans: ColumnVector): ColumnVector = {
if (resultIfNotNull.isValid) {
withResource(checkForNans.isNan) { shouldReplace =>
shouldReplace.ifElse(resultIfNotNull, checkForNans)
}
} else {
// Nothing to replace because the scalar is null
checkForNans.incRefCount()
}
}

/**
* Cudf does not handle floating point like Spark wants when it comes to NaN values.
* Spark wants NaN > anything except for null, and null is either the smallest value when used
* with the greatest operator or the largest value when used with the least value.
* This does more computation, but gets the right answer in those cases.
* @param r first value
* @param c second value
* @return the combined value
*/
private[this] def combineButNoCloseFp(r: Any, c: Any): Any = (r, c) match {
case (r: ColumnVector, c: ColumnVector) =>
withResource(r.binaryOp(binaryOp, c, dtype)) { tmp =>
if (shouldNanWin) {
withResource(makeNanWin(r, tmp)) { tmp2 =>
makeNanWin(c, tmp2)
}
} else {
withResource(makeNanLose(r, tmp)) { tmp2 =>
makeNanLose(c, tmp2)
}
}
}
case (r: ColumnVector, c: Scalar) =>
withResource(r.binaryOp(binaryOp, c, dtype)) { tmp =>
if (shouldNanWin) {
withResource(makeNanWin(r, tmp)) { tmp2 =>
makeNanWin(c, tmp2)
}
} else {
withResource(makeNanLose(r, tmp)) { tmp2 =>
makeNanLose(c, tmp2)
}
}
}
case (r: Scalar, c: ColumnVector) =>
withResource(r.binaryOp(binaryOp, c, dtype)) { tmp =>
if (shouldNanWin) {
withResource(makeNanWin(r, tmp)) { tmp2 =>
makeNanWin(c, tmp2)
}
} else {
withResource(makeNanLose(r, tmp)) { tmp2 =>
makeNanLose(c, tmp2)
}
}
}
}

override def columnarEval(batch: ColumnarBatch): Any = {
val numRows = batch.numRows()

val result = children.foldLeft[Any](null) { (r, c) =>
withResource(
convertAndCloseIfNeeded(c.columnarEval(batch), false, numRows)) { cVal =>
withResource(convertAndCloseIfNeeded(r, cVal.isInstanceOf[Scalar], numRows)) { rVal =>
if (isFp) {
combineButNoCloseFp(rVal, cVal)
} else {
combineButNoClose(rVal, cVal)
}
}
}
}
// The result should always be a ColumnVector at this point
GpuColumnVector.from(result.asInstanceOf[ColumnVector], dataType)
}
}

case class GpuLeast(children: Seq[Expression]) extends GpuGreatestLeastBase {
override def binaryOp: BinaryOp = BinaryOp.NULL_MIN
override def shouldNanWin: Boolean = false
}

case class GpuGreatest(children: Seq[Expression]) extends GpuGreatestLeastBase {
override def binaryOp: BinaryOp = BinaryOp.NULL_MAX
override def shouldNanWin: Boolean = true
}

0 comments on commit a855df7

Please sign in to comment.