diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 01ec9bd44f4..eb52ac34568 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -768,4 +768,24 @@ def test_parquet_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enable assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(first_data_path, second_data_path, third_data_path), - conf=all_confs) \ No newline at end of file + conf=all_confs) + +# should fallback when trying to read with field ID +@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330') +@allow_non_gpu("FileSourceScanExec", "ColumnarToRowExec") +def test_parquet_read_field_id(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + schema = StructType([ + StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + data = [(1,),(2,),(3,),] + # write parquet with field IDs + with_cpu_session(lambda spark :spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(data_path)) + + readSchema = StructType([ + StructField("mapped_name_xxx", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + assert_gpu_fallback_collect( + lambda spark: spark.read.schema(readSchema).parquet(data_path), + 'FileSourceScanExec', + {"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false diff --git a/integration_tests/src/main/python/parquet_write_test.py b/integration_tests/src/main/python/parquet_write_test.py index 993c0575d13..fd556830135 100644 --- a/integration_tests/src/main/python/parquet_write_test.py +++ b/integration_tests/src/main/python/parquet_write_test.py @@ -19,7 +19,7 @@ from data_gen import * from marks import * from pyspark.sql.types import * -from spark_session import with_cpu_session, with_gpu_session +from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330 import pyspark.sql.functions as f import pyspark.sql.utils import random @@ -402,3 +402,19 @@ def create_empty_df(spark, path): lambda spark, path: spark.read.parquet(path), data_path, conf=writer_confs) + +# should fallback when trying to write field ID metadata +@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330') +@allow_non_gpu('DataWritingCommandExec') +def test_parquet_write_field_id(spark_tmp_path): + data_path = spark_tmp_path + '/PARQUET_DATA' + schema = StructType([ + StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}), + ]) + data = [(1,),(2,),(3,),] + assert_gpu_fallback_write( + lambda spark, path: spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(path), + lambda spark, path: spark.read.parquet(path), + data_path, + 'DataWritingCommandExec', + conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index 9a87dce3785..3830b291961 100644 --- a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -16,13 +16,24 @@ package com.nvidia.spark.rapids.shims.v2 +import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType object ParquetFieldIdShims { /** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */ def setupParquetFieldIdWriteConfig(conf: Configuration, sqlConf: SQLConf): Unit = { // Parquet field ID support configs are not supported until Spark 3.3 } + + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType, + conf: SQLConf): Unit = { + // Parquet field ID support configs are not supported until Spark 3.3 + } + + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = { + // Parquet field ID support configs are not supported until Spark 3.3 + } } diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala index db7b25dc255..8b1e1f2297e 100644 --- a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/v2/ParquetFieldIdShims.scala @@ -16,9 +16,12 @@ package com.nvidia.spark.rapids.shims.v2 +import com.nvidia.spark.rapids.RapidsMeta import org.apache.hadoop.conf.Configuration +import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.StructType object ParquetFieldIdShims { /** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */ @@ -27,4 +30,19 @@ object ParquetFieldIdShims { SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, sqlConf.parquetFieldIdWriteEnabled.toString) } + + def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType, + conf: SQLConf): Unit = { + if (conf.parquetFieldIdWriteEnabled && ParquetUtils.hasFieldIds(schema)) { + meta.willNotWorkOnGpu( + "field IDs are not supported for Parquet writes, schema is " + schema.json) + } + } + + def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = { + if(conf.parquetFieldIdReadEnabled) { + meta.willNotWorkOnGpu(s"reading by Parquet field ID is not supported, " + + s"${SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key} is true") + } + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala index 9031da88f25..3885917edfe 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetFileFormat.scala @@ -44,6 +44,9 @@ object GpuParquetFileFormat { schema: StructType): Option[GpuParquetFileFormat] = { val sqlConf = spark.sessionState.conf + + ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema, sqlConf) + val parquetOptions = new ParquetOptions(options, sqlConf) if (!meta.conf.isParquetEnabled) { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index 2c8842267de..e629d4b0c95 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -33,6 +33,7 @@ import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.shims.v2.ParquetFieldIdShims import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} @@ -143,6 +144,8 @@ object GpuParquetScanBase { meta: RapidsMeta[_, _, _]): Unit = { val sqlConf = sparkSession.conf + ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sparkSession.sessionState.conf) + if (!meta.conf.isParquetEnabled) { meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" + s"${RapidsConf.ENABLE_PARQUET} to true")