Skip to content

Commit

Permalink
Address Andrew's review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
tedyu committed May 7, 2015
1 parent 0c8d47e commit 8b50d93
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
9 changes: 6 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,8 @@ abstract class RDD[T: ClassTag](
def mapPartitionsWithContext[U: ClassTag](
f: (TaskContext, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter)
val cleanedF = sc.clean(f)
val func = (context: TaskContext, index: Int, iter: Iterator[T]) => cleanF(context, iter)
new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning)
}

Expand Down Expand Up @@ -789,8 +790,10 @@ abstract class RDD[T: ClassTag](
@deprecated("use mapPartitionsWithIndex and filter", "1.0.0")
def filterWith[A](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = withScope {
mapPartitionsWithIndex((index, iter) => {
val a = constructA(index)
iter.filter(t => p(t, a))
val cleanP = sc.clean(p)
val cleanA = sc.clean(constructA)
val a = cleanA(index)
iter.filter(t => cleanP(t, a))
}, preservesPartitioning = true)
}

Expand Down
25 changes: 14 additions & 11 deletions core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,10 @@ class ClosureCleanerSuite extends FunSuite {
expectCorrectException { TestUserClosuresActuallyCleaned.testKeyBy(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitions(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithIndex(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapPartitionsWithContext(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFlatMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testFilterWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testForEachWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testMapWith(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions2(rdd) }
expectCorrectException { TestUserClosuresActuallyCleaned.testZipPartitions3(rdd) }
Expand Down Expand Up @@ -263,19 +266,19 @@ private object TestUserClosuresActuallyCleaned {
rdd.mapPartitionsWithIndex { (_, it) => return; it }.count()
}
def testFlatMapWith(rdd: RDD[Int]): Unit = {
import java.util.Random
val randoms = rdd.flatMapWith(
(index: Int) => new Random(index + 42))
{(t: Int, prng: Random) =>
val random = prng.nextDouble()
Seq(random * t, random * t * 10)}.
count()
rdd.flatMapWith { (_, it) => return; it }.count()
}
def testMapWith(rdd: RDD[Int]): Unit = {
import java.util.Random
val randoms = rdd.mapWith(
(index: Int) => new Random(index + 42))
{(t: Int, prng: Random) => prng.nextDouble * t}.count()
rdd.mapWith { (_, it) => return; it }.count()
}
def testFilterWith(rdd: RDD[Int]): Unit = {
rdd.filterWith { (_, it) => return; it }.count()
}
def testForEachWith(rdd: RDD[Int]): Unit = {
rdd.foreachWith { (_, it) => return; it }.count()
}
def testMapPartitionsWithContext(rdd: RDD[Int]): Unit = {
rdd.mapPartitionsWithContext { (_, it) => return; it }.count()
}
def testZipPartitions2(rdd: RDD[Int]): Unit = {
rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count()
Expand Down

0 comments on commit 8b50d93

Please sign in to comment.