Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve date support in JSON and CSV readers #4853

Merged
merged 26 commits into from
Mar 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ec66baa
Initial support for reading decimal types from JSON and CSV
andygrove Feb 18, 2022
2b45083
update docs
andygrove Feb 18, 2022
edc8c71
Document behavior of JSON option allowNonNumericNumbers
andygrove Feb 18, 2022
ec31dc9
Improve JSON support for INF values
andygrove Feb 22, 2022
3d121bb
improve decimal tests to cover different rounding cases
andygrove Feb 22, 2022
55b350a
reduce the amount of regexp
andygrove Feb 22, 2022
addbe40
scalastyle
andygrove Feb 22, 2022
fcddeb1
fix regression and update generated docs
andygrove Feb 22, 2022
84b0bae
merge from branch-22.04
andygrove Feb 22, 2022
1a68c27
update compatibility guide
andygrove Feb 22, 2022
ecff2d2
update compatibility guide
andygrove Feb 22, 2022
bb79bf4
Initial support for reading dates with JSON and CSV readers
andygrove Feb 23, 2022
0c12507
fix issues from self review
andygrove Feb 23, 2022
689364e
more tests for newly supported date formats
andygrove Feb 23, 2022
6487d22
Make CSV date parsing consistent with Spark and improve tests
andygrove Feb 23, 2022
b7fb366
fall back to CPU when reading CSV with timeParserPolicy=LEGACY
andygrove Feb 24, 2022
c6e3af2
update docs
andygrove Feb 24, 2022
1c2077d
upmerge
andygrove Mar 7, 2022
320fb56
Address PR feedback
andygrove Mar 7, 2022
5621356
update docs and add tests for issue 1091
andygrove Mar 7, 2022
28fbe69
fix regression
andygrove Mar 7, 2022
e0b4a44
Add shim code for getting dateFormat from CSV and JSON read options
andygrove Mar 8, 2022
f0599ab
XFAIL some tests with Spark 3.3.0 with LEGACY timeParserPolicy
andygrove Mar 8, 2022
c5e6c58
use is_spark_330_or_later
andygrove Mar 8, 2022
8b6796f
Tests pass with 3.3.0
andygrove Mar 8, 2022
53a4083
merge from branch-22.04
andygrove Mar 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 2 additions & 13 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ will produce a different result compared to the plugin.
Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled)
* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)

If you know that your particular data type will be parsed correctly enough, you may enable each
Expand All @@ -310,8 +309,6 @@ Escaped quote characters `'\"'` are not supported well as described by this
[issue](https://github.com/NVIDIA/spark-rapids/issues/129).

### CSV Dates
Parsing a `timestamp` as a `date` does not work. The details are documented in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/869).

Only a limited set of formats are supported when parsing dates.

Expand All @@ -323,16 +320,8 @@ Only a limited set of formats are supported when parsing dates.
* `"MM/yyyy"`
revans2 marked this conversation as resolved.
Show resolved Hide resolved
* `"MM-dd-yyyy"`
* `"MM/dd/yyyy"`

The reality is that all of these formats are supported at the same time. The plugin will only
disable itself if you set a format that it does not support.

As a workaround you can parse the column as a timestamp and then cast it to a date.

Invalid dates in Spark, values that have the correct format, but the numbers produce invalid dates,
can result in an exception by default, and how they are parsed can be controlled through a config.
The RAPIDS Accelerator does not support any of this and will produce an incorrect date. Typically,
one that overflowed.
* `"dd-MM-yyyy"`
* `"dd/MM/yyyy"`

### CSV Timestamps
The CSV parser does not support time zones. It will ignore any trailing time zone information,
Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
Expand Down
75 changes: 60 additions & 15 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, assert_cpu_and_gpu_are_equal_collect_with_capture
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_write, \
assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_fallback_collect
from conftest import get_non_gpu_allowed
from datetime import datetime, timezone
from data_gen import *
Expand Down Expand Up @@ -167,15 +168,7 @@
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true',
'spark.rapids.sql.csv.read.bool.enabled': 'true',
'spark.rapids.sql.csv.read.date.enabled': 'true',
'spark.rapids.sql.csv.read.byte.enabled': 'true',
'spark.rapids.sql.csv.read.short.enabled': 'true',
'spark.rapids.sql.csv.read.integer.enabled': 'true',
'spark.rapids.sql.csv.read.long.enabled': 'true',
'spark.rapids.sql.csv.read.float.enabled': 'true',
'spark.rapids.sql.csv.read.double.enabled': 'true',
Comment on lines -170 to -177
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of these options were removed from the configs in earlier PRs related to JSON support

