Skip to content

Commit

Permalink
Update docs and change tests to copy reader confs
Browse files Browse the repository at this point in the history
  • Loading branch information
tgravescs committed Oct 26, 2020
1 parent ef6be79 commit 78f9033
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 33 deletions.
2 changes: 1 addition & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ Name | Description | Default Value
<a name="sql.format.parquet.multiThreadedRead.maxNumFilesParallel"></a>spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type|2147483647
<a name="sql.format.parquet.multiThreadedRead.numThreads"></a>spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|20
<a name="sql.format.parquet.read.enabled"></a>spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true
<a name="sql.format.parquet.reader.type"></a>spark.rapids.sql.format.parquet.reader.type|Sets the parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE. This is where the task reads each file assigned to it serially and copies each one to the GPU after reading that file. That reader is not great at handling many small files, so using either COALESCING OR MULTITHREADED is better for that. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It does copy blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is total separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to gpu separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO
<a name="sql.format.parquet.reader.type"></a>spark.rapids.sql.format.parquet.reader.type|Sets the parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO
<a name="sql.format.parquet.write.enabled"></a>spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true
<a name="sql.hasNans"></a>spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false
Expand Down
80 changes: 48 additions & 32 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs,
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(read_func(data_path),
conf=reader_confs)
conf=all_confs)

@allow_non_gpu('FileSourceScanExec')
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
Expand Down Expand Up @@ -85,10 +86,11 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, rea
with_cpu_session(
lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path),
conf={'spark.sql.parquet.compression.codec': compress})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

parquet_pred_push_gens = [
byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen,
Expand All @@ -109,10 +111,11 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled
lambda spark : gen_df(spark, gen_list).orderBy('a').write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
rf = read_func(data_path)
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0),
conf=reader_confs)
conf=all_confs)

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

Expand All @@ -129,10 +132,11 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

def readParquetCatchException(spark, data_path):
with pytest.raises(Exception) as e_info:
Expand All @@ -152,10 +156,11 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, v1_e
lambda spark : unary_op_df(spark, gen).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase,
'spark.sql.parquet.outputTimestampType': ts_write})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
with_gpu_session(
lambda spark : readParquetCatchException(spark, data_path),
conf=reader_confs)
conf=all_confs)

parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
Expand All @@ -172,10 +177,11 @@ def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, r
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand All @@ -195,10 +201,11 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

# In this we are reading the data, but only reading the key the data was partitioned by
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
Expand All @@ -215,10 +222,11 @@ def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reade
lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path).select("key"),
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand All @@ -237,12 +245,13 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs)
with_cpu_session(
lambda spark : gen_df(spark, second_gen_list, 1).write.parquet(second_data_path))
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.files.maxPartitionBytes': "1g",
'spark.sql.files.minPartitionNum': '1'})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand All @@ -263,10 +272,11 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs):
lambda spark : gen_df(spark, second_gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path),
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand All @@ -286,12 +296,13 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con
with_cpu_session(
lambda spark : gen_df(spark, second_gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.parquet.mergeSchema': "true"})
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf=reader_confs)
conf=all_confs)

parquet_write_gens_list = [
[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
Expand All @@ -304,14 +315,15 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con
def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list, ts_type, reader_confs):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS'])
@pytest.mark.parametrize('ts_rebase', ['CORRECTED'])
Expand Down Expand Up @@ -342,28 +354,30 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list, ts_
gen_list = [('a', RepeatSeqGen(parquet_gen, 10)),
('b', parquet_gen)]
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': ts_type})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
conf=reader_confs)
conf=all_confs)

parquet_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', parquet_write_compress_options)
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs):
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.parquet.compression.codec': compress})
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path),
lambda spark, path : spark.read.parquet(path),
data_path,
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
Expand All @@ -375,15 +389,16 @@ def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs):
with_cpu_session(
lambda spark : unary_op_df(spark, long_gen).write.parquet(second_data_path))
data_path = spark_tmp_path + '/PARQUET_DATA'
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path)\
.filter(f.col('a') > 0)\
.selectExpr('a',
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'),
conf=reader_confs)
conf=all_confs)

def createBucketedTableAndJoin(spark):
spark.range(10e4).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e4")
Expand All @@ -398,10 +413,11 @@ def createBucketedTableAndJoin(spark):
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
# this test would be better if we could ensure exchanges didn't exist - ie used buckets
def test_buckets(spark_tmp_path, v1_enabled_list, reader_confs):
reader_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list,
"spark.sql.autoBroadcastJoinThreshold": '-1'})
assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin,
conf=reader_confs)
conf=all_confs)

@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_small_file_memory(spark_tmp_path, v1_enabled_list):
Expand Down

0 comments on commit 78f9033

Please sign in to comment.