Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with parquet partitioned reads #741

Merged
merged 1 commit into from
Sep 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 66 additions & 46 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,16 @@ def read_parquet_sql(data_path):

@pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, small_file_opt, v1_enabled_list):
def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, mt_opt, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
assert_gpu_and_cpu_are_equal_collect(read_func(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@allow_non_gpu('FileSourceScanExec')
Expand All @@ -69,16 +69,16 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
# https://github.com/NVIDIA/spark-rapids/issues/143

@pytest.mark.parametrize('compress', parquet_compress_options)
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_compress_read_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list):
def test_compress_read_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list):
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path),
conf={'spark.sql.parquet.compression.codec': compress})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

parquet_pred_push_gens = [
Expand All @@ -90,9 +90,9 @@ def test_compress_read_round_trip(spark_tmp_path, compress, small_file_opt, v1_e

@pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn)
@pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql])
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, small_file_opt, v1_enabled_list):
def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, mt_opt, v1_enabled_list):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)]
s0 = gen_scalar(parquet_gen, force_no_nulls=True)
Expand All @@ -102,16 +102,16 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, small_file
rf = read_func(data_path)
assert_gpu_and_cpu_are_equal_collect(
lambda spark: rf(spark).select(f.col('a') >= s0),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS']

@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY'])
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list):
def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list):
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))
Expand All @@ -122,7 +122,7 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt,
'spark.sql.parquet.outputTimestampType': ts_write})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

def readParquetCatchException(spark, data_path):
Expand All @@ -132,9 +132,9 @@ def readParquetCatchException(spark, data_path):

@pytest.mark.parametrize('ts_write', parquet_ts_write_options)
@pytest.mark.parametrize('ts_rebase', ['LEGACY'])
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list):
def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list):
# Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with
# timestamp_gen
gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc))
Expand All @@ -145,7 +145,7 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, smal
'spark.sql.parquet.outputTimestampType': ts_write})
with_gpu_session(
lambda spark : readParquetCatchException(spark, data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
Expand All @@ -155,22 +155,22 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, smal
pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))]

@pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn)
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list):
def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'})
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_simple_partitioned_read(spark_tmp_path, small_file_opt, v1_enabled_list):
def test_simple_partitioned_read(spark_tmp_path, mt_opt, v1_enabled_list):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
Expand All @@ -188,12 +188,32 @@ def test_simple_partitioned_read(spark_tmp_path, small_file_opt, v1_enabled_list
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@pytest.mark.parametrize('small_file_opt', ["false", "true"])
# In this we are reading the data, but only reading the key the data was partitioned by
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, small_file_opt):
def test_partitioned_read_just_partitions(spark_tmp_path, mt_opt, v1_enabled_list):
parquet_gens = [byte_gen]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'})
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path).select("key"),
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@pytest.mark.parametrize('mt_opt', ["false", "true"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, mt_opt):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen]
Expand All @@ -210,14 +230,14 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, small_file_op
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list,
'spark.sql.files.maxPartitionBytes': "1g",
'spark.sql.files.minPartitionNum': '1'})

@pytest.mark.parametrize('small_file_opt', ["false", "true"])
@pytest.mark.parametrize('mt_opt', ["false", "true"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_merge_schema(spark_tmp_path, v1_enabled_list, small_file_opt):
def test_read_merge_schema(spark_tmp_path, v1_enabled_list, mt_opt):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
Expand All @@ -236,12 +256,12 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, small_file_opt):
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@pytest.mark.parametrize('small_file_opt', ["false", "true"])
@pytest.mark.parametrize('mt_opt', ["false", "true"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file_opt):
def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, mt_opt):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen,
Expand All @@ -260,7 +280,7 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.parquet.mergeSchema': "true",
'spark.sql.sources.useV1SourceList': v1_enabled_list})

Expand All @@ -269,9 +289,9 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file
string_gen, boolean_gen, date_gen, timestamp_gen]]

@pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn)
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list):
def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list):
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
Expand All @@ -280,7 +300,7 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabl
data_path,
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS',
'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

parquet_part_write_gens = [
Expand All @@ -292,9 +312,9 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabl
# There are race conditions around when individual files are read in for partitioned data
@ignore_order
@pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn)
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_part_write_round_trip(spark_tmp_path, parquet_gen, small_file_opt, v1_enabled_list):
def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_list):
gen_list = [('a', RepeatSeqGen(parquet_gen, 10)),
('b', parquet_gen)]
data_path = spark_tmp_path + '/PARQUET_DATA'
Expand All @@ -304,26 +324,26 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen, small_file_opt, v1_e
data_path,
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED',
'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS',
'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

parquet_write_compress_options = ['none', 'uncompressed', 'snappy']
@pytest.mark.parametrize('compress', parquet_write_compress_options)
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_compress_write_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list):
def test_compress_write_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list):
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_writes_are_equal_collect(
lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path),
lambda spark, path : spark.read.parquet(path),
data_path,
conf={'spark.sql.parquet.compression.codec': compress,
'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})

@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_input_meta(spark_tmp_path, small_file_opt, v1_enabled_list):
def test_input_meta(spark_tmp_path, mt_opt, v1_enabled_list):
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
with_cpu_session(
lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path))
Expand All @@ -338,7 +358,7 @@ def test_input_meta(spark_tmp_path, small_file_opt, v1_enabled_list):
'input_file_name()',
'input_file_block_start()',
'input_file_block_length()'),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list})


Expand All @@ -351,12 +371,12 @@ def createBucketedTableAndJoin(spark):

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.parametrize('small_file_opt', ["true", "false"])
@pytest.mark.parametrize('mt_opt', ["true", "false"])
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
# this test would be better if we could ensure exchanges didn't exist - ie used buckets
def test_buckets(spark_tmp_path, small_file_opt, v1_enabled_list):
def test_buckets(spark_tmp_path, mt_opt, v1_enabled_list):
assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin,
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt,
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt,
'spark.sql.sources.useV1SourceList': v1_enabled_list,
"spark.sql.autoBroadcastJoinThreshold": '-1'})

Expand All @@ -376,7 +396,7 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list):
data_path = spark_tmp_path + '/PARQUET_DATA'
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path),
conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': 'true',
conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': 'true',
'spark.sql.files.maxPartitionBytes': "1g",
'spark.sql.sources.useV1SourceList': v1_enabled_list})

Loading