Skip to content

Commit

Permalink
Add a config to disable decimal types by default (NVIDIA#1278)
Browse files Browse the repository at this point in the history
Signed-off-by: Alessandro Bellina <abellina@nvidia.com>
  • Loading branch information
abellina authored Dec 7, 2020
1 parent 8b917ab commit cb47ba8
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 33 deletions.
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Name | Description | Default Value
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalType.enabled"></a>spark.rapids.sql.decimalType.enabled|Enable decimal type support on the GPU. Decimal support on the GPU is limited to less than 18 digits and is only supported by a small number of operations currently. This can result in a lot of data movement to and from the GPU, which can slow down processing in some cases.|false
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
Expand Down
10 changes: 7 additions & 3 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ to document what operations are supported and what data types they support.
## `Decimal`
The `Decimal` type in Spark supports a precision
up to 38 digits (128-bits). The RAPIDS Accelerator stores values up to 64-bits and as such only
supports a precision up to 18 digits.
supports a precision up to 18 digits. Note that decimals are disabled by default in the plugin
because they are supported by a small number of operations presently, which can result in a lot
of data movement to and from the GPU, slowing down processing in some cases.

## `Timestamp`
Timestamps in Spark will all be converted to the local time zone before processing
Expand Down Expand Up @@ -722,7 +724,8 @@ Accelerator supports are described below.
</table>
* as was state previously Decimal is only supported up to a precision of
18 and Timestamp is only supported in the
UTC time zone.
UTC time zone. Decimals are off by default due to performance impact in
some cases.

# `Expression` and SQL Functions
Inside each node in the DAG there can be one or more trees of expressions
Expand Down Expand Up @@ -15375,7 +15378,8 @@ Accelerator support is described below.
</table>
* as was state previously Decimal is only supported up to a precision of
18 and Timestamp is only supported in the
UTC time zone.
UTC time zone. Decimals are off by default due to performance impact in
some cases.

## `Cast`
The above table does not show what is and is not supported for cast very well.
Expand Down
2 changes: 2 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ def with_gpu_session(func, conf={}):
else:
copy['spark.rapids.sql.test.enabled'] = 'true'
copy['spark.rapids.sql.test.allowedNonGpu'] = ','.join(get_non_gpu_allowed())
# TODO: remove when decimal types can be enabled by default
copy['spark.rapids.sql.decimalType.enabled'] = 'true'
return with_spark_session(func, conf=copy)

def is_spark_300():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class StringOperatorsDiagnostics extends SparkQueryCompareTestSuite {
def generateResults(gen : org.apache.spark.sql.Column => org.apache.spark.sql.Column):
(Array[Row], Array[Row]) = {
val (testConf, qualifiedTestName) = setupTestConfAndQualifierName("", true, false,
new SparkConf(), Seq.empty, 0.0, false)
new SparkConf(), Seq.empty, 0.0, false, false)
runOnCpuAndGpu(TestCodepoints.validCodepointCharsDF,
frame => frame.select(gen(col("strings"))), testConf)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ object GpuOverrides {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true,
allowDecimal = conf.decimalTypeEnabled,
allowCalendarInterval = true)
}),
expr[Signum](
Expand All @@ -667,7 +667,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(child: Expression): GpuExpression =
GpuAlias(child, a.name)(a.exprId, a.qualifier, a.explicitMetadata)
Expand All @@ -682,7 +682,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

// This is the only NOOP operator. It goes away when things are bound
override def convertToGpu(): Expression = att
Expand Down Expand Up @@ -878,7 +878,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(child: Expression): GpuExpression = GpuIsNull(child)
}),
Expand All @@ -892,7 +892,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(child: Expression): GpuExpression = GpuIsNotNull(child)
}),
Expand Down Expand Up @@ -1295,7 +1295,7 @@ object GpuOverrides {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuEqualTo(lhs, rhs)
Expand All @@ -1306,7 +1306,7 @@ object GpuOverrides {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGreaterThan(lhs, rhs)
Expand All @@ -1317,7 +1317,7 @@ object GpuOverrides {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuGreaterThanOrEqual(lhs, rhs)
Expand Down Expand Up @@ -1372,7 +1372,7 @@ object GpuOverrides {

GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuLessThan(lhs, rhs)
Expand All @@ -1383,7 +1383,7 @@ object GpuOverrides {
override def isSupportedType(t: DataType): Boolean =
GpuOverrides.isSupportedType(t,
allowNull = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression =
GpuLessThanOrEqual(lhs, rhs)
Expand Down Expand Up @@ -2010,7 +2010,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(): GpuExec =
GpuProjectExec(childExprs.map(_.convertToGpu()), childPlans(0).convertIfNeeded())
Expand Down Expand Up @@ -2112,7 +2112,7 @@ object GpuOverrides {
allowArray = true,
allowStruct = true,
allowNesting = true,
allowDecimal = true)
allowDecimal = conf.decimalTypeEnabled)

override def convertToGpu(): GpuExec =
GpuFilterExec(childExprs(0).convertToGpu(), childPlans(0).convertIfNeeded())
Expand Down
10 changes: 10 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,14 @@ object RapidsConf {
.booleanConf
.createWithDefault(false)

val DECIMAL_TYPE_ENABLED = conf("spark.rapids.sql.decimalType.enabled")
.doc("Enable decimal type support on the GPU. Decimal support on the GPU is limited to " +
"less than 18 digits and is only supported by a small number of operations currently. " +
"This can result in a lot of data movement to and from the GPU, which can slow down " +
"processing in some cases.")
.booleanConf
.createWithDefault(false)

val ENABLE_REPLACE_SORTMERGEJOIN = conf("spark.rapids.sql.replaceSortMergeJoin.enabled")
.doc("Allow replacing sortMergeJoin with HashJoin")
.booleanConf
Expand Down Expand Up @@ -965,6 +973,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isFloatAggEnabled: Boolean = get(ENABLE_FLOAT_AGG)

lazy val decimalTypeEnabled: Boolean = get(DECIMAL_TYPE_ENABLED)

lazy val explain: String = get(EXPLAIN)

lazy val isImprovedTimestampOpsEnabled: Boolean = get(IMPROVED_TIMESTAMP_OPS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
val conf = makeBatchedBytes(1)
.set(RapidsConf.MAX_READER_BATCH_SIZE_ROWS.key, "1")
.set(RapidsConf.MAX_READER_BATCH_SIZE_BYTES.key, "1")
.set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true")
.set("spark.sql.shuffle.partitions", "1")

withGpuSparkSession(spark => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ class ProjectExprSuite extends SparkQueryCompareTestSuite {

val createDF = (ss: SparkSession) => ss.read.parquet(path)
val fun = (df: DataFrame) => df.withColumn("dec", df("decimals")).select("dec")
val conf = new SparkConf().set("spark.rapids.sql.exec.FileSourceScanExec", "false")
val conf = new SparkConf()
.set("spark.rapids.sql.exec.FileSourceScanExec", "false")
.set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true")
val (fromCpu, fromGpu) = runOnCpuAndGpu(createDF, fun, conf, repart = 0)
compareResults(false, 0.0, fromCpu, fromGpu)
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,12 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
maxFloatDiff: Double = 0.0,
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false)
sortBeforeRepart: Boolean = false,
decimalTypeEnabled: Boolean = true)
(fun: DataFrame => DataFrame): Unit = {
val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimalTypeEnabled)
test(qualifiedTestName) {
val (fromCpu, _, fromGpu, gpuPlan) = runOnCpuAndGpuWithCapture(df, fun,
conf = testConf,
Expand Down Expand Up @@ -638,7 +639,8 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
conf: SparkConf,
execsAllowedNonGpu: Seq[String],
maxFloatDiff: Double,
sortBeforeRepart: Boolean): (SparkConf, String) = {
sortBeforeRepart: Boolean,
decimalTypeEnabled: Boolean): (SparkConf, String) = {

var qualifiers = Set[String]()
var testConf = conf
Expand All @@ -658,6 +660,11 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
qualifiers = qualifiers + s"NOT ON GPU[$execStr]"
}

if (decimalTypeEnabled) {
testConf = testConf.clone().set(RapidsConf.DECIMAL_TYPE_ENABLED.key, "true")
qualifiers = qualifiers + "WITH DECIMALS"
}

testConf.set("spark.sql.execution.sortBeforeRepartition", sortBeforeRepart.toString)
val qualifiedTestName = qualifiers.mkString("", ", ",
(if (qualifiers.nonEmpty) ": " else "") + testName)
Expand Down Expand Up @@ -711,12 +718,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false,
assumeCondition: SparkSession => (Boolean, String) = null)
assumeCondition: SparkSession => (Boolean, String) = null,
decimalTypeEnabled: Boolean = true)
(fun: DataFrame => DataFrame): Unit = {

val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimalTypeEnabled)

test(qualifiedTestName) {
if (assumeCondition != null) {
Expand All @@ -740,12 +748,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false)
(fun: DataFrame => DataFrame)
(fun: DataFrame => DataFrame,
decimalTypeEnabled: Boolean = true)
(validateCapturedPlans: (SparkPlan, SparkPlan) => Unit): Unit = {

val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimalTypeEnabled)
test(qualifiedTestName) {
val (fromCpu, cpuPlan, fromGpu, gpuPlan) = runOnCpuAndGpuWithCapture(df, fun,
conf = testConf,
Expand Down Expand Up @@ -786,10 +795,11 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
maxFloatDiff: Double = 0.0,
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false)(fun: DataFrame => DataFrame): Unit = {
sortBeforeRepart: Boolean = false,
decimalTypeEnabled: Boolean = true)(fun: DataFrame => DataFrame): Unit = {
val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimalTypeEnabled)

test(qualifiedTestName) {
val t = Try({
Expand Down Expand Up @@ -821,12 +831,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
maxFloatDiff: Double = 0.0,
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false)
sortBeforeRepart: Boolean = false,
decimaTypeEnabled: Boolean = true)
(fun: DataFrame => DataFrame)(implicit classTag: ClassTag[T]): Unit = {
val clazz = classTag.runtimeClass
val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimaTypeEnabled)

test(qualifiedTestName) {
val t = Try({
Expand Down Expand Up @@ -854,12 +865,13 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
maxFloatDiff: Double = 0.0,
incompat: Boolean = false,
execsAllowedNonGpu: Seq[String] = Seq.empty,
sortBeforeRepart: Boolean = false)
sortBeforeRepart: Boolean = false,
decimalTypeEnabled: Boolean = true)
(fun: (DataFrame, DataFrame) => DataFrame): Unit = {

val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, incompat, sort, conf, execsAllowedNonGpu,
maxFloatDiff, sortBeforeRepart)
maxFloatDiff, sortBeforeRepart, decimalTypeEnabled)

testConf.set("spark.sql.execution.sortBeforeRepartition", sortBeforeRepart.toString)
test(qualifiedTestName) {
Expand Down Expand Up @@ -915,10 +927,11 @@ trait SparkQueryCompareTestSuite extends FunSuite with Arm {
reader: (SparkSession, String) => DataFrame,
conf: SparkConf = new SparkConf(),
sortBeforeRepart: Boolean = false,
sort: Boolean = false): Unit = {
sort: Boolean = false,
decimalTypeEnabled: Boolean = true): Unit = {
val (testConf, qualifiedTestName) =
setupTestConfAndQualifierName(testName, false, sort, conf, Nil,
0.0, sortBeforeRepart)
0.0, sortBeforeRepart, decimalTypeEnabled)

test(qualifiedTestName) {
val (fromCpu, fromGpu) = writeWithCpuAndGpu(df, writer, reader, testConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,25 @@ class DecimalUnitTest extends GpuUnitTests {
}
}

private val rapidsConf = new RapidsConf(Map[String, String]())
private val rapidsConf = new RapidsConf(Map[String, String](
RapidsConf.DECIMAL_TYPE_ENABLED.key -> "true"
))

private val lit = Literal(dec32Data(0), DecimalType(dec32Data(0).precision, dec32Data(0).scale))

test("decimals are off by default") {
// decimals should be disabled by default
val rapidsConfDefault = new RapidsConf(Map[String, String]())
val wrapperLit = GpuOverrides.wrapExpr(lit, rapidsConfDefault, None)
wrapperLit.tagForGpu()
assertResult(false)(wrapperLit.canExprTreeBeReplaced)

// use the tests' rapidsConf, which enables decimals
val wrapperLitSupported = GpuOverrides.wrapExpr(lit, rapidsConf, None)
wrapperLitSupported.tagForGpu()
assertResult(true)(wrapperLitSupported.canExprTreeBeReplaced)
}

test("test Literal with decimal") {
val wrapperLit = GpuOverrides.wrapExpr(lit, rapidsConf, None)
wrapperLit.tagForGpu()
Expand Down

0 comments on commit cb47ba8

Please sign in to comment.