diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index dfe8e7e13b1..844040d4ffd 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -34,16 +34,16 @@ def read_parquet_sql(data_path): @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, small_file_opt, v1_enabled_list): +def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, mt_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) assert_gpu_and_cpu_are_equal_collect(read_func(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) @allow_non_gpu('FileSourceScanExec') @@ -69,16 +69,16 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): # https://github.com/NVIDIA/spark-rapids/issues/143 @pytest.mark.parametrize('compress', parquet_compress_options) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_read_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list): +def test_compress_read_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), conf={'spark.sql.parquet.compression.codec': compress}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_pred_push_gens = [ @@ -90,9 +90,9 @@ def test_compress_read_round_trip(spark_tmp_path, compress, small_file_opt, v1_e @pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, small_file_opt, v1_enabled_list): +def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, mt_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] s0 = gen_scalar(parquet_gen, force_no_nulls=True) @@ -102,16 +102,16 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, small_file rf = read_func(data_path) assert_gpu_and_cpu_are_equal_collect( lambda spark: rf(spark).select(f.col('a') >= s0), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'] @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list): +def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list): # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)) @@ -122,7 +122,7 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt, 'spark.sql.parquet.outputTimestampType': ts_write}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) def readParquetCatchException(spark, data_path): @@ -132,9 +132,9 @@ def readParquetCatchException(spark, data_path): @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['LEGACY']) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list): +def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list): # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)) @@ -145,7 +145,7 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, smal 'spark.sql.parquet.outputTimestampType': ts_write}) with_gpu_session( lambda spark : readParquetCatchException(spark, data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -155,9 +155,9 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, smal pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))] @pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list): +def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -165,12 +165,12 @@ def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, small_file_opt, v1 conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_simple_partitioned_read(spark_tmp_path, small_file_opt, v1_enabled_list): +def test_simple_partitioned_read(spark_tmp_path, mt_opt, v1_enabled_list): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -188,12 +188,32 @@ def test_simple_partitioned_read(spark_tmp_path, small_file_opt, v1_enabled_list data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('small_file_opt', ["false", "true"]) +# In this we are reading the data, but only reading the key the data was partitioned by +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, small_file_opt): +def test_partitioned_read_just_partitions(spark_tmp_path, mt_opt, v1_enabled_list): + parquet_gens = [byte_gen] + gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path).select("key"), + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + +@pytest.mark.parametrize('mt_opt', ["false", "true"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, mt_opt): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen] @@ -210,14 +230,14 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, small_file_op data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list, 'spark.sql.files.maxPartitionBytes': "1g", 'spark.sql.files.minPartitionNum': '1'}) -@pytest.mark.parametrize('small_file_opt', ["false", "true"]) +@pytest.mark.parametrize('mt_opt', ["false", "true"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema(spark_tmp_path, v1_enabled_list, small_file_opt): +def test_read_merge_schema(spark_tmp_path, v1_enabled_list, mt_opt): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -236,12 +256,12 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, small_file_opt): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('small_file_opt', ["false", "true"]) +@pytest.mark.parametrize('mt_opt', ["false", "true"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file_opt): +def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, mt_opt): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -260,7 +280,7 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.parquet.mergeSchema': "true", 'spark.sql.sources.useV1SourceList': v1_enabled_list}) @@ -269,9 +289,9 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file string_gen, boolean_gen, date_gen, timestamp_gen]] @pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list): +def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( @@ -280,7 +300,7 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabl data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', - 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_part_write_gens = [ @@ -292,9 +312,9 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabl # There are race conditions around when individual files are read in for partitioned data @ignore_order @pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_part_write_round_trip(spark_tmp_path, parquet_gen, small_file_opt, v1_enabled_list): +def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_list): gen_list = [('a', RepeatSeqGen(parquet_gen, 10)), ('b', parquet_gen)] data_path = spark_tmp_path + '/PARQUET_DATA' @@ -304,26 +324,26 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen, small_file_opt, v1_e data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', - 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_write_compress_options = ['none', 'uncompressed', 'snappy'] @pytest.mark.parametrize('compress', parquet_write_compress_options) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_write_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list): +def test_compress_write_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path), lambda spark, path : spark.read.parquet(path), data_path, conf={'spark.sql.parquet.compression.codec': compress, - 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta(spark_tmp_path, small_file_opt, v1_enabled_list): +def test_input_meta(spark_tmp_path, mt_opt, v1_enabled_list): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -338,7 +358,7 @@ def test_input_meta(spark_tmp_path, small_file_opt, v1_enabled_list): 'input_file_name()', 'input_file_block_start()', 'input_file_block_length()'), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) @@ -351,12 +371,12 @@ def createBucketedTableAndJoin(spark): @ignore_order @allow_non_gpu('DataWritingCommandExec') -@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('mt_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) # this test would be better if we could ensure exchanges didn't exist - ie used buckets -def test_buckets(spark_tmp_path, small_file_opt, v1_enabled_list): +def test_buckets(spark_tmp_path, mt_opt, v1_enabled_list): assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin, - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list, "spark.sql.autoBroadcastJoinThreshold": '-1'}) @@ -376,7 +396,7 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': 'true', + conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': 'true', 'spark.sql.files.maxPartitionBytes': "1g", 'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index ee61325c728..306defbe4ef 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -924,13 +924,17 @@ class MultiFileParquetPartitionReader( private def addPartitionValues( batch: Option[ColumnarBatch], inPartitionValues: InternalRow): Option[ColumnarBatch] = { - batch.map { cb => - val partitionValues = inPartitionValues.toSeq(partitionSchema) - val partitionScalars = ColumnarPartitionReaderWithPartitionValues - .createPartitionValues(partitionValues, partitionSchema) - withResource(partitionScalars) { scalars => - ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + if (partitionSchema.nonEmpty) { + batch.map { cb => + val partitionValues = inPartitionValues.toSeq(partitionSchema) + val partitionScalars = ColumnarPartitionReaderWithPartitionValues + .createPartitionValues(partitionValues, partitionSchema) + withResource(partitionScalars) { scalars => + ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + } } + } else { + batch } } @@ -945,9 +949,12 @@ class MultiFileParquetPartitionReader( // shouldn't ever get here None } - // not reading any data, so return a degenerate ColumnarBatch with the row count + // not reading any data, but add in partition data if needed if (hostBuffer == null) { - return Some(new ColumnarBatch(Array.empty, dataSize.toInt)) + // Someone is going to process this data, even if it is just a row count + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + val emptyBatch = new ColumnarBatch(Array.empty, dataSize.toInt) + return addPartitionValues(Some(emptyBatch), partValues) } val table = withResource(hostBuffer) { _ => if (debugDumpPrefix != null) {