From 12979160d68e1dfd6b80a3fc0443e36e72a60a14 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 23 Mar 2022 11:33:14 -0700 Subject: [PATCH 1/3] Replace ParquetFileReader.readFooter with open() and getFooter [databricks] (#4976) * replace readFooter with open() and getFootter Signed-off-by: Raza Jafri * add dependency for db failure Signed-off-by: Raza Jafri * close the file Signed-off-by: Raza Jafri * addressed review comments Signed-off-by: Raza Jafri Co-authored-by: Raza Jafri --- .../spark/rapids/GpuParquetScanBase.scala | 5 +++-- tests/pom.xml | 6 ++++++ .../spark/rapids/ParquetWriterSuite.scala | 18 +++++++++--------- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala index a43ac6e19d2..b9e5ae5c1ee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScanBase.scala @@ -43,6 +43,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} import org.apache.parquet.hadoop.metadata._ +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, PrimitiveType, Type, Types} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -340,8 +341,8 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte val filePath = new Path(new URI(file.filePath)) //noinspection ScalaDeprecation - val footer = ParquetFileReader.readFooter(conf, filePath, - ParquetMetadataConverter.range(file.start, file.start + file.length)) + val inputFile = HadoopInputFile.fromPath(filePath, conf) + val footer = withResource(ParquetFileReader.open(inputFile))(_.getFooter) val fileSchema = footer.getFileMetaData.getSchema val pushedFilters = if (enableParquetFilterPushDown) { val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate, diff --git a/tests/pom.xml b/tests/pom.xml index 466578d412c..e180bd14c00 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -224,6 +224,12 @@ ${spark.version} provided + + org.apache.parquet + parquet-common + ${spark.version} + provided + org.apache.parquet parquet-column diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala index 4057e367cf1..f68cc911a4f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala @@ -16,13 +16,15 @@ package com.nvidia.spark.rapids -import java.io.File +import java.io.{File, FilenameFilter} import java.nio.charset.StandardCharsets import com.nvidia.spark.rapids.shims.SparkShimImpl +import org.apache.commons.io.filefilter.WildcardFileFilter import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.io.FileCommitProtocol @@ -32,9 +34,6 @@ import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker /** * Tests for writing Parquet files with the GPU. */ -@scala.annotation.nowarn( - "msg=method readFooters in class ParquetFileReader is deprecated" -) class ParquetWriterSuite extends SparkQueryCompareTestSuite { test("file metadata") { val tempFile = File.createTempFile("stats", ".parquet") @@ -42,12 +41,13 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite { withGpuSparkSession(spark => { val df = mixedDfWithNulls(spark) df.write.mode("overwrite").parquet(tempFile.getAbsolutePath) + val filter: FilenameFilter = new WildcardFileFilter("*.parquet") + val inputFile = HadoopInputFile.fromPath( + new Path(tempFile.listFiles(filter)(0).getAbsolutePath), + spark.sparkContext.hadoopConfiguration) + val parquetMeta = withResource(ParquetFileReader.open(inputFile))(_.getFooter) - val footer = ParquetFileReader.readFooters(spark.sparkContext.hadoopConfiguration, - new Path(tempFile.getAbsolutePath)).get(0) - - val parquetMeta = footer.getParquetMetadata - val fileMeta = footer.getParquetMetadata.getFileMetaData + val fileMeta = parquetMeta.getFileMetaData val extra = fileMeta.getKeyValueMetaData assert(extra.containsKey("org.apache.spark.version")) assert(extra.containsKey("org.apache.spark.sql.parquet.row.metadata")) From fa8d16156137a410234e688b85e250e3a3953590 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 23 Mar 2022 13:41:38 -0500 Subject: [PATCH 2/3] Fix unused imports Signed-off-by: Jason Lowe --- .../scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala index 0725ad32917..b07a544033a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuTransitionOverrides.scala @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids import scala.annotation.tailrec -import com.nvidia.spark.rapids.shims.{GpuOrcScan, GpuParquetScan, SparkShimImpl} +import com.nvidia.spark.rapids.shims.SparkShimImpl import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, AttributeReference, Expression, InputFileBlockLength, InputFileBlockStart, InputFileName, SortOrder} import org.apache.spark.sql.catalyst.rules.Rule From 1d3b8362f298046dcae0001c1bf3a54f37329739 Mon Sep 17 00:00:00 2001 From: Alessandro Bellina Date: Wed, 23 Mar 2022 13:57:02 -0500 Subject: [PATCH 3/3] Update RoCE doc URL (#5026) Signed-off-by: Alessandro Bellina --- docs/additional-functionality/rapids-shuffle.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/additional-functionality/rapids-shuffle.md b/docs/additional-functionality/rapids-shuffle.md index 850abc1aa7e..ca636bc22b9 100644 --- a/docs/additional-functionality/rapids-shuffle.md +++ b/docs/additional-functionality/rapids-shuffle.md @@ -60,7 +60,7 @@ The minimum UCX requirement for the RAPIDS Shuffle Manager is Other considerations: - Please refer to [Mellanox documentation]( - https://community.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment) + https://support.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment) on how to configure RoCE networks (lossless/lossy, QoS, and more) - We recommend that the `--without-ucx` option is passed when installing MLNX_OFED