From 39d8adc1d4b1d0bf117b2e2a0040bb0a635dc781 Mon Sep 17 00:00:00 2001 From: rwlee Date: Thu, 21 Jan 2021 14:37:22 -0800 Subject: [PATCH] Decimal support for add and subtract (#1561) Signed-off-by: Ryan Lee Co-authored-by: Robert (Bobby) Evans --- docs/configs.md | 2 + docs/supported_ops.md | 192 +++++++++++++++++- .../src/main/python/arithmetic_ops_test.py | 13 +- .../nvidia/spark/rapids/GpuOverrides.scala | 21 +- .../spark/rapids/decimalExpressions.scala | 47 ++++- 5 files changed, 256 insertions(+), 19 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index b54b95a997a..b499a4d96f8 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -124,6 +124,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Cast|`timestamp`, `tinyint`, `binary`, `float`, `smallint`, `string`, `decimal`, `double`, `boolean`, `cast`, `date`, `int`, `bigint`|Convert a column of one type of data into another type|true|None| spark.rapids.sql.expression.Cbrt|`cbrt`|Cube root|true|None| spark.rapids.sql.expression.Ceil|`ceiling`, `ceil`|Ceiling of a number|true|None| +spark.rapids.sql.expression.CheckOverflow| |CheckOverflow after arithmetic operations between DecimalType data|true|None| spark.rapids.sql.expression.Coalesce|`coalesce`|Returns the first non-null argument if exists. Otherwise, null|true|None| spark.rapids.sql.expression.Concat|`concat`|String concatenate NO separator|true|None| spark.rapids.sql.expression.Contains| |Contains|true|None| @@ -193,6 +194,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Or|`or`|Logical OR|true|None| spark.rapids.sql.expression.Pmod|`pmod`|Pmod|true|None| spark.rapids.sql.expression.Pow|`pow`, `power`|lhs ^ rhs|true|None| +spark.rapids.sql.expression.PromotePrecision| |PromotePrecision before arithmetic operations between DecimalType data|true|None| spark.rapids.sql.expression.PythonUDF| |UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated.|true|None| spark.rapids.sql.expression.Quarter|`quarter`|Returns the quarter of the year for date, in the range 1 to 4|true|None| spark.rapids.sql.expression.Rand|`random`, `rand`|Generate a random column with i.i.d. uniformly distributed values in [0, 1)|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 28ee94e9720..5b352cf91dd 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -1068,7 +1068,7 @@ Accelerator support is described below. -NS +S* NS @@ -1089,7 +1089,7 @@ Accelerator support is described below. -NS +S* NS @@ -1110,7 +1110,7 @@ Accelerator support is described below. -NS +S* NS @@ -2834,6 +2834,96 @@ Accelerator support is described below. +CheckOverflow + +CheckOverflow after arithmetic operations between DecimalType data +None +project +input + + + + + + + + + + +S* + + + + + + + + + +result + + + + + + + + + + +S* + + + + + + + + + +lambda +input + + + + + + + + + + +NS + + + + + + + + + +result + + + + + + + + + + +NS + + + + + + + + + Coalesce `coalesce` Returns the first non-null argument if exists. Otherwise, null @@ -9859,6 +9949,96 @@ Accelerator support is described below. +PromotePrecision + +PromotePrecision before arithmetic operations between DecimalType data +None +project +input + + + + + + + + + + +S* + + + + + + + + + +result + + + + + + + + + + +S* + + + + + + + + + +lambda +input + + + + + + + + + + +NS + + + + + + + + + +result + + + + + + + + + + +NS + + + + + + + + + PythonUDF UDF run in an external python process. Does not actually run on the GPU, but the transfer of data to/from it can be accelerated. @@ -13540,7 +13720,7 @@ Accelerator support is described below. -NS +S* NS @@ -13561,7 +13741,7 @@ Accelerator support is described below. -NS +S* NS @@ -13582,7 +13762,7 @@ Accelerator support is described below. -NS +S* NS diff --git a/integration_tests/src/main/python/arithmetic_ops_test.py b/integration_tests/src/main/python/arithmetic_ops_test.py index 3ce14d5bee9..feb75b7082b 100644 --- a/integration_tests/src/main/python/arithmetic_ops_test.py +++ b/integration_tests/src/main/python/arithmetic_ops_test.py @@ -21,7 +21,10 @@ from spark_session import with_spark_session, is_before_spark_310 import pyspark.sql.functions as f -@pytest.mark.parametrize('data_gen', numeric_gens, ids=idfn) +decimal_gens_not_max_prec = [decimal_gen_neg_scale, decimal_gen_scale_precision, + decimal_gen_same_scale_precision, decimal_gen_64bit] + +@pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens_not_max_prec, ids=idfn) def test_addition(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -30,9 +33,10 @@ def test_addition(data_gen): f.lit(-12).cast(data_type) + f.col('b'), f.lit(None).cast(data_type) + f.col('a'), f.col('b') + f.lit(None).cast(data_type), - f.col('a') + f.col('b'))) + f.col('a') + f.col('b')), + conf=allow_negative_scale_of_decimal_conf) -@pytest.mark.parametrize('data_gen', numeric_gens, ids=idfn) +@pytest.mark.parametrize('data_gen', numeric_gens + decimal_gens_not_max_prec, ids=idfn) def test_subtraction(data_gen): data_type = data_gen.data_type assert_gpu_and_cpu_are_equal_collect( @@ -41,7 +45,8 @@ def test_subtraction(data_gen): f.lit(-12).cast(data_type) - f.col('b'), f.lit(None).cast(data_type) - f.col('a'), f.col('b') - f.lit(None).cast(data_type), - f.col('a') - f.col('b'))) + f.col('a') - f.col('b')), + conf=allow_negative_scale_of_decimal_conf) @pytest.mark.parametrize('data_gen', numeric_gens, ids=idfn) def test_multiplication(data_gen): diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index de1c103ed04..75c80e1599a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.text.TextFileFormat import org.apache.spark.sql.execution.datasources.v2.{AlterNamespaceSetPropertiesExec, AlterTableExec, AtomicReplaceTableExec, BatchScanExec, CreateNamespaceExec, CreateTableExec, DeleteFromTableExec, DescribeNamespaceExec, DescribeTableExec, DropNamespaceExec, DropTableExec, RefreshTableExec, RenameTableExec, ReplaceTableExec, SetCatalogAndNamespaceExec, ShowCurrentNamespaceExec, ShowNamespacesExec, ShowTablePropertiesExec, ShowTablesExec} import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan -import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.python._ @@ -706,6 +705,14 @@ object GpuOverrides { } } }), + expr[PromotePrecision]( + "PromotePrecision before arithmetic operations between DecimalType data", + ExprChecks.unaryProjectNotLambdaInputMatchesOutput(TypeSig.DECIMAL, TypeSig.DECIMAL), + (a, conf, p, r) => new PromotePrecisionExprMeta(a, conf, p, r)), + expr[CheckOverflow]( + "CheckOverflow after arithmetic operations between DecimalType data", + ExprChecks.unaryProjectNotLambdaInputMatchesOutput(TypeSig.DECIMAL, TypeSig.DECIMAL), + (a, conf, p, r) => new CheckOverflowExprMeta(a, conf, p, r)), expr[ToDegrees]( "Converts radians to degrees", ExprChecks.mathUnary, @@ -1377,9 +1384,9 @@ object GpuOverrides { expr[Add]( "Addition", ExprChecks.binaryProjectNotLambda( - TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval, - ("lhs", TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval), - ("rhs", TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval)), + TypeSig.numeric, TypeSig.numericAndInterval, + ("lhs", TypeSig.numeric, TypeSig.numericAndInterval), + ("rhs", TypeSig.numeric, TypeSig.numericAndInterval)), (a, conf, p, r) => new BinaryExprMeta[Add](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuAdd(lhs, rhs) @@ -1387,9 +1394,9 @@ object GpuOverrides { expr[Subtract]( "Subtraction", ExprChecks.binaryProjectNotLambda( - TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval, - ("lhs", TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval), - ("rhs", TypeSig.integral + TypeSig.fp, TypeSig.numericAndInterval)), + TypeSig.numeric, TypeSig.numericAndInterval, + ("lhs", TypeSig.numeric, TypeSig.numericAndInterval), + ("rhs", TypeSig.numeric, TypeSig.numericAndInterval)), (a, conf, p, r) => new BinaryExprMeta[Subtract](a, conf, p, r) { override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = GpuSubtract(lhs, rhs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala index d9623fd9f6c..6361d4a7761 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/decimalExpressions.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,10 +16,53 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{ColumnVector, DType, Scalar} +import scala.math.{max, min} -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{CheckOverflow, Expression, PromotePrecision} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.rapids._ import org.apache.spark.sql.types.{DataType, DecimalType, LongType} +/** + * A GPU substitution of CheckOverflow, serves as a placeholder. + */ +case class GpuCheckOverflow(child: Expression) extends GpuUnaryExpression { + override protected def doColumnar(input: GpuColumnVector): ColumnVector = + input.getBase.incRefCount() + override def dataType: DataType = child.dataType +} + +/** + * A GPU substitution of PromotePrecision, serves as a placeholder. + */ +case class GpuPromotePrecision(child: Expression) extends GpuUnaryExpression { + override protected def doColumnar(input: GpuColumnVector): ColumnVector = + input.getBase.incRefCount() + override def dataType: DataType = child.dataType +} + +/** Meta-data for checkOverflow */ +class CheckOverflowExprMeta( + expr: CheckOverflow, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends UnaryExprMeta[CheckOverflow](expr, conf, parent, rule) { + override def convertToGpu(child: Expression): GpuExpression = + GpuCheckOverflow(child) +} + +/** Meta-data for promotePrecision */ +class PromotePrecisionExprMeta( + expr: PromotePrecision, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends UnaryExprMeta[PromotePrecision](expr, conf, parent, rule) { + override def convertToGpu(child: Expression): GpuExpression = + GpuPromotePrecision(child) +} + case class GpuUnscaledValue(child: Expression) extends GpuUnaryExpression { override def dataType: DataType = LongType override def toString: String = s"UnscaledValue($child)"