Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-1278: Fix improper use of SimpleDateFormat #179

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[spark] class Master(

val conf = new SparkConf

val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200)
val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15)
Expand Down Expand Up @@ -689,7 +689,7 @@ private[spark] class Master(

/** Generate a new app ID given a app's submission date */
def newApplicationId(submitDate: Date): String = {
val appId = "app-%s-%04d".format(DATE_FORMAT.format(submitDate), nextAppNumber)
val appId = "app-%s-%04d".format(createDateFormat.format(submitDate), nextAppNumber)
nextAppNumber += 1
appId
}
Expand All @@ -713,7 +713,7 @@ private[spark] class Master(
}

def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
val appId = "driver-%s-%04d".format(createDateFormat.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
appId
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{DeployWebUI, JsonProtocol}
import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
Expand Down Expand Up @@ -169,10 +169,10 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{DeployWebUI.formatDate(app.submitDate)}</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{DeployWebUI.formatDuration(app.duration)}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
</tr>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class Worker(
Utils.checkHost(host, "Expected hostname")
assert (port > 0)

val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
def createDateFormat = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs

// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
Expand Down Expand Up @@ -319,7 +319,7 @@ private[spark] class Worker(
}

def generateWorkerId(): String = {
"worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), host, port)
"worker-%s-%s-%d".format(createDateFormat.format(new Date), host, port)
}

override def postStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
private val jobIdToPrintWriter = new HashMap[Int, PrintWriter]
private val stageIdToJobId = new HashMap[Int, Int]
private val jobIdToStageIds = new HashMap[Int, Seq[Int]]
private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think you can drop the first type declaration here. Not a big deal -I'll just clean it up when merging.

}
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]

createLogDir()
Expand Down Expand Up @@ -128,7 +130,7 @@ class JobLogger(val user: String, val logDirName: String) extends SparkListener
var writeInfo = info
if (withTime) {
val date = new Date(System.currentTimeMillis())
writeInfo = DATE_FORMAT.format(date) + ": " + info
writeInfo = dateFormat.get.format(date) + ": " + info
}
jobIdToPrintWriter.get(jobId).foreach(_.println(writeInfo))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,23 @@
* limitations under the License.
*/

package org.apache.spark.deploy
package org.apache.spark.ui

import java.text.SimpleDateFormat
import java.util.Date

/**
* Utilities used throughout the web UI.
*/
private[spark] object DeployWebUI {
val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private[spark] object WebUI {
// SimpleDateFormat is not thread-safe. Don't explore it to avoid improper use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"expose" it? I can change this too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for correcting the typo.

private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

def formatDate(date: Date): String = DATE_FORMAT.format(date)
def formatDate(date: Date): String = dateFormat.get.format(date)

def formatDate(timestamp: Long): String = DATE_FORMAT.format(new Date(timestamp))
def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp))

def formatDuration(milliseconds: Long): String = {
val seconds = milliseconds.toDouble / 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ui.jobs

import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler
Expand All @@ -32,7 +31,6 @@ import org.apache.spark.util.Utils
private[ui] class JobProgressUI(parent: SparkUI) {
val appName = parent.appName
val basePath = parent.basePath
val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
val live = parent.live
val sc = parent.sc

Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.Page._
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.{Utils, Distribution}

/** Page showing statistics and task list for a given stage */
private[ui] class StagePage(parent: JobProgressUI) {
private val appName = parent.appName
private val basePath = parent.basePath
private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
Expand Down Expand Up @@ -253,7 +252,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
<td>{info.status}</td>
<td>{info.taskLocality}</td>
<td>{info.host}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
<td>{WebUI.formatDate(new Date(info.launchTime))}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,12 @@ import scala.collection.mutable.HashMap
import scala.xml.Node

import org.apache.spark.scheduler.{StageInfo, TaskInfo}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.util.Utils

/** Page showing list of all ongoing and recently finished stages */
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
private val basePath = parent.basePath
private val dateFmt = parent.dateFmt
private lazy val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler

Expand Down Expand Up @@ -82,7 +81,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
val submissionTime = s.submissionTime match {
case Some(t) => dateFmt.format(new Date(t))
case Some(t) => WebUI.formatDate(new Date(t))
case None => "Unknown"
}
val finishTime = s.completionTime.getOrElse(System.currentTimeMillis)
Expand Down
7 changes: 5 additions & 2 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class FileLogger(
overwrite: Boolean = true)
extends Logging {

private val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")
}

private val fileSystem = Utils.getHadoopFileSystem(new URI(logDir))
private var fileIndex = 0

Expand Down Expand Up @@ -111,7 +114,7 @@ class FileLogger(
def log(msg: String, withTime: Boolean = false) {
val writeInfo = if (!withTime) msg else {
val date = new Date(System.currentTimeMillis())
DATE_FORMAT.format(date) + ": " + msg
dateFormat.get.format(date) + ": " + msg
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one I don't think can be used from multiple threads. But I agree why not make it thread safe in case we ever do.

}
writer.foreach(_.print(writeInfo))
}
Expand Down