Skip to content

Commit

Permalink
Move cleaning outside of mapPartitionsWithIndex
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu committed May 8, 2015
1 parent 56d7c92 commit f83d445
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 5 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -789,11 +789,11 @@ abstract class RDD[T: ClassTag](
*/
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
val cleanP = sc.clean(p)
val cleanA = sc.clean(constructA)
mapPartitionsWithIndex((index, iter) => {
/* val cleanP = sc.clean(p)
val cleanA = sc.clean(constructA) */
val a = constructA(index)
iter.filter(t => p(t, a))
val a = cleanA(index)
iter.filter(t => cleanP(t, a))
}, preservesPartitioning = true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class ClosureCleanerSuite extends FunSuite {
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) }
// expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }
Expand Down

0 comments on commit f83d445

Please sign in to comment.