Skip to content

Commit

Permalink
SPARK-1279: Fix improper use of SimpleDateFormat
Browse files Browse the repository at this point in the history
`SimpleDateFormat` is not thread-safe. Some places use the same SimpleDateFormat object without safeguard in the multiple threads. It will cause that the Web UI displays improper date.

This PR creates a new `SimpleDateFormat` every time when it's necessary. Another solution is using `ThreadLocal` to store a `SimpleDateFormat` in each thread. If this PR impacts the performance, I can change to the latter one.

Author: zsxwing <zsxwing@gmail.com>

Closes #179 from zsxwing/SPARK-1278 and squashes the following commits:

21fabd3 [zsxwing] SPARK-1278: Fix improper use of SimpleDateFormat
  • Loading branch information
zsxwing authored and pwendell committed Mar 21, 2014
1 parent 7e17fe6 commit 2c0aa22
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class Master(

val conf = new SparkConf

val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
Expand Down Expand Up @@ -682,7 +682,7 @@ private[spark] class Master(

/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
nextAppNumber += 1
appId
}
Expand All @@ -706,7 +706,7 @@ private[spark] class Master(
}

def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
appId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
Expand Down Expand Up @@ -169,10 +169,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{DeployWebUI.formatDuration(app.duration)}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
</tr>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class Worker(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)

val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs

// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
Expand Down Expand Up @@ -319,7 +319,7 @@ private[spark] class Worker(
}

def generateWorkerId(): String = {
"worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}

override def postStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue() = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]

createLogDir()
Expand Down Expand Up @@ -128,7 +130,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + info
writeInfo = dateFormat.get.format(date) + ": " + info
}
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@
* limitations under the License.
*/

package org.apache.spark.deploy
package org.apache.spark.ui

import java.text.SimpleDateFormat
import java.util.Date

/**
* Utilities used throughout the web UI.
*/
private[spark] object DeployWebUI {
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private[spark] object WebUI {
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

def formatDate(date: Date): String = DATE_FORMAT.format(date)
def formatDate(date: Date): String = dateFormat.get.format(date)

def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))

def formatDuration(milliseconds: Long): String = {
val seconds = milliseconds.toDouble / 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ui.jobs

import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler
Expand All @@ -32,7 +31,6 @@ import org.apache.spark.util.Utils
private[ui] class JobProgressUI(parent: SparkUI) {
val appName = parent.appName
val basePath = parent.basePath
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val live = parent.live
val sc = parent.sc

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.{Utils, Distribution}

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressUI) {
private val appName = parent.appName
private val basePath = parent.basePath
private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -253,7 +252,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
<td>{WebUI.formatDate(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import scala.collection.mutable.HashMap
import scala.xml.Node

import org.apache.spark.scheduler.{StageInfo, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private val basePath = parent.basePath
private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler

Expand Down Expand Up @@ -82,7 +81,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class FileLogger(
overwrite: Boolean = true)
extends Logging {

private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private var fileIndex = 0

Expand Down Expand Up @@ -111,7 +114,7 @@ class FileLogger(
def log(msg: String, withTime: Boolean = false) {
val writeInfo = if (!withTime) msg else {
val date = new Date(System.currentTimeMillis())
DATE_FORMAT.format(date) + ": " + msg
dateFormat.get.format(date) + ": " + msg
}
writer.foreach(_.print(writeInfo))
}
Expand Down

0 comments on commit 2c0aa22

Please sign in to comment.