Skip to content

Commit

Permalink
Disable write/read Parquet when Parquet field IDs are used (#4882)
Browse files Browse the repository at this point in the history
* Temporarily disable write/read parquet when schema has a specified Parquet field ID

Signed-off-by: Chong Gao <res_life@163.com>
  • Loading branch information
Chong Gao authored Mar 7, 2022
1 parent 187e584 commit d30adec
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 2 deletions.
22 changes: 21 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -768,4 +768,24 @@ def test_parquet_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enable

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(first_data_path, second_data_path, third_data_path),
conf=all_confs)
conf=all_confs)

# should fallback when trying to read with field ID
@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330')
@allow_non_gpu("FileSourceScanExec", "ColumnarToRowExec")
def test_parquet_read_field_id(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
schema = StructType([
StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}),
])
data = [(1,),(2,),(3,),]
# write parquet with field IDs
with_cpu_session(lambda spark :spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(data_path))

readSchema = StructType([
StructField("mapped_name_xxx", IntegerType(), metadata={'parquet.field.id' : 1}),
])
assert_gpu_fallback_collect(
lambda spark: spark.read.schema(readSchema).parquet(data_path),
'FileSourceScanExec',
{"spark.sql.parquet.fieldId.read.enabled": "true"}) # default is false
18 changes: 17 additions & 1 deletion integration_tests/src/main/python/parquet_write_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from data_gen import *
from marks import *
from pyspark.sql.types import *
from spark_session import with_cpu_session, with_gpu_session
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
import pyspark.sql.functions as f
import pyspark.sql.utils
import random
Expand Down Expand Up @@ -402,3 +402,19 @@ def create_empty_df(spark, path):
lambda spark, path: spark.read.parquet(path),
data_path,
conf=writer_confs)

# should fallback when trying to write field ID metadata
@pytest.mark.skipif(is_before_spark_330(), reason='Field ID is not supported before Spark 330')
@allow_non_gpu('DataWritingCommandExec')
def test_parquet_write_field_id(spark_tmp_path):
data_path = spark_tmp_path + '/PARQUET_DATA'
schema = StructType([
StructField("c1", IntegerType(), metadata={'parquet.field.id' : 1}),
])
data = [(1,),(2,),(3,),]
assert_gpu_fallback_write(
lambda spark, path: spark.createDataFrame(data, schema).coalesce(1).write.mode("overwrite").parquet(path),
lambda spark, path: spark.read.parquet(path),
data_path,
'DataWritingCommandExec',
conf = {"spark.sql.parquet.fieldId.write.enabled" : "true"}) # default is true
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,24 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.RapidsMeta
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

object ParquetFieldIdShims {
/** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */
def setupParquetFieldIdWriteConfig(conf: Configuration, sqlConf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}

def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType,
conf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}

def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = {
// Parquet field ID support configs are not supported until Spark 3.3
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@

package com.nvidia.spark.rapids.shims.v2

import com.nvidia.spark.rapids.RapidsMeta
import org.apache.hadoop.conf.Configuration

import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType

object ParquetFieldIdShims {
/** Updates the Hadoop configuration with the Parquet field ID write setting from SQLConf */
Expand All @@ -27,4 +30,19 @@ object ParquetFieldIdShims {
SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key,
sqlConf.parquetFieldIdWriteEnabled.toString)
}

def tagGpuSupportWriteForFieldId(meta: RapidsMeta[_, _, _], schema: StructType,
conf: SQLConf): Unit = {
if (conf.parquetFieldIdWriteEnabled && ParquetUtils.hasFieldIds(schema)) {
meta.willNotWorkOnGpu(
"field IDs are not supported for Parquet writes, schema is " + schema.json)
}
}

def tagGpuSupportReadForFieldId(meta: RapidsMeta[_, _, _], conf: SQLConf): Unit = {
if(conf.parquetFieldIdReadEnabled) {
meta.willNotWorkOnGpu(s"reading by Parquet field ID is not supported, " +
s"${SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key} is true")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ object GpuParquetFileFormat {
schema: StructType): Option[GpuParquetFileFormat] = {

val sqlConf = spark.sessionState.conf

ParquetFieldIdShims.tagGpuSupportWriteForFieldId(meta, schema, sqlConf)

val parquetOptions = new ParquetOptions(options, sqlConf)

if (!meta.conf.isParquetEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.nvidia.spark.RebaseHelper
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.shims.v2.ParquetFieldIdShims
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
Expand Down Expand Up @@ -143,6 +144,8 @@ object GpuParquetScanBase {
meta: RapidsMeta[_, _, _]): Unit = {
val sqlConf = sparkSession.conf

ParquetFieldIdShims.tagGpuSupportReadForFieldId(meta, sparkSession.sessionState.conf)

if (!meta.conf.isParquetEnabled) {
meta.willNotWorkOnGpu("Parquet input and output has been disabled. To enable set" +
s"${RapidsConf.ENABLE_PARQUET} to true")
Expand Down

0 comments on commit d30adec

Please sign in to comment.