Skip to content

Commit

Permalink
Improve date support in JSON and CSV readers (#4853)
Browse files Browse the repository at this point in the history
Signed-off-by: Andy Grove <andygrove@nvidia.com>
  • Loading branch information
andygrove authored Mar 10, 2022
1 parent 2926496 commit 2fca312
Show file tree
Hide file tree
Showing 27 changed files with 382 additions and 132 deletions.
15 changes: 2 additions & 13 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,6 @@ will produce a different result compared to the plugin.
Due to inconsistencies between how CSV data is parsed CSV parsing is off by default.
Each data type can be enabled or disabled independently using the following configs.

* [spark.rapids.sql.csv.read.date.enabled](configs.md#sql.csv.read.date.enabled)
* [spark.rapids.sql.csvTimestamps.enabled](configs.md#sql.csvTimestamps.enabled)

If you know that your particular data type will be parsed correctly enough, you may enable each
Expand All @@ -310,8 +309,6 @@ Escaped quote characters `'\"'` are not supported well as described by this
[issue](https://github.com/NVIDIA/spark-rapids/issues/129).

### CSV Dates
Parsing a `timestamp` as a `date` does not work. The details are documented in this
[issue](https://github.com/NVIDIA/spark-rapids/issues/869).

Only a limited set of formats are supported when parsing dates.

Expand All @@ -323,16 +320,8 @@ Only a limited set of formats are supported when parsing dates.
* `"MM/yyyy"`
* `"MM-dd-yyyy"`
* `"MM/dd/yyyy"`

The reality is that all of these formats are supported at the same time. The plugin will only
disable itself if you set a format that it does not support.

As a workaround you can parse the column as a timestamp and then cast it to a date.

Invalid dates in Spark, values that have the correct format, but the numbers produce invalid dates,
can result in an exception by default, and how they are parsed can be controlled through a config.
The RAPIDS Accelerator does not support any of this and will produce an incorrect date. Typically,
one that overflowed.
* `"dd-MM-yyyy"`
* `"dd/MM/yyyy"`

### CSV Timestamps
The CSV parser does not support time zones. It will ignore any trailing time zone information,
Expand Down
1 change: 0 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ Name | Description | Default Value
<a name="sql.castStringToFloat.enabled"></a>spark.rapids.sql.castStringToFloat.enabled|When set to true, enables casting from strings to float types (float, double) on the GPU. Currently hex values aren't supported on the GPU. Also note that casting from string to float types on the GPU returns incorrect results when the string represents any number "1.7976931348623158E308" <= x < "1.7976931348623159E308" and "-1.7976931348623158E308" >= x > "-1.7976931348623159E308" in both these cases the GPU returns Double.MaxValue while CPU returns "+Infinity" and "-Infinity" respectively|false
<a name="sql.castStringToTimestamp.enabled"></a>spark.rapids.sql.castStringToTimestamp.enabled|When set to true, casting from string to timestamp is supported on the GPU. The GPU only supports a subset of formats when casting strings to timestamps. Refer to the CAST documentation for more details.|false
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csv.read.date.enabled"></a>spark.rapids.sql.csv.read.date.enabled|Parsing invalid CSV dates produces different results from Spark|false
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.decimalOverflowGuarantees"></a>spark.rapids.sql.decimalOverflowGuarantees|FOR TESTING ONLY. DO NOT USE IN PRODUCTION. Please see the decimal section of the compatibility documents for more information on this config.|true
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
Expand Down
75 changes: 60 additions & 15 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_fallback_write, assert_cpu_and_gpu_are_equal_collect_with_capture
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_write, \
assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_fallback_collect
from conftest import get_non_gpu_allowed
from datetime import datetime, timezone
from data_gen import *
Expand Down Expand Up @@ -167,15 +168,7 @@
StructField('ignored_b', StringType())])

_enable_all_types_conf = {'spark.rapids.sql.csvTimestamps.enabled': 'true',
'spark.rapids.sql.csv.read.bool.enabled': 'true',
'spark.rapids.sql.csv.read.date.enabled': 'true',
'spark.rapids.sql.csv.read.byte.enabled': 'true',
'spark.rapids.sql.csv.read.short.enabled': 'true',
'spark.rapids.sql.csv.read.integer.enabled': 'true',
'spark.rapids.sql.csv.read.long.enabled': 'true',
'spark.rapids.sql.csv.read.float.enabled': 'true',
'spark.rapids.sql.csv.read.double.enabled': 'true',
'spark.sql.legacy.timeParserPolicy': 'Corrected'}
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'}

def read_csv_df(data_path, schema, options = {}):
def read_impl(spark):
Expand All @@ -200,8 +193,8 @@ def read_impl(spark):
@pytest.mark.parametrize('name,schema,options', [
('Acquisition_2007Q3.txt', _acq_schema, {'sep': '|'}),
('Performance_2007Q3.txt_0', _perf_schema, {'sep': '|'}),
pytest.param('ts.csv', _date_schema, {}),
pytest.param('date.csv', _date_schema, {}, marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1111')),
('ts.csv', _date_schema, {}),
('date.csv', _date_schema, {}),
('ts.csv', _ts_schema, {}),
('str.csv', _bad_str_schema, {'header': 'true'}),
('str.csv', _good_str_schema, {'header': 'true'}),
Expand Down Expand Up @@ -316,22 +309,74 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
@pytest.mark.parametrize('date_format', csv_supported_date_formats, ids=idfn)
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list):
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_date_formats_round_trip(spark_tmp_path, date_format, v1_enabled_list, ansi_enabled, time_parser_policy):
gen = StructGen([('a', DateGen())], nullable=False)
data_path = spark_tmp_path + '/CSV_DATA'
schema = gen.data_type
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.sources.useV1SourceList': v1_enabled_list})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
with_cpu_session(
lambda spark : gen_df(spark, gen).write\
.option('dateFormat', date_format)\
.csv(data_path))
assert_gpu_and_cpu_are_equal_collect(
if time_parser_policy == 'LEGACY':
expected_class = 'FileSourceScanExec'
if v1_enabled_list == '':
expected_class = 'BatchScanExec'
assert_gpu_fallback_collect(
lambda spark : spark.read \
.schema(schema) \
.option('dateFormat', date_format) \
.csv(data_path),
expected_class,
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read\
.schema(schema)\
.option('dateFormat', date_format)\
.csv(data_path),
conf=updated_conf)

@pytest.mark.parametrize('filename', ["date.csv"])
@pytest.mark.parametrize('v1_enabled_list', ["", "csv"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('BatchScanExec,FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_read_valid_and_invalid_dates(std_input_path, filename, v1_enabled_list, ansi_enabled, time_parser_policy):
data_path = std_input_path + '/' + filename
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.ansi.enabled': ansi_enabled,
'spark.rapids.sql.incompatibleDateFormats.enabled': True,
'spark.sql.legacy.timeParserPolicy': time_parser_policy})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path)
.collect(),
conf=updated_conf,
error_message='DateTimeException')
else:
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read \
.schema(_date_schema) \
.csv(data_path),
conf=updated_conf)

csv_supported_ts_parts = ['', # Just the date
"'T'HH:mm:ss.SSSXXX",
"'T'HH:mm:ss[.SSS][XXX]",
Expand Down
73 changes: 69 additions & 4 deletions integration_tests/src/main/python/json_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@

import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error, assert_gpu_fallback_collect
from data_gen import *
from conftest import is_databricks_runtime
from marks import approximate_float, allow_non_gpu, ignore_order

from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330, is_spark_330_or_later

json_supported_gens = [
# Spark does not escape '\r' or '\n' even though it uses it to mark end of record
Expand Down Expand Up @@ -66,6 +66,9 @@
_decimal_10_3_schema = StructType([
StructField('number', DecimalType(10, 3))])

_date_schema = StructType([
StructField('number', DateType())])

_string_schema = StructType([
StructField('a', StringType())])

Expand Down Expand Up @@ -204,21 +207,83 @@ def test_json_ts_formats_round_trip(spark_tmp_path, date_format, ts_part, v1_ena
'floats_invalid.json',
pytest.param('floats_edge_cases.json', marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/4647')),
'decimals.json',
'dates.json',
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, _float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema])
@pytest.mark.parametrize('schema', [_bool_schema, _byte_schema, _short_schema, _int_schema, _long_schema, \
_float_schema, _double_schema, _decimal_10_2_schema, _decimal_10_3_schema, \
_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_non_numeric_numbers', ["true", "false"])
@pytest.mark.parametrize('allow_numeric_leading_zeros', ["true"])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
def test_basic_json_read(std_input_path, filename, schema, read_func, allow_non_numeric_numbers, allow_numeric_leading_zeros, ansi_enabled):
updated_conf = copy_and_update(_enable_all_types_conf, {'spark.sql.ansi.enabled': ansi_enabled})
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': 'CORRECTED'})
assert_gpu_and_cpu_are_equal_collect(
read_func(std_input_path + '/' + filename,
schema,
{ "allowNonNumericNumbers": allow_non_numeric_numbers,
"allowNumericLeadingZeros": allow_numeric_leading_zeros}),
conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_valid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy,
'spark.rapids.sql.incompatibleDateFormats.enabled': True})
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@approximate_float
@pytest.mark.parametrize('filename', [
'dates_invalid.json',
])
@pytest.mark.parametrize('schema', [_date_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('ansi_enabled', ["true", "false"])
@pytest.mark.parametrize('time_parser_policy', [
pytest.param('LEGACY', marks=pytest.mark.allow_non_gpu('FileSourceScanExec')),
'CORRECTED',
'EXCEPTION'
])
def test_json_read_invalid_dates(std_input_path, filename, schema, read_func, ansi_enabled, time_parser_policy):
updated_conf = copy_and_update(_enable_all_types_conf,
{'spark.sql.ansi.enabled': ansi_enabled,
'spark.sql.legacy.timeParserPolicy': time_parser_policy })
f = read_func(std_input_path + '/' + filename, schema, {})
if time_parser_policy == 'EXCEPTION':
assert_gpu_and_cpu_error(
df_fun=lambda spark: f(spark).collect(),
conf=updated_conf,
error_message='DateTimeException')
elif time_parser_policy == 'LEGACY' and ansi_enabled == 'true':
assert_gpu_fallback_collect(
f,
'FileSourceScanExec',
conf=updated_conf)
else:
assert_gpu_and_cpu_are_equal_collect(f, conf=updated_conf)

@pytest.mark.parametrize('schema', [_string_schema])
@pytest.mark.parametrize('read_func', [read_json_df, read_json_sql])
@pytest.mark.parametrize('allow_unquoted_chars', ["true"])
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ def is_before_spark_320():
def is_before_spark_330():
return spark_version() < "3.3.0"

def is_spark_330_or_later():
return spark_version() >= "3.3.0"

def is_databricks91_or_later():
spark = get_spark_i_know_what_i_am_doing()
return spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion", "") >= "9.1"
3 changes: 3 additions & 0 deletions integration_tests/src/test/resources/dates.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{ "number": "2020-09-16" }
{ "number": "1581-01-01" }
{ "number": "1583-01-01" }
2 changes: 2 additions & 0 deletions integration_tests/src/test/resources/dates_invalid.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "number": "2020-09-32" }
{ "number": "2020-50-16" }
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class MortgageSparkSuite extends FunSuite {
.config("spark.rapids.sql.test.enabled", false)
.config("spark.rapids.sql.incompatibleOps.enabled", true)
.config("spark.rapids.sql.hasNans", false)
.config("spark.rapids.sql.csv.read.date.enabled", true)
val rapidsShuffle = ShimLoader.getRapidsShuffleManagerClass
val prop = System.getProperty("rapids.shuffle.manager.override", "false")
if (prop.equalsIgnoreCase("true")) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String = options.dateFormat
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.json

object GpuJsonUtils {
def dateFormatInRead(options: JSONOptions): String = options.dateFormat
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,6 @@ import org.apache.spark.sql.catalyst.json.JSONOptions

trait Spark30Xuntil33XFileOptionsShims extends SparkShims {

def dateFormatInRead(fileOptions: Serializable): Option[String] = {
fileOptions match {
case csvOpts: CSVOptions => Option(csvOpts.dateFormat)
case jsonOpts: JSONOptions => Option(jsonOpts.dateFormat)
case _ => throw new RuntimeException("Wrong file options.")
}
}

def timestampFormatInRead(fileOptions: Serializable): Option[String] = {
fileOptions match {
case csvOpts: CSVOptions => Option(csvOpts.timestampFormat)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.csv

import org.apache.spark.sql.catalyst.util.DateFormatter

object GpuCsvUtils {
def dateFormatInRead(options: CSVOptions): String =
options.dateFormatInRead.getOrElse(DateFormatter.defaultPattern)
}
Loading

0 comments on commit 2fca312

Please sign in to comment.