diff --git a/docs/configs.md b/docs/configs.md index 0f51747dbac9..c7a52c9c041c 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -29,6 +29,7 @@ scala> spark.conf.set("spark.rapids.sql.incompatibleOps.enabled", true) Name | Description | Default Value -----|-------------|-------------- +spark.rapids.cloudSchemes|Comma separated list of additional URI schemes that are to be considered cloud based filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores generally would be total separate from the executors and likely have a higher I/O read cost. Many times the cloud filesystems also get better throughput when you have multiple readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type|None spark.rapids.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory. Extra memory will be allocated as needed, but it may result in more fragmentation. This must be less than or equal to the maximum limit configured via spark.rapids.memory.gpu.maxAllocFraction.|0.9 spark.rapids.memory.gpu.debug|Provides a log of GPU memory allocations and frees. If set to STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. All other values are reserved for possible future expansion and in the mean time will disable logging.|NONE spark.rapids.memory.gpu.maxAllocFraction|The fraction of total GPU memory that limits the maximum size of the RMM pool. The value must be greater than or equal to the setting for spark.rapids.memory.gpu.allocFraction. Note that this limit will be reduced by the reserve memory configured in spark.rapids.memory.gpu.reserve.|1.0 @@ -61,10 +62,10 @@ Name | Description | Default Value spark.rapids.sql.format.orc.read.enabled|When set to false disables orc input acceleration|true spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true -spark.rapids.sql.format.parquet.multiThreadedRead.enabled|When set to true, reads multiple small files within a partition more efficiently by reading each file in a separate thread in parallel on the CPU side before sending to the GPU. Limited by spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|true -spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel.|2147483647 -spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started.|20 +spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type|2147483647 +spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|20 spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true +spark.rapids.sql.format.parquet.reader.type|Sets the parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 7b3559ad36f2..194fb985bcf8 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -32,19 +32,28 @@ def read_parquet_sql(data_path): TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))], pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/132'))] +# test with original parquet file reader, the multi-file parallel reader for cloud, and coalesce file reader for +# non-cloud +original_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'} +multithreaded_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'} +coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'} +reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, + coalesce_parquet_file_reader_conf] + @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, mt_opt, v1_enabled_list): +def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect(read_func(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) @allow_non_gpu('FileSourceScanExec') @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @@ -70,17 +79,18 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): # https://github.com/NVIDIA/spark-rapids/issues/143 @pytest.mark.parametrize('compress', parquet_compress_options) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_read_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list): +def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), conf={'spark.sql.parquet.compression.codec': compress}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) parquet_pred_push_gens = [ byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen, @@ -91,9 +101,9 @@ def test_compress_read_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_l @pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, mt_opt, v1_enabled_list): +def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] s0 = gen_scalar(parquet_gen, force_no_nulls=True) @@ -101,18 +111,19 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, mt_opt, v1 lambda spark : gen_df(spark, gen_list).orderBy('a').write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) rf = read_func(data_path) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark: rf(spark).select(f.col('a') >= s0), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'] @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list): +def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)) @@ -121,10 +132,11 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enab lambda spark : unary_op_df(spark, gen).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, 'spark.sql.parquet.outputTimestampType': ts_write}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) def readParquetCatchException(spark, data_path): with pytest.raises(Exception) as e_info: @@ -133,9 +145,9 @@ def readParquetCatchException(spark, data_path): @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['LEGACY']) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, mt_opt, v1_enabled_list): +def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)) @@ -144,10 +156,11 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, mt_o lambda spark : unary_op_df(spark, gen).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, 'spark.sql.parquet.outputTimestampType': ts_write}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) with_gpu_session( lambda spark : readParquetCatchException(spark, data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), @@ -156,22 +169,23 @@ def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, mt_o pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))] @pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list): +def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_simple_partitioned_read(spark_tmp_path, mt_opt, v1_enabled_list): +def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -187,15 +201,16 @@ def test_simple_partitioned_read(spark_tmp_path, mt_opt, v1_enabled_list): lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) # In this we are reading the data, but only reading the key the data was partitioned by -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_partitioned_read_just_partitions(spark_tmp_path, mt_opt, v1_enabled_list): +def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reader_confs): parquet_gens = [byte_gen] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' @@ -207,14 +222,15 @@ def test_partitioned_read_just_partitions(spark_tmp_path, mt_opt, v1_enabled_lis lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path).select("key"), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) -@pytest.mark.parametrize('mt_opt', ["false", "true"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, mt_opt): +def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen] @@ -229,16 +245,17 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, mt_opt): with_cpu_session( lambda spark : gen_df(spark, second_gen_list, 1).write.parquet(second_data_path)) data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.files.maxPartitionBytes': "1g", + 'spark.sql.files.minPartitionNum': '1'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list, - 'spark.sql.files.maxPartitionBytes': "1g", - 'spark.sql.files.minPartitionNum': '1'}) + conf=all_confs) -@pytest.mark.parametrize('mt_opt', ["false", "true"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema(spark_tmp_path, v1_enabled_list, mt_opt): +def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -255,14 +272,15 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, mt_opt): lambda spark : gen_df(spark, second_gen_list).write.parquet(second_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) -@pytest.mark.parametrize('mt_opt', ["false", "true"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, mt_opt): +def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -278,32 +296,34 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, mt_opt): with_cpu_session( lambda spark : gen_df(spark, second_gen_list).write.parquet(second_data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.parquet.mergeSchema': "true"}) data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.parquet.mergeSchema': "true", - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) parquet_write_gens_list = [ [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen]] @pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.parametrize('ts_type', ["TIMESTAMP_MICROS", "TIMESTAMP_MILLIS"]) -def test_write_round_trip(spark_tmp_path, parquet_gens, mt_opt, v1_enabled_list, ts_type): +def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list, ts_type, reader_confs): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', + 'spark.sql.parquet.outputTimestampType': ts_type}) assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.parquet(path), lambda spark, path: spark.read.parquet(path), data_path, - conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', - 'spark.sql.parquet.outputTimestampType': ts_type, - 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) @pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS']) @pytest.mark.parametrize('ts_rebase', ['CORRECTED']) @@ -327,39 +347,41 @@ def test_write_ts_millis(spark_tmp_path, ts_type, ts_rebase): # There are race conditions around when individual files are read in for partitioned data @ignore_order @pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.parametrize('ts_type', ['TIMESTAMP_MILLIS', 'TIMESTAMP_MICROS']) -def test_part_write_round_trip(spark_tmp_path, parquet_gen, mt_opt, v1_enabled_list, ts_type): +def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list, ts_type, reader_confs): gen_list = [('a', RepeatSeqGen(parquet_gen, 10)), ('b', parquet_gen)] data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', + 'spark.sql.parquet.outputTimestampType': ts_type}) assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path: gen_df(spark, gen_list).coalesce(1).write.partitionBy('a').parquet(path), lambda spark, path: spark.read.parquet(path), data_path, - conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', - 'spark.sql.parquet.outputTimestampType': ts_type, - 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) parquet_write_compress_options = ['none', 'uncompressed', 'snappy'] @pytest.mark.parametrize('compress', parquet_write_compress_options) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_write_round_trip(spark_tmp_path, compress, mt_opt, v1_enabled_list): +def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.parquet.compression.codec': compress}) assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path), lambda spark, path : spark.read.parquet(path), data_path, - conf={'spark.sql.parquet.compression.codec': compress, - 'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf=all_confs) -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta(spark_tmp_path, mt_opt, v1_enabled_list): +def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -367,6 +389,8 @@ def test_input_meta(spark_tmp_path, mt_opt, v1_enabled_list): with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(second_data_path)) data_path = spark_tmp_path + '/PARQUET_DATA' + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path)\ .filter(f.col('a') > 0)\ @@ -374,9 +398,7 @@ def test_input_meta(spark_tmp_path, mt_opt, v1_enabled_list): 'input_file_name()', 'input_file_block_start()', 'input_file_block_length()'), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) - + conf=all_confs) def createBucketedTableAndJoin(spark): spark.range(10e4).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e4") @@ -387,14 +409,15 @@ def createBucketedTableAndJoin(spark): @ignore_order @allow_non_gpu('DataWritingCommandExec') -@pytest.mark.parametrize('mt_opt', ["true", "false"]) +@pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) # this test would be better if we could ensure exchanges didn't exist - ie used buckets -def test_buckets(spark_tmp_path, mt_opt, v1_enabled_list): +def test_buckets(spark_tmp_path, v1_enabled_list, reader_confs): + all_confs = reader_confs.copy() + all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list, + "spark.sql.autoBroadcastJoinThreshold": '-1'}) assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin, - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': mt_opt, - 'spark.sql.sources.useV1SourceList': v1_enabled_list, - "spark.sql.autoBroadcastJoinThreshold": '-1'}) + conf=all_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_small_file_memory(spark_tmp_path, v1_enabled_list): @@ -412,7 +435,7 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.rapids.sql.format.parquet.multiThreadedRead.enabled': 'true', - 'spark.sql.files.maxPartitionBytes': "1g", - 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING', + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.files.maxPartitionBytes': "1g"}) diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala index 76c5188b84b1..be7704940ea9 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala @@ -42,10 +42,10 @@ case class GpuParquetScan( partitionFilters: Seq[Expression], dataFilters: Seq[Expression], rapidsConf: RapidsConf, - supportsSmallFileOpt: Boolean = true) + queryUsesInputFile: Boolean = false) extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, - supportsSmallFileOpt) with FileScan { + queryUsesInputFile) with FileScan { override def isSplitable(path: Path): Boolean = super.isSplitableBase(path) @@ -55,7 +55,8 @@ case class GpuParquetScan( case p: GpuParquetScan => super.equals(p) && dataSchema == p.dataSchema && options == p.options && equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf && - supportsSmallFileOpt == p.supportsSmallFileOpt + queryUsesInputFile == p.queryUsesInputFile + case _ => false } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index c7abca6440a1..d032f53e3b3a 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -157,10 +156,7 @@ class Spark300Shims extends SparkShims { wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) - val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled - case _ => false - } + GpuFileSourceScanExec( newRelation, wrapped.output, @@ -170,7 +166,7 @@ class Spark300Shims extends SparkShims { None, wrapped.dataFilters, wrapped.tableIdentifier, - canUseSmallFileOpt) + conf) } }), GpuOverrides.exec[SortMergeJoinExec]( @@ -272,8 +268,7 @@ class Spark300Shims extends SparkShims { a.options, a.partitionFilters, a.dataFilters, - conf, - conf.isParquetMultiThreadReadEnabled) + conf) } def isSupported(t: DataType) = t match { case MapType(StringType, StringType, _) => true @@ -377,16 +372,18 @@ class Spark300Shims extends SparkShims { FilePartition(index, files) } - override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, - supportsSmallFileOpt: Boolean): GpuBatchScanExec = { + override def copyParquetBatchScanExec( + batchScanExec: GpuBatchScanExec, + queryUsesInputFile: Boolean): GpuBatchScanExec = { val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan] - val scanCopy = scan.copy(supportsSmallFileOpt=supportsSmallFileOpt) + val scanCopy = scan.copy(queryUsesInputFile=queryUsesInputFile) batchScanExec.copy(scan=scanCopy) } - override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, - supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { - scanExec.copy(supportsSmallFileOpt = supportsSmallFileOpt) + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile=queryUsesInputFile) } override def getGpuColumnarToRowTransition(plan: SparkPlan, diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index b5cfc4a23ea7..a02c33da9958 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -109,10 +109,7 @@ class Spark300dbShims extends Spark300Shims { wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) - val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled - case _ => false - } + GpuFileSourceScanExec( newRelation, wrapped.output, @@ -123,7 +120,7 @@ class Spark300dbShims extends Spark300Shims { None, wrapped.dataFilters, wrapped.tableIdentifier, - canUseSmallFileOpt) + conf) } }), GpuOverrides.exec[SortMergeJoinExec]( @@ -199,8 +196,9 @@ class Spark300dbShims extends Spark300Shims { FilePartition(index, files) } - override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, - supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { - scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt) + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile=queryUsesInputFile) } } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala index 71ae25d7baae..2d03c067ef2e 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala @@ -42,10 +42,10 @@ case class GpuParquetScan( partitionFilters: Seq[Expression], dataFilters: Seq[Expression], rapidsConf: RapidsConf, - supportsSmallFileOpt: Boolean = true) + queryUsesInputFile: Boolean = true) extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, - supportsSmallFileOpt) with FileScan { + queryUsesInputFile) with FileScan { override def isSplitable(path: Path): Boolean = super.isSplitableBase(path) @@ -55,7 +55,7 @@ case class GpuParquetScan( case p: GpuParquetScan => super.equals(p) && dataSchema == p.dataSchema && options == p.options && equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf && - supportsSmallFileOpt == p.supportsSmallFileOpt + queryUsesInputFile == p.queryUsesInputFile case _ => false } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index 9bfe3f8ecca8..ba464bb6d729 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, ShuffledHashJoinExec, SortMergeJoinExec} @@ -159,10 +158,7 @@ class Spark310Shims extends Spark301Shims { wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), options)(sparkSession) - val canUseSmallFileOpt = newRelation.fileFormat match { - case _: ParquetFileFormat => conf.isParquetMultiThreadReadEnabled - case _ => false - } + GpuFileSourceScanExec( newRelation, wrapped.output, @@ -172,7 +168,7 @@ class Spark310Shims extends Spark301Shims { wrapped.optionalNumCoalescedBuckets, wrapped.dataFilters, wrapped.tableIdentifier, - canUseSmallFileOpt) + conf) } }), GpuOverrides.exec[InMemoryTableScanExec]( @@ -224,8 +220,7 @@ class Spark310Shims extends Spark301Shims { a.options, a.partitionFilters, a.dataFilters, - conf, - conf.isParquetMultiThreadReadEnabled) + conf) } def isSupported(t: DataType) = t match { case MapType(StringType, StringType, _) => true @@ -271,16 +266,18 @@ class Spark310Shims extends Spark301Shims { new ShuffleManagerShim } - override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, - supportsSmallFileOpt: Boolean): GpuBatchScanExec = { + override def copyParquetBatchScanExec( + batchScanExec: GpuBatchScanExec, + queryUsesInputFile: Boolean): GpuBatchScanExec = { val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan] - val scanCopy = scan.copy(supportsSmallFileOpt = supportsSmallFileOpt) - batchScanExec.copy(scan = scanCopy) + val scanCopy = scan.copy(queryUsesInputFile=queryUsesInputFile) + batchScanExec.copy(scan=scanCopy) } - override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, - supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { - scanExec.copy(supportsSmallFileOpt = supportsSmallFileOpt) + override def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec = { + scanExec.copy(queryUsesInputFile=queryUsesInputFile) } override def getGpuColumnarToRowTransition(plan: SparkPlan, diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 5f9a83da30a7..b9ad2cecf7f0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -16,15 +16,16 @@ package com.nvidia.spark.rapids -import java.io.OutputStream -import java.net.URI +import java.io.{File, OutputStream} +import java.net.{URI, URISyntaxException} import java.nio.charset.StandardCharsets import java.util.{Collections, Locale} import java.util.concurrent._ import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection.mutable.{ArrayBuffer, Queue} +import scala.collection.immutable.HashSet +import scala.collection.mutable.{ArrayBuffer, LinkedHashMap, Queue} import scala.math.max import ai.rapids.cudf._ @@ -65,6 +66,21 @@ import org.apache.spark.sql.types.{MapType, StringType, StructType, TimestampTyp import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration +/** + * Base GpuParquetScan used for common code across Spark versions. Gpu version of + * Spark's 'ParquetScan'. + * + * @param sparkSession SparkSession. + * @param hadoopConf Hadoop configuration. + * @param dataSchema Schema of the data. + * @param readDataSchema Schema to read. + * @param readPartitionSchema Partition schema. + * @param pushedFilters Filters on non-partition columns. + * @param rapidsConf Rapids configuration. + * @param queryUsesInputFile This is a parameter to easily allow turning it + * off in GpuTransitionOverrides if InputFileName, + * InputFileBlockStart, or InputFileBlockLength are used + */ abstract class GpuParquetScanBase( sparkSession: SparkSession, hadoopConf: Configuration, @@ -73,7 +89,7 @@ abstract class GpuParquetScanBase( readPartitionSchema: StructType, pushedFilters: Array[Filter], rapidsConf: RapidsConf, - supportsSmallFileOpt: Boolean) + queryUsesInputFile: Boolean) extends ScanWithMetrics with Logging { def isSplitableBase(path: Path): Boolean = true @@ -82,13 +98,14 @@ abstract class GpuParquetScanBase( val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - logDebug(s"Small file optimization support: $supportsSmallFileOpt") - if (supportsSmallFileOpt) { - GpuParquetMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics) - } else { + if (rapidsConf.isParquetPerFileReadEnabled) { + logInfo("Using the original per file parquet reader") GpuParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics) + } else { + GpuParquetMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics, + queryUsesInputFile) } } } @@ -215,6 +232,10 @@ object GpuParquetPartitionReaderFactoryBase { private case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: Seq[BlockMetaData], partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) +// contains meta about a single block in a file +private case class ParquetFileInfoWithSingleBlockMeta(filePath: Path, blockMeta: BlockMetaData, + partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) + private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) extends Arm { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown @@ -288,13 +309,24 @@ case class GpuParquetMultiFilePartitionReaderFactory( partitionSchema: StructType, filters: Array[Filter], @transient rapidsConf: RapidsConf, - metrics: Map[String, SQLMetric]) extends PartitionReaderFactory with Arm with Logging { + metrics: Map[String, SQLMetric], + queryUsesInputFile: Boolean) extends PartitionReaderFactory with Arm with Logging { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes private val numThreads = rapidsConf.parquetMultiThreadReadNumThreads private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel + private val canUseMultiThreadReader = rapidsConf.isParquetMultiThreadReadEnabled + // we can't use the coalescing files reader when InputFileName, InputFileBlockStart, + // or InputFileBlockLength because we are combining all the files into a single buffer + // and we don't know which file is associated with each row. + private val canUseCoalesceFilesReader = + rapidsConf.isParquetCoalesceFileReadEnabled && !queryUsesInputFile + + private val configCloudSchemes = rapidsConf.getCloudSchemes + private val CLOUD_SCHEMES = HashSet("dbfs", "s3", "s3a", "s3n", "wasbs", "gs") + private val allCloudSchemes = CLOUD_SCHEMES ++ configCloudSchemes.getOrElse(Seq.empty) private val filterHandler = new GpuParquetFileFilterHandler(sqlConf) @@ -304,22 +336,79 @@ case class GpuParquetMultiFilePartitionReaderFactory( throw new IllegalStateException("GPU column parser called to read rows") } + private def resolveURI(path: String): URI = { + try { + val uri = new URI(path) + if (uri.getScheme() != null) { + return uri + } + } catch { + case e: URISyntaxException => + } + new File(path).getAbsoluteFile().toURI() + } + + // We expect the filePath here to always have a scheme on it, + // if it doesn't we try using the local filesystem. If that + // doesn't work for some reason user would need to configure + // it directly. + private def isCloudFileSystem(filePath: String): Boolean = { + val uri = resolveURI(filePath) + val scheme = uri.getScheme + if (allCloudSchemes.contains(scheme)) { + true + } else { + false + } + } + + private def arePathsInCloud(filePaths: Array[String]): Boolean = { + filePaths.exists(isCloudFileSystem) + } + override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { assert(partition.isInstanceOf[FilePartition]) val filePartition = partition.asInstanceOf[FilePartition] val files = filePartition.files - buildBaseColumnarParquetReader(files) + val filePaths = files.map(_.filePath) + val conf = broadcastedConf.value.value + if (!canUseCoalesceFilesReader || (canUseMultiThreadReader && arePathsInCloud(filePaths))) { + logInfo("Using the multi-threaded multi-file parquet reader, files: " + + s"${filePaths.mkString(",")} task attemptid: ${TaskContext.get.taskAttemptId()}") + buildBaseColumnarParquetReaderForCloud(files, conf) + } else { + logInfo("Using the coalesce multi-file parquet reader, files: " + + s"${filePaths.mkString(",")} task attemptid: ${TaskContext.get.taskAttemptId()}") + buildBaseColumnarParquetReader(files, conf) + } } - private def buildBaseColumnarParquetReader( - files: Array[PartitionedFile]): PartitionReader[ColumnarBatch] = { - val conf = broadcastedConf.value.value - logDebug(s"Number files being read: ${files.size} for task ${TaskContext.get().partitionId()}") - new MultiFileParquetPartitionReader(conf, files, + private def buildBaseColumnarParquetReaderForCloud( + files: Array[PartitionedFile], + conf: Configuration): PartitionReader[ColumnarBatch] = { + new MultiFileCloudParquetPartitionReader(conf, files, isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema, numThreads, maxNumFileProcessed, filterHandler, filters) } + + private def buildBaseColumnarParquetReader( + files: Array[PartitionedFile], + conf: Configuration): PartitionReader[ColumnarBatch] = { + val conf = broadcastedConf.value.value + val clippedBlocks = ArrayBuffer[ParquetFileInfoWithSingleBlockMeta]() + files.map { file => + val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + clippedBlocks ++= singleFileInfo.blocks.map( + ParquetFileInfoWithSingleBlockMeta(singleFileInfo.filePath, _, file.partitionValues, + singleFileInfo.schema, singleFileInfo.isCorrectedRebaseMode)) + } + + new MultiFileParquetPartitionReader(conf, files, clippedBlocks, + isCaseSensitive, readDataSchema, debugDumpPrefix, + maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, + partitionSchema, numThreads) + } } case class GpuParquetPartitionReaderFactory( @@ -403,7 +492,8 @@ abstract class FileParquetPartitionReaderBase( protected def calculateParquetOutputSize( currentChunkedBlocks: Seq[BlockMetaData], - schema: MessageType): Long = { + schema: MessageType, + handleCoalesceFiles: Boolean): Long = { // start with the size of Parquet magic (at start+end) and footer length values var size: Long = 4 + 4 + 4 @@ -413,7 +503,20 @@ abstract class FileParquetPartitionReaderBase( size += currentChunkedBlocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum val footerSize = calculateParquetFooterSize(currentChunkedBlocks, schema) - val totalSize = size + footerSize + val extraMemory = if (handleCoalesceFiles) { + // we want to add extra memory because the ColumnChunks saved in the Footer have 2 fields + // file_offset and data_page_offset that get much larger when we are combining files. + // Here we estimate that by taking the number of columns * number of blocks which should be + // the number of column chunks and then saying there are 2 fields that could be larger and + // assume max size of those would be 8 bytes worst case. So we probably allocate to much here + // but it shouldn't be by a huge amount and its better then having to realloc and copy. + val numCols = currentChunkedBlocks.head.getColumns().size() + val numColumnChunks = numCols * currentChunkedBlocks.size + numColumnChunks * 2 * 8 + } else { + 0 + } + val totalSize = size + footerSize + extraMemory totalSize } @@ -460,7 +563,8 @@ abstract class FileParquetPartitionReaderBase( protected def copyBlocksData( in: FSDataInputStream, out: HostMemoryOutputStream, - blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = { + blocks: Seq[BlockMetaData], + realStartOffset: Long): Seq[BlockMetaData] = { var totalRows: Long = 0 val outputBlocks = new ArrayBuffer[BlockMetaData](blocks.length) val copyRanges = new ArrayBuffer[CopyRange] @@ -473,7 +577,8 @@ abstract class FileParquetPartitionReaderBase( val outputColumns = new ArrayBuffer[ColumnChunkMetaData](columns.length) columns.foreach { column => // update column metadata to reflect new position in the output file - val offsetAdjustment = out.getPos + totalBytesToCopy - column.getStartingPos + val startPosCol = column.getStartingPos + val offsetAdjustment = realStartOffset + totalBytesToCopy - startPosCol val newDictOffset = if (column.getDictionaryPageOffset > 0) { column.getDictionaryPageOffset + offsetAdjustment } else { @@ -585,11 +690,11 @@ abstract class FileParquetPartitionReaderBase( withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, metrics("bufferTime"))) { _ => withResource(filePath.getFileSystem(conf).open(filePath)) { in => - val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema) + val estTotalSize = calculateParquetOutputSize(blocks, clippedSchema, false) closeOnExcept(HostMemoryBuffer.allocate(estTotalSize)) { hmb => val out = new HostMemoryOutputStream(hmb) out.write(ParquetPartitionReader.PARQUET_MAGIC) - val outputBlocks = copyBlocksData(in, out, blocks) + val outputBlocks = copyBlocksData(in, out, blocks, out.getPos) val footerPos = out.getPos writeFooter(out, outputBlocks, clippedSchema) @@ -641,6 +746,23 @@ abstract class FileParquetPartitionReaderBase( currentChunk } + protected def addPartitionValues( + batch: Option[ColumnarBatch], + inPartitionValues: InternalRow, + partitionSchema: StructType): Option[ColumnarBatch] = { + if (partitionSchema.nonEmpty) { + batch.map { cb => + val partitionValues = inPartitionValues.toSeq(partitionSchema) + val partitionScalars = ColumnarPartitionReaderWithPartitionValues + .createPartitionValues(partitionValues, partitionSchema) + withResource(partitionScalars) { scalars => + ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + } + } + } else { + batch + } + } } // Singleton threadpool that is used across all the tasks. @@ -677,7 +799,328 @@ object MultiFileThreadPoolFactory { } /** - * A PartitionReader that can read multiple Parquet files in parallel. + * A PartitionReader that can read multiple Parquet files up to the certain size. It will + * coalesce small files together and copy the block data in a separate thread pool to speed + * up processing the small files before sending down to the GPU. + * + * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file + * in memory that contains just the column chunks that are needed. This avoids sending + * unnecessary data to the GPU and saves GPU memory. + * + * @param conf the Hadoop configuration + * @param splits the partitioned files to read + * @param clippedBlocks the block metadata from the original Parquet file that has been clipped + * to only contain the column chunks to be read + * @param isSchemaCaseSensitive whether schema is case sensitive + * @param readDataSchema the Spark schema describing what will be read + * @param debugDumpPrefix a path prefix to use for dumping the fabricated Parquet data or null + * @param maxReadBatchSizeRows soft limit on the maximum number of rows the reader reads per batch + * @param maxReadBatchSizeBytes soft limit on the maximum number of bytes the reader reads per batch + * @param execMetrics metrics + * @param partitionSchema Schema of partitions. + * @param numThreads the size of the threadpool + */ +class MultiFileParquetPartitionReader( + conf: Configuration, + splits: Array[PartitionedFile], + clippedBlocks: Seq[ParquetFileInfoWithSingleBlockMeta], + isSchemaCaseSensitive: Boolean, + readDataSchema: StructType, + debugDumpPrefix: String, + maxReadBatchSizeRows: Integer, + maxReadBatchSizeBytes: Long, + execMetrics: Map[String, SQLMetric], + partitionSchema: StructType, + numThreads: Int) + extends FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema, + debugDumpPrefix, execMetrics) { + + private val blockIterator: BufferedIterator[ParquetFileInfoWithSingleBlockMeta] = + clippedBlocks.iterator.buffered + + class ParquetCopyBlocksRunner( + file: Path, + outhmb: HostMemoryBuffer, + blocks: ArrayBuffer[BlockMetaData], + offset: Long, + startTs: Long) + extends Callable[Seq[BlockMetaData]] { + + override def call(): Seq[BlockMetaData] = { + var out = new HostMemoryOutputStream(outhmb) + val res = withResource(file.getFileSystem(conf).open(file)) { in => + copyBlocksData(in, out, blocks, offset) + } + outhmb.close() + res + } + } + + override def next(): Boolean = { + batch.foreach(_.close()) + batch = None + if (!isDone) { + if (!blockIterator.hasNext) { + isDone = true + metrics("peakDevMemory") += maxDeviceMemory + } else { + batch = readBatch() + } + } + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + batch.isDefined + } + + private def reallocHostBufferAndCopy( + in: HostMemoryInputStream, + newSizeEstimate: Long): HostMemoryBuffer = { + // realloc memory and copy + closeOnExcept(HostMemoryBuffer.allocate(newSizeEstimate)) { newhmb => + val newout = new HostMemoryOutputStream(newhmb) + IOUtils.copy(in, newout) + newout.close() + newhmb + } + } + + private def readPartFiles( + blocks: Seq[(Path, BlockMetaData)], + clippedSchema: MessageType): (HostMemoryBuffer, Long) = { + withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, + metrics("bufferTime"))) { _ => + // ugly but we want to keep the order + val filesAndBlocks = LinkedHashMap[Path, ArrayBuffer[BlockMetaData]]() + blocks.foreach { case (path, block) => + filesAndBlocks.getOrElseUpdate(path, new ArrayBuffer[BlockMetaData]) += block + } + val tasks = new java.util.ArrayList[Future[Seq[BlockMetaData]]]() + + val allBlocks = blocks.map(_._2) + val initTotalSize = calculateParquetOutputSize(allBlocks, clippedSchema, true) + closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { allocBuf => + var hmb = allocBuf + val out = new HostMemoryOutputStream(hmb) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + var offset = out.getPos + val allOutputBlocks = scala.collection.mutable.ArrayBuffer[BlockMetaData]() + filesAndBlocks.foreach { case (file, blocks) => + val fileBlockSize = blocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum + // use a single buffer and slice it up for different files if we need + val outLocal = hmb.slice(offset, fileBlockSize) + // copy the blocks for each file in parallel using background threads + tasks.add(MultiFileThreadPoolFactory.submitToThreadPool( + new ParquetCopyBlocksRunner(file, outLocal, blocks, offset, System.nanoTime()), + numThreads)) + offset += fileBlockSize + } + + for (future <- tasks.asScala) { + val result = future.get() + allOutputBlocks ++= result + } + + // The footer size can change vs the initial estimated because we are combining more blocks + // and offsets are larger, check to make sure we allocated enough memory before writing. + // Not sure how expensive this is, we could throw exception instead if the written + // size comes out > then the estimated size. + val actualFooterSize = calculateParquetFooterSize(allOutputBlocks, clippedSchema) + // 4 + 4 is for writing size and the ending PARQUET_MAGIC. + val bufferSizeReq = offset + actualFooterSize + 4 + 4 + out.close() + val totalBufferSize = if (bufferSizeReq > initTotalSize) { + logWarning(s"The original estimated size $initTotalSize is to small, " + + s"reallocing and copying data to bigger buffer size: $bufferSizeReq") + val prevhmb = hmb + val in = new HostMemoryInputStream(prevhmb, offset) + hmb = reallocHostBufferAndCopy(in, bufferSizeReq) + prevhmb.close() + bufferSizeReq + } else { + initTotalSize + } + val lenLeft = totalBufferSize - offset + val finalizehmb = hmb.slice(offset, lenLeft) + val footerOut = new HostMemoryOutputStream(finalizehmb) + writeFooter(footerOut, allOutputBlocks, clippedSchema) + BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt) + footerOut.write(ParquetPartitionReader.PARQUET_MAGIC) + val amountWritten = offset + footerOut.getPos + // triple check we didn't go over memory + if (amountWritten > totalBufferSize) { + throw new QueryExecutionException(s"Calculated buffer size $totalBufferSize is to " + + s"small, actual written: ${amountWritten}") + } + if (finalizehmb != null) { + finalizehmb.close() + } + (hmb, amountWritten) + } + } + } + + private def readBatch(): Option[ColumnarBatch] = { + withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, + metrics(TOTAL_TIME))) { _ => + val (isCorrectRebaseMode, clippedSchema, partValues, seqPathsAndBlocks) = + populateCurrentBlockChunk() + if (readDataSchema.isEmpty) { + // not reading any data, so return a degenerate ColumnarBatch with the row count + val numRows = seqPathsAndBlocks.map(_._2.getRowCount).sum.toInt + if (numRows == 0) { + None + } else { + // Someone is going to process this data, even if it is just a row count + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + val emptyBatch = new ColumnarBatch(Array.empty, numRows.toInt) + addPartitionValues(Some(emptyBatch), partValues, partitionSchema) + } + } else { + val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode) + try { + val colTypes = readDataSchema.fields.map(f => f.dataType).toList + val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes.asJava)) + maybeBatch.foreach { batch => + logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") + } + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + addPartitionValues(maybeBatch, partValues, partitionSchema) + } finally { + table.foreach(_.close()) + } + } + } + } + + private def readToTable( + currentChunkedBlocks: Seq[(Path, BlockMetaData)], + clippedSchema: MessageType, + isCorrectRebaseMode: Boolean): Option[Table] = { + if (currentChunkedBlocks.isEmpty) { + return None + } + val (dataBuffer, dataSize) = readPartFiles(currentChunkedBlocks, clippedSchema) + try { + if (dataSize == 0) { + None + } else { + if (debugDumpPrefix != null) { + dumpParquetData(dataBuffer, dataSize, splits) + } + val parseOpts = ParquetOptions.builder() + .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) + .includeColumn(readDataSchema.fieldNames:_*).build() + + // about to start using the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + + val table = withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, + metrics(GPU_DECODE_TIME))) { _ => + Table.readParquet(parseOpts, dataBuffer, 0, dataSize) + } + closeOnExcept(table) { _ => + if (!isCorrectRebaseMode) { + (0 until table.getNumberOfColumns).foreach { i => + if (RebaseHelper.isDateTimeRebaseNeededRead(table.getColumn(i))) { + throw RebaseHelper.newRebaseExceptionInRead("Parquet") + } + } + } + maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) + if (readDataSchema.length < table.getNumberOfColumns) { + throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + + s"but read ${table.getNumberOfColumns} from $currentChunkedBlocks") + } + } + metrics(NUM_OUTPUT_BATCHES) += 1 + Some(evolveSchemaIfNeededAndClose(table, splits.mkString(","), clippedSchema)) + } + } finally { + dataBuffer.close() + } + } + + private def populateCurrentBlockChunk(): + (Boolean, MessageType, InternalRow, Seq[(Path, BlockMetaData)]) = { + + val currentChunk = new ArrayBuffer[(Path, BlockMetaData)] + var numRows: Long = 0 + var numBytes: Long = 0 + var numParquetBytes: Long = 0 + var currentFile: Path = null + var currentPartitionValues: InternalRow = null + var currentClippedSchema: MessageType = null + var currentIsCorrectRebaseMode: Boolean = false + + @tailrec + def readNextBatch(): Unit = { + if (blockIterator.hasNext) { + if (currentFile == null) { + currentFile = blockIterator.head.filePath + currentPartitionValues = blockIterator.head.partValues + currentClippedSchema = blockIterator.head.schema + currentIsCorrectRebaseMode = blockIterator.head.isCorrectedRebaseMode + } + if (currentFile != blockIterator.head.filePath) { + // We need to ensure all files we are going to combine have the same datetime rebase mode. + if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) { + logInfo("datetime rebase mode for the next file " + + s"${blockIterator.head.filePath} is different then current file $currentFile, " + + s"splitting into another batch.") + return + } + + // check to see if partitionValues different, then have to split it + if (blockIterator.head.partValues != currentPartitionValues) { + logInfo(s"Partition values for the next file ${blockIterator.head.filePath}" + + s" doesn't match current $currentFile, splitting it into another batch!") + return + } + val schemaNextfile = + blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName) + val schemaCurrentfile = + currentClippedSchema.asGroupType().getFields.asScala.map(_.getName) + if (!schemaNextfile.sameElements(schemaCurrentfile)) { + logInfo(s"File schema for the next file ${blockIterator.head.filePath}" + + s" doesn't match current $currentFile, splitting it into another batch!") + return + } + currentFile = blockIterator.head.filePath + currentPartitionValues = blockIterator.head.partValues + currentClippedSchema = blockIterator.head.schema + } + val peekedRowGroup = blockIterator.head.blockMeta + if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Too many rows in split") + } + + if (numRows == 0 || numRows + peekedRowGroup.getRowCount <= maxReadBatchSizeRows) { + val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, + peekedRowGroup.getRowCount) + if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { + val nextBlock = blockIterator.next() + val nextTuple = (nextBlock.filePath, nextBlock.blockMeta) + currentChunk += nextTuple + numRows += currentChunk.last._2.getRowCount + numParquetBytes += currentChunk.last._2.getTotalByteSize + numBytes += estimatedBytes + readNextBatch() + } + } + } + } + readNextBatch() + logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + + s"Estimated GPU bytes: $numBytes") + (currentIsCorrectRebaseMode, currentClippedSchema, currentPartitionValues, currentChunk) + } +} + +/** + * A PartitionReader that can read multiple Parquet files in parallel. This is most efficient + * running in a cloud environment where the I/O of reading is slow. * * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file * in memory that contains just the column chunks that are needed. This avoids sending @@ -698,7 +1141,7 @@ object MultiFileThreadPoolFactory { * @param filterHandler GpuParquetFileFilterHandler used to filter the parquet blocks * @param filters filters passed into the filterHandler */ -class MultiFileParquetPartitionReader( +class MultiFileCloudParquetPartitionReader( conf: Configuration, files: Array[PartitionedFile], isSchemaCaseSensitive: Boolean, @@ -922,23 +1365,6 @@ class MultiFileParquetPartitionReader( } } - private def addPartitionValues( - batch: Option[ColumnarBatch], - inPartitionValues: InternalRow): Option[ColumnarBatch] = { - if (partitionSchema.nonEmpty) { - batch.map { cb => - val partitionValues = inPartitionValues.toSeq(partitionSchema) - val partitionScalars = ColumnarPartitionReaderWithPartitionValues - .createPartitionValues(partitionValues, partitionSchema) - withResource(partitionScalars) { scalars => - ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) - } - } - } else { - batch - } - } - private def readBufferToTable( isCorrectRebaseMode: Boolean, clippedSchema: MessageType, @@ -955,7 +1381,7 @@ class MultiFileParquetPartitionReader( // Someone is going to process this data, even if it is just a row count GpuSemaphore.acquireIfNecessary(TaskContext.get()) val emptyBatch = new ColumnarBatch(Array.empty, dataSize.toInt) - return addPartitionValues(Some(emptyBatch), partValues) + return addPartitionValues(Some(emptyBatch), partValues, partitionSchema) } val table = withResource(hostBuffer) { _ => if (debugDumpPrefix != null) { @@ -997,7 +1423,7 @@ class MultiFileParquetPartitionReader( } // we have to add partition values here for this batch, we already verified that // its not different for all the blocks in this batch - addPartitionValues(maybeBatch, partValues) + addPartitionValues(maybeBatch, partValues, partitionSchema) } finally { table.foreach(_.close()) } @@ -1072,7 +1498,8 @@ class ParquetPartitionReader( } else { val table = readToTable(currentChunkedBlocks) try { - val maybeBatch = table.map(GpuColumnVector.from) + val colTypes = readDataSchema.fields.map(f => f.dataType).toList + val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes.asJava)) maybeBatch.foreach { batch => logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") } @@ -1179,4 +1606,3 @@ object ParquetPartitionReader { }) } } - diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 5c68aab306b8..ed40fffb510b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.execution.command.ExecutedCommandExec import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} -import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec} +import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuShuffleExchangeExecBase} /** @@ -178,6 +178,47 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { plan.expressions.exists(disableCoalesceUntilInput) } + private def disableScanUntilInput(exec: Expression): Boolean = { + exec match { + case _: InputFileName => true + case _: InputFileBlockStart => true + case _: InputFileBlockLength => true + case _: GpuInputFileName => true + case _: GpuInputFileBlockStart => true + case _: GpuInputFileBlockLength => true + case e => e.children.exists(disableScanUntilInput) + } + } + + private def disableScanUntilInput(plan: SparkPlan): Boolean = { + plan.expressions.exists(disableScanUntilInput) + } + + // This walks from the output to the input to look for any uses of InputFileName, + // InputFileBlockStart, or InputFileBlockLength when we use a Parquet read because + // we can't support the coalesce file reader optimization when this is used. + private def updateScansForInput(plan: SparkPlan, + disableUntilInput: Boolean = false): SparkPlan = plan match { + case batchScan: GpuBatchScanExec => + if (batchScan.scan.isInstanceOf[GpuParquetScanBase] && + (disableUntilInput || disableScanUntilInput(batchScan))) { + ShimLoader.getSparkShims.copyParquetBatchScanExec(batchScan, true) + } else { + batchScan + } + case fileSourceScan: GpuFileSourceScanExec => + if ((disableUntilInput || disableScanUntilInput(fileSourceScan))) { + ShimLoader.getSparkShims.copyFileSourceScanExec(fileSourceScan, true) + } else { + fileSourceScan + } + case p => + val planDisableUntilInput = disableScanUntilInput(p) && hasDirectLineToInput(p) + p.withNewChildren(p.children.map(c => { + updateScansForInput(c, planDisableUntilInput || disableUntilInput) + })) + } + // This walks from the output to the input so disableUntilInput can walk its way from when // we hit something that cannot allow for coalesce up until the input private def insertCoalesce(plan: SparkPlan, @@ -336,6 +377,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { this.conf = new RapidsConf(plan.conf) if (conf.isSqlEnabled) { var updatedPlan = insertHashOptimizeSorts(plan) + updatedPlan = updateScansForInput(updatedPlan) updatedPlan = insertCoalesce(insertColumnarFromGpu(updatedPlan)) updatedPlan = optimizeCoalesce(if (plan.conf.adaptiveExecutionEnabled) { optimizeAdaptiveTransitions(updatedPlan, None) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 29a2c3a8d3b4..3dc49bca1e17 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -192,6 +192,17 @@ class TypedConfBuilder[T]( } } + /** Check that user-provided values for the config match a pre-defined set. */ + def checkValues(validValues: Set[T]): TypedConfBuilder[T] = { + transform { v => + if (!validValues.contains(v)) { + throw new IllegalArgumentException( + s"The value of ${parent.key} should be one of ${validValues.mkString(", ")}, but was $v") + } + v + } + } + def createWithDefault(value: T): ConfEntry[T] = { val ret = new ConfEntryWithDefault[T](parent.key, converter, parent.doc, parent.isInternal, value) @@ -492,21 +503,53 @@ object RapidsConf { .booleanConf .createWithDefault(true) - val ENABLE_MULTITHREAD_PARQUET_READS = conf( - "spark.rapids.sql.format.parquet.multiThreadedRead.enabled") - .doc("When set to true, reads multiple small files within a partition more efficiently " + - "by reading each file in a separate thread in parallel on the CPU side before " + - "sending to the GPU. Limited by " + - "spark.rapids.sql.format.parquet.multiThreadedRead.numThreads " + - "and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel") - .booleanConf - .createWithDefault(true) + object ParquetReaderType extends Enumeration { + val AUTO, COALESCING, MULTITHREADED, PERFILE = Value + } + + val PARQUET_READER_TYPE = conf("spark.rapids.sql.format.parquet.reader.type") + .doc("Sets the parquet reader type. We support different types that are optimized for " + + "different environments. The original Spark style reader can be selected by setting this " + + "to PERFILE which individually reads and copies files to the GPU. Loading many small files " + + "individually has high overhead, and using either COALESCING or MULTITHREADED is " + + "recommended instead. The COALESCING reader is good when using a local file system where " + + "the executors are on the same nodes or close to the nodes the data is being read on. " + + "This reader coalesces all the files assigned to a task into a single host buffer before " + + "sending it down to the GPU. It copies blocks from a single file into a host buffer in " + + "separate threads in parallel, see " + + "spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. " + + "MULTITHREADED is good for cloud environments where you are reading from a blobstore " + + "that is totally separate and likely has a higher I/O read cost. Many times the cloud " + + "environments also get better throughput when you have multiple readers in parallel. " + + "This reader uses multiple threads to read each file in parallel and each file is sent " + + "to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. " + + "See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and " + + "spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control " + + "the number of threads and amount of memory used. " + + "By default this is set to AUTO so we select the reader we think is best. This will " + + "either be the COALESCING or the MULTITHREADED based on whether we think the file is " + + "in the cloud. See spark.rapids.cloudSchemes.") + .stringConf + .transform(_.toUpperCase(java.util.Locale.ROOT)) + .checkValues(ParquetReaderType.values.map(_.toString)) + .createWithDefault(ParquetReaderType.AUTO.toString) + + val CLOUD_SCHEMES = conf("spark.rapids.cloudSchemes") + .doc("Comma separated list of additional URI schemes that are to be considered cloud based " + + "filesystems. Schemes already included: dbfs, s3, s3a, s3n, wasbs, gs. Cloud based stores " + + "generally would be total separate from the executors and likely have a higher I/O read " + + "cost. Many times the cloud filesystems also get better throughput when you have multiple " + + "readers in parallel. This is used with spark.rapids.sql.format.parquet.reader.type") + .stringConf + .toSequence + .createOptional val PARQUET_MULTITHREAD_READ_NUM_THREADS = conf("spark.rapids.sql.format.parquet.multiThreadedRead.numThreads") .doc("The maximum number of threads, on the executor, to use for reading small " + "parquet files in parallel. This can not be changed at runtime after the executor has " + - "started.") + "started. Used with COALESCING and MULTITHREADED reader, see " + + "spark.rapids.sql.format.parquet.reader.type.") .integerConf .createWithDefault(20) @@ -514,7 +557,8 @@ object RapidsConf { conf("spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel") .doc("A limit on the maximum number of files per task processed in parallel on the CPU " + "side before the file is sent to the GPU. This affects the amount of host memory used " + - "when reading the files in parallel.") + "when reading the files in parallel. Used with MULTITHREADED reader, see " + + "spark.rapids.sql.format.parquet.reader.type") .integerConf .checkValue(v => v > 0, "The maximum number of files must be greater than 0.") .createWithDefault(Integer.MAX_VALUE) @@ -933,7 +977,17 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET) - lazy val isParquetMultiThreadReadEnabled: Boolean = get(ENABLE_MULTITHREAD_PARQUET_READS) + lazy val isParquetPerFileReadEnabled: Boolean = + ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.PERFILE + + lazy val isParquetAutoReaderEnabled: Boolean = + ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.AUTO + + lazy val isParquetCoalesceFileReadEnabled: Boolean = isParquetAutoReaderEnabled || + ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.COALESCING + + lazy val isParquetMultiThreadReadEnabled: Boolean = isParquetAutoReaderEnabled || + ParquetReaderType.withName(get(PARQUET_READER_TYPE)) == ParquetReaderType.MULTITHREADED lazy val parquetMultiThreadReadNumThreads: Int = get(PARQUET_MULTITHREAD_READ_NUM_THREADS) @@ -986,6 +1040,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val shimsProviderOverride: Option[String] = get(SHIMS_PROVIDER_OVERRIDE) + lazy val getCloudSchemes: Option[Seq[String]] = get(CLOUD_SCHEMES) + def isOperatorEnabled(key: String, incompat: Boolean, isDisabledByDefault: Boolean): Boolean = { val default = !(isDisabledByDefault || incompat) || (incompat && isIncompatEnabled) conf.get(key).map(toBoolean(_, key)).getOrElse(default) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index 1dac9f87ca8e..846b4fde362d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -132,10 +132,12 @@ trait SparkShims { readFunction: (PartitionedFile) => Iterator[InternalRow], filePartitions: Seq[FilePartition]): RDD[InternalRow] - def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, - supportsSmallFileOpt: Boolean): GpuBatchScanExec + def copyParquetBatchScanExec( + batchScanExec: GpuBatchScanExec, + queryUsesInputFile: Boolean): GpuBatchScanExec - def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, - supportsSmallFileOpt: Boolean): GpuFileSourceScanExec + def copyFileSourceScanExec( + scanExec: GpuFileSourceScanExec, + queryUsesInputFile: Boolean): GpuFileSourceScanExec } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index cd5418d8577f..39bde29c3af5 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -53,7 +53,10 @@ import org.apache.spark.util.collection.BitSet * @param optionalNumCoalescedBuckets Number of coalesced buckets. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. - * @param supportsSmallFileOpt indicates if we should use the small file optimization. + * @param rapidsConf Rapids conf + * @param queryUsesInputFile This is a parameter to easily allow turning it + * off in GpuTransitionOverrides if InputFileName, + * InputFileBlockStart, or InputFileBlockLength are used */ case class GpuFileSourceScanExec( @transient relation: HadoopFsRelation, @@ -64,9 +67,13 @@ case class GpuFileSourceScanExec( optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], tableIdentifier: Option[TableIdentifier], - supportsSmallFileOpt: Boolean = true) + @transient rapidsConf: RapidsConf, + queryUsesInputFile: Boolean = false) extends GpuDataSourceScanExec with GpuExec { + private val isParquetFileFormat: Boolean = relation.fileFormat.isInstanceOf[ParquetFileFormat] + private val isPerFileReadEnabled = rapidsConf.isParquetPerFileReadEnabled || !isParquetFileFormat + override val nodeName: String = { s"GpuScan $relation ${tableIdentifier.map(_.unquotedString).getOrElse("")}" } @@ -280,29 +287,30 @@ case class GpuFileSourceScanExec( * at a time. */ lazy val inputRDD: RDD[InternalRow] = { - val readFile: Option[(PartitionedFile) => Iterator[InternalRow]] = if (!supportsSmallFileOpt) { - val fileFormat = relation.fileFormat.asInstanceOf[GpuReadFileFormatWithMetrics] - val reader = fileFormat.buildReaderWithPartitionValuesAndMetrics( - sparkSession = relation.sparkSession, - dataSchema = relation.dataSchema, - partitionSchema = relation.partitionSchema, - requiredSchema = requiredSchema, - filters = pushedDownFilters, - options = relation.options, - hadoopConf = - relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), - metrics = metrics) - Some(reader) - } else { - None - } + val readFile: Option[(PartitionedFile) => Iterator[InternalRow]] = + if (isPerFileReadEnabled) { + val fileFormat = relation.fileFormat.asInstanceOf[GpuReadFileFormatWithMetrics] + val reader = fileFormat.buildReaderWithPartitionValuesAndMetrics( + sparkSession = relation.sparkSession, + dataSchema = relation.dataSchema, + partitionSchema = relation.partitionSchema, + requiredSchema = requiredSchema, + filters = pushedDownFilters, + options = relation.options, + hadoopConf = + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + metrics = metrics) + Some(reader) + } else { + None + } val readRDD = if (bucketedScan) { createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, - relation, supportsSmallFileOpt) + relation) } else { createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, - relation, supportsSmallFileOpt) + relation) } sendDriverMetrics() readRDD @@ -398,14 +406,12 @@ case class GpuFileSourceScanExec( * when not using the small file optimization. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. - * @param useSmallFileOpt whether to create an RDD using small file optimization. */ private def createBucketedReadRDD( bucketSpec: BucketSpec, readFile: Option[(PartitionedFile) => Iterator[InternalRow]], selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation, - useSmallFileOpt: Boolean): RDD[InternalRow] = { + fsRelation: HadoopFsRelation): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val partitionedFiles = @@ -441,7 +447,8 @@ case class GpuFileSourceScanExec( prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } } - if (!useSmallFileOpt) { + if (isPerFileReadEnabled) { + logInfo("Using the original per file parquet reader") ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile.get, filePartitions) } else { // here we are making an optimization to read more then 1 file at a time on the CPU side @@ -457,8 +464,9 @@ case class GpuFileSourceScanExec( requiredSchema, relation.partitionSchema, pushedDownFilters.toArray, - new RapidsConf(sqlConf), - metrics) + rapidsConf, + metrics, + queryUsesInputFile) // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code new DataSourceRDD(relation.sparkSession.sparkContext, filePartitions, @@ -474,13 +482,11 @@ case class GpuFileSourceScanExec( * not using the small file optimization. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. - * @param useSmallFileOpt whether to create an RDD using small file optimization. */ private def createNonBucketedReadRDD( readFile: Option[(PartitionedFile) => Iterator[InternalRow]], selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation, - useSmallFileOpt: Boolean): RDD[InternalRow] = { + fsRelation: HadoopFsRelation): RDD[InternalRow] = { val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) @@ -494,7 +500,8 @@ case class GpuFileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - if (!useSmallFileOpt) { + if (isPerFileReadEnabled) { + logInfo("Using the original per file parquet reader") ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile.get, partitions) } else { // here we are making an optimization to read more then 1 file at a time on the CPU side @@ -510,8 +517,9 @@ case class GpuFileSourceScanExec( requiredSchema, relation.partitionSchema, pushedDownFilters.toArray, - new RapidsConf(sqlConf), - metrics) + rapidsConf, + metrics, + queryUsesInputFile) // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code new DataSourceRDD(relation.sparkSession.sparkContext, partitions, factory, supportsColumnar) @@ -535,7 +543,9 @@ case class GpuFileSourceScanExec( optionalBucketSet, optionalNumCoalescedBuckets, QueryPlan.normalizePredicates(dataFilters, output), - None) + None, + rapidsConf, + queryUsesInputFile) } }