Skip to content

Commit

Permalink
Check schema compatibility when building parquet readers (#5434)
Browse files Browse the repository at this point in the history
Fixes #5200 #5445

This PR is to add the schema check, referring the checking process of Spark. Converters downcasting INT32 to Byte/Short/Date are added in this PR as well. Note: This PR uses some deprecated API of parquet-mr, in order to accommodate Spark 3.1.

Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored May 12, 2022
1 parent 15ae542 commit e41a6f3
Show file tree
Hide file tree
Showing 2 changed files with 316 additions and 14 deletions.
92 changes: 90 additions & 2 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

import pytest

from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql
from asserts import assert_cpu_and_gpu_are_equal_collect_with_capture, assert_gpu_and_cpu_are_equal_collect, \
assert_gpu_fallback_collect, assert_gpu_and_cpu_are_equal_sql, assert_gpu_and_cpu_error, assert_py4j_exception
from data_gen import *
from marks import *
from pyspark.sql.types import *
Expand Down Expand Up @@ -842,4 +843,91 @@ def test_parquet_read_case_insensitivity(spark_tmp_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.parquet(data_path).select('one', 'two', 'three'),
{'spark.sql.caseSensitive': 'false'}
)
)


# test read INT32 as INT8/INT16/Date
@pytest.mark.parametrize('reader_confs', reader_opt_confs)
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
def test_parquet_int32_downcast(spark_tmp_path, reader_confs, v1_enabled_list):
data_path = spark_tmp_path + '/PARQUET_DATA'
write_schema = [("d", date_gen), ('s', short_gen), ('b', byte_gen)]
with_cpu_session(
lambda spark: gen_df(spark, write_schema).selectExpr(
"cast(d as Int) as d",
"cast(s as Int) as s",
"cast(b as Int) as b").write.parquet(data_path))

read_schema = StructType([StructField("d", DateType()),
StructField("s", ShortType()),
StructField("b", ByteType())])
conf = copy_and_update(reader_confs,
{'spark.sql.sources.useV1SourceList': v1_enabled_list})
assert_gpu_and_cpu_are_equal_collect(
lambda spark: spark.read.schema(read_schema).parquet(data_path),
conf=conf)


def test_parquet_check_schema_compatibility(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('int', int_gen), ('long', long_gen), ('dec32', decimal_gen_32bit)]
with_cpu_session(lambda spark: gen_df(spark, gen_list).coalesce(1).write.parquet(data_path))

read_int_as_long = StructType(
[StructField('long', LongType()), StructField('int', LongType())])
assert_gpu_and_cpu_error(
lambda spark: spark.read.schema(read_int_as_long).parquet(data_path).collect(),
conf={},
error_message='Parquet column cannot be converted in')

read_dec32_as_dec64 = StructType(
[StructField('int', IntegerType()), StructField('dec32', DecimalType(15, 10))])
assert_gpu_and_cpu_error(
lambda spark: spark.read.schema(read_dec32_as_dec64).parquet(data_path).collect(),
conf={},
error_message='Parquet column cannot be converted in')


# For nested types, GPU throws incompatible exception with a different message from CPU.
def test_parquet_check_schema_compatibility_nested_types(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
gen_list = [('array_long', ArrayGen(long_gen)),
('array_array_int', ArrayGen(ArrayGen(int_gen))),
('struct_float', StructGen([('f', float_gen), ('d', double_gen)])),
('struct_array_int', StructGen([('a', ArrayGen(int_gen))])),
('map', map_string_string_gen[0])]
with_cpu_session(lambda spark: gen_df(spark, gen_list).coalesce(1).write.parquet(data_path))

read_array_long_as_int = StructType([StructField('array_long', ArrayType(IntegerType()))])
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_array_long_as_int).parquet(data_path).collect()),
error_message='Parquet column cannot be converted in')

read_arr_arr_int_as_long = StructType(
[StructField('array_array_int', ArrayType(ArrayType(LongType())))])
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_arr_arr_int_as_long).parquet(data_path).collect()),
error_message='Parquet column cannot be converted in')

read_struct_flt_as_dbl = StructType([StructField(
'struct_float', StructType([StructField('f', DoubleType())]))])
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_struct_flt_as_dbl).parquet(data_path).collect()),
error_message='Parquet column cannot be converted in')

read_struct_arr_int_as_long = StructType([StructField(
'struct_array_int', StructType([StructField('a', ArrayType(LongType()))]))])
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_struct_arr_int_as_long).parquet(data_path).collect()),
error_message='Parquet column cannot be converted in')

read_map_str_str_as_str_int = StructType([StructField(
'map', MapType(StringType(), IntegerType()))])
assert_py4j_exception(
lambda: with_gpu_session(
lambda spark: spark.read.schema(read_map_str_str_as_str_int).parquet(data_path).collect()),
error_message='Parquet column cannot be converted in')
Loading

0 comments on commit e41a6f3

Please sign in to comment.