Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #SPARK-1149 Bad partitioners can cause Spark to hang #44

Closed
wants to merge 18 commits into from
Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -847,6 +847,8 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {
val partitionRange = (0 until rdd.partitions.size)
require(partitions.forall(partitionRange.contains(_)), "partition index out of range")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could just do partitions.forall(partitionRange.contains)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my code more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two questions

  1. Does the Spark guarantee that RDD has continuous partition index? (I think so, as https://spark-project.atlassian.net/browse/SPARK-911 is still there)
  2. Shall we put the check here or we want to check inside the specific APIs, (the current issue looks more like a bug in lookup() - forget to check the return value of getPartition before using it....)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can be changed
require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")
  1. Custom Partitioner causing a lot of problems
val partitioner = new Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key.hashCode() % 2
 }

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (-1, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.count

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code will return IndexOutOfRange?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at use of PartitionPruningRDD ..
On Mar 4, 2014 9:33 AM, "LiGuoqiang" notifications@github.com wrote:

In core/src/main/scala/org/apache/spark/SparkContext.scala:

@@ -847,6 +847,8 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {

  • val partitionRange = (0 until rdd.partitions.size)
  • require(partitions.forall(partitionRange.contains(_)), "partition index out of range")

Can be changed

require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")

2.

Custom Partitioner causing a lot of problems

val partitioner = new Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key.hashCode() % 2
}

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (-1, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.count

Reply to this email directly or view it on GitHubhttps://github.com//pull/44/files#r10242185
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.ArrayIndexOutOfBoundsException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even the PartitionPruningRDD ensures the continuous index space...I think so...


class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
  extends NarrowDependency[T](rdd) {

  @transient
  val partitions: Array[Partition] = rdd.partitions
    .filter(s => partitionFilterFunc(s.index)).zipWithIndex
    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

  override def getParents(partitionId: Int) = {
    List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. The code has been modified .

require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")

val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite)
Expand Down Expand Up @@ -950,6 +952,8 @@ class SparkContext(
resultHandler: (Int, U) => Unit,
resultFunc: => R): SimpleFutureAction[R] =
{
val partitionRange = (0 until rdd.partitions.size)
require(partitions.forall(partitionRange.contains(_)), "partition index out of range")
val cleanF = clean(processPartition)
val callSite = getCallSite
val waiter = dagScheduler.submitJob(
Expand Down