-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix #SPARK-1149 Bad partitioners can cause Spark to hang #44
Changes from 4 commits
6bb725e
3a65903
e68210a
61e5a87
3348619
1e3331e
db6ecc5
928e1e3
adc443e
3feb3a8
ac006a3
3395ee7
d0a6005
b0d5c07
e3e56aa
3dad595
8425395
3dcdcaf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -847,6 +847,8 @@ class SparkContext( | |
partitions: Seq[Int], | ||
allowLocal: Boolean, | ||
resultHandler: (Int, U) => Unit) { | ||
val rddPartitions = rdd.partitions.map(_.index) | ||
require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") | ||
val callSite = getCallSite | ||
val cleanedFunc = clean(func) | ||
logInfo("Starting job: " + callSite) | ||
|
@@ -950,6 +952,8 @@ class SparkContext( | |
resultHandler: (Int, U) => Unit, | ||
resultFunc: => R): SimpleFutureAction[R] = | ||
{ | ||
val rddPartitions = rdd.partitions.map(_.index) | ||
require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This check as written is going to have quadratic complexity. If you have 100 partitions for example, you're going to create a list of length 100 at the top and then check for all 100 partitions whether they're in that list, getting 10,000 operations. Can't you just check that all the indices in partitions are between 0 and rdd.partitions.size? I don't think RDDs can have non-contiguous partition numbers, though there might have been some stuff in the past with partition pruning that I may be misremembering. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, so it should be better.
or is this?
|
||
val cleanF = clean(processPartition) | ||
val callSite = getCallSite | ||
val waiter = dagScheduler.submitJob( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mind explaining a bit more the case where these two will not match? I'm just wondering if it make more sense to check this invariant inside of the
getPartitions
function ofShuffleRDD.scala
- but maybe there are other code paths where this could get messed up that don't go through that.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here error, is looking forward to the results.
Although the log records the error, but Spark to hang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just scanned the code, this issue (partitions does not match with rdd.partitions.map(_.index)) can only happen when you run the computation based on the correctness of partitioner
In current implementation, there are only two cases:
First is lookup, the computation is based on the correctness of getPartition()
the other case is ShuffleMapTask
I'm not sure which fix option is better, add a checking condition in SparkContext, or we have a specific checking in these two places separately
I just felt that without looking at the code, I cannot get the idea why the partitions does not match rdd.partitions (if you look at how SparkContext run the job you will get more confusion, because partitions are exactly derived from "0 until rdd.partitions.size")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not understand what you mean.
Caused by improper design Partitioner partition index out of range.
partitioner.getPartition(-1)
result -1There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@witgo so in the case you mentioned, why not put this check in the constructor of ShuffleRDD? It seems more natural to check it there rather than inside of runjob.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correctness of the partitioner is related to the input key. The current example, if key> = 0 is no problem. In the constructor can not be detected in.