'spark.sql.legacy.timeParserPolicy': 'Corrected'}
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, options = {}):
def read_impl(spark):
Expand All @@ -200,8 +193,8 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _date_schema, {}),
('date.csv', _date_schema, {}),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
('str.csv', _good_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -316,22 +309,74 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list):
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, ansi_enabled, time_parser_policy):
gen = StructGen([('a', DateGen())], nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
schema = gen.data_type
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
with_cpu_session(
lambda spark : gen_df(spark, gen).write\
.option('dateFormat', date_format)\
.csv(data_path))
assert_gpu_and_cpu_are_equal_collect(
if time_parser_policy == 'LEGACY':
expected_class = 'FileSourceScanExec'
if v1_enabled_list == '':
expected_class = 'BatchScanExec'
assert_gpu_fallback_collect(
lambda spark : spark.read \
.schema(schema) \
.option('dateFormat', date_format) \
.csv(data_path),
expected_class,
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read\
.schema(schema)\
.option('dateFormat', date_format)\
.csv(data_path),
conf=updated_conf)

@pytest.mark.parametrize('filename', ["date.csv"])
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list, ansi_enabled, time_parser_policy):
data_path = std_input_path + '/' + filename
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path)
.collect(),
conf=updated_conf,
error_message='DateTimeException')
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path),
conf=updated_conf)

csv_supported_ts_parts = ['', # Just the date
"'T'HH:mm:ss.SSSXXX",
"'T'HH:mm:ss[.SSS][XXX]",
Expand Down
73 changes: 69 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from data_gen import *
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order

from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_spark_330_or_later

json_supported_gens = [
# Spark does not escape '\r' or '\n' even though it uses it to mark end of record
Expand Down Expand Up @@ -66,6 +66,9 @@
_decimal_10_3_schema = StructType([
StructField('number', DecimalType(10, 3))])

_date_schema = StructType([
StructField('number', DateType())])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -204,21 +207,83 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'decimals.json',
'dates.json',
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \
_float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \
_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, allow_numeric_leading_zeros, ansi_enabled):
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
{ "allowNonNumericNumbers": allow_non_numeric_numbers,
"allowNumericLeadingZeros": allow_numeric_leading_zeros}),
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.rapids.sql.incompatibleDateFormats.enabled': True})
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy })
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
df_fun=lambda spark: f(spark).collect(),
conf=updated_conf,
error_message='DateTimeException')
elif time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ def is_before_spark_320():
def is_before_spark_330():
return spark_version() < "3.3.0"

def is_spark_330_or_later():
return spark_version() >= "3.3.0"

def is_databricks91_or_later():
spark = get_spark_i_know_what_i_am_doing()
return spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "") >= "9.1"
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "2020-09-16" }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/dates_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "number": "2020-09-32" }
{ "number": "2020-50-16" }
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class MortgageSparkSuite extends FunSuite {
.config("spark.rapids.sql.test.enabled", false)
.config("spark.rapids.sql.incompatibleOps.enabled", true)
.config("spark.rapids.sql.hasNans", false)
.config("spark.rapids.sql.csv.read.date.enabled", true)
val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass
val prop = System.getProperty("rapids.shuffle.manager.override", "false")
if (prop.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String = options.dateFormat
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.json

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String = options.dateFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ import org.apache.spark.sql.catalyst.json.JSONOptions

trait Spark30Xuntil33XFileOptionsShims extends SparkShims {

def dateFormatInRead(fileOptions: Serializable): Option[String] = {
fileOptions match {
case csvOpts: CSVOptions => Option(csvOpts.dateFormat)
case jsonOpts: JSONOptions => Option(jsonOpts.dateFormat)
case _ => throw new RuntimeException("Wrong file options.")
}
}

def timestampFormatInRead(fileOptions: Serializable): Option[String] = {
fileOptions match {
case csvOpts: CSVOptions => Option(csvOpts.timestampFormat)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

import org.apache.spark.sql.catalyst.util.DateFormatter

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)
}
Loading