From 048038ca40a20c9395ada9727d86f02f147badd0 Mon Sep 17 00:00:00 2001 From: Kuhu Shukla Date: Wed, 5 Jan 2022 08:30:16 -0600 Subject: [PATCH] Decimal128 support for Parquet (#4362) * Decimal128 support for Parquet Signed-off-by: Kuhu Shukla Co-authored-by: Kuhu Shukla --- docs/supported_ops.md | 18 +++--- integration_tests/src/main/python/data_gen.py | 1 + .../src/main/python/parquet_test.py | 60 ++++++++++-------- .../src/main/python/parquet_write_test.py | 5 +- .../nvidia/spark/rapids/GpuOverrides.scala | 8 +-- .../spark/rapids/GpuParquetScanBase.scala | 10 ++- .../test/resources/test_unsigned64.parquet | Bin 0 -> 1089 bytes .../spark/rapids/ParquetScanSuite.scala | 14 ++++ .../main/resources/supportedDataSource.csv | 2 +- 9 files changed, 72 insertions(+), 46 deletions(-) create mode 100644 tests/src/test/resources/test_unsigned64.parquet diff --git a/docs/supported_ops.md b/docs/supported_ops.md index f4685bc3aef..376eb5c0ace 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -626,7 +626,7 @@ Accelerator supports are described below. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
128bit decimal only supported for Orc
+PS
128bit decimal only supported for Orc and Parquet
NS NS NS @@ -17509,13 +17509,13 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
NS @@ -17530,13 +17530,13 @@ dates or timestamps, or for a lack of type coercion support. S PS
UTC is only supported TZ for TIMESTAMP
S -PS
max DECIMAL precision of 18
+S NS -PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
-PS
max child DECIMAL precision of 18;
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
+PS
UTC is only supported TZ for child TIMESTAMP;
unsupported child types BINARY, UDT
NS diff --git a/integration_tests/src/main/python/data_gen.py b/integration_tests/src/main/python/data_gen.py index 2d5807efafe..5c3a8e33649 100644 --- a/integration_tests/src/main/python/data_gen.py +++ b/integration_tests/src/main/python/data_gen.py @@ -949,6 +949,7 @@ def gen_scalars_for_sql(data_gen, count, seed=0, force_no_nulls=False): decimal_64_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(7, 3, nullable=False), DecimalGen(12, 2, nullable=False), DecimalGen(18, -3, nullable=False)]] decimal_128_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False), DecimalGen(36, -5, nullable=False)]] +decimal_128_no_neg_map_gens = [MapGen(key_gen=gen, value_gen=gen, nullable=False) for gen in [DecimalGen(20, 2, nullable=False), DecimalGen(36, 5, nullable=False), DecimalGen(38, 38, nullable=False)]] # Some map gens, but not all because of nesting map_gens_sample = all_basic_map_gens + [MapGen(StringGen(pattern='key_[0-9]', nullable=False), ArrayGen(string_gen), max_length=10), diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 858ec932bef..b7b36c4885e 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -74,7 +74,7 @@ def read_parquet_sql(data_path): @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): +def test_parquet_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -90,12 +90,13 @@ def test_read_round_trip(spark_tmp_path, parquet_gens, read_func, reader_confs, assert_gpu_and_cpu_are_equal_collect(read_func(data_path), conf=all_confs) + @allow_non_gpu('FileSourceScanExec') @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.parquet.read.enabled']) def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): data_gens = [string_gen, - byte_gen, short_gen, int_gen, long_gen, boolean_gen] + decimal_gens + byte_gen, short_gen, int_gen, long_gen, boolean_gen] + decimal_gens + decimal_128_gens_no_neg gen_list = [('_c' + str(i), gen) for i, gen in enumerate(data_gens)] gen = StructGen(gen_list, nullable=False) @@ -116,7 +117,7 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf): @pytest.mark.parametrize('compress', parquet_compress_options) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): +def test_parquet_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : binary_op_df(spark, long_gen).write.parquet(data_path), @@ -131,13 +132,13 @@ def test_compress_read_round_trip(spark_tmp_path, compress, v1_enabled_list, rea string_gen, date_gen, # Once https://github.com/NVIDIA/spark-rapids/issues/132 is fixed replace this with # timestamp_gen - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg @pytest.mark.parametrize('parquet_gen', parquet_pred_push_gens, ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): +def test_parquet_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' gen_list = [('a', RepeatSeqGen(parquet_gen, 100)), ('b', parquet_gen)] s0 = gen_scalar(parquet_gen, force_no_nulls=True) @@ -162,7 +163,7 @@ def test_pred_push_round_trip(spark_tmp_path, parquet_gen, read_func, v1_enabled @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/1126') -def test_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): +def test_parquet_ts_read_round_trip_nested(gen, spark_tmp_path, ts_write, ts_rebase, v1_enabled_list, reader_confs): data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( lambda spark : unary_op_df(spark, gen).write.parquet(data_path), @@ -220,11 +221,11 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, @pytest.mark.parametrize('parquet_gens', [[byte_gen, short_gen, DecimalGen(precision=7, scale=3)], decimal_gens, [ArrayGen(DecimalGen(7,2), max_length=10)], - [StructGen([['child0', DecimalGen(7, 2)]])]], ids=idfn) + [StructGen([['child0', DecimalGen(7, 2)]])], decimal_128_gens_no_neg], ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): +def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -236,14 +237,14 @@ def test_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_con parquet_gens_legacy_list = [[byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens, + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg, pytest.param([timestamp_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133')), pytest.param([date_gen], marks=pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/133'))] @pytest.mark.parametrize('parquet_gens', parquet_gens_legacy_list, ids=idfn) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs): +def test_parquet_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, reader_confs): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] data_path = spark_tmp_path + '/PARQUET_DATA' with_cpu_session( @@ -256,12 +257,12 @@ def test_read_round_trip_legacy(spark_tmp_path, parquet_gens, v1_enabled_list, r @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20' with_cpu_session( @@ -281,10 +282,11 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs): lambda spark : spark.read.parquet(data_path), conf=all_confs) + # In this we are reading the data, but only reading the key the data was partitioned by @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reader_confs): parquet_gens = [byte_gen] gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' @@ -303,7 +305,7 @@ def test_partitioned_read_just_partitions(spark_tmp_path, v1_enabled_list, reade @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen] @@ -328,12 +330,12 @@ def test_read_schema_missing_cols(spark_tmp_path, v1_enabled_list, reader_confs) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( @@ -352,12 +354,12 @@ def test_read_merge_schema(spark_tmp_path, v1_enabled_list, reader_confs): @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_confs): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators parquet_gens = [byte_gen, short_gen, int_gen, long_gen, float_gen, double_gen, string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)), - TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))] + decimal_gens + decimal_128_gens_no_neg first_gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( @@ -378,7 +380,7 @@ def test_read_merge_schema_from_conf(spark_tmp_path, v1_enabled_list, reader_con @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): +def test_parquet_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -403,7 +405,7 @@ def test_input_meta(spark_tmp_path, v1_enabled_list, reader_confs): @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('disable_conf', ['spark.rapids.sql.format.parquet.enabled', 'spark.rapids.sql.format.orc.parquet.enabled']) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_input_meta_fallback(spark_tmp_path, v1_enabled_list, reader_confs, disable_conf): +def test_parquet_input_meta_fallback(spark_tmp_path, v1_enabled_list, reader_confs, disable_conf): first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0' with_cpu_session( lambda spark : unary_op_df(spark, long_gen).write.parquet(first_data_path)) @@ -485,7 +487,7 @@ def test_small_file_memory(spark_tmp_path, v1_enabled_list): ([["struct", StructGen([["c_1", StringGen()], ["case_insensitive", LongGen()], ["c_3", ShortGen()]])]], [["stRUct", StructGen([["CASE_INSENSITIVE", LongGen()]])]]), ] - +# TODO CHECK FOR DECIMAL?? @pytest.mark.parametrize('data_gen,read_schema', _nested_pruning_schemas, ids=idfn) @pytest.mark.parametrize('reader_confs', reader_opt_confs) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) @@ -593,7 +595,7 @@ def test_disorder_read_schema(spark_tmp_table_factory, reader_confs, v1_enabled_ @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -611,7 +613,7 @@ def test_reading_from_unaligned_pages_basic_filters(spark_tmp_path, reader_confs @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -622,6 +624,7 @@ def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, en "cast(id as double) as _6", # DECIMAL128 IS NOT SUPPORTED YET "cast(id as decimal(20,0)) as _7", "cast(id as decimal(10,0)) as _7", + "cast(id as decimal(30,0)) as _8", "cast(cast(1618161925 + (id * 60 * 60 * 24) as timestamp) as date) as _9", "cast(1618161925 + id as timestamp) as _10")\ .coalesce(1)\ @@ -637,7 +640,7 @@ def test_reading_from_unaligned_pages_all_types(spark_tmp_path, reader_confs, en @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) data_path = spark_tmp_path + '/PARQUET_UNALIGNED_DATA' with_cpu_session(lambda spark : spark.range(0, 2000)\ @@ -649,9 +652,10 @@ def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, r "cast(id % 10 as double) as _6", # DECIMAL128 IS NOT SUPPORTED YET "cast(id % 10 as decimal(20,0)) as _7", "cast(id % 10 as decimal(10,0)) as _7", - "cast(id % 2 as boolean) as _8", - "cast(cast(1618161925 + ((id % 10) * 60 * 60 * 24) as timestamp) as date) as _9", - "cast(1618161925 + (id % 10) as timestamp) as _10")\ + "cast(id % 10 as decimal(20,0)) as _8", + "cast(id % 2 as boolean) as _9", + "cast(cast(1618161925 + ((id % 10) * 60 * 60 * 24) as timestamp) as date) as _10", + "cast(1618161925 + (id % 10) as timestamp) as _11")\ .coalesce(1)\ .write\ .option("parquet.page.size", "4096") @@ -665,7 +669,7 @@ def test_reading_from_unaligned_pages_all_types_dict_optimized(spark_tmp_path, r @pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn) @pytest.mark.parametrize('enable_dictionary', ["true", "false"], ids=idfn) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) -def test_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): +def test_parquet_reading_from_unaligned_pages_basic_filters_with_nulls(spark_tmp_path, reader_confs, enable_dictionary, v1_enabled_list): # insert 50 null values in [400, 450) to verify that they are skipped during processing row # range [500, 1000) against the second page of col_2 [400, 800) all_confs = copy_and_update(reader_confs, {'spark.sql.sources.useV1SourceList': v1_enabled_list}) diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 9e2b68f07f6..250c22ab1e6 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -34,7 +34,8 @@ coalesce_parquet_file_reader_conf={'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'} reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, coalesce_parquet_file_reader_conf] -parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit] +parquet_decimal_gens=[decimal_gen_default, decimal_gen_scale_precision, decimal_gen_same_scale_precision, decimal_gen_64bit, + decimal_gen_20_2, decimal_gen_36_5, decimal_gen_38_0, decimal_gen_38_10] parquet_decimal_struct_gen= StructGen([['child'+str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_decimal_gens)]) writer_confs={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED', 'spark.sql.legacy.parquet.int96RebaseModeInWrite': 'CORRECTED'} @@ -57,7 +58,7 @@ def limited_int96(): parquet_basic_map_gens = [MapGen(f(nullable=False), f()) for f in [BooleanGen, ByteGen, ShortGen, IntegerGen, LongGen, FloatGen, DoubleGen, DateGen, - limited_timestamp]] + [simple_string_to_string_map_gen] + limited_timestamp]] + [simple_string_to_string_map_gen] + decimal_128_no_neg_map_gens parquet_struct_gen = [StructGen([['child' + str(ind), sub_gen] for ind, sub_gen in enumerate(parquet_basic_gen)]), StructGen([['child0', StructGen([['child1', byte_gen]])]]), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7723c369fd4..88d7b685b2d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -824,9 +824,9 @@ object GpuOverrides extends Logging { cudfWrite = TypeSig.none, sparkSig = TypeSig.atomics)), (ParquetFormatType, FileFormatChecks( - cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + TypeSig.ARRAY + - TypeSig.MAP).nested(), - cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_64 + TypeSig.STRUCT + + cudfRead = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + + TypeSig.ARRAY + TypeSig.MAP).nested(), + cudfWrite = (TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP).nested(), sparkSig = (TypeSig.atomics + TypeSig.STRUCT + TypeSig.ARRAY + TypeSig.MAP + TypeSig.UDT).nested())), @@ -3480,7 +3480,7 @@ object GpuOverrides extends Logging { exec[DataWritingCommandExec]( "Writing data", ExecChecks((TypeSig.commonCudfTypes + TypeSig.DECIMAL_128_FULL.withPsNote( - TypeEnum.DECIMAL, "128bit decimal only supported for Orc") + + TypeEnum.DECIMAL, "128bit decimal only supported for Orc and Parquet") + TypeSig.STRUCT.withPsNote(TypeEnum.STRUCT, "Only supported for Parquet") + TypeSig.MAP.withPsNote(TypeEnum.MAP, "Only supported for Parquet") + TypeSig.ARRAY.withPsNote(TypeEnum.ARRAY, "Only supported for Parquet")).nested(), diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 0f736e946cd..47e0d794133 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -795,7 +795,8 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics field => { if (field.isPrimitive) { val t = field.getOriginalType - (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || (t == OriginalType.UINT_32) + (t == OriginalType.UINT_8) || (t == OriginalType.UINT_16) || + (t == OriginalType.UINT_32) || (t == OriginalType.UINT_64) } else { existsUnsignedType(field.asGroupType) } @@ -804,7 +805,12 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics } def needDecimalCast(cv: ColumnView, dt: DataType): Boolean = { - cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) + // UINT64 is casted to Decimal(20,0) by Spark to accommodate + // the largest possible values this type can take. Other Unsigned data types are converted to + // basic types like LongType, this is analogous to that except we spill over to large + // decimal/ints. + cv.getType.isDecimalType && !GpuColumnVector.getNonNestedRapidsType(dt).equals(cv.getType()) || + cv.getType.equals(DType.UINT64) } def needUnsignedToSignedCast(cv: ColumnView, dt: DataType): Boolean = { diff --git a/tests/src/test/resources/test_unsigned64.parquet b/tests/src/test/resources/test_unsigned64.parquet new file mode 100644 index 0000000000000000000000000000000000000000..5d828e3a2399be0bdffa40cc1f958de9a995a286 GIT binary patch literal 1089 zcmcIk&1(};5P$pjCEact(3-d1l@)vG5@;H2OiGa!I-7o=rkVynQV=AKOA<`dn52!~ zJc>t8UIfqLAK=NeM-l%PPlC?8-K|yHV+Y=wnfGRX^PAbcRG_*fg{;W+vNWZbVJssw zHmNavazd~~%2=IXq5O_SGHJ=nS&EiPnRJ&~#)$81LyMwOO=URlrl#w!x0v@$bER;K zf+Zb0HEehLo#4q~yEj@cNN$yu)nK4`L4mTASi67uH6 zdn#qHX6|+7VL`{iKdpHv9QP9Qq($*Jlz2V+3-TTQ^gQpZZr9tjIE;2hgY_x)IT?*`Y zp8q$zdAI!O(mH-NR^;}H`jX3Jd6kZZt+>$SC|z;-n`N1f>txU2>zwCcthd54k+bz( zPc!DY_B2$h_a5C02W5Ze*GLwCWLNC~cL2uVH(?8)38khw^?iUHvPuBANTVNX^vLuJ z(K#Y)Am|FZn>z@CNeX(COjX4nlZABv_G-6Z)^0rP@Ar1|chN^&jHCFC2SiOBwQ$E! z+VDOWg;%dtJNpX0v_SF7#2~DTr1k(yDc%t%<@ylfJqw)}4yZgd@|{s*`$gvjr)`=8 zLZXg`w6@0VtNr@m`C%|xZ1nq!_;WrO&m9GWVf&yrSIAw@Ejd|yL-dT^<4dUEpT{3V Cmf+_A literal 0 HcmV?d00001 diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala index 4ba3808175a..38515b90f55 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetScanSuite.scala @@ -138,4 +138,18 @@ class ParquetScanSuite extends SparkQueryCompareTestSuite { assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { frame => frame.select(col("*")) } + + /** + * Parquet file with 2 columns + * + * + */ + + testSparkResultsAreEqual("Test Parquet unsigned int: uint64", + frameFromParquet("test_unsigned64.parquet"), + // CPU version throws an exception when Spark < 3.2, so skip when Spark < 3.2. + // The exception is like "Parquet type not supported: INT32 (UINT_8)" + assumeCondition = (_ => (VersionUtils.isSpark320OrLater, "Spark version not 3.2.0+"))) { + frame => frame.select(col("*")) + } } diff --git a/tools/src/main/resources/supportedDataSource.csv b/tools/src/main/resources/supportedDataSource.csv index e003ee02d9a..8e9b8ea1186 100644 --- a/tools/src/main/resources/supportedDataSource.csv +++ b/tools/src/main/resources/supportedDataSource.csv @@ -2,5 +2,5 @@ Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING, CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,NS,NA,NS,NA,NA,NA,NA,NA ORC,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS ORC,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA -Parquet,read,S,S,S,S,S,S,S,S,PS,S,PS,NA,NS,NA,PS,PS,PS,NS +Parquet,read,S,S,S,S,S,S,S,S,PS,S,S,NA,NS,NA,PS,PS,PS,NS Parquet,write,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA