Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] decimal meta adjustment for GPU runtime #976

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,12 @@ object GpuOverrides {
expr[AnsiCast](
"Convert a column of one type of data into another type",
(cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, true, conf, p, r)),
expr[PromotePrecision](
"PromotePrecision before arithmetic operations between DecimalType data",
(a, conf, p, r) => new PromotePrecisionExprMeta(a, conf, p, r)),
expr[CheckOverflow](
"CheckOverflow after arithmetic operations between DecimalType data",
(a, conf, p, r) => new CheckOverflowExprMeta(a, conf, p, r)),
expr[ToDegrees](
"Converts radians to degrees",
(a, conf, p, r) => new UnaryExprMeta[ToDegrees](a, conf, p, r) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import scala.math.{max, min}

import org.apache.spark.sql.catalyst.expressions.{CheckOverflow, Expression, PromotePrecision}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.types.{DataType, DecimalType}


/**
* A GPU substitution of CheckOverflow, serves as a placeholder.
*/
case class GpuCheckOverflow(child: Expression) extends GpuUnaryExpression {
override protected def doColumnar(input: GpuColumnVector): GpuColumnVector = input
override def dataType: DataType = child.dataType
}

/**
* A GPU substitution of PromotePrecision, serves as a placeholder.
*/
case class GpuPromotePrecision(child: Expression) extends GpuUnaryExpression {
override protected def doColumnar(input: GpuColumnVector): GpuColumnVector = input
override def dataType: DataType = child.dataType
}

/** Meta-data for checkOverflow */
class CheckOverflowExprMeta(
expr: CheckOverflow,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: ConfKeysAndIncompat)
extends UnaryExprMeta[CheckOverflow](expr, conf, parent, rule) {
override def convertToGpu(child: Expression): GpuExpression = {
child match {
// For Add | Subtract | Remainder | Pmod | IntegralDivide,
// resultTypes have less or same precision compared with inputTypes.
// Since inputTypes are checked in PromotePrecisionExprMeta, resultTypes are also safe.
case _: GpuAdd =>
case _: GpuSubtract =>
case _: GpuRemainder =>
case _: GpuPmod =>
case _: GpuIntegralDivide =>
// For Multiply, we need to infer result's precision from inputs' precision.
case GpuMultiply(GpuPromotePrecision(lhs: GpuCast), _) =>
val dt = lhs.dataType.asInstanceOf[DecimalType]
if (dt.precision * 2 + 1 > DecimalExpressions.GPU_MAX_PRECISION) {
throw new IllegalStateException("DecimalPrecision overflow may occur because " +
s"inferred result precision(${dt.precision * 2 + 1}) exceeds GPU_MAX_PRECISION.")
}
// For Divide, we need to infer result's precision from inputs' precision and scale.
case GpuDivide(GpuPromotePrecision(lhs: GpuCast), _) =>
val dt = lhs.dataType.asInstanceOf[DecimalType]
val scale = max(DecimalExpressions.GPU_MINIMUM_ADJUSTED_SCALE, dt.precision + dt.scale + 1)
if (dt.precision + scale > DecimalExpressions.GPU_MAX_PRECISION) {
throw new IllegalStateException("DecimalPrecision overflow may occur because " +
s"inferred result precision(${dt.precision + scale}) exceeds GPU_MAX_PRECISION.")
}
case c =>
throw new IllegalAccessException(
s"Unknown child expression of CheckOverflow ${c.prettyName}.")
}
GpuCheckOverflow(child)
}
}

/** Meta-data for promotePrecision */
class PromotePrecisionExprMeta(
expr: PromotePrecision,
conf: RapidsConf,
parent: Option[RapidsMeta[_, _, _]],
rule: ConfKeysAndIncompat)
extends UnaryExprMeta[PromotePrecision](expr, conf, parent, rule) {
override def convertToGpu(child: Expression): GpuExpression = {
child match {
case GpuCast(cc: Expression, dt: DecimalType, a: Boolean, t: Option[String]) =>
// refine DecimalTypeMeta with GPU constraints according to same strategy as CPU runtime
val refinedDt = if (SQLConf.get.decimalOperationsAllowPrecisionLoss) {
DecimalExpressions.adjustPrecisionScale(dt)
} else {
DecimalExpressions.bounded(dt.precision, dt.scale)
}
GpuPromotePrecision(GpuCast(cc, refinedDt, a, t))
case c => throw new IllegalStateException(
s"Child expression of PromotePrecision should always be GpuCast with DecimalType, " +
s"but found ${c.prettyName}")
}
}
}

object DecimalExpressions {
// Underlying storage type of decimal data in cuDF is int64_t, whose max capacity is 19.
val GPU_MAX_PRECISION: Int = 19
val GPU_MAX_SCALE: Int = 19
// Keep up with MINIMUM_ADJUSTED_SCALE, is this better to be configurable?
val GPU_MINIMUM_ADJUSTED_SCALE = 6

/**
* A forked version of [[org.apache.spark.sql.types.DecimalType]] with GPU constants replacement
*/
private[rapids] def adjustPrecisionScale(dt: DecimalType): DecimalType = {
if (dt.precision <= GPU_MAX_PRECISION) {
// Adjustment only needed when we exceed max precision
dt
} else if (dt.scale < 0) {
// Decimal can have negative scale (SPARK-24468). In this case, we cannot allow a precision
// loss since we would cause a loss of digits in the integer part.
// In this case, we are likely to meet an overflow.
DecimalType(GPU_MAX_PRECISION, dt.scale)
} else {
// Precision/scale exceed maximum precision. Result must be adjusted to MAX_PRECISION.
val intDigits = dt.precision - dt.scale
// If original scale is less than MINIMUM_ADJUSTED_SCALE, use original scale value; otherwise
// preserve at least MINIMUM_ADJUSTED_SCALE fractional digits
val minScaleValue = Math.min(dt.scale, GPU_MINIMUM_ADJUSTED_SCALE)
// The resulting scale is the maximum between what is available without causing a loss of
// digits for the integer part of the decimal and the minimum guaranteed scale, which is
// computed above
val adjustedScale = Math.max(GPU_MAX_PRECISION - intDigits, minScaleValue)

DecimalType(GPU_MAX_PRECISION, adjustedScale)
}
}

/**
* A forked version of [[org.apache.spark.sql.types.DecimalType]] with GPU constants replacement
*/
private[rapids] def bounded(precision: Int, scale: Int): DecimalType = {
DecimalType(min(precision, GPU_MAX_PRECISION), min(scale, GPU_MAX_SCALE))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.unit

import com.nvidia.spark.rapids._
import org.scalatest.FunSuite
import org.scalatest.Matchers.convertToAnyShouldWrapper

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.{Add, Cast, CheckOverflow, Expression, Literal, Multiply, Pmod, PromotePrecision, Subtract}
import org.apache.spark.sql.rapids._
import org.apache.spark.sql.types.{DecimalType, DoubleType, IntegerType}


class DecimalUnitTest extends FunSuite with Arm {
test("GpuDecimalExpressionMeta") {
val rapidsConf = new RapidsConf(Map[String, String]())
val testWrapper = (input: Expression, expected: Expression) => {
val output = GpuOverrides.wrapExpr(input, rapidsConf, None).convertToGpu()
println(output.sql)
println(expected.sql)
expected.semanticEquals(output) shouldBe true
}
val sparkConf = new SparkConf().set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
TestUtils.withGpuSparkSession(sparkConf) { _ =>
// no need to adjust
testWrapper(
PromotePrecision(Cast(Literal(1.0), DecimalType(10, 3))),
GpuPromotePrecision(GpuCast(GpuLiteral(1.0, DoubleType), DecimalType(10, 3))))
// GPU_MAX_PRECISION - intDigits = 19 - (22 - 10) = 7
// minScaleValue = min(10, GPU_MINIMUM_ADJUSTED_SCALE) = 6
// adjustedScale = max(7, 6) = 7
testWrapper(
PromotePrecision(Cast(Literal(1.0), DecimalType(22, 10))),
GpuPromotePrecision(GpuCast(GpuLiteral(1.0, DoubleType), DecimalType(19, 7))))
// GPU_MAX_PRECISION - intDigits = 19 - (20 - 5) = 4
// minScaleValue = min(5, GPU_MINIMUM_ADJUSTED_SCALE) = 5
// adjustedScale = max(4, 5) = 5
testWrapper(
PromotePrecision(Cast(Literal(1.0), DecimalType(20, 5))),
GpuPromotePrecision(GpuCast(GpuLiteral(1.0, DoubleType), DecimalType(19, 5))))

// GPU_MAX_PRECISION - intDigits = 19 - (30 - 15) = 4
// minScaleValue = min(15, GPU_MINIMUM_ADJUSTED_SCALE) = 6
// adjustedScale = max(4, 6) = 6
var cpuPP = PromotePrecision(Cast(Literal(1), DecimalType(30, 15)))
var gpuPP = GpuPromotePrecision(GpuCast(GpuLiteral(1, IntegerType), DecimalType(19, 6)))
testWrapper(
CheckOverflow(Add(cpuPP, cpuPP), DecimalType(20, 3), false),
GpuCheckOverflow(GpuAdd(gpuPP, gpuPP)))
testWrapper(
CheckOverflow(Subtract(cpuPP, cpuPP), DecimalType(20, 3), false),
GpuCheckOverflow(GpuSubtract(gpuPP, gpuPP)))
testWrapper(
CheckOverflow(Pmod(cpuPP, cpuPP), DecimalType(20, 3), false),
GpuCheckOverflow(GpuPmod(gpuPP, gpuPP)))

cpuPP = PromotePrecision(Cast(Literal(1), DecimalType(10, 3)))
gpuPP = GpuPromotePrecision(GpuCast(GpuLiteral(1, IntegerType), DecimalType(10, 3)))
assertThrows[IllegalStateException] {
testWrapper(
CheckOverflow(Multiply(cpuPP, cpuPP), DecimalType(20, 6), false),
GpuCheckOverflow(GpuMultiply(gpuPP, gpuPP)))
}
}

sparkConf.set("spark.sql.decimalOperations.allowPrecisionLoss", "false")
TestUtils.withGpuSparkSession(sparkConf) { _ =>
testWrapper(
PromotePrecision(Cast(Literal(1.0), DecimalType(22, 10))),
GpuPromotePrecision(GpuCast(GpuLiteral(1.0, DoubleType), DecimalType(19, 10))))
testWrapper(
PromotePrecision(Cast(Literal(1.0), DecimalType(30, 20))),
GpuPromotePrecision(GpuCast(GpuLiteral(1.0, DoubleType), DecimalType(19, 19))))
}
}
}