Skip to content

Commit

Permalink
Start working on re-implementing the resolution logic to avoid gettin…
Browse files Browse the repository at this point in the history
…g locations
  • Loading branch information
holdenk committed Jul 2, 2020
1 parent e00f43c commit bbe6344
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,34 @@ class NewHadoopRDD[K, V](
}
}

protected def getSplits(): Array[FileSplit] = {
val jobContext = new JobContextImpl(_conf, jobId)
if (!ignoreLocatity) {
inputFormat.getSplits(jobContext).asScala
} else {
inputFormat match {
case fileFormat: FileInputFormat =>
// dirs can be a mixture of dirs and files, but this matches the Hadoop impl
val dirs = fileFormat.getInputPaths(jobContext)
val filter = fileFormat.getInputPathFilter(jobContext)
def processInputPath(p: Path): Try[FileStatus] = {
val fs = p.getFileSystem(jobContext)
try {
if
Success()
} catch {
}
}
val fileStatuses = dirs.map { p =>
processInputPath(p)
}
case _ =>
throw new SparkException(
s"Input form ${inputFormat} was not a FileInputFormat but asked to skip locations")
}
}
}

override def getPartitions: Array[Partition] = {
val inputFormat = inputFormatClass.getConstructor().newInstance()
inputFormat match {
Expand All @@ -128,7 +156,7 @@ class NewHadoopRDD[K, V](
case _ =>
}
try {
val allRowSplits = inputFormat.getSplits(new JobContextImpl(_conf, jobId)).asScala
val allRowSplits = getSplits()
val rawSplits = if (ignoreEmptySplits) {
allRowSplits.filter(_.getLength > 0)
} else {
Expand Down

0 comments on commit bbe6344

Please sign in to comment.