Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve some CSV integration tests [databricks] #9146

Merged
merged 8 commits into from
Sep 28, 2023
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,14 +463,15 @@ def test_input_meta_fallback(spark_tmp_path, v1_enabled_list, disable_conf):
updated_conf = copy_and_update(_enable_all_types_conf, {
'spark.sql.sources.useV1SourceList': v1_enabled_list,
disable_conf: 'false'})
assert_gpu_and_cpu_are_equal_collect(
assert_gpu_fallback_collect(
lambda spark : spark.read.schema(gen.data_type)\
.csv(data_path)\
.filter(f.col('a') > 0)\
.selectExpr('a',
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'),
cpu_fallback_class_name = 'FileSourceScanExec' if v1_enabled_list == 'csv' else 'BatchScanExec',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a must-fix, stylistically and because in a future the parameter v1_enabled_list might become a comma-separated value or a Python list we might want to check for 'csv' in v1_enabled_list rather than equality.

conf=updated_conf)

@allow_non_gpu('DataWritingCommandExec,ExecutedCommandExec,WriteFilesExec')
Expand Down Expand Up @@ -529,16 +530,18 @@ def test_round_trip_for_interval(spark_tmp_path, v1_enabled_list):
lambda spark: spark.read.schema(schema).csv(data_path),
conf=updated_conf)

@allow_non_gpu(any = True)
@allow_non_gpu('FileSourceScanExec', 'CollectLimitExec', 'DeserializeToObjectExec')
def test_csv_read_case_insensitivity(spark_tmp_path):
gen_list = [('one', int_gen), ('tWo', byte_gen), ('THREE', boolean_gen)]
data_path = spark_tmp_path + '/CSV_DATA'

with_cpu_session(lambda spark: gen_df(spark, gen_list).write.option('header', True).csv(data_path))

assert_gpu_and_cpu_are_equal_collect(
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.read.option('header', True).csv(data_path).select('one', 'two', 'three'),
{'spark.sql.caseSensitive': 'false'}
exist_classes = 'GpuFileSourceScanExec',
non_exist_classes = 'FileSourceScanExec',
conf = {'spark.sql.caseSensitive': 'false'}
)

@allow_non_gpu('FileSourceScanExec', 'CollectLimitExec', 'DeserializeToObjectExec')
Expand All @@ -549,7 +552,10 @@ def test_csv_read_count(spark_tmp_path):

with_cpu_session(lambda spark: gen_df(spark, gen_list).write.csv(data_path))

assert_gpu_and_cpu_row_counts_equal(lambda spark: spark.read.csv(data_path))
# TODO this does not confirm that this scan actually runs on GPU, but I am not
# sure if we can capture the plan with a COUNT, since it normally requires a COLLECT
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let us file an issue and maybe reference it in the comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gerashegalov . I have filed #9199 and updated the comment.

assert_gpu_and_cpu_row_counts_equal(lambda spark: spark.read.csv(data_path),
conf = {'spark.rapids.sql.explain': 'ALL'})

@allow_non_gpu('FileSourceScanExec', 'CollectLimitExec', 'DeserializeToObjectExec')
@pytest.mark.skipif(is_before_spark_340(), reason='`preferDate` is only supported in Spark 340+')
Expand All @@ -561,12 +567,18 @@ def test_csv_prefer_date_with_infer_schema(spark_tmp_path):

with_cpu_session(lambda spark: gen_df(spark, gen_list).write.csv(data_path))

assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.option("inferSchema", "true").csv(data_path))
assert_gpu_and_cpu_are_equal_collect(lambda spark: spark.read.option("inferSchema", "true").option("preferDate", "false").csv(data_path))
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.read.option("inferSchema", "true").csv(data_path),
exist_classes = 'GpuFileSourceScanExec',
non_exist_classes = 'FileSourceScanExec')
assert_cpu_and_gpu_are_equal_collect_with_capture(
lambda spark: spark.read.option("inferSchema", "true").option("preferDate", "false").csv(data_path),
exist_classes = 'GpuFileSourceScanExec',
non_exist_classes = 'FileSourceScanExec')

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.skipif(is_before_spark_340(), reason='enableDateTimeParsingFallback is supported from Spark3.4.0')
@pytest.mark.parametrize('filename,schema',[("date.csv", _date_schema), ("date.csv", _ts_schema,),
@pytest.mark.parametrize('filename,schema',[("date.csv", _date_schema), ("date.csv", _ts_schema),
("ts.csv", _ts_schema)])
def test_csv_datetime_parsing_fallback_cpu_fallback(std_input_path, filename, schema):
data_path = std_input_path + "/" + filename
Expand Down