Skip to content

Commit

Permalink
cap numPartsToTry
Browse files Browse the repository at this point in the history
  • Loading branch information
yingjieMiao committed Oct 7, 2014
1 parent c4483dc commit 692f4e6
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the first iteration, just try all partitions next.
// If we didn't find any rows after the previous iteration, quadruple and retry.
// Otherwise, interpolate the number of partitions we need to try, but overestimate it
// by 50%.
// by 50%. We also cap the estimation in the end.
if (results.size == 0) {
numPartsToTry = totalParts - 1
numPartsToTry = totalParts * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = ((1.5 * num * partsScanned / results.size).toInt - partsScanned) max 1
numPartsToTry = numPartsToTry min (totalParts * 4)
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1079,13 +1079,15 @@ abstract class RDD[T: ClassTag](
// greater than totalParts because we actually cap it at totalParts in runJob.
var numPartsToTry = 1
if (partsScanned > 0) {
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// If we didn't find any rows after the previous iteration, quadruple and retry. Otherwise,
// interpolate the number of partitions we need to try, but overestimate it by 50%.
// We also cap the estimation in the end.
if (buf.size == 0) {
numPartsToTry = partsScanned * 4
} else {
// the left side of max is >=1 whenever partsScanned >= 2
numPartsToTry = ((1.5 * num * partsScanned / buf.size).toInt - partsScanned) max 1
numPartsToTry = numPartsToTry min (partsScanned * 4)
}
}

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -1070,11 +1070,13 @@ def take(self, num):
# If we didn't find any rows after the previous iteration,
# quadruple and retry. Otherwise, interpolate the number of
# partitions we need to try, but overestimate it by 50%.
# We also cap the estimation in the end.
if len(items) == 0:
numPartsToTry = partsScanned * 4
else:
#the first paramter of max is >=1 whenever partsScanned >= 2
numPartsToTry = max(int(1.5 * num * partsScanned / len(items)) - partsScanned, 1)
numPartsToTry = min(numPartsToTry, partsScanned * 4)

left = num - len(items)

Expand Down

0 comments on commit 692f4e6

Please sign in to comment.