Skip to content

Commit

Permalink
Use wrapped try/catch in Utils.tryOrExit
Browse files Browse the repository at this point in the history
  • Loading branch information
markhamstra committed May 12, 2014
1 parent 8fc0439 commit 3573ecd
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 25 deletions.
19 changes: 10 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils}
import org.apache.spark.util.{Utils, AkkaUtils}

/**
* Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL,
Expand Down Expand Up @@ -88,14 +88,15 @@ private[spark] class AppClient(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
tryRegisterAllMasters()
}
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils}
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* @param masterUrls Each url should look like spark://host:port.
Expand Down Expand Up @@ -166,15 +166,16 @@ private[spark] class Worker(
var retries = 0
registrationRetryTimer = Some {
context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
Utils.tryOrExit {
retries += 1
if (registered) {
registrationRetryTimer.foreach(_.cancel())
} else if (retries >= REGISTRATION_RETRIES) {
logError("All masters are unresponsive! Giving up.")
System.exit(1)
} else {
tryRegisterAllMasters()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.UncaughtExceptionHandler
import org.apache.spark.util.Utils

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand Down Expand Up @@ -140,8 +140,7 @@ private[spark] class TaskSchedulerImpl(
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
SPECULATION_INTERVAL milliseconds) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
checkSpeculatableTasks()
Utils.tryOrExit { checkSpeculatableTasks() }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,7 @@ private[spark] class BlockManager(
BlockManagerWorker.startBlockManagerWorker(this)
if (!BlockManager.getDisableHeartBeatsForTesting(conf)) {
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) {
Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler)
heartBeat()
Utils.tryOrExit { heartBeat() }
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.util
import org.apache.spark.executor.ExecutorExitCode
import org.apache.spark.Logging

object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging {
private[spark] object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging {
override def uncaughtException(thread: Thread, exception: Throwable) {
try {
logError("Uncaught exception in thread " + thread, exception)
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,18 @@ private[spark] object Utils extends Logging {
output.toString
}

/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*/
def tryOrExit(block: => Unit) {
try {
block
} catch {
case t: Throwable => UncaughtExceptionHandler.uncaughtException(Thread.currentThread, t)
}
}

/**
* A regular expression to match classes of the "core" Spark API that we want to skip when
* finding the call site of a method.
Expand Down

0 comments on commit 3573ecd

Please sign in to comment.