Skip to content

Commit

Permalink
Merge pull request #5027 from jlowe/fix-merge
Browse files Browse the repository at this point in the history
Fix merge of branch-22.04 to branch-22.06
  • Loading branch information
jlowe authored Mar 23, 2022
2 parents 29399d6 + 8d135b1 commit 384f0d5
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 12 deletions.
2 changes: 1 addition & 1 deletion docs/additional-functionality/rapids-shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The minimum UCX requirement for the RAPIDS Shuffle Manager is
Other considerations:

- Please refer to [Mellanox documentation](
https://community.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment)
https://support.mellanox.com/s/article/recommended-network-configuration-examples-for-roce-deployment)
on how to configure RoCE networks (lossless/lossy, QoS, and more)

- We recommend that the `--without-ucx` option is passed when installing MLNX_OFED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.hadoop.metadata._
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, PrimitiveType, Type, Types}
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName

Expand Down Expand Up @@ -369,8 +370,8 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte

val filePath = new Path(new URI(file.filePath))
//noinspection ScalaDeprecation
val footer = ParquetFileReader.readFooter(conf, filePath,
ParquetMetadataConverter.range(file.start, file.start + file.length))
val inputFile = HadoopInputFile.fromPath(filePath, conf)
val footer = withResource(ParquetFileReader.open(inputFile))(_.getFooter)
val fileSchema = footer.getFileMetaData.getSchema
val pushedFilters = if (enableParquetFilterPushDown) {
val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate,
Expand Down
6 changes: 6 additions & 0 deletions tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.nvidia.spark.rapids

import java.io.File
import java.io.{File, FilenameFilter}
import java.nio.charset.StandardCharsets

import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.apache.commons.io.filefilter.WildcardFileFilter
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.internal.io.FileCommitProtocol
Expand All @@ -32,22 +34,20 @@ import org.apache.spark.sql.rapids.BasicColumnarWriteJobStatsTracker
/**
* Tests for writing Parquet files with the GPU.
*/
@scala.annotation.nowarn(
"msg=method readFooters in class ParquetFileReader is deprecated"
)
class ParquetWriterSuite extends SparkQueryCompareTestSuite {
test("file metadata") {
val tempFile = File.createTempFile("stats", ".parquet")
try {
withGpuSparkSession(spark => {
val df = mixedDfWithNulls(spark)
df.write.mode("overwrite").parquet(tempFile.getAbsolutePath)
val filter: FilenameFilter = new WildcardFileFilter("*.parquet")
val inputFile = HadoopInputFile.fromPath(
new Path(tempFile.listFiles(filter)(0).getAbsolutePath),
spark.sparkContext.hadoopConfiguration)
val parquetMeta = withResource(ParquetFileReader.open(inputFile))(_.getFooter)

val footer = ParquetFileReader.readFooters(spark.sparkContext.hadoopConfiguration,
new Path(tempFile.getAbsolutePath)).get(0)

val parquetMeta = footer.getParquetMetadata
val fileMeta = footer.getParquetMetadata.getFileMetaData
val fileMeta = parquetMeta.getFileMetaData
val extra = fileMeta.getKeyValueMetaData
assert(extra.containsKey("org.apache.spark.version"))
assert(extra.containsKey("org.apache.spark.sql.parquet.row.metadata"))
Expand Down

0 comments on commit 384f0d5

Please sign in to comment.