From 27cd945c151dccb5ac863e6bc2c4f5b2c6a6d996 Mon Sep 17 00:00:00 2001 From: Chao Sun Date: Wed, 18 Nov 2020 12:39:00 -0800 Subject: [PATCH] [SPARK-32381][CORE][SQL][FOLLOWUP] More cleanup on HadoopFSUtils ### What changes were proposed in this pull request? This PR is a follow-up of #29471 and does the following improvements for `HadoopFSUtils`: 1. Removes the extra `filterFun` from the listing API and combines it with the `filter`. 2. Removes `SerializableBlockLocation` and `SerializableFileStatus` given that `BlockLocation` and `FileStatus` are already serializable. 3. Hides the `isRootLevel` flag from the top-level API. ### Why are the changes needed? Main purpose is to simplify the logic within `HadoopFSUtils` as well as cleanup the API. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests (e.g., `FileIndexSuite`) Closes #29959 from sunchao/hadoop-fs-utils-followup. Authored-by: Chao Sun Signed-off-by: Holden Karau --- .../org/apache/spark/util/HadoopFSUtils.scala | 104 ++++-------------- .../sql/execution/command/CommandUtils.scala | 2 +- .../datasources/InMemoryFileIndex.scala | 19 ++-- 3 files changed, 31 insertions(+), 94 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala index c0a135e04bac5..a3a528cddee37 100644 --- a/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystem import org.apache.hadoop.hdfs.DistributedFileSystem import org.apache.spark._ -import org.apache.spark.annotation.Private import org.apache.spark.internal.Logging import org.apache.spark.metrics.source.HiveCatalogMetrics @@ -45,8 +44,6 @@ private[spark] object HadoopFSUtils extends Logging { * @param paths Input paths to list * @param hadoopConf Hadoop configuration * @param filter Path filter used to exclude leaf files from result - * @param isRootLevel Whether the input paths are at the root level, i.e., they are the root - * paths as opposed to nested paths encountered during recursive calls of this. * @param ignoreMissingFiles Ignore missing files that happen during recursive listing * (e.g., due to race conditions) * @param ignoreLocality Whether to fetch data locality info when listing leaf files. If false, @@ -57,11 +54,22 @@ private[spark] object HadoopFSUtils extends Logging { * @param parallelismMax The maximum parallelism for listing. If the number of input paths is * larger than this value, parallelism will be throttled to this value * to avoid generating too many tasks. - * @param filterFun Optional predicate on the leaf files. Files who failed the check will be - * excluded from the results * @return for each input path, the set of discovered files for the path */ def parallelListLeafFiles( + sc: SparkContext, + paths: Seq[Path], + hadoopConf: Configuration, + filter: PathFilter, + ignoreMissingFiles: Boolean, + ignoreLocality: Boolean, + parallelismThreshold: Int, + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { + parallelListLeafFilesInternal(sc, paths, hadoopConf, filter, isRootLevel = true, + ignoreMissingFiles, ignoreLocality, parallelismThreshold, parallelismMax) + } + + private def parallelListLeafFilesInternal( sc: SparkContext, paths: Seq[Path], hadoopConf: Configuration, @@ -70,8 +78,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, parallelismThreshold: Int, - parallelismMax: Int, - filterFun: Option[String => Boolean] = None): Seq[(Path, Seq[FileStatus])] = { + parallelismMax: Int): Seq[(Path, Seq[FileStatus])] = { // Short-circuits parallel listing when serial listing is likely to be faster. if (paths.size <= parallelismThreshold) { @@ -85,8 +92,7 @@ private[spark] object HadoopFSUtils extends Logging { ignoreLocality = ignoreLocality, isRootPath = isRootLevel, parallelismThreshold = parallelismThreshold, - parallelismMax = parallelismMax, - filterFun = filterFun) + parallelismMax = parallelismMax) (path, leafFiles) } } @@ -126,58 +132,16 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = isRootLevel, - filterFun = filterFun, parallelismThreshold = Int.MaxValue, parallelismMax = 0) (path, leafFiles) }.iterator - }.map { case (path, statuses) => - val serializableStatuses = statuses.map { status => - // Turn FileStatus into SerializableFileStatus so we can send it back to the driver - val blockLocations = status match { - case f: LocatedFileStatus => - f.getBlockLocations.map { loc => - SerializableBlockLocation( - loc.getNames, - loc.getHosts, - loc.getOffset, - loc.getLength) - } - - case _ => - Array.empty[SerializableBlockLocation] - } - - SerializableFileStatus( - status.getPath.toString, - status.getLen, - status.isDirectory, - status.getReplication, - status.getBlockSize, - status.getModificationTime, - status.getAccessTime, - blockLocations) - } - (path.toString, serializableStatuses) }.collect() } finally { sc.setJobDescription(previousJobDescription) } - // turn SerializableFileStatus back to Status - statusMap.map { case (path, serializableStatuses) => - val statuses = serializableStatuses.map { f => - val blockLocations = f.blockLocations.map { loc => - new BlockLocation(loc.names, loc.hosts, loc.offset, loc.length) - } - new LocatedFileStatus( - new FileStatus( - f.length, f.isDir, f.blockReplication, f.blockSize, f.modificationTime, - new Path(f.path)), - blockLocations) - } - (new Path(path), statuses) - } + statusMap.toSeq } // scalastyle:off argcount @@ -197,7 +161,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles: Boolean, ignoreLocality: Boolean, isRootPath: Boolean, - filterFun: Option[String => Boolean], parallelismThreshold: Int, parallelismMax: Int): Seq[FileStatus] = { @@ -245,19 +208,11 @@ private[spark] object HadoopFSUtils extends Logging { Array.empty[FileStatus] } - def doFilter(statuses: Array[FileStatus]) = filterFun match { - case Some(shouldFilterOut) => - statuses.filterNot(status => shouldFilterOut(status.getPath.getName)) - case None => - statuses - } - - val filteredStatuses = doFilter(statuses) val allLeafStatuses = { - val (dirs, topLevelFiles) = filteredStatuses.partition(_.isDirectory) + val (dirs, topLevelFiles) = statuses.partition(_.isDirectory) val nestedFiles: Seq[FileStatus] = contextOpt match { case Some(context) if dirs.size > parallelismThreshold => - parallelListLeafFiles( + parallelListLeafFilesInternal( context, dirs.map(_.getPath), hadoopConf = hadoopConf, @@ -265,7 +220,6 @@ private[spark] object HadoopFSUtils extends Logging { isRootLevel = false, ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax ).flatMap(_._2) @@ -279,7 +233,6 @@ private[spark] object HadoopFSUtils extends Logging { ignoreMissingFiles = ignoreMissingFiles, ignoreLocality = ignoreLocality, isRootPath = false, - filterFun = filterFun, parallelismThreshold = parallelismThreshold, parallelismMax = parallelismMax) } @@ -289,8 +242,7 @@ private[spark] object HadoopFSUtils extends Logging { } val missingFiles = mutable.ArrayBuffer.empty[String] - val filteredLeafStatuses = doFilter(allLeafStatuses) - val resolvedLeafStatuses = filteredLeafStatuses.flatMap { + val resolvedLeafStatuses = allLeafStatuses.flatMap { case f: LocatedFileStatus => Some(f) @@ -339,22 +291,4 @@ private[spark] object HadoopFSUtils extends Logging { resolvedLeafStatuses } // scalastyle:on argcount - - /** A serializable variant of HDFS's BlockLocation. */ - private case class SerializableBlockLocation( - names: Array[String], - hosts: Array[String], - offset: Long, - length: Long) - - /** A serializable variant of HDFS's FileStatus. */ - private case class SerializableFileStatus( - path: String, - length: Long, - isDir: Boolean, - blockReplication: Short, - blockSize: Long, - modificationTime: Long, - accessTime: Long, - blockLocations: Array[SerializableBlockLocation]) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala index 8bf7504716f79..6495463be02c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala @@ -163,7 +163,7 @@ object CommandUtils extends Logging { .getConfString("hive.exec.stagingdir", ".hive-staging") val filter = new PathFilterIgnoreNonData(stagingDir) val sizes = InMemoryFileIndex.bulkListLeafFiles(paths.flatten, - sparkSession.sessionState.newHadoopConf(), filter, sparkSession, isRootLevel = true).map { + sparkSession.sessionState.newHadoopConf(), filter, sparkSession).map { case (_, files) => files.map(_.getLen).sum } // the size is 0 where paths(i) is not defined and sizes(i) where it is defined diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index 130894e9bc025..21275951b5603 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -128,7 +128,7 @@ class InMemoryFileIndex( } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( - pathsToFetch.toSeq, hadoopConf, filter, sparkSession, isRootLevel = true) + pathsToFetch.toSeq, hadoopConf, filter, sparkSession) discovered.foreach { case (path, leafFiles) => HiveCatalogMetrics.incrementFilesDiscovered(leafFiles.size) fileStatusCache.putLeafFiles(path, leafFiles.toArray) @@ -146,20 +146,17 @@ object InMemoryFileIndex extends Logging { paths: Seq[Path], hadoopConf: Configuration, filter: PathFilter, - sparkSession: SparkSession, - isRootLevel: Boolean): Seq[(Path, Seq[FileStatus])] = { + sparkSession: SparkSession): Seq[(Path, Seq[FileStatus])] = { HadoopFSUtils.parallelListLeafFiles( sc = sparkSession.sparkContext, paths = paths, hadoopConf = hadoopConf, - filter = filter, - isRootLevel = isRootLevel, + filter = new PathFilterWrapper(filter), ignoreMissingFiles = sparkSession.sessionState.conf.ignoreMissingFiles, ignoreLocality = sparkSession.sessionState.conf.ignoreDataLocality, parallelismThreshold = sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold, - parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism, - filterFun = Some(shouldFilterOut)) - } + parallelismMax = sparkSession.sessionState.conf.parallelPartitionDiscoveryParallelism) + } /** Checks if we should filter out this path name. */ def shouldFilterOut(pathName: String): Boolean = { @@ -175,3 +172,9 @@ object InMemoryFileIndex extends Logging { exclude && !include } } + +private class PathFilterWrapper(val filter: PathFilter) extends PathFilter with Serializable { + override def accept(path: Path): Boolean = { + (filter == null || filter.accept(path)) && !InMemoryFileIndex.shouldFilterOut(path.getName) + } +}