Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial support for reading decimal types from JSON and CSV #4825

Merged
merged 11 commits into from
Mar 7, 2022
43 changes: 13 additions & 30 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,14 +283,7 @@ will produce a different result compared to the plugin.
Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csv.read.bool.enabled](configs.md#sql.csv.read.bool.enabled)
* [spark.rapids.sql.csv.read.byte.enabled](configs.md#sql.csv.read.byte.enabled)
* [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled)
* [spark.rapids.sql.csv.read.double.enabled](configs.md#sql.csv.read.double.enabled)
* [spark.rapids.sql.csv.read.float.enabled](configs.md#sql.csv.read.float.enabled)
* [spark.rapids.sql.csv.read.integer.enabled](configs.md#sql.csv.read.integer.enabled)
* [spark.rapids.sql.csv.read.long.enabled](configs.md#sql.csv.read.long.enabled)
* [spark.rapids.sql.csv.read.short.enabled](configs.md#sql.csv.read.short.enabled)
* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are not specific to this PR and could be PR'd separately


If you know that your particular data type will be parsed correctly enough, you may enable each
Expand All @@ -307,14 +300,6 @@ default. The plugin will strip leading and trailing space for all values except

There are also discrepancies/issues with specific types that are detailed below.

### CSV Boolean

Invalid values like `BAD` show up as `true` as described by this
[issue](https://github.com/NVIDIA/spark-rapids/issues/2071)

This is the same for all other types, but because that is the only issue with boolean parsing
we have called it out specifically here.

### CSV Strings
Writing strings to a CSV file in general for Spark can be problematic unless you can ensure that
your data does not have any line deliminators in it. The GPU accelerated CSV parser handles quoted
Expand Down Expand Up @@ -383,10 +368,6 @@ Also parsing of some values will not produce bit for bit identical results to wh
They are within round-off errors except when they are close enough to overflow to Inf or -Inf which
then results in a number being returned when the CPU would have returned null.

### CSV Integer

Any number that overflows will not be turned into a null value.

## ORC

The ORC format has fairly complete support for both reads and writes. There are only a few known
Expand Down Expand Up @@ -475,18 +456,13 @@ The nested types(array, map and struct) are not supported yet in current version

Parsing floating-point values has the same limitations as [casting from string to float](#String-to-Float).

The GPU JSON reader does not support `NaN` and `Inf` values with full compatibility with Spark.
Prior to Spark 3.3.0, reading JSON strings such as `"+Infinity"` when specifying that the data type is `FloatType`
or `DoubleType` caused these values to be parsed even when `allowNonNumericNumbers` is set to false. Also, Spark
versions prior to 3.3.0 only supported the `"Infinity"` and `"-Infinity"` representations of infinity and did not
support `"+INF"`, `"-INF"`, or `"+Infinity"`, which Spark considers valid when unquoted. The GPU JSON reader is
consistent with the behavior in Spark 3.3.0 and later.

The following are the only formats that are parsed consistently between CPU and GPU. Any other variation, including
these formats when unquoted, will produce `null` on the CPU and may produce valid `NaN` and `Inf` results on the GPU.

```json
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
```

Another limitation of the GPU JSON reader is that it will parse strings containing floating-point values where
Another limitation of the GPU JSON reader is that it will parse strings containing boolean or numeric values where
Spark will treat them as invalid inputs and will just return `null`.

### JSON Schema discovery
Expand All @@ -513,6 +489,13 @@ unquoted control characters but Spark reads these entries incorrectly as null. H
and when the option is false, then RAPIDS Accelerator's behavior is same as Spark where an exception is thrown
as discussed in `JSON Schema discovery` section.

- `allowNonNumericNumbers` - Allows `NaN` and `Infinity` values to be parsed (note that these are not valid numeric
values in the [JSON specification](https://json.org)). Spark has inconsistent behavior and will
parse some variants of `NaN` and `Infinity` even when this option is disabled
([SPARK-38060](https://issues.apache.org/jira/browse/SPARK-38060)). The RAPIDS Accelerator supports a wider range of
representations than Spark when this option is enabled and does not support any form of `NaN` or `Infinity` when the
option is disabled.

## Regular Expressions

The following Apache Spark regular expression functions and expressions are supported on the GPU:
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@
_double_schema = StructType([
StructField('number', DoubleType())])

_decimal_10_2_schema = StructType([
StructField('number', DecimalType(10, 2))])

_decimal_10_3_schema = StructType([
StructField('number', DecimalType(10, 3))])

_number_as_string_schema = StructType([
StructField('number', StringType())])

Expand Down Expand Up @@ -220,6 +226,9 @@ def read_impl(spark):
pytest.param('simple_int_values.csv', _long_schema, {'header': 'true'}),
('simple_int_values.csv', _float_schema, {'header': 'true'}),
('simple_int_values.csv', _double_schema, {'header': 'true'}),
('simple_int_values.csv', _decimal_10_2_schema, {'header': 'true'}),
('decimals.csv', _decimal_10_2_schema, {'header': 'true'}),
('decimals.csv', _decimal_10_3_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_byte_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_short_schema, {'header': 'true'}),
pytest.param('empty_int_values.csv', _empty_int_schema, {'header': 'true'}),
Expand All @@ -235,6 +244,8 @@ def read_impl(spark):
pytest.param('simple_float_values.csv', _long_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _float_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _double_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _decimal_10_2_schema, {'header': 'true'}),
pytest.param('simple_float_values.csv', _decimal_10_3_schema, {'header': 'true'}),
pytest.param('simple_boolean_values.csv', _bool_schema, {'header': 'true'}),
pytest.param('ints_with_whitespace.csv', _number_as_string_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/2069')),
pytest.param('ints_with_whitespace.csv', _byte_schema, {'header': 'true'}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/130'))
Expand Down
14 changes: 11 additions & 3 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from data_gen import *
from src.main.python.marks import approximate_float, allow_non_gpu

from src.main.python.spark_session import with_cpu_session
from src.main.python.spark_session import with_cpu_session, is_before_spark_330

json_supported_gens = [
# Spark does not escape '\r' or '\n' even though it uses it to mark end of record
Expand Down Expand Up @@ -59,6 +59,12 @@
_double_schema = StructType([
StructField('number', DoubleType())])

_decimal_10_2_schema = StructType([
StructField('number', DecimalType(10, 2))])

_decimal_10_3_schema = StructType([
StructField('number', DecimalType(10, 3))])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -190,13 +196,15 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
'ints.json',
pytest.param('ints_invalid.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4793')),
'nan_and_inf.json',
pytest.param('nan_and_inf_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4646')),
pytest.param('nan_and_inf_strings.json', marks=pytest.mark.skipif(is_before_spark_330(), reason='https://issues.apache.org/jira/browse/SPARK-38060 fixed in Spark 3.3.0')),
'nan_and_inf_invalid.json',
'floats.json',
'floats_leading_zeros.json',
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'decimals.json',
])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
Expand Down
16 changes: 16 additions & 0 deletions integration_tests/src/test/resources/decimals.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
"number"
-1
0
1.
0.12
.12
+.12
-.12
1
1.01
12.34
12.3456
revans2 marked this conversation as resolved.
Show resolved Hide resolved
12345678.12
33.545454
33.454545

14 changes: 14 additions & 0 deletions integration_tests/src/test/resources/decimals.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{ "number": 0 }
{ "number": 12 }
{ "number": 12.0 }
{ "number": 12. }
{ "number": .34 }
{ "number": +.34 }
{ "number": -.34 }
{ "number": 0.34 }
{ "number": 12.34 }
{ "number": 12.3456 }
{ "number": 12.345678 }
revans2 marked this conversation as resolved.
Show resolved Hide resolved
{ "number": 123456.78 }
{ "number": 33.454545 }
{ "number": 33.545454 }
9 changes: 6 additions & 3 deletions integration_tests/src/test/resources/nan_and_inf.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
{ "number": "NaN" }
{ "number": "Infinity" }
{ "number": "-Infinity" }
{ "number": NaN }
{ "number": +INF }
{ "number": -INF }
{ "number": Infinity }
{ "number": +Infinity }
{ "number": -Infinity }
12 changes: 0 additions & 12 deletions integration_tests/src/test/resources/nan_and_inf_edge_cases.json

This file was deleted.

12 changes: 12 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{ "number": NAN }
{ "number": nan }
{ "number": INF }
{ "number": Inf }
{ "number": +Inf }
{ "number": -Inf }
{ "number": "NAN" }
{ "number": "nan" }
{ "number": "INF" }
{ "number": "Inf" }
{ "number": "+Inf" }
{ "number": "-Inf" }
6 changes: 6 additions & 0 deletions integration_tests/src/test/resources/nan_and_inf_strings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{ "number": "NaN" }
{ "number": "+INF" }
{ "number": "-INF" }
{ "number": "Infinity" }
{ "number": "+Infinity" }
{ "number": "-Infinity" }
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,7 @@ object GpuOverrides extends Logging {

lazy val fileFormats: Map[FileFormatType, Map[FileFormatOp, FileFormatChecks]] = Map(
(CsvFormatType, FileFormatChecks(
cudfRead = TypeSig.commonCudfTypes,
cudfRead = TypeSig.commonCudfTypes + TypeSig.DECIMAL_128,
cudfWrite = TypeSig.none,
sparkSig = TypeSig.cpuAtomics)),
(ParquetFormatType, FileFormatChecks(
Expand All @@ -823,7 +823,7 @@ object GpuOverrides extends Logging {
sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())),
(JsonFormatType, FileFormatChecks(
cudfRead = TypeSig.commonCudfTypes,
cudfRead = TypeSig.commonCudfTypes + TypeSig.DECIMAL_128,
cudfWrite = TypeSig.none,
sparkSig = (TypeSig.cpuAtomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP +
TypeSig.UDT).nested())))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@ package com.nvidia.spark.rapids

import scala.collection.mutable.ListBuffer
import scala.math.max

import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, NvtxRange, Scalar, Schema, Table}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.compress.CompressionCodecFactory

import org.apache.spark.TaskContext
import org.apache.spark.sql.connector.read.PartitionReader
import org.apache.spark.sql.execution.QueryExecutionException
import org.apache.spark.sql.execution.datasources.{HadoopFileLinesReader, PartitionedFile}
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
import org.apache.spark.sql.types.{DataTypes, DecimalType, StructField, StructType}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
Expand Down Expand Up @@ -172,7 +170,7 @@ abstract class GpuTextBasedPartitionReader(
f.dataType match {
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType =>
DataTypes.DoubleType | _: DecimalType =>
f.copy(dataType = DataTypes.StringType)
case _ =>
f
Expand All @@ -196,8 +194,6 @@ abstract class GpuTextBasedPartitionReader(
// Table increases the ref counts on the columns so we have
// to close them after creating the table
withResource(columns) { _ =>
// ansi mode does not apply to text inputs
val ansiEnabled = false
for (i <- 0 until table.getNumberOfColumns) {
val castColumn = newReadDataSchema.fields(i).dataType match {
case DataTypes.BooleanType =>
Expand All @@ -211,9 +207,11 @@ abstract class GpuTextBasedPartitionReader(
case DataTypes.LongType =>
castStringToInt(table.getColumn(i), DType.INT64)
case DataTypes.FloatType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT32)
castStringToFloat(table.getColumn(i), DType.FLOAT32)
case DataTypes.DoubleType =>
GpuCast.castStringToFloats(table.getColumn(i), ansiEnabled, DType.FLOAT64)
castStringToFloat(table.getColumn(i), DType.FLOAT64)
case dt: DecimalType =>
castStringToDecimal(table.getColumn(i), dt)
case _ =>
table.getColumn(i).incRefCount()
}
Expand All @@ -232,6 +230,14 @@ abstract class GpuTextBasedPartitionReader(

def castStringToBool(input: ColumnVector): ColumnVector

def castStringToFloat(input: ColumnVector, dt: DType): ColumnVector = {
GpuCast.castStringToFloats(input, ansiEnabled = false, dt)
}

def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = {
GpuCast.castStringToDecimal(input, ansiEnabled = false, dt)
}

def castStringToInt(input: ColumnVector, intType: DType): ColumnVector = {
withResource(input.isInteger(intType)) { isInt =>
withResource(input.castTo(intType)) { asInt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,10 @@ import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import ai.rapids.cudf
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, Scalar, Schema, Table}
import com.nvidia.spark.rapids._
import org.apache.hadoop.conf.Configuration

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -38,7 +36,7 @@ import org.apache.spark.sql.execution.datasources.{PartitionedFile, Partitioning
import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan, TextBasedFileScan}
import org.apache.spark.sql.execution.datasources.v2.json.JsonScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DateType, StringType, StructType, TimestampType}
import org.apache.spark.sql.types.{DateType, DecimalType, StringType, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -370,4 +368,44 @@ class JsonPartitionReader(
}
}

/**
* JSON has strict rules about valid numeric formats. See https://www.json.org/ for specification.
*
* Spark then has its own rules for supporting NaN and Infinity, which are not
* valid numbers in JSON.
*/
private def sanitizeNumbers(input: ColumnVector): ColumnVector = {
// Note that this is not 100% consistent with Spark versions prior to Spark 3.3.0
// due to https://issues.apache.org/jira/browse/SPARK-38060
val regex = if (parsedOptions.allowNonNumericNumbers) {
"^" +
"(?:" +
"(?:-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?)" +
"|NaN" +
"|(?:[\\+\\-]INF)" +
"|(?:[\\-\\+]?Infinity)" +
")" +
"$"
} else {
"^-?[0-9]+(?:\\.[0-9]+)?(?:[eE][\\-\\+]?[0-9]+)?$"
revans2 marked this conversation as resolved.
Show resolved Hide resolved
}
withResource(input.matchesRe(regex)) { validJsonDecimal =>
withResource(Scalar.fromNull(DType.STRING)) { nullString =>
validJsonDecimal.ifElse(input, nullString)
}
}
}

override def castStringToFloat(input: ColumnVector, dt: DType): ColumnVector = {
withResource(sanitizeNumbers(input)) { sanitizedInput =>
super.castStringToFloat(sanitizedInput, dt)
}
}

override def castStringToDecimal(input: ColumnVector, dt: DecimalType): ColumnVector = {
withResource(sanitizeNumbers(input)) { sanitizedInput =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
super.castStringToDecimal(sanitizedInput, dt)
}
}

}