From bd5d96fa7abb8d0120341ba9f8b2e78bbc810c45 Mon Sep 17 00:00:00 2001 From: Firestarman Date: Tue, 15 Mar 2022 10:43:49 +0800 Subject: [PATCH] Fix of failing to read parquet files after writing the hidden file metadata in to follow the update in Spark PR: apache/spark@50520fe3eb Signed-off-by: Firestarman --- .../shims/RapidsFileSourceMetaUtils.scala | 24 +++++++++++++++ .../shims/RapidsFileSourceMetaUtils.scala | 29 +++++++++++++++++++ .../sql/rapids/GpuFileFormatWriter.scala | 16 ++++++---- 3 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala create mode 100644 sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala diff --git a/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala new file mode 100644 index 00000000000..54553c1d464 --- /dev/null +++ b/sql-plugin/src/main/301until330-all/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.catalyst.expressions.Attribute + +object RapidsFileSourceMetaUtils { + /** Do nothing before Spark 3.3.0 */ + def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute = attr +} diff --git a/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala new file mode 100644 index 00000000000..ede437581c0 --- /dev/null +++ b/sql-plugin/src/main/330+/scala/com/nvidia/spark/rapids/shims/RapidsFileSourceMetaUtils.scala @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.shims + +import org.apache.spark.sql.catalyst.expressions.{Attribute, FileSourceMetadataAttribute} + +object RapidsFileSourceMetaUtils { + /** + * Cleanup the internal metadata information of an attribute if it is + * a 'FileSourceMetadataAttribute', it will remove both 'METADATA_COL_ATTR_KEY' and + * 'FILE_SOURCE_METADATA_COL_ATTR_KEY' from the attribute 'Metadata' + */ + def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute = + FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation(attr) +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala index c513d201799..118e43ad40b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileFormatWriter.scala @@ -21,7 +21,7 @@ import java.util.{Date, UUID} import ai.rapids.cudf.ColumnVector import com.nvidia.spark.TimingUtils import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.shims.SparkShimImpl +import com.nvidia.spark.rapids.shims.{RapidsFileSourceMetaUtils, SparkShimImpl} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ @@ -117,7 +117,11 @@ object GpuFileFormatWriter extends Logging { FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath)) val partitionSet = AttributeSet(partitionColumns) - val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains) + // cleanup the internal metadata information of + // the file source metadata attribute if any before write out when needed. + val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns + .map(RapidsFileSourceMetaUtils.cleanupFileSourceMetadataInformation)) + val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains) var needConvert = false val projectList: List[NamedExpression] = plan.output.map { @@ -153,12 +157,12 @@ object GpuFileFormatWriter extends Logging { uuid = UUID.randomUUID.toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), outputWriterFactory = outputWriterFactory, - allColumns = outputSpec.outputColumns, + allColumns = finalOutputSpec.outputColumns, dataColumns = dataColumns, partitionColumns = partitionColumns, bucketSpec = writerBucketSpec, - path = outputSpec.outputPath, - customPartitionLocations = outputSpec.customPartitionLocations, + path = finalOutputSpec.outputPath, + customPartitionLocations = finalOutputSpec.customPartitionLocations, maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong) .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) @@ -199,7 +203,7 @@ object GpuFileFormatWriter extends Logging { // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch. val orderingExpr = GpuBindReferences.bindReferences( requiredOrdering - .map(attr => SparkShimImpl.sortOrder(attr, Ascending)), outputSpec.outputColumns) + .map(attr => SparkShimImpl.sortOrder(attr, Ascending)), finalOutputSpec.outputColumns) val sortType = if (RapidsConf.STABLE_SORT.get(plan.conf)) { FullSortSingleBatch } else {