diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index 834fd0aa905..a9b44459e55 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -171,6 +171,16 @@ private static DType toRapidsOrNull(DataType type) { return DType.TIMESTAMP_MICROSECONDS; } else if (type instanceof StringType) { return DType.STRING; + } else if (type instanceof DecimalType) { + // Decimal supportable check has been conducted in the GPU plan overriding stage. + // So, we don't have to handle decimal-supportable problem at here. + DecimalType dt = (DecimalType) type; + if (dt.precision() > DType.DECIMAL64_MAX_PRECISION) { + return null; + } else { + // Map all DecimalType to DECIMAL64, in case of underlying DType transaction. + return DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale()); + } } return null; } @@ -299,6 +309,14 @@ public static ColumnarBatch from(Table table, DataType[] colTypes) { */ private static boolean typeConversionAllowed(ColumnViewAccess cv, DataType colType) { DType dt = cv.getDataType(); + if (dt.isDecimalType()) { + if (!(colType instanceof DecimalType)) { + return false; + } + // check for overflow + int maxPrecision = dt.isBackedByLong() ? DType.DECIMAL64_MAX_PRECISION : DType.DECIMAL32_MAX_PRECISION; + return ((DecimalType) colType).precision() <= maxPrecision; + } if (!dt.isNestedType()) { return getRapidsType(colType).equals(dt); } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java index 22f890ab9d0..60b8834d0cf 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVector.java @@ -75,6 +75,7 @@ public final RapidsHostColumnVector incRefCount() { return this; } + @Override public final ai.rapids.cudf.HostColumnVector getBase() { return cudfCv; } diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java index dc47f3693a2..bed3d88d9bb 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/RapidsHostColumnVectorCore.java @@ -29,6 +29,8 @@ import org.apache.spark.sql.vectorized.ColumnarMap; import org.apache.spark.unsafe.types.UTF8String; +import java.math.BigDecimal; +import java.math.RoundingMode; /** * A GPU accelerated version of the Spark ColumnVector. @@ -158,8 +160,10 @@ public final ColumnarMap getMap(int ordinal) { } @Override - public final Decimal getDecimal(int rowId, int precision, int scale) { - throw new IllegalStateException("The decimal type is currently not supported by rapids cudf"); + public Decimal getDecimal(int rowId, int precision, int scale) { + BigDecimal bigDec = cudfCv.getBigDecimal(rowId).setScale(scale, RoundingMode.UNNECESSARY); + assert bigDec.precision() <= precision : "Assert" + bigDec.precision() + " <= " + precision; + return Decimal.apply(bigDec); } @Override diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 3ec10c7d8cd..1bb8de48bc5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -598,7 +598,7 @@ object GpuOverrides { } override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowCalendarInterval = true) + GpuOverrides.isSupportedType(t, allowCalendarInterval = true, allowDecimal = true) }), expr[Signum]( "Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive", @@ -610,6 +610,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[Alias](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) @@ -622,6 +623,7 @@ object GpuOverrides { (att, conf, p, r) => new BaseExprMeta[AttributeReference](att, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) @@ -818,6 +820,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[IsNull](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) @@ -829,6 +832,7 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[IsNotNull](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) @@ -1177,19 +1181,19 @@ object GpuOverrides { }), expr[Add]( "Addition", - (a, conf, p, r) => new BinaryExprMeta[Add](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[Add](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuAdd(lhs, rhs) }), expr[Subtract]( "Subtraction", - (a, conf, p, r) => new BinaryExprMeta[Subtract](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[Subtract](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuSubtract(lhs, rhs) }), expr[Multiply]( "Multiplication", - (a, conf, p, r) => new BinaryExprMeta[Multiply](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[Multiply](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuMultiply(lhs, rhs) }), @@ -1215,20 +1219,21 @@ object GpuOverrides { "Check if the values are equal", (a, conf, p, r) => new BinaryExprMeta[EqualTo](a, conf, p, r) { override def isSupportedType(t: DataType): Boolean = - GpuOverrides.isSupportedType(t, allowStringMaps = true) + GpuOverrides.isSupportedType(t, allowStringMaps = true, allowDecimal = true) override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuEqualTo(lhs, rhs) }), expr[GreaterThan]( "> operator", - (a, conf, p, r) => new BinaryExprMeta[GreaterThan](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[GreaterThan](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuGreaterThan(lhs, rhs) }), expr[GreaterThanOrEqual]( ">= operator", - (a, conf, p, r) => new BinaryExprMeta[GreaterThanOrEqual](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[GreaterThanOrEqual](a, conf, p, r, + allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuGreaterThanOrEqual(lhs, rhs) }), @@ -1269,13 +1274,13 @@ object GpuOverrides { }), expr[LessThan]( "< operator", - (a, conf, p, r) => new BinaryExprMeta[LessThan](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[LessThan](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuLessThan(lhs, rhs) }), expr[LessThanOrEqual]( "<= operator", - (a, conf, p, r) => new BinaryExprMeta[LessThanOrEqual](a, conf, p, r) { + (a, conf, p, r) => new BinaryExprMeta[LessThanOrEqual](a, conf, p, r, allowDecimal = true) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuLessThanOrEqual(lhs, rhs) }), @@ -1823,6 +1828,7 @@ object GpuOverrides { new SparkPlanMeta[ProjectExec](proj, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) @@ -1911,6 +1917,7 @@ object GpuOverrides { (filter, conf, p, r) => new SparkPlanMeta[FilterExec](filter, conf, p, r) { override def isSupportedType(t: DataType): Boolean = GpuOverrides.isSupportedType(t, + allowDecimal = true, allowStringMaps = true, allowArray = true, allowNesting = true) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala index e28ff30253d..416cd283734 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuRowToColumnarExec.scala @@ -91,6 +91,8 @@ private object GpuRowToColumnConverter { case (TimestampType, false) => NotNullLongConverter case (StringType, true) => StringConverter case (StringType, false) => NotNullStringConverter + case (dt: DecimalType, true) => new DecimalConverter(dt.precision, dt.scale) + case (dt: DecimalType, false) => new NotNullDecimalConverter(dt.precision, dt.scale) // NOT SUPPORTED YET // case CalendarIntervalType => CalendarConverter case (at: ArrayType, true) => @@ -100,8 +102,6 @@ private object GpuRowToColumnConverter { // NOT SUPPORTED YET // case st: StructType => new StructConverter(st.fields.map( // (f) => getConverterForType(f.dataType))) - // NOT SUPPORTED YET - // case dt: DecimalType => new DecimalConverter(dt) // NOT SUPPORTED YET case (MapType(k, v, vcn), true) => MapConverter(getConverterForType(k, nullable = false), @@ -289,6 +289,32 @@ private object GpuRowToColumnConverter { } } + private class DecimalConverter(precision: Int, scale: Int) extends TypeConverter { + override def append( + row: SpecializedGetters, + column: Int, + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + if (row.isNullAt(column)) { + builder.appendNull() + } else { + new NotNullDecimalConverter(precision, scale).append(row, column, builder) + } + // Infer the storage type via precision, because we can't access DType of builder. + (if (precision > ai.rapids.cudf.DType.DECIMAL32_MAX_PRECISION) 8 else 4) + VALIDITY + } + } + + private class NotNullDecimalConverter(precision: Int, scale: Int) extends TypeConverter { + override def append( + row: SpecializedGetters, + column: Int, + builder: ai.rapids.cudf.HostColumnVector.ColumnBuilder): Double = { + builder.append(row.getDecimal(column, precision, scale).toJavaBigDecimal) + // Infer the storage type via precision, because we can't access DType of builder. + if (precision > ai.rapids.cudf.DType.DECIMAL32_MAX_PRECISION) 8 else 4 + } + } + private[this] def mapConvert( keyConverter: TypeConverter, valueConverter: TypeConverter, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala index 3517bb4c72a..5dec0cc7364 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostColumnarToGpu.scala @@ -117,6 +117,25 @@ object HostColumnarToGpu { for (i <- 0 until rows) { b.appendUTF8String(cv.getUTF8String(i).getBytes) } + case (dt, nullable) if dt.isDecimalType => + val precision = if (dt.isBackedByInt) { + DType.DECIMAL32_MAX_PRECISION + } else { + DType.DECIMAL64_MAX_PRECISION + } + if (nullable) { + for (i <- 0 until rows) { + if (cv.isNullAt(i)) { + b.appendNull() + } else { + b.append(cv.getDecimal(i, precision, -dt.getScale).toJavaBigDecimal) + } + } + } else { + for (i <- 0 until rows) { + b.append(cv.getDecimal(i, precision, -dt.getScale).toJavaBigDecimal) + } + } case (t, n) => throw new UnsupportedOperationException(s"Converting to GPU for ${t} is not currently " + s"supported") diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 9cd8e5989fd..07b15c80d7a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -434,6 +434,11 @@ abstract class SparkPlanMeta[INPUT <: SparkPlan](plan: INPUT, override val childParts: Seq[PartMeta[_]] = Seq.empty override val childDataWriteCmds: Seq[DataWritingCommandMeta[_]] = Seq.empty + // We assume that all common plans are decimal supportable by default, considering + // whether decimal allowable is mainly determined in expression-level. + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowDecimal = true) + override def convertToCpu(): SparkPlan = { wrapped.withNewChildren(childPlans.map(_.convertIfNeeded())) } @@ -765,9 +770,13 @@ abstract class BinaryExprMeta[INPUT <: BinaryExpression]( expr: INPUT, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], - rule: ConfKeysAndIncompat) + rule: ConfKeysAndIncompat, + allowDecimal: Boolean = false) extends ExprMeta[INPUT](expr, conf, parent, rule) { + override def isSupportedType(t: DataType): Boolean = + GpuOverrides.isSupportedType(t, allowDecimal = allowDecimal) + override final def convertToGpu(): GpuExpression = convertToGpu(childExprs(0).convertToGpu(), childExprs(1).convertToGpu()) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala index 15eaf7a6bcf..b299c1d4327 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/literals.scala @@ -65,6 +65,7 @@ object GpuScalar { case DType.TIMESTAMP_DAYS => v.getInt case DType.TIMESTAMP_MICROSECONDS => v.getLong case DType.STRING => v.getJavaString + case dt: DType if dt.isDecimalType => Decimal(v.getBigDecimal) case t => throw new IllegalStateException(s"$t is not a supported rapids scalar type yet") } @@ -88,12 +89,34 @@ object GpuScalar { case b: Boolean => Scalar.fromBool(b) case s: String => Scalar.fromString(s) case s: UTF8String => Scalar.fromString(s.toString) + case dec: Decimal => + Scalar.fromDecimal(-dec.scale, dec.toUnscaledLong) + case dec: BigDecimal => + Scalar.fromDecimal(-dec.scale, dec.bigDecimal.unscaledValue().longValueExact()) case _ => throw new IllegalStateException(s"${v.getClass} '${v}' is not supported as a scalar yet") } def from(v: Any, t: DataType): Scalar = v match { case _ if v == null => Scalar.fromNull(GpuColumnVector.getRapidsType(t)) + case _ if t.isInstanceOf[DecimalType] => + var bigDec = v match { + case vv: Decimal => vv.toBigDecimal.bigDecimal + case vv: BigDecimal => vv.bigDecimal + case vv: Double => BigDecimal(vv).bigDecimal + case vv: Float => BigDecimal(vv).bigDecimal + case vv: String => BigDecimal(vv).bigDecimal + case vv: Double => BigDecimal(vv).bigDecimal + case vv: Long => BigDecimal(vv).bigDecimal + case vv: Int => BigDecimal(vv).bigDecimal + case vv => throw new IllegalStateException( + s"${vv.getClass} '${vv}' is not supported as a scalar yet") + } + bigDec = bigDec.setScale(t.asInstanceOf[DecimalType].scale) + if (bigDec.precision() > t.asInstanceOf[DecimalType].precision) { + throw new IllegalArgumentException(s"BigDecimal $bigDec exceeds precision constraint of $t") + } + Scalar.fromDecimal(-bigDec.scale(), bigDec.unscaledValue().longValueExact()) case l: Long => t match { case LongType => Scalar.fromLong(l) case TimestampType => Scalar.timestampFromLong(DType.TIMESTAMP_MICROSECONDS, l) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala index 61f363be819..da3334b0a5c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/arithmetic.scala @@ -144,6 +144,8 @@ object GpuDivModLike { case DType.INT64 => Scalar.fromLong(0L) case DType.FLOAT32 => Scalar.fromFloat(0f) case DType.FLOAT64 => Scalar.fromDouble(0) + case dt if dt.isDecimalType && dt.isBackedByInt => Scalar.fromDecimal(0, 0) + case dt if dt.isDecimalType && dt.isBackedByLong => Scalar.fromDecimal(0, 0L) case t => throw new IllegalArgumentException(s"Unexpected type: $t") } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala new file mode 100644 index 00000000000..35ab8a3e0b4 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/DecimalBinaryOpSuite.scala @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.DType + +import org.apache.spark.sql.rapids.{GpuAdd, GpuEqualTo, GpuGreaterThan, GpuGreaterThanOrEqual, GpuLessThan, GpuLessThanOrEqual, GpuMultiply, GpuSubtract} +import org.apache.spark.sql.types.{DataType, DataTypes, Decimal, DecimalType, StructType} + +/* + unsupported operators are as below: + - GpuEqualNullSafe + - GpuDivModLike (GpuDivide/GpuIntegralDivide/GpuPmod/GpuRemainder) +*/ +class DecimalBinaryOpSuite extends GpuExpressionTestSuite { + private val schema = FuzzerUtils.createSchema(Seq( + DecimalType(DType.DECIMAL32_MAX_PRECISION, 4), + DecimalType(DType.DECIMAL32_MAX_PRECISION, 2))) + private val litValue = Decimal(12345.6789) + private val lit = GpuLiteral(litValue, DecimalType(DType.DECIMAL64_MAX_PRECISION, 5)) + private val leftExpr = GpuBoundReference(0, schema.head.dataType, nullable = true) + private val rightExpr = GpuBoundReference(1, schema(1).dataType, nullable = true) + + private val schemaSame = FuzzerUtils.createSchema(Seq( + DecimalType(DType.DECIMAL32_MAX_PRECISION, 3), + DecimalType(DType.DECIMAL32_MAX_PRECISION, 3))) + private val leftExprSame = GpuBoundReference(0, schemaSame.head.dataType, nullable = true) + private val rightExprSame = GpuBoundReference(1, schemaSame(1).dataType, nullable = true) + + test("GpuEqualTo") { + val expectedFun = (l: Decimal, r: Decimal) => Option(l == r) + checkEvaluateGpuBinaryExpression(GpuEqualTo(leftExpr, rightExpr), + schema.head.dataType, schema(1).dataType, DataTypes.BooleanType, + expectedFun, schema) + checkEvaluateGpuBinaryExpression(GpuEqualTo(leftExprSame, rightExprSame), + schemaSame.head.dataType, schemaSame.head.dataType, DataTypes.BooleanType, + expectedFun, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x == litValue) + checkEvaluateGpuUnaryExpression(GpuEqualTo(leftExpr, lit), + schema.head.dataType, DataTypes.BooleanType, expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue == x) + checkEvaluateGpuUnaryExpression(GpuEqualTo(lit, leftExpr), + schema.head.dataType, DataTypes.BooleanType, expectedFunSV, schema) + } + + test("GpuGreaterThan") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l > r) + checkEvaluateGpuBinaryExpression(GpuGreaterThan(leftExpr, rightExpr), + schema.head.dataType, schema(1).dataType, DataTypes.BooleanType, + expectedFunVV, schema) + + val expectedFunVS = (x: Decimal) => Option(x > litValue) + checkEvaluateGpuUnaryExpression(GpuGreaterThan(leftExpr, lit), + schema.head.dataType, DataTypes.BooleanType, expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue > x) + checkEvaluateGpuUnaryExpression(GpuGreaterThan(lit, leftExpr), + schema.head.dataType, DataTypes.BooleanType, expectedFunSV, schema) + } + + test("GpuGreaterThanOrEqual") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l >= r) + checkEvaluateGpuBinaryExpression(GpuGreaterThanOrEqual(leftExpr, rightExpr), + schema.head.dataType, schema(1).dataType, DataTypes.BooleanType, + expectedFunVV, schema) + checkEvaluateGpuBinaryExpression(GpuGreaterThanOrEqual(leftExprSame, rightExprSame), + schemaSame.head.dataType, schemaSame.head.dataType, DataTypes.BooleanType, + expectedFunVV, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x >= litValue) + checkEvaluateGpuUnaryExpression(GpuGreaterThanOrEqual(leftExpr, lit), + schema.head.dataType, DataTypes.BooleanType, expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue >= x) + checkEvaluateGpuUnaryExpression(GpuGreaterThanOrEqual(lit, leftExpr), + schema.head.dataType, DataTypes.BooleanType, expectedFunSV, schema) + } + + test("GpuLessThan") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l < r) + checkEvaluateGpuBinaryExpression(GpuLessThan(leftExpr, rightExpr), + schema.head.dataType, schema(1).dataType, DataTypes.BooleanType, + expectedFunVV, schema) + + val expectedFunVS = (x: Decimal) => Option(x < litValue) + checkEvaluateGpuUnaryExpression(GpuLessThan(leftExpr, lit), + schema.head.dataType, DataTypes.BooleanType, expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue < x) + checkEvaluateGpuUnaryExpression(GpuLessThan(lit, leftExpr), + schema.head.dataType, DataTypes.BooleanType, expectedFunSV, schema) + } + + test("GpuLessThanOrEqual") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l <= r) + checkEvaluateGpuBinaryExpression(GpuLessThanOrEqual(leftExpr, rightExpr), + schema.head.dataType, schema(1).dataType, DataTypes.BooleanType, + expectedFunVV, schema) + checkEvaluateGpuBinaryExpression(GpuLessThanOrEqual(leftExprSame, rightExprSame), + schemaSame.head.dataType, schemaSame.head.dataType, DataTypes.BooleanType, + expectedFunVV, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x <= litValue) + checkEvaluateGpuUnaryExpression(GpuLessThanOrEqual(leftExpr, lit), + schema.head.dataType, DataTypes.BooleanType, expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue <= x) + checkEvaluateGpuUnaryExpression(GpuLessThanOrEqual(lit, leftExpr), + schema.head.dataType, DataTypes.BooleanType, expectedFunSV, schema) + } + + test("GpuAdd") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l + r) + var outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale max + schema(1).dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuMathBinaryExpression(GpuAdd(leftExpr, rightExpr), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schema) + outputScale = schemaSame.head.dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuMathBinaryExpression(GpuAdd(leftExprSame, rightExprSame), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x + litValue) + outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale max + lit.dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuUnaryExpression(GpuAdd(leftExpr, lit), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue + x) + checkEvaluateGpuUnaryExpression(GpuAdd(lit, leftExpr), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunSV, schema) + } + + test("GpuMinus") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l - r) + var outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale max + schema(1).dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuMathBinaryExpression(GpuSubtract(leftExpr, rightExpr), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schema) + outputScale = schemaSame.head.dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuMathBinaryExpression(GpuSubtract(leftExprSame, rightExprSame), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x - litValue) + outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale max + lit.dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuUnaryExpression(GpuSubtract(leftExpr, lit), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue - x) + checkEvaluateGpuUnaryExpression(GpuSubtract(lit, leftExpr), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunSV, schema) + } + + test("GpuMultiply") { + val expectedFunVV = (l: Decimal, r: Decimal) => Option(l * r) + var outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale + + schema(1).dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuMathBinaryExpression(GpuMultiply(leftExpr, rightExpr), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schema) + outputScale = schemaSame.head.dataType.asInstanceOf[DecimalType].scale * 2 + checkEvaluateGpuMathBinaryExpression(GpuMultiply(leftExprSame, rightExprSame), + DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVV, schemaSame) + + val expectedFunVS = (x: Decimal) => Option(x * litValue) + outputScale = schema.head.dataType.asInstanceOf[DecimalType].scale + + lit.dataType.asInstanceOf[DecimalType].scale + checkEvaluateGpuUnaryExpression(GpuMultiply(leftExpr, lit), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunVS, schema) + val expectedFunSV = (x: Decimal) => Option(litValue * x) + checkEvaluateGpuUnaryExpression(GpuMultiply(lit, leftExpr), + schema.head.dataType, DecimalType(DType.DECIMAL64_MAX_PRECISION, outputScale), + expectedFunSV, schema) + } + + private def checkEvaluateGpuMathBinaryExpression[U]( + inputExpr: GpuExpression, + outputType: DataType, + expectedFun: (Decimal, Decimal) => Option[U], + schema: StructType): Unit = { + + val fun = (left: Any, right: Any) => { + if (left == null || right == null) { + null + } else { + expectedFun(left.asInstanceOf[Decimal], right.asInstanceOf[Decimal]) + } + } + + super.checkEvaluateGpuBinaryExpression(inputExpr, + schema.head.dataType, schema(1).dataType, outputType, fun, schema) + } +} diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala index 96fb0141524..00d9a28a2e9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/FuzzerUtils.scala @@ -20,11 +20,9 @@ import java.sql.{Date, Timestamp} import scala.collection.JavaConverters._ import scala.util.Random - import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder - import org.apache.spark.sql.{DataFrame, Row, SparkSession} -import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -114,6 +112,17 @@ object FuzzerUtils { case None => builder.appendNull() } }) + case dt: DecimalType => + rows.foreach(_ => { + maybeNull(rand, r.nextLong()) match { + case Some(value) => + // bounding unscaledValue with precision + val invScale = (dt.precision to ai.rapids.cudf.DType.DECIMAL64_MAX_PRECISION) + .foldLeft(10L)((x, _) => x * 10) + builder.append(BigDecimal(value / invScale, dt.scale).bigDecimal) + case None => builder.appendNull() + } + }) } } builders.build(rowCount) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala index bac59ce0954..a3eceadb090 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuBatchUtilsSuite.scala @@ -24,7 +24,7 @@ import org.scalatest.FunSuite import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericInternalRow -import org.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType} +import org.apache.spark.sql.types.{DataTypes, Decimal, DecimalType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String class GpuBatchUtilsSuite extends FunSuite { @@ -44,6 +44,11 @@ class GpuBatchUtilsSuite extends FunSuite { StructField("c0", DataTypes.StringType, nullable = false) )) + val decimalSchema = new StructType(Array( + StructField("c0", DataTypes.StringType, nullable = true), + StructField("c0", DataTypes.StringType, nullable = false) + )) + /** Mix of data types and nullable and not nullable */ val mixedSchema = new StructType(Array( StructField("c0", DataTypes.ByteType, nullable = false), @@ -61,7 +66,9 @@ class GpuBatchUtilsSuite extends FunSuite { StructField("c6", DataTypes.StringType, nullable = false), StructField("c6_nullable", DataTypes.StringType, nullable = true), StructField("c7", DataTypes.BooleanType, nullable = false), - StructField("c7_nullable", DataTypes.BooleanType, nullable = true) + StructField("c7_nullable", DataTypes.BooleanType, nullable = true), + StructField("c8", DataTypes.createDecimalType(15, 6), nullable = false), + StructField("c8_nullable", DataTypes.createDecimalType(15, 6), nullable = true) )) test("Calculate GPU memory for batch of 64 rows with integers") { @@ -72,6 +79,10 @@ class GpuBatchUtilsSuite extends FunSuite { compareEstimateWithActual(stringSchema, 64) } + test("Calculate GPU memory for batch of 64 rows with decimals") { + compareEstimateWithActual(decimalSchema, 64) + } + test("Calculate GPU memory for batch of 64 rows with mixed types") { compareEstimateWithActual(mixedSchema, 64) } @@ -84,6 +95,10 @@ class GpuBatchUtilsSuite extends FunSuite { compareEstimateWithActual(stringSchema, 124) } + test("Calculate GPU memory for batch of 124 rows with decimals") { + compareEstimateWithActual(decimalSchema, 124) + } + test("Calculate GPU memory for batch of 124 rows with mixed types") { compareEstimateWithActual(mixedSchema, 124) } @@ -96,6 +111,10 @@ class GpuBatchUtilsSuite extends FunSuite { compareEstimateWithActual(stringSchema, 1024) } + test("Calculate GPU memory for batch of 1024 rows with decimals") { + compareEstimateWithActual(decimalSchema, 1024) + } + test("Calculate GPU memory for batch of 1024 rows with mixed types") { compareEstimateWithActual(mixedSchema, 1024) } @@ -185,6 +204,10 @@ class GpuBatchUtilsSuite extends FunSuite { case DataTypes.LongType => maybeNull(field, i, r.nextLong()) case DataTypes.FloatType => maybeNull(field, i, r.nextFloat()) case DataTypes.DoubleType => maybeNull(field, i, r.nextDouble()) + case dataType: DecimalType => + val upperBound = (0 until dataType.precision).foldLeft(1L)((x, _) => x * 10) + val unScaledValue = r.nextLong() % upperBound + maybeNull(field, i, Decimal(unScaledValue, dataType.precision, dataType.scale)) case dataType@DataTypes.StringType => if (field.nullable) { // since we want a deterministic test that compares the estimate with actual diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index b534ba092cf..bde00b20bdc 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -19,19 +19,19 @@ package com.nvidia.spark.rapids import java.io.File import java.nio.file.Files -import ai.rapids.cudf.{ContiguousTable, Cuda, HostColumnVector, Table} +import ai.rapids.cudf.{ContiguousTable, Cuda, DType, HostColumnVector, Table} import com.nvidia.spark.rapids.format.CodecType import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.rapids.metrics.source.MockTaskContext -import org.apache.spark.sql.types.{DataType, DataTypes, LongType, StructField, StructType} +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, LongType, StructField, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { test("test with small input batches") { withGpuSparkSession(spark => { - val testData = doubleCsvDf(spark).coalesce(1) + val testData = mixedDf(spark, numSlices = 1) val gpuRowToColumnarExec = GpuRowToColumnarExec(testData.queryExecution.sparkPlan, TargetSize(1)) val gpuCoalesceBatches = GpuCoalesceBatches(gpuRowToColumnarExec, TargetSize(100000)) @@ -43,7 +43,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { // assert final results are correct assert(batches.hasNext) val batch = batches.next() - assert(batch.numCols() == 2) + assert(batch.numCols() == 5) assert(batch.numRows() == 14) assert(!batches.hasNext) batch.close() @@ -137,7 +137,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { withGpuSparkSession(spark => { - val df = longsCsvDf(spark) + val df = mixedDf(spark, numSlices = 7) // currently, GpuSortExec requires a single batch but this is likely to change in the // future, making this test invalid @@ -149,7 +149,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { df2.collect() val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan( ExecutionPlanCaptureCallback.getResultWithTimeout()) - val coalesce = executedPlan .find(_.isInstanceOf[GpuCoalesceBatches]).get .asInstanceOf[GpuCoalesceBatches] @@ -184,13 +183,12 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { try { // convert csv test data to parquet withCpuSparkSession(spark => { - longsCsvDf(spark).write.parquet(path) + mixedDf(spark).write.parquet(path) }, conf) withGpuSparkSession(spark => { val df = spark.read.parquet(path) - val df2 = df - .sort(df.col("longs")) + val df2 = df.sort(df.col("longs")) // execute the plan ExecutionPlanCaptureCallback.startCapture() @@ -198,7 +196,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val executedPlan = ExecutionPlanCaptureCallback.extractExecutedPlan( ExecutionPlanCaptureCallback.getResultWithTimeout()) - // ensure that the plan does include the HostColumnarToGpu step val hostColumnarToGpu = executedPlan .find(_.isInstanceOf[HostColumnarToGpu]).get @@ -212,8 +209,6 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { assert(gpuCoalesceBatches.goal == RequireSingleBatch) assert(gpuCoalesceBatches.goal.targetSizeBytes == Long.MaxValue) - - }, conf) } finally { dir.delete() @@ -229,13 +224,13 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { withGpuSparkSession(spark => { - val df = longsCsvDf(spark) + val df = mixedDf(spark, numSlices = 14) // A coalesce step is added after the filter to help with the case where much of the // data is filtered out. The select is there to prevent the coalesce from being // the last thing in the plan which will cause the coalesce to be optimized out. val df2 = df - .filter(df.col("six").gt(5)).select(df.col("six") * 2) + .filter(df.col("ints").gt(90)).select(df.col("ints") * 2) val coalesce = df2.queryExecution.executedPlan .find(_.isInstanceOf[GpuCoalesceBatches]).get @@ -252,8 +247,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { df2.collect() // assert the metrics are correct - assert(coalesce.additionalMetrics("numInputBatches").value == 7) - assert(coalesce.longMetric(GpuMetricNames.NUM_OUTPUT_BATCHES).value == 7) + assert(coalesce.additionalMetrics("numInputBatches").value == 14) + assert(coalesce.longMetric(GpuMetricNames.NUM_OUTPUT_BATCHES).value == 11) }, conf) } @@ -279,7 +274,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { } } - val schema = new StructType().add("i", LongType) + val schema = new StructType() + .add("i", LongType) + .add("j", DecimalType(DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = new SQLMetric("ignored") val coalesceIter = new GpuCoalesceIterator( batchIter, @@ -299,12 +296,17 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { var expected = 0 while (coalesceIter.hasNext) { withResource(coalesceIter.next()) { batch => - assertResult(1)(batch.numCols) - val col = GpuColumnVector.extractBases(batch).head - withResource(col.copyToHost) { hcv => - (0 until hcv.getRowCount.toInt).foreach { i => - assertResult(expected)(hcv.getLong(i)) - expected += 1 + assertResult(2)(batch.numCols) + val Array(longCol, decCol) = GpuColumnVector.extractBases(batch) + withResource(longCol.copyToHost) { longHcv => + withResource(decCol.copyToHost) { decHcv => + (0 until longHcv.getRowCount.toInt).foreach { i => + assertResult(expected)(longHcv.getLong(i)) + // FIXME: Enable assertions after native support of decimal contiguous split ready. + // assertResult(expected)(decHcv.getLong(i)) + // assertResult(expected * 0.0001)(decHcv.getBigDecimal(i).doubleValue()) + expected += 1 + } } } } @@ -355,7 +357,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { } } - val schema = new StructType().add("i", LongType) + val schema = new StructType() + .add("i", LongType) + .add("j", DecimalType(DType.DECIMAL64_MAX_PRECISION, 3)) val dummyMetric = new SQLMetric("ignored") val coalesceIter = new GpuCoalesceIterator( batchIter, @@ -375,12 +379,17 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { var expected = 0 while (coalesceIter.hasNext) { withResource(coalesceIter.next()) { batch => - assertResult(1)(batch.numCols) - val col = GpuColumnVector.extractBases(batch).head - withResource(col.copyToHost) { hcv => - (0 until hcv.getRowCount.toInt).foreach { i => - assertResult(expected)(hcv.getLong(i)) - expected += 1 + assertResult(2)(batch.numCols) + val Array(longCol, decCol) = GpuColumnVector.extractBases(batch) + withResource(longCol.copyToHost) { longHcv => + withResource(decCol.copyToHost) { decHcv => + (0 until longHcv.getRowCount.toInt).foreach { i => + assertResult(expected)(longHcv.getLong(i)) + // FIXME: Enable assertions after native support of decimal contiguous split ready. + // assertResult(expected)(decHcv.getLong(i)) + // assertResult(expected * 0.0001)(decHcv.getBigDecimal(i).doubleValue()) + expected += 1 + } } } } @@ -392,8 +401,12 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val vals = (0 until numRows).map(_.toLong + start) withResource(HostColumnVector.fromLongs(vals:_*)) { hcv => withResource(hcv.copyToDevice()) { cv => - withResource(new Table(cv)) { table => - table.contiguousSplit()(0) + withResource(HostColumnVector.decimalFromLongs(-3, vals:_*)) { decHcv => + withResource(decHcv.copyToDevice()) { decCv => + withResource(new Table(cv, decCv)) { table => + table.contiguousSplit()(0) + } + } } } } @@ -401,7 +414,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { private def buildUncompressedBatch(start: Int, numRows: Int): ColumnarBatch = { withResource(buildContiguousTable(start, numRows)) { ct => - GpuColumnVector.from(ct.getTable, Array[DataType](LongType)) + GpuColumnVector.from(ct.getTable, + Array[DataType](LongType, DecimalType(DType.DECIMAL64_MAX_PRECISION, 3))) } } @@ -409,7 +423,8 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val codec = TableCompressionCodec.getCodec(CodecType.NVCOMP_LZ4) withResource(codec.createBatchCompressor(0, Cuda.DEFAULT_STREAM)) { compressor => compressor.addTableToCompress(buildContiguousTable(start, numRows)) - GpuCompressedColumnVector.from(compressor.finish().head, Array[DataType](LongType)) + GpuCompressedColumnVector.from(compressor.finish().head, + Array[DataType](LongType, DecimalType(DType.DECIMAL64_MAX_PRECISION, 3))) } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuExpressionTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuExpressionTestSuite.scala index 3f26ed9bd63..e67b7c82581 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuExpressionTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuExpressionTestSuite.scala @@ -16,14 +16,14 @@ package com.nvidia.spark.rapids -import org.apache.spark.sql.types.{DataType, DataTypes, StructType} +import org.apache.spark.sql.types.{DataType, DataTypes, DecimalType, StructType} abstract class GpuExpressionTestSuite extends SparkQueryCompareTestSuite { /** - * Evaluate the GpuExpression and compare the results to the provided function. + * Evaluate the GpuUnaryExpression and compare the results to the provided function. * - * @param inputExpr GpuExpression under test + * @param inputExpr GpuUnaryExpression under test * @param expectedFun Function that produces expected results * @param schema Schema to use for generated data * @param rowCount Number of rows of random to generate @@ -76,6 +76,72 @@ abstract class GpuExpressionTestSuite extends SparkQueryCompareTestSuite { } } + /** + * Evaluate the GpuBinaryExpression and compare the results to the provided function. + * + * @param inputExpr GpuBinaryExpression under test + * @param expectedFun Function that produces expected results + * @param schema Schema to use for generated data + * @param rowCount Number of rows of random to generate + * @param comparisonFunc Optional function to compare results + * @param maxFloatDiff Maximum acceptable difference between expected and actual results + */ + def checkEvaluateGpuBinaryExpression[L, R, U](inputExpr: GpuExpression, + leftType: DataType, + rightType: DataType, + outputType: DataType, + expectedFun: (L, R) => Option[U], + schema: StructType, + rowCount: Int = 50, + seed: Long = 0, + nullable: Boolean = false, + comparisonFunc: Option[(U, U) => Boolean] = None, + maxFloatDiff: Double = 0.00001): Unit = { + + // generate batch + withResource(FuzzerUtils.createColumnarBatch(schema, rowCount, seed = seed)) { batch => + // evaluate expression + withResource(inputExpr.columnarEval(batch).asInstanceOf[GpuColumnVector]) { result => + // bring gpu data onto host + withResource(batch.column(0).asInstanceOf[GpuColumnVector].copyToHost()) { leftInput => + withResource(batch.column(1).asInstanceOf[GpuColumnVector].copyToHost()) { rightInput => + withResource(result.copyToHost()) { hostResult => + // compare results + assert(result.getRowCount == rowCount) + for (i <- 0 until result.getRowCount.toInt) { + val lValue = getAs(leftInput, i, leftType) + val rValue = getAs(rightInput, i, rightType) + val actualOption: Option[U] = + getAs(hostResult, i, outputType).map(_.asInstanceOf[U]) + val expectedOption: Option[U] = if (!nullable) { + lValue.flatMap(l => rValue.flatMap(r => + expectedFun(l.asInstanceOf[L], r.asInstanceOf[R]))) + } else { + expectedFun(lValue.orNull.asInstanceOf[L], rValue.orNull.asInstanceOf[R]) + } + (expectedOption, actualOption) match { + case (Some(expected), Some(actual)) if comparisonFunc.isDefined => + if (!comparisonFunc.get(expected, actual)) { + throw new IllegalStateException(s"Expected: $expected. Actual: $actual. " + + s"Left value: $lValue, Right value: $rValue") + } + case (Some(expected), Some(actual)) => + if (!compare(expected, actual, maxFloatDiff)) { + throw new IllegalStateException(s"Expected: $expected. Actual: $actual. " + + s"Left value: $lValue, Right value: $rValue") + } + case (None, None) => + case _ => throw new IllegalStateException(s"Expected: $expectedOption. " + + s"Actual: $actualOption. Left value: $lValue, Right value: $rValue") + } + } + } + } + } + } + } + } + def compareStringifiedFloats(expected: String, actual: String): Boolean = { // handle exact matches first @@ -111,6 +177,7 @@ abstract class GpuExpressionTestSuite extends SparkQueryCompareTestSuite { None } else { Some(dataType match { + case DataTypes.BooleanType => column.getBoolean(index) case DataTypes.ByteType => column.getByte(index) case DataTypes.ShortType => column.getShort(index) case DataTypes.IntegerType => column.getInt(index) @@ -118,6 +185,7 @@ abstract class GpuExpressionTestSuite extends SparkQueryCompareTestSuite { case DataTypes.FloatType => column.getFloat(index) case DataTypes.DoubleType => column.getDouble(index) case DataTypes.StringType => column.getUTF8String(index).toString + case dt: DecimalType => column.getDecimal(index, dt.precision, dt.scale) }) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala index 449b7b811a6..faecf76e3f6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ProjectExprSuite.scala @@ -49,6 +49,11 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite { frame => frame.select("time") } + testSparkResultsAreEqual("project decimal", mixedDf(_), + conf = forceHostColumnarToGpu()) { + frame => frame.select("decimals") + } + testSparkResultsAreEqual("getMapValue", frameFromParquet("map_of_strings.snappy.parquet")) { frame => frame.selectExpr("mapField['foo']") } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala index f75f40403ee..8b3ff28a6a2 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/SparkQueryCompareTestSuite.scala @@ -931,24 +931,32 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm { ).toDF("ints", "longs", "doubles", "strings", "bucket_1", "bucket_2") } - def mixedDf(session: SparkSession): DataFrame = { - import session.sqlContext.implicits._ - Seq[(java.lang.Integer, java.lang.Long, java.lang.Double, java.lang.String)]( - (99, 100L, 1.0, "A"), - (98, 200L, 2.0, "B"), - (97,300L, 3.0, "C"), - (99, 400L, 4.0, "D"), - (98, 500L, 5.0, "E"), - (97, -100L, 6.0, "F"), - (96, -500L, 0.0, "G"), - (95, -700L, 8.0, "E\u0480\u0481"), - (Int.MaxValue, Long.MinValue, Double.PositiveInfinity, "\u0000"), - (Int.MinValue, Long.MaxValue, Double.NaN, "\u0000"), - (null, null, null, "actions are judged by intentions"), - (94, -900L, 9.0, "g\nH"), - (92, -1200L, 12.0, "IJ\"\u0100\u0101\u0500\u0501"), - (90, 1500L, 15.0, "\ud720\ud721") - ).toDF("ints", "longs", "doubles", "strings") + def mixedDf(session: SparkSession, numSlices: Int = 2): DataFrame = { + val rows = Seq[Row]( + Row(99, 100L, 1.0, "A", Decimal("1.2")), + Row(98, 200L, 2.0, "B", Decimal("1.3")), + Row(97, 300L, 3.0, "C", Decimal("1.4")), + Row(99, 400L, 4.0, "D", Decimal("1.5")), + Row(98, 500L, 5.0, "E", Decimal("1.6")), + Row(97, -100L, 6.0, "F", Decimal("1.7")), + Row(96, -500L, 0.0, "G", Decimal("1.8")), + Row(95, -700L, 8.0, "E\u0480\u0481", Decimal("1.9")), + Row(Int.MaxValue, Long.MinValue, Double.PositiveInfinity, "\u0000", Decimal("2.0")), + Row(Int.MinValue, Long.MaxValue, Double.NaN, "\u0000", Decimal("100.123")), + Row(null, null, null, "actions are judged by intentions", Decimal("200.246")), + Row(94, -900L, 9.0, "g\nH", Decimal("300.369")), + Row(92, -1200L, 12.0, "IJ\"\u0100\u0101\u0500\u0501", Decimal("-1.47e3")), + Row(90, 1500L, 15.0, "\ud720\ud721", Decimal("-22.2345")) + ) + val structType = StructType( + Seq(StructField("ints", IntegerType), + StructField("longs", LongType), + StructField("doubles", DoubleType), + StructField("strings", StringType), + StructField("decimals", DecimalType(15, 5)), + )) + session.createDataFrame( + session.sparkContext.parallelize(rows, numSlices), structType) } def likeDf(session: SparkSession): DataFrame = { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala new file mode 100644 index 00000000000..14ba1cdf50d --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/unit/DecimalUnitTest.scala @@ -0,0 +1,226 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.unit + +import scala.util.Random + +import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector} +import com.nvidia.spark.rapids.{GpuAlias, GpuColumnVector, GpuIsNotNull, GpuIsNull, GpuLiteral, GpuOverrides, GpuScalar, GpuUnaryExpression, GpuUnitTests, HostColumnarToGpu, RapidsConf, RapidsHostColumnVector, TestUtils} +import org.scalatest.Matchers + +import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal} +import org.apache.spark.sql.types.{Decimal, DecimalType} + +class DecimalUnitTest extends GpuUnitTests with Matchers { + Random.setSeed(1234L) + + private val dec32Data = Array.fill[Decimal](10)( + Decimal.fromDecimal(BigDecimal(Random.nextInt() / 1000, 3 + Random.nextInt(3)))) + private val dec64Data = Array.fill[Decimal](10)( + Decimal.fromDecimal(BigDecimal(Random.nextLong() / 1000, 7 + Random.nextInt(3)))) + + test("test decimal as scalar") { + Array(dec32Data, dec64Data).flatten.foreach { dec => + // test GpuScalar.from(v: Any) + withResource(GpuScalar.from(dec)) { s => + s.getType.getScale shouldEqual -dec.scale + GpuScalar.extract(s).asInstanceOf[Decimal] shouldEqual dec + } + // test GpuScalar.from(v: Any, t: DataType) + val dt = DecimalType(DType.DECIMAL64_MAX_PRECISION, dec.scale) + withResource(GpuScalar.from(dec.toDouble, dt)) { s => + s.getType.getScale shouldEqual -dt.scale + GpuScalar.extract(s).asInstanceOf[Decimal].toDouble shouldEqual dec.toDouble + } + withResource(GpuScalar.from(dec.toString(), dt)) { s => + s.getType.getScale shouldEqual -dt.scale + GpuScalar.extract(s).asInstanceOf[Decimal].toString shouldEqual dec.toString() + } + val long = dec.toLong + withResource(GpuScalar.from(long, DecimalType(dec.precision, 0))) { s => + s.getType.getScale shouldEqual 0 + GpuScalar.extract(s).asInstanceOf[Decimal].toLong shouldEqual long + } + } + // test exception throwing + assertThrows[IllegalStateException] { + withResource(GpuScalar.from(true, DecimalType(10, 1))) { _ => } + } + assertThrows[IllegalArgumentException] { + val bigDec = Decimal(BigDecimal(Long.MaxValue / 100, 0)) + withResource(GpuScalar.from(bigDec, DecimalType(15, 1))) { _ => } + } + } + + test("test decimal as column vector") { + val dt32 = DecimalType(DType.DECIMAL32_MAX_PRECISION, 5) + val dt64 = DecimalType(DType.DECIMAL64_MAX_PRECISION, 9) + withResource( + GpuColumnVector.from(ColumnVector.fromDecimals(dec32Data.map(_.toJavaBigDecimal): _*), + dt32)) { cv: GpuColumnVector => + cv.getRowCount shouldEqual dec32Data.length + val (precision, scale) = cv.dataType() match { + case dt: DecimalType => (dt.precision, dt.scale) + } + withResource(cv.copyToHost()) { hostCv: RapidsHostColumnVector => + dec32Data.zipWithIndex.foreach { case (dec, i) => + val rescaled = dec.toJavaBigDecimal.setScale(scale) + hostCv.getInt(i) shouldEqual rescaled.unscaledValue().intValueExact() + hostCv.getDecimal(i, precision, scale) shouldEqual Decimal(rescaled) + } + } + } + val dec64WithNull = Array(null) ++ dec64Data.map(_.toJavaBigDecimal) ++ Array(null, null) + withResource(GpuColumnVector.from(ColumnVector.fromDecimals(dec64WithNull: _*), dt64)) { cv => + cv.getRowCount shouldEqual dec64WithNull.length + cv.hasNull shouldBe true + cv.numNulls() shouldEqual 3 + val (precision, scale) = cv.dataType() match { + case dt: DecimalType => (dt.precision, dt.scale) + } + withResource(cv.copyToHost()) { hostCv: RapidsHostColumnVector => + dec64WithNull.zipWithIndex.foreach { + case (dec, i) if dec == null => + hostCv.getBase.isNull(i) shouldBe true + case (dec, i) => + val rescaled = dec.setScale(scale) + hostCv.getLong(i) shouldEqual rescaled.unscaledValue().longValueExact() + hostCv.getDecimal(i, precision, scale) shouldEqual Decimal(rescaled) + } + } + } + // assertion error throws while running `typeConversionAllowed` check + assertThrows[AssertionError] { + withResource(GpuColumnVector.from(ColumnVector.decimalFromLongs(0, 1L), + DecimalType(DType.DECIMAL64_MAX_PRECISION + 1, 0))) { _ => } + } + assertThrows[AssertionError] { + withResource(GpuColumnVector.from(ColumnVector.decimalFromInts(0, 1), + DecimalType(DType.DECIMAL32_MAX_PRECISION + 1, 0))) { _ => } + } + // FIXME: Enable below test after creating decimal vectors from scalar supported by cuDF. + /* + withResource(GpuScalar.from(dec64Data(0), dt64)) { scalar => + withResource(GpuColumnVector.from(scalar, 10, dt64)) { cv => + cv.getRowCount shouldEqual 10 + withResource(cv.copyToHost()) { hcv => + (0 until 10).foreach { i => + hcv.getLong(i) shouldEqual scalar.getLong + hcv.getDecimal(i, dt64.precision, dt64.scale).toJavaBigDecimal shouldEqual + scalar.getBigDecimal + } + } + } + } + */ + } + + test("test basic expressions with decimal data") { + val rapidsConf = new RapidsConf(Map[String, String]()) + + val cpuLit = Literal(dec32Data(0), DecimalType(dec32Data(0).precision, dec32Data(0).scale)) + val wrapperLit = GpuOverrides.wrapExpr(cpuLit, rapidsConf, None) + wrapperLit.tagForGpu() + wrapperLit.canExprTreeBeReplaced shouldBe true + val gpuLit = wrapperLit.convertToGpu().asInstanceOf[GpuLiteral] + gpuLit.columnarEval(null) shouldEqual cpuLit.eval(null) + gpuLit.sql shouldEqual cpuLit.sql + + val cpuAlias = Alias(cpuLit, "A")() + val wrapperAlias = GpuOverrides.wrapExpr(cpuAlias, rapidsConf, None) + wrapperAlias.tagForGpu() + wrapperAlias.canExprTreeBeReplaced shouldBe true + val gpuAlias = wrapperAlias.convertToGpu().asInstanceOf[GpuAlias] + gpuAlias.dataType shouldEqual cpuAlias.dataType + gpuAlias.sql shouldEqual cpuAlias.sql + gpuAlias.columnarEval(null) shouldEqual cpuAlias.eval(null) + + val cpuAttrRef = AttributeReference("test123", cpuLit.dataType)() + val wrapperAttrRef = GpuOverrides.wrapExpr(cpuAttrRef, rapidsConf, None) + wrapperAttrRef.tagForGpu() + wrapperAttrRef.canExprTreeBeReplaced shouldBe true + val gpuAttrRef = wrapperAttrRef.convertToGpu().asInstanceOf[AttributeReference] + gpuAttrRef.sql shouldEqual cpuAttrRef.sql + gpuAttrRef.sameRef(cpuAttrRef) shouldBe true + + // inconvertible because of precision overflow + val wrp = GpuOverrides.wrapExpr(Literal(Decimal(12345L), DecimalType(38, 10)), rapidsConf, None) + wrp.tagForGpu() + wrp.canExprTreeBeReplaced shouldBe false + } + + test("test gpu null check operators with decimal data") { + val decArray = Array(BigDecimal(0).bigDecimal, null, BigDecimal(1).bigDecimal) + withResource(GpuColumnVector.from(ColumnVector.fromDecimals(decArray: _*), DecimalType(1, 0)) + ) { cv => + withResource(GpuIsNull(null).doColumnar(cv).copyToHost()) { ret => + ret.getBoolean(0) shouldBe false + ret.getBoolean(1) shouldBe true + ret.getBoolean(2) shouldBe false + } + withResource(GpuIsNotNull(null).doColumnar(cv).copyToHost()) { ret => + ret.getBoolean(0) shouldBe true + ret.getBoolean(1) shouldBe false + ret.getBoolean(2) shouldBe true + } + } + } + + test("test HostColumnarToGpu.columnarCopy") { + withResource( + GpuColumnVector.from(ColumnVector.fromDecimals(dec64Data.map(_.toJavaBigDecimal): _*), + DecimalType(DType.DECIMAL64_MAX_PRECISION, 9))) { cv => + val dt = new HostColumnVector.BasicType(false, GpuColumnVector.getRapidsType(cv.dataType())) + val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount) + withResource(cv.copyToHost()) { hostCV => + HostColumnarToGpu.columnarCopy(hostCV, builder, false, cv.getRowCount.toInt) + val actual = builder.build() + val expected = hostCV.getBase + actual.getDataType shouldEqual expected.getDataType + actual.getRowCount shouldEqual expected.getRowCount + (0 until actual.getRowCount.toInt).foreach { i => + actual.getBigDecimal(i) shouldEqual expected.getBigDecimal(i) + } + } + } + val dec32WithNull = dec32Data.splitAt(5) match { + case (left, right) => + Array(null) ++ left.map(_.toJavaBigDecimal) ++ Array(null) ++ + right.map(_.toJavaBigDecimal) ++ Array(null) + } + withResource( + GpuColumnVector.from(ColumnVector.fromDecimals(dec32WithNull: _*), + DecimalType(DType.DECIMAL32_MAX_PRECISION, 5))) { cv => + val dt = new HostColumnVector.BasicType(true, GpuColumnVector.getRapidsType(cv.dataType())) + val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount) + withResource(cv.copyToHost()) { hostCV => + HostColumnarToGpu.columnarCopy(hostCV, builder, true, cv.getRowCount.toInt) + val actual = builder.build() + val expected = hostCV.getBase + actual.getDataType shouldEqual + DType.create(DType.DTypeEnum.DECIMAL64, expected.getDataType.getScale) + actual.getRowCount shouldEqual expected.getRowCount + (0 until actual.getRowCount.toInt).foreach { i => + actual.isNull(i) shouldEqual expected.isNull(i) + if (!actual.isNull(i)) { + actual.getBigDecimal(i) shouldEqual expected.getBigDecimal(i) + } + } + } + } + } +}