Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve date support in JSON and CSV readers #4853

Merged
merged 26 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ec66baa
Initial support for reading decimal types from JSON and CSV
andygrove Feb 18, 2022
2b45083
update docs
andygrove Feb 18, 2022
edc8c71
Document behavior of JSON option allowNonNumericNumbers
andygrove Feb 18, 2022
ec31dc9
Improve JSON support for INF values
andygrove Feb 22, 2022
3d121bb
improve decimal tests to cover different rounding cases
andygrove Feb 22, 2022
55b350a
reduce the amount of regexp
andygrove Feb 22, 2022
addbe40
scalastyle
andygrove Feb 22, 2022
fcddeb1
fix regression and update generated docs
andygrove Feb 22, 2022
84b0bae
merge from branch-22.04
andygrove Feb 22, 2022
1a68c27
update compatibility guide
andygrove Feb 22, 2022
ecff2d2
update compatibility guide
andygrove Feb 22, 2022
bb79bf4
Initial support for reading dates with JSON and CSV readers
andygrove Feb 23, 2022
0c12507
fix issues from self review
andygrove Feb 23, 2022
689364e
more tests for newly supported date formats
andygrove Feb 23, 2022
6487d22
Make CSV date parsing consistent with Spark and improve tests
andygrove Feb 23, 2022
b7fb366
fall back to CPU when reading CSV with timeParserPolicy=LEGACY
andygrove Feb 24, 2022
c6e3af2
update docs
andygrove Feb 24, 2022
1c2077d
upmerge
andygrove Mar 7, 2022
320fb56
Address PR feedback
andygrove Mar 7, 2022
5621356
update docs and add tests for issue 1091
andygrove Mar 7, 2022
28fbe69
fix regression
andygrove Mar 7, 2022
e0b4a44
Add shim code for getting dateFormat from CSV and JSON read options
andygrove Mar 8, 2022
f0599ab
XFAIL some tests with Spark 3.3.0 with LEGACY timeParserPolicy
andygrove Mar 8, 2022
c5e6c58
use is_spark_330_or_later
andygrove Mar 8, 2022
8b6796f
Tests pass with 3.3.0
andygrove Mar 8, 2022
53a4083
merge from branch-22.04
andygrove Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ will produce a different result compared to the plugin.
Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled)
* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)

If you know that your particular data type will be parsed correctly enough, you may enable each
Expand All @@ -310,8 +309,6 @@ Escaped quote characters `'\"'` are not supported well as described by this
[issue](https://github.com/NVIDIA/spark-rapids/issues/129).

### CSV Dates
Parsing a `timestamp` as a `date` does not work. The details are documented in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/869).

Only a limited set of formats are supported when parsing dates.

Expand All @@ -323,16 +320,8 @@ Only a limited set of formats are supported when parsing dates.
* `"MM/yyyy"`
revans2 marked this conversation as resolved.
Show resolved Hide resolved
* `"MM-dd-yyyy"`
* `"MM/dd/yyyy"`

The reality is that all of these formats are supported at the same time. The plugin will only
disable itself if you set a format that it does not support.

As a workaround you can parse the column as a timestamp and then cast it to a date.

Invalid dates in Spark, values that have the correct format, but the numbers produce invalid dates,
can result in an exception by default, and how they are parsed can be controlled through a config.
The RAPIDS Accelerator does not support any of this and will produce an incorrect date. Typically,
one that overflowed.
* `"dd-MM-yyyy"`
* `"dd/MM/yyyy"`

### CSV Timestamps
The CSV parser does not support time zones. It will ignore any trailing time zone information,
Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
Expand Down
75 changes: 60 additions & 15 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, assert_cpu_and_gpu_are_equal_collect_with_capture
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_write, \
assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_fallback_collect
from conftest import get_non_gpu_allowed
from datetime import datetime, timezone
from data_gen import *
Expand Down Expand Up @@ -167,15 +168,7 @@
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true',
'spark.rapids.sql.csv.read.bool.enabled': 'true',
'spark.rapids.sql.csv.read.date.enabled': 'true',
'spark.rapids.sql.csv.read.byte.enabled': 'true',
'spark.rapids.sql.csv.read.short.enabled': 'true',
'spark.rapids.sql.csv.read.integer.enabled': 'true',
'spark.rapids.sql.csv.read.long.enabled': 'true',
'spark.rapids.sql.csv.read.float.enabled': 'true',
'spark.rapids.sql.csv.read.double.enabled': 'true',
Comment on lines -170 to -177
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these options were removed from the configs in earlier PRs related to JSON support

