From d5ae953c3045ef8e8e69f0f79eab2a063b8c2868 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 6 Nov 2013 23:22:47 -0800 Subject: [PATCH] Merge pull request #23 from jerryshao/multi-user Add Spark multi-user support for standalone mode and Mesos This PR add multi-user support for Spark both standalone mode and Mesos (coarse and fine grained ) mode, user can specify the user name who submit app through environment variable `SPARK_USER` or use default one. Executor will communicate with Hadoop using specified user name. Also I fixed one bug in JobLogger when different user wrote job log to specified folder which has no right file permission. I separate previous [PR750](https://github.com/mesos/spark/pull/750) into two PRs, in this PR I only solve multi-user support problem. I will try to solve security auth problem in subsequent PR because security auth is a complicated problem especially for Shark Server like long-run app (both Kerberos TGT and HDFS delegation token should be renewed or re-created through app's run time). (cherry picked from commit be7e8da98ad04d66b61cd7fc8af7ae61a649d71c) Signed-off-by: Reynold Xin --- .../scala/org/apache/spark/SparkContext.scala | 10 + .../apache/spark/deploy/SparkHadoopUtil.scala | 18 +- .../org/apache/spark/executor/Executor.scala | 7 +- .../apache/spark/scheduler/JobLogger.scala | 757 +++++++++--------- .../spark/scheduler/JobLoggerSuite.scala | 4 +- 5 files changed, 417 insertions(+), 379 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1e706286f7e2e..512daf30cc3c5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -145,6 +145,14 @@ class SparkContext( executorEnvs ++= environment } + // Set SPARK_USER for user who is running SparkContext. + val sparkUser = Option { + Option(System.getProperty("user.name")).getOrElse(System.getenv("SPARK_USER")) + }.getOrElse { + SparkContext.SPARK_UNKNOWN_USER + } + executorEnvs("SPARK_USER") = sparkUser + // Create and start the scheduler private[spark] var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format @@ -984,6 +992,8 @@ object SparkContext { private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id" + private[spark] val SPARK_UNKNOWN_USER = "" + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 6bc846aa92eb4..c29a30184af13 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -17,8 +17,11 @@ package org.apache.spark.deploy +import java.security.PrivilegedExceptionAction + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.SparkException @@ -27,6 +30,15 @@ import org.apache.spark.SparkException */ private[spark] class SparkHadoopUtil { + val conf = newConfiguration() + UserGroupInformation.setConfiguration(conf) + + def runAsUser(user: String)(func: () => Unit) { + val ugi = UserGroupInformation.createRemoteUser(user) + ugi.doAs(new PrivilegedExceptionAction[Unit] { + def run: Unit = func() + }) + } /** * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop @@ -42,9 +54,9 @@ class SparkHadoopUtil { def isYarnMode(): Boolean = { false } } - + object SparkHadoopUtil { - private val hadoop = { + private val hadoop = { val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE"))) if (yarnMode) { try { @@ -56,7 +68,7 @@ object SparkHadoopUtil { new SparkHadoopUtil } } - + def get: SparkHadoopUtil = { hadoop } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index b773346df305d..5c9bb9db1ce9e 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -25,8 +25,9 @@ import java.util.concurrent._ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap -import org.apache.spark.scheduler._ import org.apache.spark._ +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.util.Utils @@ -129,6 +130,8 @@ private[spark] class Executor( // Maintains the list of running tasks. private val runningTasks = new ConcurrentHashMap[Long, TaskRunner] + val sparkUser = Option(System.getenv("SPARK_USER")).getOrElse(SparkContext.SPARK_UNKNOWN_USER) + def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) @@ -176,7 +179,7 @@ private[spark] class Executor( } } - override def run() { + override def run(): Unit = SparkHadoopUtil.get.runAsUser(sparkUser) { () => val startTime = System.currentTimeMillis() SparkEnv.set(env) Thread.currentThread.setContextClassLoader(replClassLoader) diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala index 94f8b0128502c..60927831a159a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala @@ -1,373 +1,384 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.scheduler - -import java.io.PrintWriter -import java.io.File -import java.io.FileNotFoundException -import java.text.SimpleDateFormat -import java.util.{Date, Properties} -import java.util.concurrent.LinkedBlockingQueue - -import scala.collection.mutable.{HashMap, HashSet, ListBuffer} - -import org.apache.spark._ -import org.apache.spark.rdd.RDD -import org.apache.spark.executor.TaskMetrics -import org.apache.spark.storage.StorageLevel - -/** - * A logger class to record runtime information for jobs in Spark. This class outputs one log file - * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. - * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext - * after the SparkContext is created. - * Note that each JobLogger only works for one SparkContext - * @param logDirName The base directory for the log files. - */ -class JobLogger(val logDirName: String) extends SparkListener with Logging { - - private val logDir = Option(System.getenv("SPARK_LOG_DIR")).getOrElse("/tmp/spark") - - private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] - private val stageIDToJobID = new HashMap[Int, Int] - private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]] - private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents] - - createLogDir() - def this() = this(String.valueOf(System.currentTimeMillis())) - - // The following 5 functions are used only in testing. - private[scheduler] def getLogDir = logDir - private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter - private[scheduler] def getStageIDToJobID = stageIDToJobID - private[scheduler] def getJobIDToStages = jobIDToStages - private[scheduler] def getEventQueue = eventQueue - - /** Create a folder for log files, the folder's name is the creation time of jobLogger */ - protected def createLogDir() { - val dir = new File(logDir + "/" + logDirName + "/") - if (!dir.exists() && !dir.mkdirs()) { - logError("Error creating log directory: " + logDir + "/" + logDirName + "/") - } - } - - /** - * Create a log file for one job - * @param jobID ID of the job - * @exception FileNotFoundException Fail to create log file - */ - protected def createLogWriter(jobID: Int) { - try { - val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) - jobIDToPrintWriter += (jobID -> fileWriter) - } catch { - case e: FileNotFoundException => e.printStackTrace() - } - } - - /** - * Close log file, and clean the stage relationship in stageIDToJobID - * @param jobID ID of the job - */ - protected def closeLogWriter(jobID: Int) { - jobIDToPrintWriter.get(jobID).foreach { fileWriter => - fileWriter.close() - jobIDToStages.get(jobID).foreach(_.foreach{ stage => - stageIDToJobID -= stage.id - }) - jobIDToPrintWriter -= jobID - jobIDToStages -= jobID - } - } - - /** - * Write info into log file - * @param jobID ID of the job - * @param info Info to be recorded - * @param withTime Controls whether to record time stamp before the info, default is true - */ - protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { - var writeInfo = info - if (withTime) { - val date = new Date(System.currentTimeMillis()) - writeInfo = DATE_FORMAT.format(date) + ": " +info - } - jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) - } - - /** - * Write info into log file - * @param stageID ID of the stage - * @param info Info to be recorded - * @param withTime Controls whether to record time stamp before the info, default is true - */ - protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { - stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) - } - - /** - * Build stage dependency for a job - * @param jobID ID of the job - * @param stage Root stage of the job - */ - protected def buildJobDep(jobID: Int, stage: Stage) { - if (stage.jobId == jobID) { - jobIDToStages.get(jobID) match { - case Some(stageList) => stageList += stage - case None => val stageList = new ListBuffer[Stage] - stageList += stage - jobIDToStages += (jobID -> stageList) - } - stageIDToJobID += (stage.id -> jobID) - stage.parents.foreach(buildJobDep(jobID, _)) - } - } - - /** - * Record stage dependency and RDD dependency for a stage - * @param jobID Job ID of the stage - */ - protected def recordStageDep(jobID: Int) { - def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { - var rddList = new ListBuffer[RDD[_]] - rddList += rdd - rdd.dependencies.foreach { - case shufDep: ShuffleDependency[_, _] => - case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd) - } - rddList - } - jobIDToStages.get(jobID).foreach {_.foreach { stage => - var depRddDesc: String = "" - getRddsInStage(stage.rdd).foreach { rdd => - depRddDesc += rdd.id + "," - } - var depStageDesc: String = "" - stage.parents.foreach { stage => - depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" - } - jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + - depRddDesc.substring(0, depRddDesc.length - 1) + ")" + - " STAGE_DEP=" + depStageDesc, false) - } - } - } - - /** - * Generate indents and convert to String - * @param indent Number of indents - * @return string of indents - */ - protected def indentString(indent: Int): String = { - val sb = new StringBuilder() - for (i <- 1 to indent) { - sb.append(" ") - } - sb.toString() - } - - /** - * Get RDD's name - * @param rdd Input RDD - * @return String of RDD's name - */ - protected def getRddName(rdd: RDD[_]): String = { - var rddName = rdd.getClass.getSimpleName - if (rdd.name != null) { - rddName = rdd.name - } - rddName - } - - /** - * Record RDD dependency graph in a stage - * @param jobID Job ID of the stage - * @param rdd Root RDD of the stage - * @param indent Indent number before info - */ - protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { - val rddInfo = - if (rdd.getStorageLevel != StorageLevel.NONE) { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + - rdd.origin + " " + rdd.generator - } else { - "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + - rdd.origin + " " + rdd.generator - } - jobLogInfo(jobID, indentString(indent) + rddInfo, false) - rdd.dependencies.foreach { - case shufDep: ShuffleDependency[_, _] => - val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId - jobLogInfo(jobID, indentString(indent + 1) + depInfo, false) - case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1) - } - } - - /** - * Record stage dependency graph of a job - * @param jobID Job ID of the stage - * @param stage Root stage of the job - * @param indent Indent number before info, default is 0 - */ - protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) { - val stageInfo = if (stage.isShuffleMap) { - "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId - } else { - "STAGE_ID=" + stage.id + " RESULT_STAGE" - } - if (stage.jobId == jobID) { - jobLogInfo(jobID, indentString(indent) + stageInfo, false) - if (!idSet.contains(stage.id)) { - idSet += stage.id - recordRddInStageGraph(jobID, stage.rdd, indent) - stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2)) - } - } else { - jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false) - } - } - - /** - * Record task metrics into job log files, including execution info and shuffle metrics - * @param stageID Stage ID of the task - * @param status Status info of the task - * @param taskInfo Task description info - * @param taskMetrics Task running metrics - */ - protected def recordTaskMetrics(stageID: Int, status: String, - taskInfo: TaskInfo, taskMetrics: TaskMetrics) { - val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + - " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + - " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname - val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime - val readMetrics = taskMetrics.shuffleReadMetrics match { - case Some(metrics) => - " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + - " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + - " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + - " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + - " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + - " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + - " REMOTE_BYTES_READ=" + metrics.remoteBytesRead - case None => "" - } - val writeMetrics = taskMetrics.shuffleWriteMetrics match { - case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten - case None => "" - } - stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics) - } - - /** - * When stage is submitted, record stage submit info - * @param stageSubmitted Stage submitted event - */ - override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { - stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( - stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks)) - } - - /** - * When stage is completed, record stage completion status - * @param stageCompleted Stage completed event - */ - override def onStageCompleted(stageCompleted: StageCompleted) { - stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( - stageCompleted.stage.stageId)) - } - - override def onTaskStart(taskStart: SparkListenerTaskStart) { } - - /** - * When task ends, record task completion status and metrics - * @param taskEnd Task end event - */ - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - val task = taskEnd.task - val taskInfo = taskEnd.taskInfo - var taskStatus = "" - task match { - case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" - case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" - } - taskEnd.reason match { - case Success => taskStatus += " STATUS=SUCCESS" - recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics) - case Resubmitted => - taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + - " STAGE_ID=" + task.stageId - stageLogInfo(task.stageId, taskStatus) - case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => - taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + - task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + - mapId + " REDUCE_ID=" + reduceId - stageLogInfo(task.stageId, taskStatus) - case OtherFailure(message) => - taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + - " STAGE_ID=" + task.stageId + " INFO=" + message - stageLogInfo(task.stageId, taskStatus) - case _ => - } - } - - /** - * When job ends, recording job completion status and close log file - * @param jobEnd Job end event - */ - override def onJobEnd(jobEnd: SparkListenerJobEnd) { - val job = jobEnd.job - var info = "JOB_ID=" + job.jobId - jobEnd.jobResult match { - case JobSucceeded => info += " STATUS=SUCCESS" - case JobFailed(exception, _) => - info += " STATUS=FAILED REASON=" - exception.getMessage.split("\\s+").foreach(info += _ + "_") - case _ => - } - jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase) - closeLogWriter(job.jobId) - } - - /** - * Record job properties into job log file - * @param jobID ID of the job - * @param properties Properties of the job - */ - protected def recordJobProperties(jobID: Int, properties: Properties) { - if(properties != null) { - val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") - jobLogInfo(jobID, description, false) - } - } - - /** - * When job starts, record job property and stage graph - * @param jobStart Job start event - */ - override def onJobStart(jobStart: SparkListenerJobStart) { - val job = jobStart.job - val properties = jobStart.properties - createLogWriter(job.jobId) - recordJobProperties(job.jobId, properties) - buildJobDep(job.jobId, job.finalStage) - recordStageDep(job.jobId) - recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int]) - jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED") - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.io.{IOException, File, FileNotFoundException, PrintWriter} +import java.text.SimpleDateFormat +import java.util.{Date, Properties} +import java.util.concurrent.LinkedBlockingQueue + +import scala.collection.mutable.{HashMap, HashSet, ListBuffer} + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.executor.TaskMetrics +import org.apache.spark.storage.StorageLevel + +/** + * A logger class to record runtime information for jobs in Spark. This class outputs one log file + * for each Spark job, containing RDD graph, tasks start/stop, shuffle information. + * JobLogger is a subclass of SparkListener, use addSparkListener to add JobLogger to a SparkContext + * after the SparkContext is created. + * Note that each JobLogger only works for one SparkContext + * @param logDirName The base directory for the log files. + */ + +class JobLogger(val user: String, val logDirName: String) + extends SparkListener with Logging { + + def this() = this(System.getProperty("user.name", ""), + String.valueOf(System.currentTimeMillis())) + + private val logDir = + if (System.getenv("SPARK_LOG_DIR") != null) + System.getenv("SPARK_LOG_DIR") + else + "/tmp/spark-%s".format(user) + + private val jobIDToPrintWriter = new HashMap[Int, PrintWriter] + private val stageIDToJobID = new HashMap[Int, Int] + private val jobIDToStages = new HashMap[Int, ListBuffer[Stage]] + private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents] + + createLogDir() + + // The following 5 functions are used only in testing. + private[scheduler] def getLogDir = logDir + private[scheduler] def getJobIDtoPrintWriter = jobIDToPrintWriter + private[scheduler] def getStageIDToJobID = stageIDToJobID + private[scheduler] def getJobIDToStages = jobIDToStages + private[scheduler] def getEventQueue = eventQueue + + /** Create a folder for log files, the folder's name is the creation time of jobLogger */ + protected def createLogDir() { + val dir = new File(logDir + "/" + logDirName + "/") + if (dir.exists()) { + return + } + if (dir.mkdirs() == false) { + // JobLogger should throw a exception rather than continue to construct this object. + throw new IOException("create log directory error:" + logDir + "/" + logDirName + "/") + } + } + + /** + * Create a log file for one job + * @param jobID ID of the job + * @exception FileNotFoundException Fail to create log file + */ + protected def createLogWriter(jobID: Int) { + try { + val fileWriter = new PrintWriter(logDir + "/" + logDirName + "/" + jobID) + jobIDToPrintWriter += (jobID -> fileWriter) + } catch { + case e: FileNotFoundException => e.printStackTrace() + } + } + + /** + * Close log file, and clean the stage relationship in stageIDToJobID + * @param jobID ID of the job + */ + protected def closeLogWriter(jobID: Int) { + jobIDToPrintWriter.get(jobID).foreach { fileWriter => + fileWriter.close() + jobIDToStages.get(jobID).foreach(_.foreach{ stage => + stageIDToJobID -= stage.id + }) + jobIDToPrintWriter -= jobID + jobIDToStages -= jobID + } + } + + /** + * Write info into log file + * @param jobID ID of the job + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + */ + protected def jobLogInfo(jobID: Int, info: String, withTime: Boolean = true) { + var writeInfo = info + if (withTime) { + val date = new Date(System.currentTimeMillis()) + writeInfo = DATE_FORMAT.format(date) + ": " +info + } + jobIDToPrintWriter.get(jobID).foreach(_.println(writeInfo)) + } + + /** + * Write info into log file + * @param stageID ID of the stage + * @param info Info to be recorded + * @param withTime Controls whether to record time stamp before the info, default is true + */ + protected def stageLogInfo(stageID: Int, info: String, withTime: Boolean = true) { + stageIDToJobID.get(stageID).foreach(jobID => jobLogInfo(jobID, info, withTime)) + } + + /** + * Build stage dependency for a job + * @param jobID ID of the job + * @param stage Root stage of the job + */ + protected def buildJobDep(jobID: Int, stage: Stage) { + if (stage.jobId == jobID) { + jobIDToStages.get(jobID) match { + case Some(stageList) => stageList += stage + case None => val stageList = new ListBuffer[Stage] + stageList += stage + jobIDToStages += (jobID -> stageList) + } + stageIDToJobID += (stage.id -> jobID) + stage.parents.foreach(buildJobDep(jobID, _)) + } + } + + /** + * Record stage dependency and RDD dependency for a stage + * @param jobID Job ID of the stage + */ + protected def recordStageDep(jobID: Int) { + def getRddsInStage(rdd: RDD[_]): ListBuffer[RDD[_]] = { + var rddList = new ListBuffer[RDD[_]] + rddList += rdd + rdd.dependencies.foreach { + case shufDep: ShuffleDependency[_, _] => + case dep: Dependency[_] => rddList ++= getRddsInStage(dep.rdd) + } + rddList + } + jobIDToStages.get(jobID).foreach {_.foreach { stage => + var depRddDesc: String = "" + getRddsInStage(stage.rdd).foreach { rdd => + depRddDesc += rdd.id + "," + } + var depStageDesc: String = "" + stage.parents.foreach { stage => + depStageDesc += "(" + stage.id + "," + stage.shuffleDep.get.shuffleId + ")" + } + jobLogInfo(jobID, "STAGE_ID=" + stage.id + " RDD_DEP=(" + + depRddDesc.substring(0, depRddDesc.length - 1) + ")" + + " STAGE_DEP=" + depStageDesc, false) + } + } + } + + /** + * Generate indents and convert to String + * @param indent Number of indents + * @return string of indents + */ + protected def indentString(indent: Int): String = { + val sb = new StringBuilder() + for (i <- 1 to indent) { + sb.append(" ") + } + sb.toString() + } + + /** + * Get RDD's name + * @param rdd Input RDD + * @return String of RDD's name + */ + protected def getRddName(rdd: RDD[_]): String = { + var rddName = rdd.getClass.getSimpleName + if (rdd.name != null) { + rddName = rdd.name + } + rddName + } + + /** + * Record RDD dependency graph in a stage + * @param jobID Job ID of the stage + * @param rdd Root RDD of the stage + * @param indent Indent number before info + */ + protected def recordRddInStageGraph(jobID: Int, rdd: RDD[_], indent: Int) { + val rddInfo = + if (rdd.getStorageLevel != StorageLevel.NONE) { + "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " CACHED" + " " + + rdd.origin + " " + rdd.generator + } else { + "RDD_ID=" + rdd.id + " " + getRddName(rdd) + " NONE" + " " + + rdd.origin + " " + rdd.generator + } + jobLogInfo(jobID, indentString(indent) + rddInfo, false) + rdd.dependencies.foreach { + case shufDep: ShuffleDependency[_, _] => + val depInfo = "SHUFFLE_ID=" + shufDep.shuffleId + jobLogInfo(jobID, indentString(indent + 1) + depInfo, false) + case dep: Dependency[_] => recordRddInStageGraph(jobID, dep.rdd, indent + 1) + } + } + + /** + * Record stage dependency graph of a job + * @param jobID Job ID of the stage + * @param stage Root stage of the job + * @param indent Indent number before info, default is 0 + */ + protected def recordStageDepGraph(jobID: Int, stage: Stage, idSet: HashSet[Int], indent: Int = 0) { + val stageInfo = if (stage.isShuffleMap) { + "STAGE_ID=" + stage.id + " MAP_STAGE SHUFFLE_ID=" + stage.shuffleDep.get.shuffleId + } else { + "STAGE_ID=" + stage.id + " RESULT_STAGE" + } + if (stage.jobId == jobID) { + jobLogInfo(jobID, indentString(indent) + stageInfo, false) + if (!idSet.contains(stage.id)) { + idSet += stage.id + recordRddInStageGraph(jobID, stage.rdd, indent) + stage.parents.foreach(recordStageDepGraph(jobID, _, idSet, indent + 2)) + } + } else { + jobLogInfo(jobID, indentString(indent) + stageInfo + " JOB_ID=" + stage.jobId, false) + } + } + + /** + * Record task metrics into job log files, including execution info and shuffle metrics + * @param stageID Stage ID of the task + * @param status Status info of the task + * @param taskInfo Task description info + * @param taskMetrics Task running metrics + */ + protected def recordTaskMetrics(stageID: Int, status: String, + taskInfo: TaskInfo, taskMetrics: TaskMetrics) { + val info = " TID=" + taskInfo.taskId + " STAGE_ID=" + stageID + + " START_TIME=" + taskInfo.launchTime + " FINISH_TIME=" + taskInfo.finishTime + + " EXECUTOR_ID=" + taskInfo.executorId + " HOST=" + taskMetrics.hostname + val executorRunTime = " EXECUTOR_RUN_TIME=" + taskMetrics.executorRunTime + val readMetrics = taskMetrics.shuffleReadMetrics match { + case Some(metrics) => + " SHUFFLE_FINISH_TIME=" + metrics.shuffleFinishTime + + " BLOCK_FETCHED_TOTAL=" + metrics.totalBlocksFetched + + " BLOCK_FETCHED_LOCAL=" + metrics.localBlocksFetched + + " BLOCK_FETCHED_REMOTE=" + metrics.remoteBlocksFetched + + " REMOTE_FETCH_WAIT_TIME=" + metrics.fetchWaitTime + + " REMOTE_FETCH_TIME=" + metrics.remoteFetchTime + + " REMOTE_BYTES_READ=" + metrics.remoteBytesRead + case None => "" + } + val writeMetrics = taskMetrics.shuffleWriteMetrics match { + case Some(metrics) => " SHUFFLE_BYTES_WRITTEN=" + metrics.shuffleBytesWritten + case None => "" + } + stageLogInfo(stageID, status + info + executorRunTime + readMetrics + writeMetrics) + } + + /** + * When stage is submitted, record stage submit info + * @param stageSubmitted Stage submitted event + */ + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) { + stageLogInfo(stageSubmitted.stage.stageId,"STAGE_ID=%d STATUS=SUBMITTED TASK_SIZE=%d".format( + stageSubmitted.stage.stageId, stageSubmitted.stage.numTasks)) + } + + /** + * When stage is completed, record stage completion status + * @param stageCompleted Stage completed event + */ + override def onStageCompleted(stageCompleted: StageCompleted) { + stageLogInfo(stageCompleted.stage.stageId, "STAGE_ID=%d STATUS=COMPLETED".format( + stageCompleted.stage.stageId)) + } + + override def onTaskStart(taskStart: SparkListenerTaskStart) { } + + /** + * When task ends, record task completion status and metrics + * @param taskEnd Task end event + */ + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val task = taskEnd.task + val taskInfo = taskEnd.taskInfo + var taskStatus = "" + task match { + case resultTask: ResultTask[_, _] => taskStatus = "TASK_TYPE=RESULT_TASK" + case shuffleMapTask: ShuffleMapTask => taskStatus = "TASK_TYPE=SHUFFLE_MAP_TASK" + } + taskEnd.reason match { + case Success => taskStatus += " STATUS=SUCCESS" + recordTaskMetrics(task.stageId, taskStatus, taskInfo, taskEnd.taskMetrics) + case Resubmitted => + taskStatus += " STATUS=RESUBMITTED TID=" + taskInfo.taskId + + " STAGE_ID=" + task.stageId + stageLogInfo(task.stageId, taskStatus) + case FetchFailed(bmAddress, shuffleId, mapId, reduceId) => + taskStatus += " STATUS=FETCHFAILED TID=" + taskInfo.taskId + " STAGE_ID=" + + task.stageId + " SHUFFLE_ID=" + shuffleId + " MAP_ID=" + + mapId + " REDUCE_ID=" + reduceId + stageLogInfo(task.stageId, taskStatus) + case OtherFailure(message) => + taskStatus += " STATUS=FAILURE TID=" + taskInfo.taskId + + " STAGE_ID=" + task.stageId + " INFO=" + message + stageLogInfo(task.stageId, taskStatus) + case _ => + } + } + + /** + * When job ends, recording job completion status and close log file + * @param jobEnd Job end event + */ + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + val job = jobEnd.job + var info = "JOB_ID=" + job.jobId + jobEnd.jobResult match { + case JobSucceeded => info += " STATUS=SUCCESS" + case JobFailed(exception, _) => + info += " STATUS=FAILED REASON=" + exception.getMessage.split("\\s+").foreach(info += _ + "_") + case _ => + } + jobLogInfo(job.jobId, info.substring(0, info.length - 1).toUpperCase) + closeLogWriter(job.jobId) + } + + /** + * Record job properties into job log file + * @param jobID ID of the job + * @param properties Properties of the job + */ + protected def recordJobProperties(jobID: Int, properties: Properties) { + if(properties != null) { + val description = properties.getProperty(SparkContext.SPARK_JOB_DESCRIPTION, "") + jobLogInfo(jobID, description, false) + } + } + + /** + * When job starts, record job property and stage graph + * @param jobStart Job start event + */ + override def onJobStart(jobStart: SparkListenerJobStart) { + val job = jobStart.job + val properties = jobStart.properties + createLogWriter(job.jobId) + recordJobProperties(job.jobId, properties) + buildJobDep(job.jobId, job.finalStage) + recordStageDep(job.jobId) + recordStageDepGraph(job.jobId, job.finalStage, new HashSet[Int]) + jobLogInfo(job.jobId, "JOB_ID=" + job.jobId + " STATUS=STARTED") + } +} + diff --git a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala index 7d7ca9ba8cc1f..984881861c9a9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/JobLoggerSuite.scala @@ -91,8 +91,10 @@ class JobLoggerSuite extends FunSuite with LocalSparkContext with ShouldMatchers sc.addSparkListener(joblogger) val rdd = sc.parallelize(1 to 1e2.toInt, 4).map{ i => (i % 12, 2 * i) } rdd.reduceByKey(_+_).collect() + + val user = System.getProperty("user.name", SparkContext.SPARK_UNKNOWN_USER) - joblogger.getLogDir should be ("/tmp/spark") + joblogger.getLogDir should be ("/tmp/spark-%s".format(user)) joblogger.getJobIDtoPrintWriter.size should be (1) joblogger.getStageIDToJobID.size should be (2) joblogger.getStageIDToJobID.get(0) should be (Some(0))