diff --git a/docs/configs.md b/docs/configs.md index d5536843b73..bb55a015082 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -144,6 +144,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.GetMapValue| |Gets Value from a Map based on a key|true|None| spark.rapids.sql.expression.GreaterThan|`>`|> operator|true|None| spark.rapids.sql.expression.GreaterThanOrEqual|`>=`|>= operator|true|None| +spark.rapids.sql.expression.Greatest|`greatest`|Returns the greatest value of all parameters, skipping null values|true|None| spark.rapids.sql.expression.Hour|`hour`|Returns the hour component of the string/timestamp|true|None| spark.rapids.sql.expression.If|`if`|IF expression|true|None| spark.rapids.sql.expression.In|`in`|IN operator|true|None| @@ -160,6 +161,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Lag|`lag`|Window function that returns N entries behind this one|true|None| spark.rapids.sql.expression.LastDay|`last_day`|Returns the last day of the month which the date belongs to|true|None| spark.rapids.sql.expression.Lead|`lead`|Window function that returns N entries ahead of this one|true|None| +spark.rapids.sql.expression.Least|`least`|Returns the least value of all parameters, skipping null values|true|None| spark.rapids.sql.expression.Length|`length`, `character_length`, `char_length`|String character length|true|None| spark.rapids.sql.expression.LessThan|`<`|< operator|true|None| spark.rapids.sql.expression.LessThanOrEqual|`<=`|<= operator|true|None| diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 840931e20ab..c47583aa831 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -427,3 +427,33 @@ def test_scalar_pow(): def test_columnar_pow(data_gen): assert_gpu_and_cpu_are_equal_collect( lambda spark : binary_op_df(spark, data_gen).selectExpr('pow(a, b)')) + +@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) +def test_least(data_gen): + num_cols = 20 + s1 = gen_scalar(data_gen, force_no_nulls=True) + # we want lots of nulls + gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) + for x in range(0, num_cols)], nullable=False) + + command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)] + command_args.append(s1) + data_type = data_gen.data_type + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, gen).select( + f.least(*command_args))) + +@pytest.mark.parametrize('data_gen', all_basic_gens, ids=idfn) +def test_greatest(data_gen): + num_cols = 20 + s1 = gen_scalar(data_gen, force_no_nulls=True) + # we want lots of nulls + gen = StructGen([('_c' + str(x), data_gen.copy_special_case(None, weight=100.0)) + for x in range(0, num_cols)], nullable=False) + command_args = [f.col('_c' + str(x)) for x in range(0, num_cols)] + command_args.append(s1) + data_type = data_gen.data_type + assert_gpu_and_cpu_are_equal_collect( + lambda spark : gen_df(spark, gen).select( + f.greatest(*command_args))) + diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index f56a6bf98eb..6410b2d297f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -882,6 +882,18 @@ object GpuOverrides { override def convertToGpu(): GpuExpression = GpuCoalesce(childExprs.map(_.convertToGpu())) } ), + expr[Least] ( + "Returns the least value of all parameters, skipping null values", + (a, conf, p, r) => new ExprMeta[Least](a, conf, p, r) { + override def convertToGpu(): GpuExpression = GpuLeast(childExprs.map(_.convertToGpu())) + } + ), + expr[Greatest] ( + "Returns the greatest value of all parameters, skipping null values", + (a, conf, p, r) => new ExprMeta[Greatest](a, conf, p, r) { + override def convertToGpu(): GpuExpression = GpuGreatest(childExprs.map(_.convertToGpu())) + } + ), expr[Atan]( "Inverse tangent", (a, conf, p, r) => new UnaryExprMeta[Atan](a, conf, p, r) { diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 3a42982fce1..6e40184a65e 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -18,9 +18,13 @@ package org.apache.spark.sql.rapids import ai.rapids.cudf._ import com.nvidia.spark.rapids._ +import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, NullIntolerant} +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} +import org.apache.spark.sql.catalyst.expressions.{ComplexTypeMergingExpression, ExpectsInputTypes, Expression, NullIntolerant} +import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch case class GpuUnaryMinus(child: Expression) extends GpuUnaryExpression with ExpectsInputTypes with NullIntolerant { @@ -225,3 +229,200 @@ case class GpuPmod(left: Expression, right: Expression) extends GpuDivModLike { override def dataType: DataType = left.dataType } + +trait GpuGreatestLeastBase extends ComplexTypeMergingExpression with GpuExpression { + override def nullable: Boolean = children.forall(_.nullable) + override def foldable: Boolean = children.forall(_.foldable) + + /** + * The binary operation that should be performed when combining two values together. + */ + def binaryOp: BinaryOp + + /** + * In the case of floating point values should NaN win and become the output if NaN is + * the value for either input, or lose and not be the output unless the other choice is + * null. + */ + def shouldNanWin: Boolean + + private[this] def isFp = dataType == FloatType || dataType == DoubleType + // TODO need a better way to do this for nested types + protected lazy val dtype: DType = GpuColumnVector.getRapidsType(dataType) + + override def checkInputDataTypes(): TypeCheckResult = { + if (children.length <= 1) { + TypeCheckResult.TypeCheckFailure( + s"input to function $prettyName requires at least two arguments") + } else if (!TypeCoercion.haveSameType(inputTypesForMerging)) { + TypeCheckResult.TypeCheckFailure( + s"The expressions should all have the same type," + + s" got LEAST(${children.map(_.dataType.catalogString).mkString(", ")}).") + } else { + TypeUtils.checkForOrderingExpr(dataType, s"function $prettyName") + } + } + + /** + * Convert the input into either a ColumnVector or a Scalar + * @param a what to convert + * @param expandScalar if we get a scalar should we expand it out to a ColumnVector to avoid + * scalar scalar math. + * @param rows If we expand a scalar how many rows should we do? + * @return the resulting ColumnVector or Scalar + */ + private[this] def convertAndCloseIfNeeded( + a: Any, + expandScalar: Boolean, + rows: Int): AutoCloseable = + a match { + case gcv: GpuColumnVector => gcv.getBase + case cv: ColumnVector => cv + case s: Scalar => + if (expandScalar) { + withResource(s) { s => + ColumnVector.fromScalar(s, rows) + } + } else { + s + } + case a => + if (expandScalar) { + withResource(GpuScalar.from(a, dataType)) { s => + ColumnVector.fromScalar(s, rows) + } + } else { + GpuScalar.from(a, dataType) + } + } + + /** + * Take 2 inputs that are either a Scalar or a ColumnVector and combine them with the correct + * operator. This will blow up if both of the values are scalars though. + * @param r first value + * @param c second value + * @return the combined value + */ + private[this] def combineButNoClose(r: Any, c: Any): Any = (r, c) match { + case (r: ColumnVector, c: ColumnVector) => + r.binaryOp(binaryOp, c, dtype) + case (r: ColumnVector, c: Scalar) => + r.binaryOp(binaryOp, c, dtype) + case (r: Scalar, c: ColumnVector) => + r.binaryOp(binaryOp, c, dtype) + } + + private[this] def makeNanWin(checkForNans: ColumnVector, result: ColumnVector): ColumnVector = { + withResource(checkForNans.isNan) { shouldReplace => + shouldReplace.ifElse(checkForNans, result) + } + } + + private[this] def makeNanWin(checkForNans: Scalar, result: ColumnVector): ColumnVector = { + if (GpuScalar.isNan(checkForNans)) { + ColumnVector.fromScalar(checkForNans, result.getRowCount.toInt) + } else { + result.incRefCount() + } + } + + private[this] def makeNanLose(resultIfNotNull: ColumnVector, + checkForNans: ColumnVector): ColumnVector = { + withResource(checkForNans.isNan) { isNan => + withResource(resultIfNotNull.isNotNull) { isNotNull => + withResource(isNan.and(isNotNull)) { shouldReplace => + shouldReplace.ifElse(resultIfNotNull, checkForNans) + } + } + } + } + + private[this] def makeNanLose(resultIfNotNull: Scalar, + checkForNans: ColumnVector): ColumnVector = { + if (resultIfNotNull.isValid) { + withResource(checkForNans.isNan) { shouldReplace => + shouldReplace.ifElse(resultIfNotNull, checkForNans) + } + } else { + // Nothing to replace because the scalar is null + checkForNans.incRefCount() + } + } + + /** + * Cudf does not handle floating point like Spark wants when it comes to NaN values. + * Spark wants NaN > anything except for null, and null is either the smallest value when used + * with the greatest operator or the largest value when used with the least value. + * This does more computation, but gets the right answer in those cases. + * @param r first value + * @param c second value + * @return the combined value + */ + private[this] def combineButNoCloseFp(r: Any, c: Any): Any = (r, c) match { + case (r: ColumnVector, c: ColumnVector) => + withResource(r.binaryOp(binaryOp, c, dtype)) { tmp => + if (shouldNanWin) { + withResource(makeNanWin(r, tmp)) { tmp2 => + makeNanWin(c, tmp2) + } + } else { + withResource(makeNanLose(r, tmp)) { tmp2 => + makeNanLose(c, tmp2) + } + } + } + case (r: ColumnVector, c: Scalar) => + withResource(r.binaryOp(binaryOp, c, dtype)) { tmp => + if (shouldNanWin) { + withResource(makeNanWin(r, tmp)) { tmp2 => + makeNanWin(c, tmp2) + } + } else { + withResource(makeNanLose(r, tmp)) { tmp2 => + makeNanLose(c, tmp2) + } + } + } + case (r: Scalar, c: ColumnVector) => + withResource(r.binaryOp(binaryOp, c, dtype)) { tmp => + if (shouldNanWin) { + withResource(makeNanWin(r, tmp)) { tmp2 => + makeNanWin(c, tmp2) + } + } else { + withResource(makeNanLose(r, tmp)) { tmp2 => + makeNanLose(c, tmp2) + } + } + } + } + + override def columnarEval(batch: ColumnarBatch): Any = { + val numRows = batch.numRows() + + val result = children.foldLeft[Any](null) { (r, c) => + withResource( + convertAndCloseIfNeeded(c.columnarEval(batch), false, numRows)) { cVal => + withResource(convertAndCloseIfNeeded(r, cVal.isInstanceOf[Scalar], numRows)) { rVal => + if (isFp) { + combineButNoCloseFp(rVal, cVal) + } else { + combineButNoClose(rVal, cVal) + } + } + } + } + // The result should always be a ColumnVector at this point + GpuColumnVector.from(result.asInstanceOf[ColumnVector], dataType) + } +} + +case class GpuLeast(children: Seq[Expression]) extends GpuGreatestLeastBase { + override def binaryOp: BinaryOp = BinaryOp.NULL_MIN + override def shouldNanWin: Boolean = false +} + +case class GpuGreatest(children: Seq[Expression]) extends GpuGreatestLeastBase { + override def binaryOp: BinaryOp = BinaryOp.NULL_MAX + override def shouldNanWin: Boolean = true +} \ No newline at end of file