Skip to content

Commit

Permalink
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/spark
Browse files Browse the repository at this point in the history
…into kill

Conflicts:
	core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
  • Loading branch information
rxin committed Apr 24, 2014
2 parents 7a7bdd2 + 1d6abe3 commit 401033f
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel");
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit =
sc.setJobGroup(groupId, description, interruptOnCancel)

/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
*
* @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
* This method sets interruptOnCancel to false.
*/
def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)

Expand Down
34 changes: 32 additions & 2 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.future

import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down Expand Up @@ -130,13 +130,43 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}

test("job group with interruption") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})

// jobA is the one to be cancelled.
val jobA = future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, JobSucceeded)) }
assert(counter.count === 0)

// Starting listener bus should flush all buffered events (asynchronously, hence the sleep)
// Starting listener bus should flush all buffered events
bus.start()
Thread.sleep(1000)
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
assert(counter.count === 5)

// After listener bus has stopped, posting events should not increment counter
Expand Down Expand Up @@ -177,6 +177,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
listener.stageInfos.clear()

rdd3.count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD
Expand Down
6 changes: 3 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* @note vertex ids are unique.
* @return an RDD containing the vertices in this graph
*/
val vertices: VertexRDD[VD]
@transient val vertices: VertexRDD[VD]

/**
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
Expand All @@ -59,7 +59,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
val edges: EdgeRDD[ED]
@transient val edges: EdgeRDD[ED]

/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
Expand All @@ -77,7 +77,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}}
*/
val triplets: RDD[EdgeTriplet[VD, ED]]
@transient val triplets: RDD[EdgeTriplet[VD, ED]]

/**
* Caches the vertices and edges associated with this graph at the specified storage level.
Expand Down
10 changes: 5 additions & 5 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,28 @@ import scala.util.Random
class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Serializable {

/** The number of edges in the graph. */
lazy val numEdges: Long = graph.edges.count()
@transient lazy val numEdges: Long = graph.edges.count()

/** The number of vertices in the graph. */
lazy val numVertices: Long = graph.vertices.count()
@transient lazy val numVertices: Long = graph.vertices.count()

/**
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
@transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
@transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)

/**
* Computes the neighboring vertex degrees.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
*/
private[graphx]
class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag](
val srcIds: Array[VertexId],
val dstIds: Array[VertexId],
val data: Array[ED],
val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {
@transient val srcIds: Array[VertexId],
@transient val dstIds: Array[VertexId],
@transient val data: Array[ED],
@transient val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable {

/**
* Reverse all the edges in this partition.
Expand Down

0 comments on commit 401033f

Please sign in to comment.