From b55c206df51e8152f1ddc05934a44fad4fa2774c Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 15 Jan 2021 15:40:40 +0800 Subject: [PATCH 1/3] support casting from decimal to decimal Signed-off-by: sperlingxx --- docs/supported_ops.md | 4 +- .../com/nvidia/spark/rapids/GpuCast.scala | 202 +++++++++++------- .../com/nvidia/spark/rapids/TypeChecks.scala | 2 +- .../com/nvidia/spark/rapids/CastOpSuite.scala | 84 +++++++- 4 files changed, 207 insertions(+), 85 deletions(-) diff --git a/docs/supported_ops.md b/docs/supported_ops.md index b7db4436811..9c4e3d4d703 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -17017,7 +17017,7 @@ and the accelerator produces the same result. NS NS -NS +S* @@ -17421,7 +17421,7 @@ and the accelerator produces the same result. NS NS -NS +S* diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index c22611ac4eb..51635747167 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -380,84 +380,13 @@ case class GpuCast( input.getBase.asByteList(true) case (ShortType | IntegerType | LongType, dt: DecimalType) => - - // Use INT64 bounds instead of FLOAT64 bounds, which enables precise comparison. - val (lowBound, upBound) = math.pow(10, dt.precision - dt.scale) match { - case bound if bound > Long.MaxValue => (Long.MinValue, Long.MaxValue) - case bound => (-bound.toLong + 1, bound.toLong - 1) - } - val checkedInput = if (ansiMode) { - assertValuesInRange(input.getBase, - minValue = Scalar.fromLong(lowBound), - maxValue = Scalar.fromLong(upBound)) - input.getBase.incRefCount() - } else { - replaceOutOfRangeValues(input.getBase, - minValue = Scalar.fromLong(lowBound), - maxValue = Scalar.fromLong(upBound), - replaceValue = Scalar.fromNull(input.getBase.getType)) - } - - withResource(checkedInput) { checked => - if (dt.scale < 0) { - // Rounding is essential when scale is negative, - // so we apply HALF_UP rounding manually to keep align with CpuCast. - withResource(checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, 0))) { - scaleZero => scaleZero.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) - } - } else if (dt.scale > 0) { - // Integer will be enlarged during casting if scale > 0, so we cast input to INT64 - // before casting it to decimal in case of overflow. - withResource(checked.castTo(DType.INT64)) { long => - long.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) - } - } else { - checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) - } - } + castIntegralsToDecimal(input.getBase, dt) case (FloatType | DoubleType, dt: DecimalType) => - // Approach to minimize difference between CPUCast and GPUCast: - // step 1. cast input to FLOAT64 (if necessary) - // step 2. cast FLOAT64 to container DECIMAL (who keeps one more digit for rounding) - // step 3. perform HALF_UP rounding on container DECIMAL - val checkedInput = withResource(input.getBase.castTo(DType.FLOAT64)) { double => - val roundedDouble = double.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) - withResource(roundedDouble) { rounded => - // We rely on containerDecimal to perform preciser rounding. So, we have to take extra - // space cost of container into consideration when we run bound check. - val containerScaleBound = DType.DECIMAL64_MAX_PRECISION - (dt.scale + 1) - val bound = math.pow(10, (dt.precision - dt.scale) min containerScaleBound) - if (ansiMode) { - assertValuesInRange(rounded, - minValue = Scalar.fromDouble(-bound), - maxValue = Scalar.fromDouble(bound), - inclusiveMin = false, - inclusiveMax = false) - rounded.incRefCount() - } else { - replaceOutOfRangeValues(rounded, - minValue = Scalar.fromDouble(-bound), - maxValue = Scalar.fromDouble(bound), - inclusiveMin = false, - inclusiveMax = false, - replaceValue = Scalar.fromNull(DType.FLOAT64)) - } - } - } + castFloatsToDecimal(input.getBase, dt) - withResource(checkedInput) { checked => - // If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not - // be created because of precision overflow. In this case, we perform casting op directly. - if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { - checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) - } else { - val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1)) - withResource(checked.castTo(containerType)) { container => - container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) - } - } - } + case (from: DecimalType, to: DecimalType) => + castDecimalToDecimal(input.getBase, from, to) case _ => input.getBase.castTo(GpuColumnVector.getNonNestedRapidsType(dataType)) @@ -917,4 +846,127 @@ case class GpuCast( } } + private def castIntegralsToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = { + + // Use INT64 bounds instead of FLOAT64 bounds, which enables precise comparison. + val (lowBound, upBound) = math.pow(10, dt.precision - dt.scale) match { + case bound if bound > Long.MaxValue => (Long.MinValue, Long.MaxValue) + case bound => (-bound.toLong + 1, bound.toLong - 1) + } + val checkedInput = if (ansiMode) { + assertValuesInRange(input, + minValue = Scalar.fromLong(lowBound), + maxValue = Scalar.fromLong(upBound)) + input.incRefCount() + } else { + replaceOutOfRangeValues(input, + minValue = Scalar.fromLong(lowBound), + maxValue = Scalar.fromLong(upBound), + replaceValue = Scalar.fromNull(input.getType)) + } + + withResource(checkedInput) { checked => + if (dt.scale < 0) { + // Rounding is essential when scale is negative, + // so we apply HALF_UP rounding manually to keep align with CpuCast. + withResource(checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, 0))) { + scaleZero => scaleZero.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) + } + } else if (dt.scale > 0) { + // Integer will be enlarged during casting if scale > 0, so we cast input to INT64 + // before casting it to decimal in case of overflow. + withResource(checked.castTo(DType.INT64)) { long => + long.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + } + } else { + checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + } + } + } + + private def castFloatsToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = { + + // Approach to minimize difference between CPUCast and GPUCast: + // step 1. cast input to FLOAT64 (if necessary) + // step 2. cast FLOAT64 to container DECIMAL (who keeps one more digit for rounding) + // step 3. perform HALF_UP rounding on container DECIMAL + val checkedInput = withResource(input.castTo(DType.FLOAT64)) { double => + val roundedDouble = double.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) + withResource(roundedDouble) { rounded => + // We rely on containerDecimal to perform preciser rounding. So, we have to take extra + // space cost of container into consideration when we run bound check. + val containerScaleBound = DType.DECIMAL64_MAX_PRECISION - (dt.scale + 1) + val bound = math.pow(10, (dt.precision - dt.scale) min containerScaleBound) + if (ansiMode) { + assertValuesInRange(rounded, + minValue = Scalar.fromDouble(-bound), + maxValue = Scalar.fromDouble(bound), + inclusiveMin = false, + inclusiveMax = false) + rounded.incRefCount() + } else { + replaceOutOfRangeValues(rounded, + minValue = Scalar.fromDouble(-bound), + maxValue = Scalar.fromDouble(bound), + inclusiveMin = false, + inclusiveMax = false, + replaceValue = Scalar.fromNull(DType.FLOAT64)) + } + } + } + + withResource(checkedInput) { checked => + // If target scale reaches DECIMAL64_MAX_PRECISION, container DECIMAL can not + // be created because of precision overflow. In this case, we perform casting op directly. + if (DType.DECIMAL64_MAX_PRECISION == dt.scale) { + checked.castTo(DType.create(DType.DTypeEnum.DECIMAL64, -dt.scale)) + } else { + val containerType = DType.create(DType.DTypeEnum.DECIMAL64, -(dt.scale + 1)) + withResource(checked.castTo(containerType)) { container => + container.round(dt.scale, ai.rapids.cudf.RoundMode.HALF_UP) + } + } + } + } + + private def castDecimalToDecimal(input: ColumnVector, + from: DecimalType, + to: DecimalType): ColumnVector = { + + val checkedInput = if (to.scale <= from.scale) { + // No need to promote precision unless target scale is larger than the source one, + // which indicates the cast is always valid when to.scale <= from.scale. + input.incRefCount() + } else { + // Check whether there exists overflow during promoting precision or not. + // We do NOT use `Scalar.fromDecimal(-to.scale, math.pow(10, 18).toLong)` here, because + // cuDF binaryOperation on decimal will rescale right input to fit the left one. + // The rescaling may lead to overflow. + val absBound = math.pow(10, DType.DECIMAL64_MAX_PRECISION + from.scale - to.scale).toLong + if (ansiMode) { + assertValuesInRange(input, + minValue = Scalar.fromDecimal(-from.scale, -absBound), + maxValue = Scalar.fromDecimal(-from.scale, absBound), + inclusiveMin = false, inclusiveMax = false) + input.incRefCount() + } else { + replaceOutOfRangeValues(input, + minValue = Scalar.fromDecimal(-from.scale, -absBound), + maxValue = Scalar.fromDecimal(-from.scale, absBound), + replaceValue = Scalar.fromNull(input.getType), + inclusiveMin = false, inclusiveMax = false) + } + } + + withResource(checkedInput) { checked => + to.scale - from.scale match { + case 0 => + checked.incRefCount() + case diff if diff > 0 => + checked.castTo(GpuColumnVector.getNonNestedRapidsType(to)) + case _ => + checked.round(to.scale, ai.rapids.cudf.RoundMode.HALF_UP) + } + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala index 1b40a7a8b3c..5d8425cf5e5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala @@ -754,7 +754,7 @@ class CastChecks extends ExprChecks { val binaryChecks: TypeSig = none val sparkBinarySig: TypeSig = STRING + BINARY - val decimalChecks: TypeSig = none + val decimalChecks: TypeSig = DECIMAL val sparkDecimalSig: TypeSig = numeric + BOOLEAN + TIMESTAMP + STRING val calendarChecks: TypeSig = none diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 023d75ead4e..7855e5e2083 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -22,6 +22,8 @@ import java.sql.Timestamp import java.time.LocalDateTime import java.util.TimeZone +import scala.collection.JavaConverters._ + import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AnsiCast, Cast} @@ -476,6 +478,38 @@ class CastOpSuite extends GpuExpressionTestSuite { } } + test("cast decimal to decimal") { + // fromScale == toScale + testCastToDecimal(DataTypes.createDecimalType(18, 0), + scale = 0, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 2), + scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) + + // fromScale > toScale + testCastToDecimal(DataTypes.createDecimalType(18, 1), + scale = -1, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 10), + scale = 2, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 18), + scale = 15, + customRandGenerator = Some(new scala.util.Random(1234L))) + + // fromScale < toScale + testCastToDecimal(DataTypes.createDecimalType(18, 0), + scale = 3, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 5), + scale = 10, + customRandGenerator = Some(new scala.util.Random(1234L))) + testCastToDecimal(DataTypes.createDecimalType(18, 10), + scale = 17, + customRandGenerator = Some(new scala.util.Random(1234L))) + } + test("Detect overflow from numeric types to decimal") { def intGenerator(column: Seq[Int])(ss: SparkSession): DataFrame = { import ss.sqlContext.implicits._ @@ -493,6 +527,11 @@ class CastOpSuite extends GpuExpressionTestSuite { import ss.sqlContext.implicits._ column.toDF("col") } + def decimalGenerator(column: Seq[Decimal], decType: DecimalType + )(ss: SparkSession): DataFrame = { + val field = StructField("col", decType) + ss.createDataFrame(column.map(Row(_)).asJava, StructType(Seq(field))) + } def nonOverflowCase(dataType: DataType, generator: SparkSession => DataFrame, precision: Int, @@ -556,6 +595,15 @@ class CastOpSuite extends GpuExpressionTestSuite { generator = floatGenerator(Seq(12345.678f))) overflowCase(DataTypes.DoubleType, precision = 15, scale = -5, generator = doubleGenerator(Seq(1.23e21))) + + // Test 4: overflow caused by decimal rescaling + val decType = DataTypes.createDecimalType(18, 0) + nonOverflowCase(decType, + precision = 18, scale = 10, + generator = decimalGenerator(Seq(Decimal(99999999L)), decType)) + overflowCase(decType, + precision = 18, scale = 10, + generator = decimalGenerator(Seq(Decimal(100000000L)), decType)) } protected def testCastToDecimal( @@ -595,7 +643,7 @@ class CastOpSuite extends GpuExpressionTestSuite { if (!gpuOnly) { val (fromCpu, fromGpu) = runOnCpuAndGpu(createDF, execFun, conf, repart = 0) val (cpuResult, gpuResult) = dataType match { - case ShortType | IntegerType | LongType => + case ShortType | IntegerType | LongType | _: DecimalType => fromCpu.map(r => Row(r.getDecimal(1))) -> fromGpu.map(r => Row(r.getDecimal(1))) case FloatType | DoubleType => // There may be tiny difference between CPU and GPU result when casting from double @@ -605,6 +653,17 @@ class CastOpSuite extends GpuExpressionTestSuite { } fromCpu.map(r => Row(fetchFromRow(r))) -> fromGpu.map(r => Row(fetchFromRow(r))) } + var cc = 0 + fromCpu.zip(fromGpu).foreach { + case (x, y) if x.isNullAt(1) || y.isNullAt(1) => + println((x.isNullAt(1) && y.isNullAt(1), y.get(0), x.get(1), y.get(1))) + if (!(x.isNullAt(1) && y.isNullAt(1))) cc += 1 + case (x, y) => + val (xx, yy) = x.getDecimal(1).unscaledValue() -> y.getDecimal(1).unscaledValue() + if (xx != yy) cc += 1 + println((xx == yy, y.get(0), xx, yy)) + } + println(s"dataType: $dataType; scale: $scale, number of unEqual: $cc \n") compareResults(sort = false, maxFloatDiff, cpuResult, gpuResult) } else { withGpuSparkSession((ss: SparkSession) => execFun(createDF(ss)).collect(), conf) @@ -631,7 +690,7 @@ class CastOpSuite extends GpuExpressionTestSuite { } } val scaleRnd = new scala.util.Random(enhancedRnd.nextLong()) - val rawColumn: Seq[AnyVal] = (0 until rowCount).map { _ => + val rawColumn: Seq[Any] = (0 until rowCount).map { _ => val scale = 18 - scaleRnd.nextInt(integralSize + 1) dataType match { case ShortType => @@ -642,16 +701,27 @@ class CastOpSuite extends GpuExpressionTestSuite { enhancedRnd.nextLong() / math.pow(10, scale max 0).toLong case FloatType | DoubleType => enhancedRnd.nextLong() / math.pow(10, scale + 2) + case dt: DecimalType => + val unscaledValue = (enhancedRnd.nextLong() * math.pow(10, dt.precision - 18)).toLong + Decimal.createUnsafe(unscaledValue, dt.precision, dt.scale) case _ => throw new IllegalArgumentException(s"unsupported dataType: $dataType") } } dataType match { - case ShortType => rawColumn.map(_.asInstanceOf[Long].toShort).toDF("col") - case IntegerType => rawColumn.map(_.asInstanceOf[Long].toInt).toDF("col") - case LongType => rawColumn.map(_.asInstanceOf[Long]).toDF("col") - case FloatType => rawColumn.map(_.asInstanceOf[Double].toFloat).toDF("col") - case DoubleType => rawColumn.map(_.asInstanceOf[Double]).toDF("col") + case ShortType => + rawColumn.map(_.asInstanceOf[Long].toShort).toDF("col") + case IntegerType => + rawColumn.map(_.asInstanceOf[Long].toInt).toDF("col") + case LongType => + rawColumn.map(_.asInstanceOf[Long]).toDF("col") + case FloatType => + rawColumn.map(_.asInstanceOf[Double].toFloat).toDF("col") + case DoubleType => + rawColumn.map(_.asInstanceOf[Double]).toDF("col") + case dt: DecimalType => + val row = rawColumn.map(e => Row(e.asInstanceOf[Decimal])).asJava + ss.createDataFrame(row, StructType(Seq(StructField("col", dt)))) } } } From 26fd0bea9d551208b232c6e4d6d31a75929f9860 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Fri, 15 Jan 2021 15:48:22 +0800 Subject: [PATCH 2/3] remove debug code pieces --- .../scala/com/nvidia/spark/rapids/CastOpSuite.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala index 7855e5e2083..879382a6901 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/CastOpSuite.scala @@ -653,17 +653,6 @@ class CastOpSuite extends GpuExpressionTestSuite { } fromCpu.map(r => Row(fetchFromRow(r))) -> fromGpu.map(r => Row(fetchFromRow(r))) } - var cc = 0 - fromCpu.zip(fromGpu).foreach { - case (x, y) if x.isNullAt(1) || y.isNullAt(1) => - println((x.isNullAt(1) && y.isNullAt(1), y.get(0), x.get(1), y.get(1))) - if (!(x.isNullAt(1) && y.isNullAt(1))) cc += 1 - case (x, y) => - val (xx, yy) = x.getDecimal(1).unscaledValue() -> y.getDecimal(1).unscaledValue() - if (xx != yy) cc += 1 - println((xx == yy, y.get(0), xx, yy)) - } - println(s"dataType: $dataType; scale: $scale, number of unEqual: $cc \n") compareResults(sort = false, maxFloatDiff, cpuResult, gpuResult) } else { withGpuSparkSession((ss: SparkSession) => execFun(createDF(ss)).collect(), conf) From 6f9af4a131329ea33374301717099a54aeeb9bb3 Mon Sep 17 00:00:00 2001 From: sperlingxx Date: Mon, 18 Jan 2021 12:25:20 +0800 Subject: [PATCH 3/3] add some comments --- .../src/main/scala/com/nvidia/spark/rapids/GpuCast.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala index 51635747167..6cae021bafa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCast.scala @@ -853,6 +853,8 @@ case class GpuCast( case bound if bound > Long.MaxValue => (Long.MinValue, Long.MaxValue) case bound => (-bound.toLong + 1, bound.toLong - 1) } + // At first, we conduct overflow check onto input column. + // Then, we cast checked input into target decimal type. val checkedInput = if (ansiMode) { assertValuesInRange(input, minValue = Scalar.fromLong(lowBound), @@ -933,6 +935,8 @@ case class GpuCast( from: DecimalType, to: DecimalType): ColumnVector = { + // At first, we conduct overflow check onto input column. + // Then, we cast checked input into target decimal type. val checkedInput = if (to.scale <= from.scale) { // No need to promote precision unless target scale is larger than the source one, // which indicates the cast is always valid when to.scale <= from.scale.