From 7afbfea99eb764f5bd1e634b7cf4c15388c1e5b4 Mon Sep 17 00:00:00 2001 From: Thomas Graves Date: Wed, 26 Aug 2020 09:34:05 -0500 Subject: [PATCH] Parquet small file reading optimization (#595) * Initial prototype small filees parquet * Change datasource v1 to use small files * Working but has 72 bytes off in size * Copy filesourcescan to databricks and fix merge error * Fix databricks package name * Try to debug size calculation - adds lots of warnings * Cleanup and have file source scan small files only work for parquet * Switch to use ArrayBuffer so order correct * debug * Fix order issue * add more to calculated size * cleanup * Try to handle partition values * fix passing partitionValues * refactor * disable mergeschema * add check for mergeSchema * Add tests for both small file optimization on and off * hadnle input file - but doesn't totally work * remove extra values reader * Fixes * Debug * Check to see if Inputfile execs used * Finding InputFileName works * finding input file working * cleanup and add tests for V2 datasource * Add check for input file to GpuParquetScan * Add more tests * Add GPU metrics to GpuFileSourceScanExec Signed-off-by: Jason Lowe * remove log messages * Docs * cleanup * Update 300db and 310 FileSourceScanExecs passing unit tests * Add test for bucketing * Add in logic for datetime corrected rebase mode * Commonize some code * Cleanup * fixes * Extract GpuFileSourceScanExec from shims Signed-off-by: Jason Lowe * Add more tests * comments * update test * Pass metrics via GPU file format rather than custom options map Signed-off-by: Jason Lowe * working * pass schema around properly * fix value from tuple * Rename case class * Update tests * Update code checking for DataSourceScanExec Signed-off-by: Jason Lowe * Fix scaladoc warning and unused imports Signed-off-by: Jason Lowe * Add realloc if over memory size * refactor memory checks * Fix copyright Signed-off-by: Jason Lowe * Upmerge to latest FileSourceScanExec changes for metrics * Add missing check Filesource scan mergeSchema and cleanup * Cleanup * remove bucket test for now * formatting * Fixes * Add more tests * Merge conflict Signed-off-by: Thomas Graves * Fix merge conflict Signed-off-by: Thomas Graves * enable parquet bucket tests and change warning Signed-off-by: Thomas Graves * cleanup Signed-off-by: Thomas Graves * remove debug logs Signed-off-by: Thomas Graves * Move FilePartition creation to shim Signed-off-by: Thomas Graves * Add better message for mergeSchema Signed-off-by: Thomas Graves * Address review comments. Add in withResources and closeOnExcept and minor things. Signed-off-by: Thomas Graves * Fix spacing Signed-off-by: Thomas Graves * Fix databricks support and passing arguments Signed-off-by: Thomas Graves * fix typo in db Signed-off-by: Thomas Graves * Update config description Signed-off-by: Thomas Graves * Rework Signed-off-by: Thomas Graves Co-authored-by: Thomas Graves Co-authored-by: Jason Lowe --- docs/configs.md | 1 + .../src/main/python/parquet_test.py | 171 +++- .../src/main/python/spark_init_internal.py | 2 + .../shims/spark300/GpuParquetScan.scala | 9 +- .../spark300/GpuShuffleExchangeExec.scala | 3 +- .../rapids/shims/spark300/Spark300Shims.scala | 41 +- .../shims/spark300db/Spark300dbShims.scala | 22 +- .../shims/spark310/GpuParquetScan.scala | 9 +- .../rapids/shims/spark310/Spark310Shims.scala | 36 +- .../scala/com/nvidia/spark/rapids/Arm.scala | 9 + ...arPartitionReaderWithPartitionValues.scala | 78 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 826 ++++++++++++++---- .../spark/rapids/GpuTransitionOverrides.scala | 46 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 9 + .../com/nvidia/spark/rapids/RapidsMeta.scala | 2 +- .../com/nvidia/spark/rapids/SparkShims.scala | 10 +- .../sql/rapids/GpuFileSourceScanExec.scala | 98 ++- .../sql/rapids/datetimeExpressions.scala | 2 +- 18 files changed, 1094 insertions(+), 280 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 724e3fe4baf..b6a2babac48 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -56,6 +56,7 @@ Name | Description | Default Value spark.rapids.sql.format.orc.write.enabled|When set to false disables orc output acceleration|true spark.rapids.sql.format.parquet.enabled|When set to false disables all parquet input and output acceleration|true spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true +spark.rapids.sql.format.parquet.smallFiles.enabled|When set to true, handles reading multiple small files within a partition more efficiently by combining multiple files on the CPU side before sending to the GPU. Recommended unless user needs mergeSchema option or schema evolution.|true spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.hasNans|Config to indicate if your data has NaN's. Cudf doesn't currently support NaN's properly so you can get corrupt data if you have NaN's in your data and it runs on the GPU.|true spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 6633d4ad95e..dfe8e7e13b1 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -34,15 +34,17 @@ def read_parquet_sql(data_path): @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, v1_enabled_list): +def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, small_file_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : gen_df(spark, gen_list).write.parquet(data_path), conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) assert_gpu_and_cpu_are_equal_collect(read_func(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) @allow_non_gpu('FileSourceScanExec') @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @@ -67,15 +69,17 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): # https://github.com/NVIDIA/spark-rapids/issues/143 @pytest.mark.parametrize('compress', parquet_compress_options) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list): +def test_compress_read_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), conf={'spark.sql.parquet.compression.codec': compress}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_pred_push_gens = [ byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, boolean_gen, @@ -86,8 +90,9 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list): @pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list): +def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, small_file_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] s0 = gen_scalar(parquet_gen, force_no_nulls=True) @@ -97,14 +102,16 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled rf = read_func(data_path) assert_gpu_and_cpu_are_equal_collect( lambda spark: rf(spark).select(f.col('a') >= s0), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_ts_write_options = ['INT96', 'TIMESTAMP_MICROS', 'TIMESTAMP_MILLIS'] @pytest.mark.parametrize('ts_write', parquet_ts_write_options) @pytest.mark.parametrize('ts_rebase', ['CORRECTED', 'LEGACY']) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list): +def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list): # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen gen = TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc)) @@ -115,7 +122,31 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list 'spark.sql.parquet.outputTimestampType': ts_write}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + +def readParquetCatchException(spark, data_path): + with pytest.raises(Exception) as e_info: + df = spark.read.parquet(data_path).collect() + assert e_info.match(r".*SparkUpgradeException.*") + +@pytest.mark.parametrize('ts_write', parquet_ts_write_options) +@pytest.mark.parametrize('ts_rebase', ['LEGACY']) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_ts_read_fails_datetime_legacy(spark_tmp_path, ts_write, ts_rebase, small_file_opt, v1_enabled_list): + # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with + # timestamp_gen + gen = TimestampGen(start=datetime(1590, 1, 1, tzinfo=timezone.utc)) + data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark : unary_op_df(spark, gen).write.parquet(data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': ts_rebase, + 'spark.sql.parquet.outputTimestampType': ts_write}) + with_gpu_session( + lambda spark : readParquetCatchException(spark, data_path), + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), @@ -124,8 +155,9 @@ def test_ts_read_round_trip(spark_tmp_path, ts_write, ts_rebase, v1_enabled_list pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))] @pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list): +def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -133,10 +165,12 @@ def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list): conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list): +def test_simple_partitioned_read(spark_tmp_path, small_file_opt, v1_enabled_list): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -154,10 +188,36 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + +@pytest.mark.parametrize('small_file_opt', ["false", "true"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, small_file_opt): + # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed + # we should go with a more standard set of generators + parquet_gens = [byte_gen, short_gen, int_gen, long_gen] + first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark : gen_df(spark, first_gen_list, 1).write.parquet(first_data_path)) + # generate with 1 column less + second_parquet_gens = [byte_gen, short_gen, int_gen] + second_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(second_parquet_gens)] + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark : gen_df(spark, second_gen_list, 1).write.parquet(second_data_path)) + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + 'spark.sql.files.maxPartitionBytes': "1g", + 'spark.sql.files.minPartitionNum': '1'}) +@pytest.mark.parametrize('small_file_opt', ["false", "true"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema(spark_tmp_path, v1_enabled_list): +def test_read_merge_schema(spark_tmp_path, v1_enabled_list, small_file_opt): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, @@ -176,15 +236,42 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.option('mergeSchema', 'true').parquet(data_path), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + +@pytest.mark.parametrize('small_file_opt', ["false", "true"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, small_file_opt): + # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed + # we should go with a more standard set of generators + parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, + string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] + first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' + with_cpu_session( + lambda spark : gen_df(spark, first_gen_list).write.parquet(first_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'}) + second_gen_list = [(('_c' if i % 2 == 0 else '_b') + str(i), gen) for i, gen in enumerate(parquet_gens)] + second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1' + with_cpu_session( + lambda spark : gen_df(spark, second_gen_list).write.parquet(second_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.parquet.mergeSchema': "true", + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_write_gens_list = [ [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, date_gen, timestamp_gen]] @pytest.mark.parametrize('parquet_gens', parquet_write_gens_list, ids=idfn) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list): +def test_write_round_trip(spark_tmp_path, parquet_gens, small_file_opt, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( @@ -193,6 +280,7 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list): data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', + 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_part_write_gens = [ @@ -204,8 +292,9 @@ def test_write_round_trip(spark_tmp_path, parquet_gens, v1_enabled_list): # There are race conditions around when individual files are read in for partitioned data @ignore_order @pytest.mark.parametrize('parquet_gen', parquet_part_write_gens, ids=idfn) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list): +def test_part_write_round_trip(spark_tmp_path, parquet_gen, small_file_opt, v1_enabled_list): gen_list = [('a', RepeatSeqGen(parquet_gen, 10)), ('b', parquet_gen)] data_path = spark_tmp_path + '/PARQUET_DATA' @@ -215,22 +304,26 @@ def test_part_write_round_trip(spark_tmp_path, parquet_gen, v1_enabled_list): data_path, conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.parquet.outputTimestampType': 'TIMESTAMP_MICROS', + 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) parquet_write_compress_options = ['none', 'uncompressed', 'snappy'] @pytest.mark.parametrize('compress', parquet_write_compress_options) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_write_round_trip(spark_tmp_path, compress, v1_enabled_list): +def test_compress_write_round_trip(spark_tmp_path, compress, small_file_opt, v1_enabled_list): data_path = spark_tmp_path + '/PARQUET_DATA' assert_gpu_and_cpu_writes_are_equal_collect( lambda spark, path : binary_op_df(spark, long_gen).coalesce(1).write.parquet(path), lambda spark, path : spark.read.parquet(path), data_path, conf={'spark.sql.parquet.compression.codec': compress, + 'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, 'spark.sql.sources.useV1SourceList': v1_enabled_list}) +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta(spark_tmp_path, v1_enabled_list): +def test_input_meta(spark_tmp_path, small_file_opt, v1_enabled_list): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -245,5 +338,45 @@ def test_input_meta(spark_tmp_path, v1_enabled_list): 'input_file_name()', 'input_file_block_start()', 'input_file_block_length()'), - conf={'spark.sql.sources.useV1SourceList': v1_enabled_list}) + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) + + +def createBucketedTableAndJoin(spark): + spark.range(10e4).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e4") + spark.range(10e6).write.bucketBy(4, "id").sortBy("id").mode('overwrite').saveAsTable("bucketed_4_10e6") + bucketed_4_10e4 = spark.table("bucketed_4_10e4") + bucketed_4_10e6 = spark.table("bucketed_4_10e6") + return bucketed_4_10e4.join(bucketed_4_10e6, "id") + +@ignore_order +@allow_non_gpu('DataWritingCommandExec') +@pytest.mark.parametrize('small_file_opt', ["true", "false"]) +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +# this test would be better if we could ensure exchanges didn't exist - ie used buckets +def test_buckets(spark_tmp_path, small_file_opt, v1_enabled_list): + assert_gpu_and_cpu_are_equal_collect(createBucketedTableAndJoin, + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': small_file_opt, + 'spark.sql.sources.useV1SourceList': v1_enabled_list, + "spark.sql.autoBroadcastJoinThreshold": '-1'}) + +@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) +def test_small_file_memory(spark_tmp_path, v1_enabled_list): + # stress the memory usage by creating a lot of small files. + # The more files we combine the more the offsets will be different which will cause + # footer size to change. + # Without the addition of extraMemory in GpuParquetScan this would cause reallocations + # of the host memory buffers. + cols = [string_gen] * 4 + gen_list = [('_c' + str(i), gen ) for i, gen in enumerate(cols)] + first_data_path = spark_tmp_path + '/PARQUET_DATA' + with_cpu_session( + lambda spark : gen_df(spark, gen_list).repartition(2000).write.parquet(first_data_path), + conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'}) + data_path = spark_tmp_path + '/PARQUET_DATA' + assert_gpu_and_cpu_are_equal_collect( + lambda spark : spark.read.parquet(data_path), + conf={'spark.rapids.sql.format.parquet.smallFiles.enabled': 'true', + 'spark.sql.files.maxPartitionBytes': "1g", + 'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/integration_tests/src/main/python/spark_init_internal.py b/integration_tests/src/main/python/spark_init_internal.py index 61fba592f11..9aaabd1cde3 100644 --- a/integration_tests/src/main/python/spark_init_internal.py +++ b/integration_tests/src/main/python/spark_init_internal.py @@ -19,9 +19,11 @@ def _spark__init(): # DO NOT SET ANY OTHER CONFIGS HERE!!! # due to bugs in pyspark/pytest it looks like any configs set here # can be reset in the middle of a test if specific operations are done (some types of cast etc) + # enableHiveSupport() is needed for parquet bucket tests _s = SparkSession.builder \ .config('spark.plugins', 'com.nvidia.spark.SQLPlugin') \ .config('spark.sql.queryExecutionListeners', 'com.nvidia.spark.rapids.ExecutionPlanCaptureCallback')\ + .enableHiveSupport() \ .appName('rapids spark plugin integration tests (python)').getOrCreate() #TODO catch the ClassNotFound error that happens if the classpath is not set up properly and # make it a better error message diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala index fe0b0dbb114..76c5188b84b 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuParquetScan.scala @@ -41,9 +41,11 @@ case class GpuParquetScan( options: CaseInsensitiveStringMap, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], - rapidsConf: RapidsConf) + rapidsConf: RapidsConf, + supportsSmallFileOpt: Boolean = true) extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema, - readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan { + readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, + supportsSmallFileOpt) with FileScan { override def isSplitable(path: Path): Boolean = super.isSplitableBase(path) @@ -52,7 +54,8 @@ case class GpuParquetScan( override def equals(obj: Any): Boolean = obj match { case p: GpuParquetScan => super.equals(p) && dataSchema == p.dataSchema && options == p.options && - equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf + equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf && + supportsSmallFileOpt == p.supportsSmallFileOpt case _ => false } diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffleExchangeExec.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffleExchangeExec.scala index 43bd58e1e00..92d62472f0a 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffleExchangeExec.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/GpuShuffleExchangeExec.scala @@ -21,4 +21,5 @@ import org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase case class GpuShuffleExchangeExec( override val outputPartitioning: Partitioning, - child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child) \ No newline at end of file + child: SparkPlan) extends GpuShuffleExchangeExecBase(outputPartitioning, child) + diff --git a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala index 3cc0fbe24ae..bf9473cb45b 100644 --- a/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala +++ b/shims/spark300/src/main/scala/com/nvidia/spark/rapids/shims/spark300/Spark300Shims.scala @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark300 import java.time.ZoneId +import scala.collection.JavaConverters._ + import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.spark300.RapidsShuffleManager @@ -35,7 +37,7 @@ import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, HadoopFsRelation, PartitionDirectory, PartitionedFile} -import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -141,13 +143,20 @@ class Spark300Shims extends SparkShims { override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options val newRelation = HadoopFsRelation( wrapped.relation.location, wrapped.relation.partitionSchema, wrapped.relation.dataSchema, wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), - wrapped.relation.options)(wrapped.relation.sparkSession) + options)(sparkSession) + val canUseSmallFileOpt = newRelation.fileFormat match { + case _: ParquetFileFormat => + GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _ => false + } GpuFileSourceScanExec( newRelation, wrapped.output, @@ -156,7 +165,8 @@ class Spark300Shims extends SparkShims { wrapped.optionalBucketSet, None, wrapped.dataFilters, - wrapped.tableIdentifier) + wrapped.tableIdentifier, + canUseSmallFileOpt) } }), GpuOverrides.exec[SortMergeJoinExec]( @@ -226,7 +236,10 @@ class Spark300Shims extends SparkShims { (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) - override def convertToGpu(): Scan = + override def convertToGpu(): Scan = { + val canUseSmallFileOpt = + GpuParquetScanBase.canUseSmallFileParquetOpt(conf, + a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession) GpuParquetScan(a.sparkSession, a.hadoopConf, a.fileIndex, @@ -237,7 +250,9 @@ class Spark300Shims extends SparkShims { a.options, a.partitionFilters, a.dataFilters, - conf) + conf, + canUseSmallFileOpt) + } }), GpuOverrides.scan[OrcScan]( "ORC parsing", @@ -330,4 +345,20 @@ class Spark300Shims extends SparkShims { filePartitions: Seq[FilePartition]): RDD[InternalRow] = { new FileScanRDD(sparkSession, readFunction, filePartitions) } + + override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { + FilePartition(index, files) + } + + override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, + supportsSmallFileOpt: Boolean): GpuBatchScanExec = { + val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan] + val scanCopy = scan.copy(supportsSmallFileOpt=supportsSmallFileOpt) + batchScanExec.copy(scan=scanCopy) + } + + override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, + supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { + scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt) + } } diff --git a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala index 8b171fbb93b..0f064121b20 100644 --- a/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala +++ b/shims/spark300db/src/main/scala/com/nvidia/spark/rapids/shims/spark300db/Spark300dbShims.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{BucketingUtils, FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec @@ -93,13 +94,20 @@ class Spark300dbShims extends Spark300Shims { override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options val newRelation = HadoopFsRelation( wrapped.relation.location, wrapped.relation.partitionSchema, wrapped.relation.dataSchema, wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), - wrapped.relation.options)(wrapped.relation.sparkSession) + options)(sparkSession) + val canUseSmallFileOpt = newRelation.fileFormat match { + case _: ParquetFileFormat => + GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _ => false + } GpuFileSourceScanExec( newRelation, wrapped.output, @@ -109,7 +117,8 @@ class Spark300dbShims extends Spark300Shims { // TODO: Does Databricks have coalesced bucketing implemented? None, wrapped.dataFilters, - wrapped.tableIdentifier) + wrapped.tableIdentifier, + canUseSmallFileOpt) } }), GpuOverrides.exec[SortMergeJoinExec]( @@ -180,4 +189,13 @@ class Spark300dbShims extends Spark300Shims { filePartitions: Seq[FilePartition]): RDD[InternalRow] = { new GpuFileScanRDD(sparkSession, readFunction, filePartitions) } + + override def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition = { + FilePartition(index, files) + } + + override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, + supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { + scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt) + } } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala index aac3b13098a..71ae25d7baa 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/GpuParquetScan.scala @@ -41,9 +41,11 @@ case class GpuParquetScan( options: CaseInsensitiveStringMap, partitionFilters: Seq[Expression], dataFilters: Seq[Expression], - rapidsConf: RapidsConf) + rapidsConf: RapidsConf, + supportsSmallFileOpt: Boolean = true) extends GpuParquetScanBase(sparkSession, hadoopConf, dataSchema, - readDataSchema, readPartitionSchema, pushedFilters, rapidsConf) with FileScan { + readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, + supportsSmallFileOpt) with FileScan { override def isSplitable(path: Path): Boolean = super.isSplitableBase(path) @@ -52,7 +54,8 @@ case class GpuParquetScan( override def equals(obj: Any): Boolean = obj match { case p: GpuParquetScan => super.equals(p) && dataSchema == p.dataSchema && options == p.options && - equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf + equivalentFilters(pushedFilters, p.pushedFilters) && rapidsConf == p.rapidsConf && + supportsSmallFileOpt == p.supportsSmallFileOpt case _ => false } diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala index 02d3e175eb6..a14b4fded4e 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/Spark310Shims.scala @@ -18,6 +18,8 @@ package com.nvidia.spark.rapids.shims.spark310 import java.time.ZoneId +import scala.collection.JavaConverters._ + import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.shims.spark301.Spark301Shims import com.nvidia.spark.rapids.spark310.RapidsShuffleManager @@ -29,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.connector.read.Scan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.v2.csv.CSVScan +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec} @@ -137,13 +139,20 @@ class Spark310Shims extends Spark301Shims { override def tagPlanForGpu(): Unit = GpuFileSourceScanExec.tagSupport(this) override def convertToGpu(): GpuExec = { + val sparkSession = wrapped.relation.sparkSession + val options = wrapped.relation.options val newRelation = HadoopFsRelation( wrapped.relation.location, wrapped.relation.partitionSchema, wrapped.relation.dataSchema, wrapped.relation.bucketSpec, GpuFileSourceScanExec.convertFileFormat(wrapped.relation.fileFormat), - wrapped.relation.options)(wrapped.relation.sparkSession) + options)(sparkSession) + val canUseSmallFileOpt = newRelation.fileFormat match { + case _: ParquetFileFormat => + GpuParquetScanBase.canUseSmallFileParquetOpt(conf, options, sparkSession) + case _ => false + } GpuFileSourceScanExec( newRelation, wrapped.output, @@ -152,7 +161,8 @@ class Spark310Shims extends Spark301Shims { wrapped.optionalBucketSet, wrapped.optionalNumCoalescedBuckets, wrapped.dataFilters, - wrapped.tableIdentifier) + wrapped.tableIdentifier, + canUseSmallFileOpt) } }), GpuOverrides.exec[SortMergeJoinExec]( @@ -173,7 +183,9 @@ class Spark310Shims extends Spark301Shims { (a, conf, p, r) => new ScanMeta[ParquetScan](a, conf, p, r) { override def tagSelfForGpu(): Unit = GpuParquetScanBase.tagSupport(this) - override def convertToGpu(): Scan = + override def convertToGpu(): Scan = { + val canUseSmallFileOpt = GpuParquetScanBase.canUseSmallFileParquetOpt(conf, + a.options.asCaseSensitiveMap().asScala.toMap, a.sparkSession) GpuParquetScan(a.sparkSession, a.hadoopConf, a.fileIndex, @@ -184,7 +196,9 @@ class Spark310Shims extends Spark301Shims { a.options, a.partitionFilters, a.dataFilters, - conf) + conf, + canUseSmallFileOpt) + } }), GpuOverrides.scan[OrcScan]( "ORC parsing", @@ -223,4 +237,16 @@ class Spark310Shims extends Spark301Shims { override def getShuffleManagerShims(): ShuffleManagerShimBase = { new ShuffleManagerShim } + + override def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, + supportsSmallFileOpt: Boolean): GpuBatchScanExec = { + val scan = batchScanExec.scan.asInstanceOf[GpuParquetScan] + val scanCopy = scan.copy(supportsSmallFileOpt = supportsSmallFileOpt) + batchScanExec.copy(scan = scanCopy) + } + + override def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, + supportsSmallFileOpt: Boolean): GpuFileSourceScanExec = { + scanExec.copy(supportsSmallFileOpt=supportsSmallFileOpt) + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala index 676b8f37f8e..1464292ee97 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala @@ -40,6 +40,15 @@ trait Arm { } } + /** Executes the provided code block and then closes the array of resources */ + def withResource[T <: AutoCloseable, V](r: Array[T])(block: Array[T] => V): V = { + try { + block(r) + } finally { + r.safeClose() + } + } + /** Executes the provided code block, closing the resource only if an exception occurs */ def closeOnExcept[T <: AutoCloseable, V](r: T)(block: T => V): V = { try { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala index 4bb2c3ca017..fd4caac4139 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/ColumnarPartitionReaderWithPartitionValues.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2020, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -39,23 +39,7 @@ class ColumnarPartitionReaderWithPartitionValues( fileReader.get() } else { val fileBatch: ColumnarBatch = fileReader.get() - var partitionColumns: Array[GpuColumnVector] = null - try { - partitionColumns = buildPartitionColumns(fileBatch.numRows) - val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column) - val resultCols = fileBatchCols ++ partitionColumns - val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows) - fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount()) - partitionColumns = null - result - } finally { - if (fileBatch != null) { - fileBatch.close() - } - if (partitionColumns != null) { - partitionColumns.safeClose() - } - } + ColumnarPartitionReaderWithPartitionValues.addPartitionValues(fileBatch, partitionValues) } } @@ -63,8 +47,51 @@ class ColumnarPartitionReaderWithPartitionValues( fileReader.close() partitionValues.foreach(_.close()) } +} + +object ColumnarPartitionReaderWithPartitionValues { + def newReader(partFile: PartitionedFile, + baseReader: PartitionReader[ColumnarBatch], + partitionSchema: StructType): PartitionReader[ColumnarBatch] = { + val partitionValues = partFile.partitionValues.toSeq(partitionSchema) + val partitionScalars = createPartitionValues(partitionValues, partitionSchema) + new ColumnarPartitionReaderWithPartitionValues(baseReader, partitionScalars) + } - private def buildPartitionColumns(numRows: Int): Array[GpuColumnVector] = { + def createPartitionValues( + partitionValues: Seq[Any], + partitionSchema: StructType): Array[Scalar] = { + val partitionScalarTypes = partitionSchema.fields.map(_.dataType) + partitionValues.zip(partitionScalarTypes).safeMap { + case (v, t) => GpuScalar.from(v, t) + }.toArray + } + + def addPartitionValues( + fileBatch: ColumnarBatch, + partitionValues: Array[Scalar]): ColumnarBatch = { + var partitionColumns: Array[GpuColumnVector] = null + try { + partitionColumns = buildPartitionColumns(fileBatch.numRows, partitionValues) + val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column) + val resultCols = fileBatchCols ++ partitionColumns + val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows) + fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount()) + partitionColumns = null + result + } finally { + if (fileBatch != null) { + fileBatch.close() + } + if (partitionColumns != null) { + partitionColumns.safeClose() + } + } + } + + private def buildPartitionColumns( + numRows: Int, + partitionValues: Array[Scalar]): Array[GpuColumnVector] = { var succeeded = false val result = new Array[GpuColumnVector](partitionValues.length) try { @@ -81,16 +108,3 @@ class ColumnarPartitionReaderWithPartitionValues( } } } - -object ColumnarPartitionReaderWithPartitionValues { - def newReader(partFile: PartitionedFile, - baseReader: PartitionReader[ColumnarBatch], - partitionSchema: StructType): PartitionReader[ColumnarBatch] = { - val partitionValues = partFile.partitionValues.toSeq(partitionSchema) - val partitionScalarTypes = partitionSchema.fields.map(_.dataType) - val partitionScalars = partitionValues.zip(partitionScalarTypes).map { - case (v, t) => GpuScalar.from(v, t) - }.toArray - new ColumnarPartitionReaderWithPartitionValues(baseReader, partitionScalars) - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 8d5b2fccf04..3166c499fdf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -23,13 +23,14 @@ import java.util.{Collections, Locale} import scala.annotation.tailrec import scala.collection.JavaConverters._ -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, LinkedHashMap} import scala.math.max import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table} import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetricNames._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange +import com.nvidia.spark.rapids.RapidsConf.ENABLE_SMALL_FILES_PARQUET import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} @@ -50,7 +51,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} import org.apache.spark.sql.execution.QueryExecutionException -import org.apache.spark.sql.execution.datasources.{PartitionedFile, PartitioningAwareFileIndex} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile, PartitioningAwareFileIndex} import org.apache.spark.sql.execution.datasources.parquet.{ParquetFilters, ParquetReadSupport} import org.apache.spark.sql.execution.datasources.v2.{FilePartitionReaderFactory, FileScan} import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan @@ -69,16 +70,24 @@ abstract class GpuParquetScanBase( readDataSchema: StructType, readPartitionSchema: StructType, pushedFilters: Array[Filter], - rapidsConf: RapidsConf) - extends ScanWithMetrics { + rapidsConf: RapidsConf, + supportsSmallFileOpt: Boolean) + extends ScanWithMetrics with Logging { def isSplitableBase(path: Path): Boolean = true def createReaderFactoryBase(): PartitionReaderFactory = { val broadcastedConf = sparkSession.sparkContext.broadcast( new SerializableConfiguration(hadoopConf)) - GpuParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, - dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics) + + logDebug(s"Small file optimization support: $supportsSmallFileOpt") + if (supportsSmallFileOpt) { + GpuParquetMultiFilePartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics) + } else { + GpuParquetPartitionReaderFactory(sparkSession.sessionState.conf, broadcastedConf, + dataSchema, readDataSchema, readPartitionSchema, pushedFilters, rapidsConf, metrics) + } } } @@ -146,43 +155,24 @@ object GpuParquetScanBase { meta.willNotWorkOnGpu(s"$other is not a supported read rebase mode") } } -} -case class GpuParquetPartitionReaderFactory( - @transient sqlConf: SQLConf, - broadcastedConf: Broadcast[SerializableConfiguration], - dataSchema: StructType, - readDataSchema: StructType, - partitionSchema: StructType, - filters: Array[Filter], - @transient rapidsConf: RapidsConf, - metrics: Map[String, SQLMetric]) extends FilePartitionReaderFactory with Arm { - private val isCaseSensitive = sqlConf.caseSensitiveAnalysis - private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown - private val pushDownDate = sqlConf.parquetFilterPushDownDate - private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp - private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal - private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith - private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold - private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix - private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows - private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes - private val isCorrectedRebase = - "CORRECTED" == sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ) - - override def supportColumnarReads(partition: InputPartition): Boolean = true - - override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { - throw new IllegalStateException("GPU column parser called to read rows") + def canUseSmallFileParquetOpt( + conf: RapidsConf, + options: Map[String, String], + sparkSession: SparkSession): Boolean = { + (conf.isParquetSmallFilesEnabled && + !(options.getOrElse("mergeSchema", "false").toBoolean || + sparkSession.conf.getOption("spark.sql.parquet.mergeSchema").exists(_.toBoolean))) } +} - override def buildColumnarReader( - partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { - val reader = buildBaseColumnarParquetReader(partitionedFile) - ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema) - } +/** + * Base object that has common functions for both GpuParquetPartitionReaderFactory + * and GpuParquetPartitionReaderFactory + */ +object GpuParquetPartitionReaderFactoryBase { - private def filterClippedSchema( + def filterClippedSchema( clippedSchema: MessageType, fileSchema: MessageType, isCaseSensitive: Boolean): MessageType = { @@ -225,31 +215,55 @@ case class GpuParquetPartitionReaderFactory( version >= "3.0.0" && lookupFileMeta(SPARK_LEGACY_DATETIME) == null }.getOrElse(isCorrectedModeConfig) } +} + +// contains meta about all the blocks in a file +private case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: Seq[BlockMetaData], + partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) + +// contains meta about a single block in a file +private case class ParquetFileInfoWithSingleBlockMeta(filePath: Path, blockMeta: BlockMetaData, + partValues: InternalRow, schema: MessageType, isCorrectedRebaseMode: Boolean) + +private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) extends Arm { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown + private val pushDownDate = sqlConf.parquetFilterPushDownDate + private val pushDownTimestamp = sqlConf.parquetFilterPushDownTimestamp + private val pushDownDecimal = sqlConf.parquetFilterPushDownDecimal + private val pushDownStringStartWith = sqlConf.parquetFilterPushDownStringStartWith + private val pushDownInFilterThreshold = sqlConf.parquetFilterPushDownInFilterThreshold + private val isCorrectedRebase = + "CORRECTED" == sqlConf.getConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ) + + def filterBlocks( + file: PartitionedFile, + conf : Configuration, + filters: Array[Filter], + readDataSchema: StructType): ParquetFileInfoWithBlockMeta = { - private def buildBaseColumnarParquetReader( - file: PartitionedFile): PartitionReader[ColumnarBatch] = { - val conf = broadcastedConf.value.value val filePath = new Path(new URI(file.filePath)) //noinspection ScalaDeprecation val footer = ParquetFileReader.readFooter(conf, filePath, - ParquetMetadataConverter.range(file.start, file.start + file.length)) + ParquetMetadataConverter.range(file.start, file.start + file.length)) val fileSchema = footer.getFileMetaData.getSchema val pushedFilters = if (enableParquetFilterPushDown) { val parquetFilters = new ParquetFilters(fileSchema, pushDownDate, pushDownTimestamp, - pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) + pushDownDecimal, pushDownStringStartWith, pushDownInFilterThreshold, isCaseSensitive) filters.flatMap(parquetFilters.createFilter).reduceOption(FilterApi.and) } else { None } - val isCorrectedRebaseForThis = - isCorrectedRebaseMode(footer.getFileMetaData.getKeyValueMetaData.get, isCorrectedRebase) + val isCorrectedRebaseForThisFile = + GpuParquetPartitionReaderFactoryBase.isCorrectedRebaseMode( + footer.getFileMetaData.getKeyValueMetaData.get, isCorrectedRebase) val blocks = if (pushedFilters.isDefined) { // Use the ParquetFileReader to perform dictionary-level filtering ParquetInputFormat.setFilterPredicate(conf, pushedFilters.get) //noinspection ScalaDeprecation - withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath, + withResource(new ParquetFileReader(conf, footer.getFileMetaData, filePath, footer.getBlocks, Collections.emptyList[ColumnDescriptor])) { parquetReader => parquetReader.getRowGroups } @@ -258,75 +272,127 @@ case class GpuParquetPartitionReaderFactory( } val clippedSchemaTmp = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema, - isCaseSensitive) + isCaseSensitive) // ParquetReadSupport.clipParquetSchema does most of what we want, but it includes // everything in readDataSchema, even if it is not in fileSchema we want to remove those // for our own purposes - val clippedSchema = filterClippedSchema(clippedSchemaTmp, fileSchema, isCaseSensitive) - val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x:_*)) - val clippedBlocks = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala) - new ParquetPartitionReader(conf, file, filePath, clippedBlocks, clippedSchema, - isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, - maxReadBatchSizeBytes, metrics, isCorrectedRebaseForThis) + val clippedSchema = GpuParquetPartitionReaderFactoryBase.filterClippedSchema(clippedSchemaTmp, + fileSchema, isCaseSensitive) + val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x: _*)) + val clipped = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala) + ParquetFileInfoWithBlockMeta(filePath, clipped, file.partitionValues, + clippedSchema, isCorrectedRebaseForThisFile) } } /** - * A PartitionReader that reads a Parquet file split on the GPU. - * - * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file - * in memory that contains just the column chunks that are needed. This avoids sending - * unnecessary data to the GPU and saves GPU memory. - * - * @param conf the Hadoop configuration - * @param split the file split to read - * @param filePath the path to the Parquet file - * @param clippedBlocks the block metadata from the original Parquet file that has been clipped - * to only contain the column chunks to be read - * @param clippedParquetSchema the Parquet schema from the original Parquet file that has been - * clipped to contain only the columns to be read - * @param readDataSchema the Spark schema describing what will be read - * @param debugDumpPrefix a path prefix to use for dumping the fabricated Parquet data or null + * Similar to GpuParquetPartitionReaderFactory but extended for reading multiple files + * in an iteration. This will allow us to read multiple small files and combine them + * on the CPU side before sending them down to the GPU. */ -class ParquetPartitionReader( +case class GpuParquetMultiFilePartitionReaderFactory( + @transient sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + filters: Array[Filter], + @transient rapidsConf: RapidsConf, + metrics: Map[String, SQLMetric]) extends PartitionReaderFactory with Arm with Logging { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix + private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows + private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes + + private val filterHandler = new GpuParquetFileFilterHandler(sqlConf) + + override def supportColumnarReads(partition: InputPartition): Boolean = true + + override def createReader(partition: InputPartition): PartitionReader[InternalRow] = { + throw new IllegalStateException("GPU column parser called to read rows") + } + + override def createColumnarReader(partition: InputPartition): PartitionReader[ColumnarBatch] = { + assert(partition.isInstanceOf[FilePartition]) + val filePartition = partition.asInstanceOf[FilePartition] + val files = filePartition.files + buildBaseColumnarParquetReader(files) + } + + private def buildBaseColumnarParquetReader( + files: Array[PartitionedFile]): PartitionReader[ColumnarBatch] = { + val conf = broadcastedConf.value.value + logDebug(s"Number files being read: ${files.size} for task ${TaskContext.get().partitionId()}") + val clippedBlocks = ArrayBuffer[ParquetFileInfoWithSingleBlockMeta]() + files.map { file => + val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + clippedBlocks ++= singleFileInfo.blocks.map( + ParquetFileInfoWithSingleBlockMeta(singleFileInfo.filePath, _, file.partitionValues, + singleFileInfo.schema, singleFileInfo.isCorrectedRebaseMode)) + } + + new MultiFileParquetPartitionReader(conf, files, clippedBlocks, + isCaseSensitive, readDataSchema, debugDumpPrefix, + maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema) + } +} + +case class GpuParquetPartitionReaderFactory( + @transient sqlConf: SQLConf, + broadcastedConf: Broadcast[SerializableConfiguration], + dataSchema: StructType, + readDataSchema: StructType, + partitionSchema: StructType, + filters: Array[Filter], + @transient rapidsConf: RapidsConf, + metrics: Map[String, SQLMetric]) extends FilePartitionReaderFactory with Arm with Logging { + private val isCaseSensitive = sqlConf.caseSensitiveAnalysis + private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix + private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows + private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes + + private val filterHandler = new GpuParquetFileFilterHandler(sqlConf) + + override def supportColumnarReads(partition: InputPartition): Boolean = true + + override def buildReader(partitionedFile: PartitionedFile): PartitionReader[InternalRow] = { + throw new IllegalStateException("GPU column parser called to read rows") + } + + override def buildColumnarReader( + partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { + val reader = buildBaseColumnarParquetReader(partitionedFile) + ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema) + } + + private def buildBaseColumnarParquetReader( + file: PartitionedFile): PartitionReader[ColumnarBatch] = { + val conf = broadcastedConf.value.value + val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + new ParquetPartitionReader(conf, file, singleFileInfo.filePath, singleFileInfo.blocks, + singleFileInfo.schema, isCaseSensitive, readDataSchema, + debugDumpPrefix, maxReadBatchSizeRows, + maxReadBatchSizeBytes, metrics, singleFileInfo.isCorrectedRebaseMode) + } +} + +/** + * Base classes with common functions for MultiFileParquetPartitionReader and ParquetPartitionReader + */ +abstract class FileParquetPartitionReaderBase( conf: Configuration, - split: PartitionedFile, - filePath: Path, - clippedBlocks: Seq[BlockMetaData], - clippedParquetSchema: MessageType, isSchemaCaseSensitive: Boolean, readDataSchema: StructType, debugDumpPrefix: String, - maxReadBatchSizeRows: Integer, - maxReadBatchSizeBytes: Long, - execMetrics: Map[String, SQLMetric], - isCorrectedRebaseMode: Boolean) extends PartitionReader[ColumnarBatch] with Logging + execMetrics: Map[String, SQLMetric]) extends PartitionReader[ColumnarBatch] with Logging with ScanWithMetrics with Arm { - private var isExhausted: Boolean = false - private var maxDeviceMemory: Long = 0 - private var batch: Option[ColumnarBatch] = None - private val blockIterator : BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered - private val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024) + protected var isExhausted: Boolean = false + protected var maxDeviceMemory: Long = 0 + protected var batch: Option[ColumnarBatch] = None + protected val copyBufferSize = conf.getInt("parquet.read.allocation.size", 8 * 1024 * 1024) metrics = execMetrics - override def next(): Boolean = { - batch.foreach(_.close()) - batch = None - if (!isExhausted) { - if (!blockIterator.hasNext) { - isExhausted = true - metrics("peakDevMemory") += maxDeviceMemory - } else { - batch = readBatch() - } - } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) - batch.isDefined - } - override def get(): ColumnarBatch = { val ret = batch.getOrElse(throw new NoSuchElementException) batch = None @@ -339,52 +405,52 @@ class ParquetPartitionReader( isExhausted = true } - private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long) = { - withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, - metrics("bufferTime"))) { _ => - withResource(filePath.getFileSystem(conf).open(filePath)) { in => - var succeeded = false - val hmb = HostMemoryBuffer.allocate(calculateParquetOutputSize(blocks)) - try { - val out = new HostMemoryOutputStream(hmb) - out.write(ParquetPartitionReader.PARQUET_MAGIC) - val outputBlocks = copyBlocksData(in, out, blocks) - val footerPos = out.getPos - writeFooter(out, outputBlocks) - BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) - out.write(ParquetPartitionReader.PARQUET_MAGIC) - succeeded = true - (hmb, out.getPos) - } finally { - if (!succeeded) { - hmb.close() - } - } - } - } + protected def calculateParquetFooterSize( + currentChunkedBlocks: Seq[BlockMetaData], + schema: MessageType): Long = { + // Calculate size of the footer metadata. + // This uses the column metadata from the original file, but that should + // always be at least as big as the updated metadata in the output. + val out = new CountingOutputStream(new NullOutputStream) + writeFooter(out, currentChunkedBlocks, schema) + out.getByteCount } - private def calculateParquetOutputSize(currentChunkedBlocks: Seq[BlockMetaData]): Long = { + protected def calculateParquetOutputSize( + currentChunkedBlocks: Seq[BlockMetaData], + schema: MessageType, + handleMultiFiles: Boolean): Long = { // start with the size of Parquet magic (at start+end) and footer length values var size: Long = 4 + 4 + 4 - // add in the size of the row group data - // Calculate the total amount of column data that will be copied // NOTE: Avoid using block.getTotalByteSize here as that is the // uncompressed size rather than the size in the file. size += currentChunkedBlocks.flatMap(_.getColumns.asScala.map(_.getTotalSize)).sum - // Calculate size of the footer metadata. - // This uses the column metadata from the original file, but that should - // always be at least as big as the updated metadata in the output. - val out = new CountingOutputStream(new NullOutputStream) - writeFooter(out, currentChunkedBlocks) - size + out.getByteCount + val footerSize = calculateParquetFooterSize(currentChunkedBlocks, schema) + val extraMemory = if (handleMultiFiles) { + // we want to add extra memory because the ColumnChunks saved in the Footer have 2 fields + // file_offset and data_page_offset that get much larger when we are combining files. + // Here we estimate that by taking the number of columns * number of blocks which should be + // the number of column chunks and then saying there are 2 fields that could be larger and + // assume max size of those would be 8 bytes worst case. So we probably allocate to much here + // but it shouldn't be by a huge amount and its better then having to realloc and copy. + val numCols = currentChunkedBlocks.head.getColumns().size() + val numColumnChunks = numCols * currentChunkedBlocks.size + numColumnChunks * 2 * 8 + } else { + 0 + } + val totalSize = size + footerSize + extraMemory + totalSize } - private def writeFooter(out: OutputStream, blocks: Seq[BlockMetaData]): Unit = { - val fileMeta = new FileMetaData(clippedParquetSchema, Collections.emptyMap[String, String], + protected def writeFooter( + out: OutputStream, + blocks: Seq[BlockMetaData], + schema: MessageType): Unit = { + val fileMeta = new FileMetaData(schema, Collections.emptyMap[String, String], ParquetPartitionReader.PARQUET_CREATOR) val metadataConverter = new ParquetMetadataConverter val footer = new ParquetMetadata(fileMeta, blocks.asJava) @@ -392,7 +458,7 @@ class ParquetPartitionReader( org.apache.parquet.format.Util.writeFileMetaData(meta, out) } - private def copyDataRange( + protected def copyDataRange( range: CopyRange, in: FSDataInputStream, out: OutputStream, @@ -416,11 +482,11 @@ class ParquetPartitionReader( * metadata but with the file offsets updated to reflect the new position of the column data * as written to the output. * - * @param in the input stream for the original Parquet file + * @param in the input stream for the original Parquet file * @param out the output stream to receive the data * @return updated block metadata corresponding to the output */ - private def copyBlocksData( + protected def copyBlocksData( in: FSDataInputStream, out: HostMemoryOutputStream, blocks: Seq[BlockMetaData]): Seq[BlockMetaData] = { @@ -477,34 +543,7 @@ class ParquetPartitionReader( outputBlocks } - private def readBatch(): Option[ColumnarBatch] = { - withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, - metrics(TOTAL_TIME))) { _ => - val currentChunkedBlocks = populateCurrentBlockChunk() - if (readDataSchema.isEmpty) { - // not reading any data, so return a degenerate ColumnarBatch with the row count - val numRows = currentChunkedBlocks.map(_.getRowCount).sum.toInt - if (numRows == 0) { - None - } else { - Some(new ColumnarBatch(Array.empty, numRows.toInt)) - } - } else { - val table = readToTable(currentChunkedBlocks) - try { - val maybeBatch = table.map(GpuColumnVector.from) - maybeBatch.foreach { batch => - logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") - } - maybeBatch - } finally { - table.foreach(_.close()) - } - } - } - } - - private def areNamesEquiv(groups: GroupType, index: Int, otherName: String, + protected def areNamesEquiv(groups: GroupType, index: Int, otherName: String, isCaseSensitive: Boolean): Boolean = { if (groups.getFieldCount > index) { if (isCaseSensitive) { @@ -517,11 +556,14 @@ class ParquetPartitionReader( } } - private def evolveSchemaIfNeededAndClose(inputTable: Table): Table = { + protected def evolveSchemaIfNeededAndClose( + inputTable: Table, + filePath: String, + clippedSchema: MessageType): Table = { if (readDataSchema.length > inputTable.getNumberOfColumns) { // Spark+Parquet schema evolution is relatively simple with only adding/removing columns // To type casting or anyting like that - val clippedGroups = clippedParquetSchema.asGroupType() + val clippedGroups = clippedSchema.asGroupType() val newColumns = new Array[ColumnVector](readDataSchema.length) try { withResource(inputTable) { table => @@ -551,18 +593,438 @@ class ParquetPartitionReader( } } - private def readToTable(currentChunkedBlocks: Seq[BlockMetaData]): Option[Table] = { + protected def dumpParquetData( + hmb: HostMemoryBuffer, + dataLength: Long, + splits: Array[PartitionedFile]): Unit = { + val (out, path) = FileUtils.createTempFile(conf, debugDumpPrefix, ".parquet") + try { + logInfo(s"Writing Parquet split data for $splits to $path") + val in = new HostMemoryInputStream(hmb, dataLength) + IOUtils.copy(in, out) + } finally { + out.close() + } + } +} + +/** + * A PartitionReader that can read multiple Parquet files up to the certain size. + * + * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file + * in memory that contains just the column chunks that are needed. This avoids sending + * unnecessary data to the GPU and saves GPU memory. + * + * @param conf the Hadoop configuration + * @param split the file split to read + * @param clippedBlocks the block metadata from the original Parquet file that has been clipped + * to only contain the column chunks to be read + * @param readDataSchema the Spark schema describing what will be read + * @param debugDumpPrefix a path prefix to use for dumping the fabricated Parquet data or null + */ +class MultiFileParquetPartitionReader( + conf: Configuration, + splits: Array[PartitionedFile], + clippedBlocks: Seq[ParquetFileInfoWithSingleBlockMeta], + isSchemaCaseSensitive: Boolean, + readDataSchema: StructType, + debugDumpPrefix: String, + maxReadBatchSizeRows: Integer, + maxReadBatchSizeBytes: Long, + execMetrics: Map[String, SQLMetric], + partitionSchema: StructType) + extends FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema, + debugDumpPrefix, execMetrics) { + + private val blockIterator: BufferedIterator[ParquetFileInfoWithSingleBlockMeta] = + clippedBlocks.iterator.buffered + + private def addPartitionValues( + batch: Option[ColumnarBatch], + inPartitionValues: InternalRow): Option[ColumnarBatch] = { + batch.map { cb => + val partitionValues = inPartitionValues.toSeq(partitionSchema) + val partitionScalars = ColumnarPartitionReaderWithPartitionValues + .createPartitionValues(partitionValues, partitionSchema) + withResource(partitionScalars) { scalars => + ColumnarPartitionReaderWithPartitionValues.addPartitionValues(cb, scalars) + } + } + } + + override def next(): Boolean = { + batch.foreach(_.close()) + batch = None + if (!isExhausted) { + if (!blockIterator.hasNext) { + isExhausted = true + metrics("peakDevMemory") += maxDeviceMemory + } else { + batch = readBatch() + } + } + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + batch.isDefined + } + + private def reallocHostBufferAndCopy( + in: HostMemoryInputStream, + newSizeEstimate: Long): (HostMemoryBuffer, HostMemoryOutputStream) = { + // realloc memory and copy + closeOnExcept(HostMemoryBuffer.allocate(newSizeEstimate)) { newhmb => + val newout = new HostMemoryOutputStream(newhmb) + IOUtils.copy(in, newout) + (newhmb, newout) + } + } + + private def readPartFiles( + blocks: Seq[(Path, BlockMetaData)], + clippedSchema: MessageType): (HostMemoryBuffer, Long) = { + withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, + metrics("bufferTime"))) { _ => + // ugly but we want to keep the order + val filesAndBlocks = LinkedHashMap[Path, ArrayBuffer[BlockMetaData]]() + blocks.foreach { info => + if (filesAndBlocks.contains(info._1)) { + filesAndBlocks(info._1) += info._2 + } else { + filesAndBlocks(info._1) = ArrayBuffer(info._2) + } + } + + var succeeded = false + val allBlocks = blocks.map(_._2) + val initTotalSize = calculateParquetOutputSize(allBlocks, clippedSchema, true) + var hmb = HostMemoryBuffer.allocate(initTotalSize) + var out = new HostMemoryOutputStream(hmb) + try { + out.write(ParquetPartitionReader.PARQUET_MAGIC) + val allOutputBlocks = scala.collection.mutable.ArrayBuffer[BlockMetaData]() + filesAndBlocks.foreach { case (file, blocks) => + withResource(file.getFileSystem(conf).open(file)) { in => + val retBlocks = copyBlocksData(in, out, blocks) + allOutputBlocks ++= retBlocks + } + } + // The footer size can change vs the initial estimated because we are combining more blocks + // and offsets are larger, check to make sure we allocated enough memory before writing. + // Not sure how expensive this is, we could throw exception instead if the written + // size comes out > then the estimated size. + val actualFooterSize = calculateParquetFooterSize(allOutputBlocks, clippedSchema) + val footerPos = out.getPos + // 4 + 4 is for writing size and the ending PARQUET_MAGIC. + val bufferSizeReq = footerPos + actualFooterSize + 4 + 4 + val bufferSize = if (bufferSizeReq > initTotalSize) { + logWarning(s"The original estimated size $initTotalSize is to small, " + + s"reallocing and copying data to bigger buffer size: $bufferSizeReq") + val prevhmb = hmb + val in = new HostMemoryInputStream(prevhmb, footerPos) + val (newhmb, newout) = reallocHostBufferAndCopy(in, bufferSizeReq) + out = newout + hmb = newhmb + prevhmb.close() + bufferSizeReq + } else { + // we didn't change the buffer size so return the initial size which is the actual + // size of the buffer + initTotalSize + } + writeFooter(out, allOutputBlocks, clippedSchema) + BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + succeeded = true + // triple check we didn't go over memory + if (out.getPos > bufferSize) { + throw new QueryExecutionException(s"Calculated buffer size $bufferSize is to " + + s"small, actual written: ${out.getPos}") + } + (hmb, out.getPos) + } finally { + if (!succeeded) { + hmb.close() + } + } + } + } + + private def readBatch(): Option[ColumnarBatch] = { + withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, + metrics(TOTAL_TIME))) { _ => + val (isCorrectRebaseMode, clippedSchema, partValues, seqPathsAndBlocks) = + populateCurrentBlockChunk() + if (readDataSchema.isEmpty) { + // not reading any data, so return a degenerate ColumnarBatch with the row count + val numRows = seqPathsAndBlocks.map(_._2.getRowCount).sum.toInt + if (numRows == 0) { + None + } else { + Some(new ColumnarBatch(Array.empty, numRows.toInt)) + } + } else { + val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode) + try { + val maybeBatch = table.map(GpuColumnVector.from) + maybeBatch.foreach { batch => + logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") + } + // we have to add partition values here for this batch, we already verified that + // its not different for all the blocks in this batch + addPartitionValues(maybeBatch, partValues) + } finally { + table.foreach(_.close()) + } + } + } + } + + private def readToTable( + currentChunkedBlocks: Seq[(Path, BlockMetaData)], + clippedSchema: MessageType, + isCorrectRebaseMode: Boolean): Option[Table] = { if (currentChunkedBlocks.isEmpty) { return None } + val (dataBuffer, dataSize) = readPartFiles(currentChunkedBlocks, clippedSchema) + try { + if (dataSize == 0) { + None + } else { + if (debugDumpPrefix != null) { + dumpParquetData(dataBuffer, dataSize, splits) + } + val parseOpts = ParquetOptions.builder() + .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) + .includeColumn(readDataSchema.fieldNames:_*).build() + + // about to start using the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + + val table = withResource(new NvtxWithMetrics("Parquet decode", NvtxColor.DARK_GREEN, + metrics(GPU_DECODE_TIME))) { _ => + Table.readParquet(parseOpts, dataBuffer, 0, dataSize) + } + if (!isCorrectRebaseMode) { + (0 until table.getNumberOfColumns).foreach { i => + if (RebaseHelper.isDateTimeRebaseNeededRead(table.getColumn(i))) { + throw RebaseHelper.newRebaseExceptionInRead("Parquet") + } + } + } + maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) + if (readDataSchema.length < table.getNumberOfColumns) { + table.close() + throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + + s"but read ${table.getNumberOfColumns} from $currentChunkedBlocks") + } + metrics(NUM_OUTPUT_BATCHES) += 1 + Some(evolveSchemaIfNeededAndClose(table, splits.mkString(","), clippedSchema)) + } + } finally { + dataBuffer.close() + } + } + + private def populateCurrentBlockChunk(): + (Boolean, MessageType, InternalRow, Seq[(Path, BlockMetaData)]) = { + + val currentChunk = new ArrayBuffer[(Path, BlockMetaData)] + var numRows: Long = 0 + var numBytes: Long = 0 + var numParquetBytes: Long = 0 + var currentFile: Path = null + var currentPartitionValues: InternalRow = null + var currentClippedSchema: MessageType = null + var currentIsCorrectRebaseMode: Boolean = false + + @tailrec + def readNextBatch(): Unit = { + if (blockIterator.hasNext) { + if (currentFile == null) { + currentFile = blockIterator.head.filePath + currentPartitionValues = blockIterator.head.partValues + currentClippedSchema = blockIterator.head.schema + currentIsCorrectRebaseMode = blockIterator.head.isCorrectedRebaseMode + } + if (currentFile != blockIterator.head.filePath) { + // We need to ensure all files we are going to combine have the same datetime rebase mode. + if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) { + logInfo("datetime rebase mode for the next file " + + s"${blockIterator.head.filePath} is different then current file $currentFile, " + + s"splitting into another batch.") + return + } + + // check to see if partitionValues different, then have to split it + if (blockIterator.head.partValues != currentPartitionValues) { + logInfo(s"Partition values for the next file ${blockIterator.head.filePath}" + + s" doesn't match current $currentFile, splitting it into another batch!") + return + } + val schemaNextfile = + blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName) + val schemaCurrentfile = + currentClippedSchema.asGroupType().getFields.asScala.map(_.getName) + if (!schemaNextfile.sameElements(schemaCurrentfile)) { + logInfo(s"File schema for the next file ${blockIterator.head.filePath}" + + s" doesn't match current $currentFile, splitting it into another batch!") + return + } + currentFile = blockIterator.head.filePath + currentPartitionValues = blockIterator.head.partValues + currentClippedSchema = blockIterator.head.schema + } + val peekedRowGroup = blockIterator.head.blockMeta + if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) { + throw new UnsupportedOperationException("Too many rows in split") + } + + if (numRows == 0 || numRows + peekedRowGroup.getRowCount <= maxReadBatchSizeRows) { + val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema, + peekedRowGroup.getRowCount) + if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) { + val nextBlock = blockIterator.next() + val nextTuple = (nextBlock.filePath, nextBlock.blockMeta) + currentChunk += nextTuple + numRows += currentChunk.last._2.getRowCount + numParquetBytes += currentChunk.last._2.getTotalByteSize + numBytes += estimatedBytes + readNextBatch() + } + } + } + } + readNextBatch() + logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " + + s"Estimated GPU bytes: $numBytes") + (currentIsCorrectRebaseMode, currentClippedSchema, currentPartitionValues, currentChunk) + } +} + +/** + * A PartitionReader that reads a Parquet file split on the GPU. + * + * Efficiently reading a Parquet split on the GPU requires re-constructing the Parquet file + * in memory that contains just the column chunks that are needed. This avoids sending + * unnecessary data to the GPU and saves GPU memory. + * + * @param conf the Hadoop configuration + * @param split the file split to read + * @param filePath the path to the Parquet file + * @param clippedBlocks the block metadata from the original Parquet file that has been clipped + * to only contain the column chunks to be read + * @param clippedParquetSchema the Parquet schema from the original Parquet file that has been + * clipped to contain only the columns to be read + * @param readDataSchema the Spark schema describing what will be read + * @param debugDumpPrefix a path prefix to use for dumping the fabricated Parquet data or null + */ +class ParquetPartitionReader( + conf: Configuration, + split: PartitionedFile, + filePath: Path, + clippedBlocks: Seq[BlockMetaData], + clippedParquetSchema: MessageType, + isSchemaCaseSensitive: Boolean, + readDataSchema: StructType, + debugDumpPrefix: String, + maxReadBatchSizeRows: Integer, + maxReadBatchSizeBytes: Long, + execMetrics: Map[String, SQLMetric], + isCorrectedRebaseMode: Boolean) extends + FileParquetPartitionReaderBase(conf, + isSchemaCaseSensitive, readDataSchema, debugDumpPrefix, execMetrics) { + + private val blockIterator : BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered + + override def next(): Boolean = { + batch.foreach(_.close()) + batch = None + if (!isExhausted) { + if (!blockIterator.hasNext) { + isExhausted = true + metrics("peakDevMemory") += maxDeviceMemory + } else { + batch = readBatch() + } + } + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + batch.isDefined + } + + private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long) = { + withResource(new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, + metrics("bufferTime"))) { _ => + withResource(filePath.getFileSystem(conf).open(filePath)) { in => + var succeeded = false + val estTotalSize = calculateParquetOutputSize(blocks, clippedParquetSchema, false) + val hmb = + HostMemoryBuffer.allocate(estTotalSize) + try { + val out = new HostMemoryOutputStream(hmb) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + val outputBlocks = copyBlocksData(in, out, blocks) + val footerPos = out.getPos + writeFooter(out, outputBlocks, clippedParquetSchema) + BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) + out.write(ParquetPartitionReader.PARQUET_MAGIC) + succeeded = true + // check we didn't go over memory + if (out.getPos > estTotalSize) { + throw new QueryExecutionException(s"Calculated buffer size $estTotalSize is to " + + s"small, actual written: ${out.getPos}") + } + (hmb, out.getPos) + } finally { + if (!succeeded) { + hmb.close() + } + } + } + } + } + + private def readBatch(): Option[ColumnarBatch] = { + withResource(new NvtxWithMetrics("Parquet readBatch", NvtxColor.GREEN, + metrics(TOTAL_TIME))) { _ => + val currentChunkedBlocks = populateCurrentBlockChunk() + if (readDataSchema.isEmpty) { + // not reading any data, so return a degenerate ColumnarBatch with the row count + val numRows = currentChunkedBlocks.map(_.getRowCount).sum.toInt + if (numRows == 0) { + None + } else { + Some(new ColumnarBatch(Array.empty, numRows.toInt)) + } + } else { + val table = readToTable(currentChunkedBlocks) + try { + val maybeBatch = table.map(GpuColumnVector.from) + maybeBatch.foreach { batch => + logDebug(s"GPU batch size: ${GpuColumnVector.getTotalDeviceMemoryUsed(batch)} bytes") + } + maybeBatch + } finally { + table.foreach(_.close()) + } + } + } + } + private def readToTable(currentChunkedBlocks: Seq[BlockMetaData]): Option[Table] = { + if (currentChunkedBlocks.isEmpty) { + return None + } val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks) try { if (dataSize == 0) { None } else { if (debugDumpPrefix != null) { - dumpParquetData(dataBuffer, dataSize) + dumpParquetData(dataBuffer, dataSize, Array(split)) } val parseOpts = ParquetOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) @@ -589,7 +1051,7 @@ class ParquetPartitionReader( s"but read ${table.getNumberOfColumns} from $filePath") } metrics(NUM_OUTPUT_BATCHES) += 1 - Some(evolveSchemaIfNeededAndClose(table)) + Some(evolveSchemaIfNeededAndClose(table, filePath.toString, clippedParquetSchema)) } } finally { dataBuffer.close() @@ -631,27 +1093,14 @@ class ParquetPartitionReader( currentChunk } - - private def dumpParquetData( - hmb: HostMemoryBuffer, - dataLength: Long): Unit = { - val (out, path) = FileUtils.createTempFile(conf, debugDumpPrefix, ".parquet") - try { - logInfo(s"Writing Parquet split data for $split to $path") - val in = new HostMemoryInputStream(hmb, dataLength) - IOUtils.copy(in, out) - } finally { - out.close() - } - } } object ParquetPartitionReader { - private val PARQUET_MAGIC = "PAR1".getBytes(StandardCharsets.US_ASCII) - private val PARQUET_CREATOR = "RAPIDS Spark Plugin" - private val PARQUET_VERSION = 1 + private[rapids] val PARQUET_MAGIC = "PAR1".getBytes(StandardCharsets.US_ASCII) + private[rapids] val PARQUET_CREATOR = "RAPIDS Spark Plugin" + private[rapids] val PARQUET_VERSION = 1 - private case class CopyRange(offset: Long, length: Long) + private[rapids] case class CopyRange(offset: Long, length: Long) /** * Build a new BlockMetaData @@ -660,7 +1109,7 @@ object ParquetPartitionReader { * @param columns the new column chunks to reference in the new BlockMetaData * @return the new BlockMetaData */ - private def newParquetBlock( + private[rapids] def newParquetBlock( rowCount: Long, columns: Seq[ColumnChunkMetaData]): BlockMetaData = { val block = new BlockMetaData @@ -696,4 +1145,3 @@ object ParquetPartitionReader { } } - diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index b21a3d5fdfe..c339a8e0683 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -22,10 +22,11 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.ExecutedCommandExec +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanExecBase import org.apache.spark.sql.execution.exchange.{Exchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec} -import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec} +import org.apache.spark.sql.rapids.{GpuDataSourceScanExec, GpuFileSourceScanExec, GpuInputFileBlockLength, GpuInputFileBlockStart, GpuInputFileName} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuCustomShuffleReaderExec, GpuShuffleExchangeExecBase} /** @@ -161,6 +162,48 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { plan.expressions.exists(disableCoalesceUntilInput) } + private def disableScanUntilInput(exec: Expression): Boolean = { + exec match { + case _: InputFileName => true + case _: InputFileBlockStart => true + case _: InputFileBlockLength => true + case _: GpuInputFileName => true + case _: GpuInputFileBlockStart => true + case _: GpuInputFileBlockLength => true + case e => e.children.exists(disableScanUntilInput) + } + } + + private def disableScanUntilInput(plan: SparkPlan): Boolean = { + plan.expressions.exists(disableScanUntilInput) + } + + // This walks from the output to the input to look for any uses of InputFileName, + // InputFileBlockStart, or InputFileBlockLength when we use a Parquet read because + // we can't support the small file optimization when this is used. + private def updateScansForInput(plan: SparkPlan, + disableUntilInput: Boolean = false): SparkPlan = plan match { + case batchScan: GpuBatchScanExec => + if (batchScan.scan.isInstanceOf[GpuParquetScanBase] && + (disableUntilInput || disableScanUntilInput(batchScan))) { + ShimLoader.getSparkShims.copyParquetBatchScanExec(batchScan, false) + } else { + batchScan + } + case fileSourceScan: GpuFileSourceScanExec => + if (fileSourceScan.supportsSmallFileOpt == true && + (disableUntilInput || disableScanUntilInput(fileSourceScan))) { + ShimLoader.getSparkShims.copyFileSourceScanExec(fileSourceScan, false) + } else { + fileSourceScan + } + case p => + val planDisableUntilInput = disableScanUntilInput(p) && hasDirectLineToInput(p) + p.withNewChildren(p.children.map(c => { + updateScansForInput(c, planDisableUntilInput || disableUntilInput) + })) + } + // This walks from the output to the input so disableUntilInput can walk its way from when // we hit something that cannot allow for coalesce up until the input private def insertCoalesce(plan: SparkPlan, @@ -319,6 +362,7 @@ class GpuTransitionOverrides extends Rule[SparkPlan] { this.conf = new RapidsConf(plan.conf) if (conf.isSqlEnabled) { var updatedPlan = insertHashOptimizeSorts(plan) + updatedPlan = updateScansForInput(updatedPlan) updatedPlan = insertCoalesce(insertColumnarFromGpu(updatedPlan)) updatedPlan = optimizeCoalesce(if (plan.conf.adaptiveExecutionEnabled) { optimizeAdaptiveTransitions(updatedPlan) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index a0dc79d4009..c59fa8fd3fa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -437,6 +437,13 @@ object RapidsConf { .booleanConf .createWithDefault(true) + val ENABLE_SMALL_FILES_PARQUET = conf("spark.rapids.sql.format.parquet.smallFiles.enabled") + .doc("When set to true, handles reading multiple small files within a partition more " + + "efficiently by combining multiple files on the CPU side before sending to the GPU. " + + "Recommended unless user needs mergeSchema option or schema evolution.") + .booleanConf + .createWithDefault(true) + val ENABLE_PARQUET_READ = conf("spark.rapids.sql.format.parquet.read.enabled") .doc("When set to false disables parquet input acceleration") .booleanConf @@ -833,6 +840,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetEnabled: Boolean = get(ENABLE_PARQUET) + lazy val isParquetSmallFilesEnabled: Boolean = get(ENABLE_SMALL_FILES_PARQUET) + lazy val isParquetReadEnabled: Boolean = get(ENABLE_PARQUET_READ) lazy val isParquetWriteEnabled: Boolean = get(ENABLE_PARQUET_WRITE) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala index 259cdd2d1af..13e28be1244 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsMeta.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.DataWritingCommand import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, ShuffledHashJoinExec, SortMergeJoinExec} -import org.apache.spark.sql.types.{CalendarIntervalType, DataType, DataTypes, StringType} +import org.apache.spark.sql.types.DataType trait ConfKeysAndIncompat { val operationName: String diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala index c95dc764f53..ad183978e36 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SparkShims.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec import org.apache.spark.sql.execution.datasources.{FilePartition, HadoopFsRelation, PartitionDirectory, PartitionedFile} import org.apache.spark.sql.execution.joins._ -import org.apache.spark.sql.rapids.ShuffleManagerShimBase +import org.apache.spark.sql.rapids.{GpuFileSourceScanExec, ShuffleManagerShimBase} import org.apache.spark.sql.rapids.execution.{GpuBroadcastExchangeExecBase, GpuBroadcastNestedLoopJoinExecBase, GpuShuffleExchangeExecBase} import org.apache.spark.sql.types._ import org.apache.spark.storage.{BlockId, BlockManagerId} @@ -112,6 +112,8 @@ trait SparkShims { def getShuffleManagerShims(): ShuffleManagerShimBase + def createFilePartition(index: Int, files: Array[PartitionedFile]): FilePartition + def getPartitionFileNames(partitions: Seq[PartitionDirectory]): Seq[String] def getPartitionFileStatusSize(partitions: Seq[PartitionDirectory]): Long def getPartitionedFiles(partitions: Array[PartitionDirectory]): Array[PartitionedFile] @@ -123,5 +125,11 @@ trait SparkShims { sparkSession: SparkSession, readFunction: (PartitionedFile) => Iterator[InternalRow], filePartitions: Seq[FilePartition]): RDD[InternalRow] + + def copyParquetBatchScanExec(batchScanExec: GpuBatchScanExec, + supportsSmallFileOpt: Boolean): GpuBatchScanExec + + def copyFileSourceScanExec(scanExec: GpuFileSourceScanExec, + supportsSmallFileOpt: Boolean): GpuFileSourceScanExec } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 69c818aa3a3..cd5418d8577 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap -import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, ShimLoader, SparkPlanMeta} +import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, ShimLoader, SparkPlanMeta} import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD @@ -34,10 +34,12 @@ import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStr import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.util.SerializableConfiguration import org.apache.spark.util.collection.BitSet /** @@ -51,6 +53,7 @@ import org.apache.spark.util.collection.BitSet * @param optionalNumCoalescedBuckets Number of coalesced buckets. * @param dataFilters Filters on non-partition columns. * @param tableIdentifier identifier for the table in the metastore. + * @param supportsSmallFileOpt indicates if we should use the small file optimization. */ case class GpuFileSourceScanExec( @transient relation: HadoopFsRelation, @@ -60,7 +63,8 @@ case class GpuFileSourceScanExec( optionalBucketSet: Option[BitSet], optionalNumCoalescedBuckets: Option[Int], dataFilters: Seq[Expression], - tableIdentifier: Option[TableIdentifier]) + tableIdentifier: Option[TableIdentifier], + supportsSmallFileOpt: Boolean = true) extends GpuDataSourceScanExec with GpuExec { override val nodeName: String = { @@ -270,25 +274,35 @@ case class GpuFileSourceScanExec( |""".stripMargin } + /** + * If the small file optimization is enabled then we read all the files before sending down + * to the GPU. If it is disabled then we use the standard Spark logic of reading one file + * at a time. + */ lazy val inputRDD: RDD[InternalRow] = { - val readFile: (PartitionedFile) => Iterator[InternalRow] = { + val readFile: Option[(PartitionedFile) => Iterator[InternalRow]] = if (!supportsSmallFileOpt) { val fileFormat = relation.fileFormat.asInstanceOf[GpuReadFileFormatWithMetrics] - fileFormat.buildReaderWithPartitionValuesAndMetrics( + val reader = fileFormat.buildReaderWithPartitionValuesAndMetrics( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = relation.options, - hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), + hadoopConf = + relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options), metrics = metrics) + Some(reader) + } else { + None } val readRDD = if (bucketedScan) { createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, - relation) + relation, supportsSmallFileOpt) } else { - createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, relation) + createNonBucketedReadRDD(readFile, dynamicallySelectedPartitions, + relation, supportsSmallFileOpt) } sendDriverMetrics() readRDD @@ -380,15 +394,18 @@ case class GpuFileSourceScanExec( * with the same bucket id from all the given Hive partitions. * * @param bucketSpec the bucketing spec. - * @param readFile a function to read each (part of a) file. + * @param readFile an optional function to read each (part of a) file. Used + * when not using the small file optimization. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. + * @param useSmallFileOpt whether to create an RDD using small file optimization. */ private def createBucketedReadRDD( bucketSpec: BucketSpec, - readFile: (PartitionedFile) => Iterator[InternalRow], + readFile: Option[(PartitionedFile) => Iterator[InternalRow]], selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { + fsRelation: HadoopFsRelation, + useSmallFileOpt: Boolean): RDD[InternalRow] = { logInfo(s"Planning with ${bucketSpec.numBuckets} buckets") val partitionedFiles = @@ -416,28 +433,54 @@ case class GpuFileSourceScanExec( val partitionedFiles = coalescedBuckets.get(bucketId).map { _.values.flatten.toArray }.getOrElse(Array.empty) - FilePartition(bucketId, partitionedFiles) + ShimLoader.getSparkShims.createFilePartition(bucketId, partitionedFiles) } }.getOrElse { Seq.tabulate(bucketSpec.numBuckets) { bucketId => - FilePartition(bucketId, prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) + ShimLoader.getSparkShims.createFilePartition(bucketId, + prunedFilesGroupedToBuckets.getOrElse(bucketId, Array.empty)) } } - ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, filePartitions) + if (!useSmallFileOpt) { + ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile.get, filePartitions) + } else { + // here we are making an optimization to read more then 1 file at a time on the CPU side + // if they are small files before sending it down to the GPU + val sqlConf = relation.sparkSession.sessionState.conf + val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) + val broadcastedHadoopConf = + relation.sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val factory = GpuParquetMultiFilePartitionReaderFactory( + sqlConf, + broadcastedHadoopConf, + relation.dataSchema, + requiredSchema, + relation.partitionSchema, + pushedDownFilters.toArray, + new RapidsConf(sqlConf), + metrics) + + // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code + new DataSourceRDD(relation.sparkSession.sparkContext, filePartitions, + factory, supportsColumnar) + } } /** * Create an RDD for non-bucketed reads. * The bucketed variant of this function is [[createBucketedReadRDD]]. * - * @param readFile a function to read each (part of a) file. + * @param readFile an optional function to read each (part of a) file. Used when + * not using the small file optimization. * @param selectedPartitions Hive-style partition that are part of the read. * @param fsRelation [[HadoopFsRelation]] associated with the read. + * @param useSmallFileOpt whether to create an RDD using small file optimization. */ private def createNonBucketedReadRDD( - readFile: (PartitionedFile) => Iterator[InternalRow], + readFile: Option[(PartitionedFile) => Iterator[InternalRow]], selectedPartitions: Array[PartitionDirectory], - fsRelation: HadoopFsRelation): RDD[InternalRow] = { + fsRelation: HadoopFsRelation, + useSmallFileOpt: Boolean): RDD[InternalRow] = { val openCostInBytes = fsRelation.sparkSession.sessionState.conf.filesOpenCostInBytes val maxSplitBytes = FilePartition.maxSplitBytes(fsRelation.sparkSession, selectedPartitions) @@ -451,7 +494,28 @@ case class GpuFileSourceScanExec( val partitions = FilePartition.getFilePartitions(relation.sparkSession, splitFiles, maxSplitBytes) - ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile, partitions) + if (!useSmallFileOpt) { + ShimLoader.getSparkShims.getFileScanRDD(fsRelation.sparkSession, readFile.get, partitions) + } else { + // here we are making an optimization to read more then 1 file at a time on the CPU side + // if they are small files before sending it down to the GPU + val sqlConf = relation.sparkSession.sessionState.conf + val hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options) + val broadcastedHadoopConf = + relation.sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + val factory = GpuParquetMultiFilePartitionReaderFactory( + sqlConf, + broadcastedHadoopConf, + relation.dataSchema, + requiredSchema, + relation.partitionSchema, + pushedDownFilters.toArray, + new RapidsConf(sqlConf), + metrics) + + // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code + new DataSourceRDD(relation.sparkSession.sparkContext, partitions, factory, supportsColumnar) + } } // Filters unused DynamicPruningExpression expressions - one which has been replaced diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala index 3edc4570d35..22e82953c44 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/datetimeExpressions.scala @@ -271,7 +271,7 @@ abstract class UnixTimeExprMeta[A <: BinaryExpression with TimeZoneAwareExpressi } } catch { case x: TimestampFormatConversionException => - willNotWorkOnGpu(x.getMessage) + willNotWorkOnGpu(s"Failed to convert ${x.reason} ${x.getMessage()}") } } }