diff --git a/docs/additional-functionality/rapids-shuffle.md b/docs/additional-functionality/rapids-shuffle.md index 850abc1aa7e..ca636bc22b9 100644 --- a/docs/additional-functionality/rapids-shuffle.md +++ b/docs/additional-functionality/rapids-shuffle.md @@ -60,7 +60,7 @@ The minimum UCX requirement for the RAPIDS Shuffle Manager is Other considerations: - Please refer to [Mellanox documentation]( - https://community.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment) + https://support.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment) on how to configure RoCE networks (lossless/lossy, QoS, and more) - We recommend that the `--without-ucx` option is passed when installing MLNX_OFED diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index 6587661fd61..3b9306d459d 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -43,6 +43,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} import org.apache.parquet.hadoop.metadata._ +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, PrimitiveType, Type, Types} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -369,8 +370,8 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte val filePath = new Path(new URI(file.filePath)) //noinspection ScalaDeprecation - val footer = ParquetFileReader.readFooter(conf, filePath, - ParquetMetadataConverter.range(file.start, file.start + file.length)) + val inputFile = HadoopInputFile.fromPath(filePath, conf) + val footer = withResource(ParquetFileReader.open(inputFile))(_.getFooter) val fileSchema = footer.getFileMetaData.getSchema val pushedFilters = if (enableParquetFilterPushDown) { val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate, diff --git a/tests/pom.xml b/tests/pom.xml index 466578d412c..e180bd14c00 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -224,6 +224,12 @@ ${spark.version} provided + + org.apache.parquet + parquet-common + ${spark.version} + provided + org.apache.parquet parquet-column diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala index 4057e367cf1..f68cc911a4f 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala @@ -16,13 +16,15 @@ package com.nvidia.spark.rapids -import java.io.File +import java.io.{File, FilenameFilter} import java.nio.charset.StandardCharsets import com.nvidia.spark.rapids.shims.SparkShimImpl +import org.apache.commons.io.filefilter.WildcardFileFilter import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext} import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.internal.io.FileCommitProtocol @@ -32,9 +34,6 @@ import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker /** * Tests for writing Parquet files with the GPU. */ -@scala.annotation.nowarn( - "msg=method readFooters in class ParquetFileReader is deprecated" -) class ParquetWriterSuite extends SparkQueryCompareTestSuite { test("file metadata") { val tempFile = File.createTempFile("stats", ".parquet") @@ -42,12 +41,13 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite { withGpuSparkSession(spark => { val df = mixedDfWithNulls(spark) df.write.mode("overwrite").parquet(tempFile.getAbsolutePath) + val filter: FilenameFilter = new WildcardFileFilter("*.parquet") + val inputFile = HadoopInputFile.fromPath( + new Path(tempFile.listFiles(filter)(0).getAbsolutePath), + spark.sparkContext.hadoopConfiguration) + val parquetMeta = withResource(ParquetFileReader.open(inputFile))(_.getFooter) - val footer = ParquetFileReader.readFooters(spark.sparkContext.hadoopConfiguration, - new Path(tempFile.getAbsolutePath)).get(0) - - val parquetMeta = footer.getParquetMetadata - val fileMeta = footer.getParquetMetadata.getFileMetaData + val fileMeta = parquetMeta.getFileMetaData val extra = fileMeta.getKeyValueMetaData assert(extra.containsKey("org.apache.spark.version")) assert(extra.containsKey("org.apache.spark.sql.parquet.row.metadata"))