'spark.sql.legacy.timeParserPolicy': 'Corrected'}
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, options = {}):
def read_impl(spark):
Expand All @@ -200,8 +193,8 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _date_schema, {}),
('date.csv', _date_schema, {}),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
('str.csv', _good_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -316,22 +309,74 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list):
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, ansi_enabled, time_parser_policy):
gen = StructGen([('a', DateGen())], nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
schema = gen.data_type
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
with_cpu_session(
lambda spark : gen_df(spark, gen).write\
.option('dateFormat', date_format)\
.csv(data_path))
assert_gpu_and_cpu_are_equal_collect(
if time_parser_policy == 'LEGACY':
expected_class = 'FileSourceScanExec'
if v1_enabled_list == '':
expected_class = 'BatchScanExec'
assert_gpu_fallback_collect(
lambda spark : spark.read \
.schema(schema) \
.option('dateFormat', date_format) \
.csv(data_path),
expected_class,
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read\
.schema(schema)\
.option('dateFormat', date_format)\
.csv(data_path),
conf=updated_conf)

@pytest.mark.parametrize('filename', ["date.csv"])
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list, ansi_enabled, time_parser_policy):
data_path = std_input_path + '/' + filename
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path)
.collect(),
conf=updated_conf,
error_message='DateTimeException')
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path),
conf=updated_conf)

csv_supported_ts_parts = ['', # Just the date
"'T'HH:mm:ss.SSSXXX",
"'T'HH:mm:ss[.SSS][XXX]",
Expand Down
71 changes: 68 additions & 3 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from data_gen import *
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order
Expand Down Expand Up @@ -66,6 +66,9 @@
_decimal_10_3_schema = StructType([
StructField('number', DecimalType(10, 3))])

_date_schema = StructType([
StructField('number', DateType())])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -204,21 +207,83 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'decimals.json',
'dates.json',
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \
_float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \
_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, allow_numeric_leading_zeros, ansi_enabled):
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
{ "allowNonNumericNumbers": allow_non_numeric_numbers,
"allowNumericLeadingZeros": allow_numeric_leading_zeros}),
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.rapids.sql.incompatibleDateFormats.enabled': True})
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy })
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
df_fun=lambda spark: f(spark).collect(),
conf=updated_conf,
error_message='DateTimeException')
elif time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "2020-09-16" }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/dates_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "number": "2020-09-32" }
{ "number": "2020-50-16" }
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class MortgageSparkSuite extends FunSuite {
.config("spark.rapids.sql.test.enabled", false)
.config("spark.rapids.sql.incompatibleOps.enabled", true)
.config("spark.rapids.sql.hasNans", false)
.config("spark.rapids.sql.csv.read.date.enabled", true)
val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass
val prop = System.getProperty("rapids.shuffle.manager.override", "false")
if (prop.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -25,6 +25,8 @@ import com.nvidia.spark.rapids.VersionUtils.isSpark320OrLater

import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.localDateToDays
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.{GpuToTimestamp, LegacyTimeParserPolicy}

/**
* Class for helper functions for Date
Expand Down Expand Up @@ -211,4 +213,55 @@ object DateUtils {
}

case class TimestampFormatConversionException(reason: String) extends Exception

def tagAndGetCudfFormat(
meta: RapidsMeta[_, _, _],
sparkFormat: String,
parseString: Boolean): String = {
var strfFormat: String = null
if (GpuOverrides.getTimeParserPolicy == LegacyTimeParserPolicy) {
try {
// try and convert the format to cuDF format - this will throw an exception if
// the format contains unsupported characters or words
strfFormat = toStrf(sparkFormat, parseString)
// format parsed ok but we have no 100% compatible formats in LEGACY mode
if (GpuToTimestamp.LEGACY_COMPATIBLE_FORMATS.contains(sparkFormat)) {
// LEGACY support has a number of issues that mean we cannot guarantee
// compatibility with CPU
// - we can only support 4 digit years but Spark supports a wider range
// - we use a proleptic Gregorian calender but Spark uses a hybrid Julian+Gregorian
// calender in LEGACY mode
if (SQLConf.get.ansiEnabled) {
meta.willNotWorkOnGpu("LEGACY format in ANSI mode is not supported on the GPU")
} else if (!meta.conf.incompatDateFormats) {
meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' on the GPU is not guaranteed " +
s"to produce the same results as Spark on CPU. Set " +
s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.")
}
} else {
meta.willNotWorkOnGpu(s"LEGACY format '$sparkFormat' is not supported on the GPU.")
}
} catch {
case e: TimestampFormatConversionException =>
meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}")
}
} else {
try {
// try and convert the format to cuDF format - this will throw an exception if
// the format contains unsupported characters or words
strfFormat = toStrf(sparkFormat, parseString)
// format parsed ok, so it is either compatible (tested/certified) or incompatible
if (!GpuToTimestamp.CORRECTED_COMPATIBLE_FORMATS.contains(sparkFormat) &&
!meta.conf.incompatDateFormats) {
meta.willNotWorkOnGpu(s"CORRECTED format '$sparkFormat' on the GPU is not guaranteed " +
s"to produce the same results as Spark on CPU. Set " +
s"${RapidsConf.INCOMPATIBLE_DATE_FORMATS.key}=true to force onto GPU.")
}
} catch {
case e: TimestampFormatConversionException =>
meta.willNotWorkOnGpu(s"Failed to convert ${e.reason} ${e.getMessage}")
}
}
strfFormat
}
}
Loading