Skip to content

Commit

Permalink
[SPARK-1516]Throw exception in yarn client instead of run system.exi…
Browse files Browse the repository at this point in the history
…t directly.

  All the changes is in  the package of "org.apache.spark.deploy.yarn":
     1) Throw IllegalArgumentException in ClinetArguments  instead of exit directly.
     2) In Client's main method, if exception is caught, it will exit with code 1, otherwise exit with code 0.
     3) In YarnClientSchedulerBackend's start method, if IllegalArgumentException is caught, it will exit with code 1, otherwise throw that exception.
     4) Fix some message typo in the Client.scala

    After the fix, if user integrate the spark yarn client into their applications,
    when the argument is wrong or the running is finished, the application won't be terminated.
  • Loading branch information
codeboyyong committed Jun 17, 2014
1 parent 706e38f commit addcecb
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 36 deletions.
33 changes: 22 additions & 11 deletions yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
def run() {
val appId = runApp()
monitorApplication(appId)
System.exit(0)
}

def validateArgs() = {
Expand All @@ -109,7 +108,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
throw new IllegalArgumentException(args.getUsageMessage())
}
}
}
Expand All @@ -135,17 +134,19 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)

def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
logInfo("Max mem capabililty of a single resource in this cluster " + maxMem)
logInfo("Max mem capability of a single resource in this cluster " + maxMem)

// If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
val errorMessage = s"the worker size is too large to run on this cluster ${args.workerMemory}"
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
logError("AM size is to large to run on this cluster " + amMem)
System.exit(1)
val errorMessage = s"AM size is too large to run on this cluster $amMem"
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}

// We could add checks to make sure the entire cluster has enough resources but that involves
Expand Down Expand Up @@ -229,8 +230,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
System.exit(1)
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
Expand Down Expand Up @@ -475,9 +477,18 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")

val sparkConf = new SparkConf
val args = new ClientArguments(argStrings, sparkConf)

new Client(args, sparkConf).run
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}

System.exit(0)
}

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,11 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {

case Nil =>
if (userJar == null || userClass == null) {
printUsageAndExit(1)
throw new IllegalArgumentException(getUsageMessage())
}

case _ =>
printUsageAndExit(1, args)
throw new IllegalArgumentException(getUsageMessage(args))
}
}

Expand All @@ -122,11 +122,10 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
}


def printUsageAndExit(exitCode: Int, unknownParam: Any = null) {
if (unknownParam != null) {
System.err.println("Unknown/unsupported param " + unknownParam)
}
System.err.println(
def getUsageMessage(unknownParam: Any = null): String = {
val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam \n" else ""

message +
"Usage: org.apache.spark.deploy.yarn.Client [options] \n" +
"Options:\n" +
" --jar JAR_PATH Path to your application's JAR file (required)\n" +
Expand All @@ -143,8 +142,7 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) {
" --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" +
" --files files Comma separated list of files to be distributed with the job.\n" +
" --archives archives Comma separated list of archives to be distributed with the job."
)
System.exit(exitCode)

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,17 @@ private[spark] class YarnClientSchedulerBackend(
.foreach { case (optName, optParam) => addArg(optName, optParam, argsArrayBuf) }

logDebug("ClientArguments called with: " + argsArrayBuf)
val args = new ClientArguments(argsArrayBuf.toArray, conf)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
try {
val args = new ClientArguments(argsArrayBuf.toArray, conf)
client = new Client(args, conf)
appId = client.runApp()
waitForApp()
} catch {
case e: IllegalArgumentException => {
Console.err.println(e.getMessage)
System.exit(1)
}
}
}

def waitForApp() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
def run() {
val appId = runApp()
monitorApplication(appId)
System.exit(0)
}

// TODO(harvey): This could just go in ClientArguments.
Expand All @@ -130,7 +129,7 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
throw new IllegalArgumentException(args.getUsageMessage())
}
}
}
Expand Down Expand Up @@ -160,15 +159,18 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)

// If we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
logError("Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster.".
format(args.workerMemory, maxMem))
System.exit(1)
val errorMessage =
"Required worker memory (%d MB), is above the max threshold (%d MB) of this cluster."
.format(args.workerMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
if (amMem > maxMem) {
logError("Required AM memory (%d) is above the max threshold (%d) of this cluster".
format(args.amMemory, maxMem))
System.exit(1)
val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster"
.format(args.amMemory, maxMem)
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}

// We could add checks to make sure the entire cluster has enough resources but that involves
Expand Down Expand Up @@ -244,8 +246,9 @@ class Client(args: ClientArguments, conf: Configuration, sparkConf: SparkConf)
val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
System.exit(1)
val errorMessage = "Can't get Master Kerberos principal for use as renewer"
logError(errorMessage)
throw new IllegalArgumentException(errorMessage)
}
}
val dst = new Path(fs.getHomeDirectory(), appStagingDir)
Expand Down Expand Up @@ -489,9 +492,17 @@ object Client {
// see Client#setupLaunchEnv().
System.setProperty("SPARK_YARN_MODE", "true")
val sparkConf = new SparkConf()
val args = new ClientArguments(argStrings, sparkConf)
try {
val args = new ClientArguments(argStrings, sparkConf)
new Client(args, sparkConf).run()
} catch {
case e: Exception => {
Console.err.println(e.getMessage)
System.exit(1)
}
}

new Client(args, sparkConf).run()
System.exit(0)
}

// Based on code from org.apache.hadoop.mapreduce.v2.util.MRApps
Expand Down

0 comments on commit addcecb

Please sign in to comment.