Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable ORC writes until bug can be fixed #1740

Merged
merged 2 commits into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ Name | Description | Default Value
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
<a name="sql.format.orc.enabled"></a>spark.rapids.sql.format.orc.enabled|When set to false disables all orc input and output acceleration|true
<a name="sql.format.orc.read.enabled"></a>spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true
<a name="sql.format.orc.write.enabled"></a>spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration. This has been disabled by default because of https://github.com/NVIDIA/spark-rapids/issues/1550|false
<a name="sql.format.parquet.enabled"></a>spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|20
Expand Down
17 changes: 10 additions & 7 deletions integration_tests/src/main/python/orc_write_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -36,7 +36,7 @@ def test_write_round_trip(spark_tmp_path, orc_gens, orc_impl):
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.orc(path),
jlowe marked this conversation as resolved.
Show resolved Hide resolved
lambda spark, path: spark.read.orc(path),
data_path,
conf={'spark.sql.orc.impl': orc_impl})
conf={'spark.sql.orc.impl': orc_impl, 'spark.rapids.sql.format.orc.write.enabled': True})

orc_part_write_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen,
Expand All @@ -59,7 +59,8 @@ def test_part_write_round_trip(spark_tmp_path, orc_gen):
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').orc(path),
lambda spark, path: spark.read.orc(path),
data_path)
data_path,
conf = {'spark.rapids.sql.format.orc.write.enabled': True})

orc_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', orc_write_compress_options)
Expand All @@ -69,14 +70,15 @@ def test_compress_write_round_trip(spark_tmp_path, compress):
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.orc(path),
lambda spark, path : spark.read.orc(path),
data_path,
conf={'spark.sql.orc.compression.codec': compress})
conf={'spark.sql.orc.compression.codec': compress, 'spark.rapids.sql.format.orc.write.enabled': True})

@pytest.mark.parametrize('orc_gens', orc_write_gens_list, ids=idfn)
@pytest.mark.parametrize('orc_impl', ["native", "hive"])
def test_write_save_table(spark_tmp_path, orc_gens, orc_impl, spark_tmp_table_factory):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(orc_gens)]
data_path = spark_tmp_path + '/ORC_DATA'
all_confs={'spark.sql.sources.useV1SourceList': "orc",
'spark.rapids.sql.format.orc.write.enabled': True,
"spark.sql.orc.impl": orc_impl}
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.format("orc").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
Expand All @@ -100,14 +102,14 @@ def test_write_sql_save_table(spark_tmp_path, orc_gens, ts_type, orc_impl, spark
lambda spark, path: write_orc_sql_from(spark, gen_df(spark, gen_list).coalesce(1), path, spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path,
conf={'spark.sql.orc.impl': orc_impl})
conf={'spark.sql.orc.impl': orc_impl, 'spark.rapids.sql.format.orc.write.enabled': True})

@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('codec', ['zlib', 'lzo'])
def test_orc_write_compression_fallback(spark_tmp_path, codec, spark_tmp_table_factory):
gen = TimestampGen()
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs={'spark.sql.orc.compression.codec': codec}
all_confs={'spark.sql.orc.compression.codec': codec, 'spark.rapids.sql.format.orc.write.enabled': True}
assert_gpu_fallback_write(
lambda spark, path: unary_op_df(spark, gen).coalesce(1).write.format("orc").mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
Expand All @@ -123,4 +125,5 @@ def test_buckets_write_fallback(spark_tmp_path, spark_tmp_table_factory):
lambda spark, path: spark.range(10e4).write.bucketBy(4, "id").sortBy("id").format('orc').mode('overwrite').option("path", path).saveAsTable(spark_tmp_table_factory.get()),
lambda spark, path: spark.read.orc(path),
data_path,
'DataWritingCommandExec')
'DataWritingCommandExec',
conf = {'spark.rapids.sql.format.orc.write.enabled': True})
Original file line number Diff line number Diff line change
Expand Up @@ -662,9 +662,10 @@ object RapidsConf {
.createWithDefault(true)

val ENABLE_ORC_WRITE = conf("spark.rapids.sql.format.orc.write.enabled")
.doc("When set to false disables orc output acceleration")
.doc("When set to false disables orc output acceleration. This has been disabled by " +
"default because of https://github.com/NVIDIA/spark-rapids/issues/1550")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val ENABLE_CSV = conf("spark.rapids.sql.format.csv.enabled")
.doc("When set to false disables all csv input and output acceleration. " +
Expand Down