From e106cb106a5973b1039c606471388c9e6335e226 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Fri, 11 Apr 2014 23:33:49 -0700 Subject: [PATCH] [SPARK-1386] Web UI for Spark Streaming When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers? While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine. http://i.imgur.com/1ooDGhm.png This UI is integrated into the Spark UI running at 4040. Author: Tathagata Das Author: Andrew Or Closes #290 from tdas/streaming-web-ui and squashes the following commits: fc73ca5 [Tathagata Das] Merge pull request #9 from andrewor14/ui-refactor 642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor f4f4cbe [Tathagata Das] More minor fixes. 34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 252c566 [Tathagata Das] Merge pull request #8 from andrewor14/ui-refactor e038b4b [Tathagata Das] Addressed Patrick's comments. 125a054 [Andrew Or] Disable serving static resources with gzip 90feb8d [Andrew Or] Address Patrick's comments 89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 72fe256 [Tathagata Das] Merge pull request #6 from andrewor14/ui-refactor 2fc09c8 [Tathagata Das] Added binary check exclusions aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala) f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests. caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 585cd65 [Tathagata Das] Merge pull request #5 from andrewor14/ui-refactor 914b8ff [Tathagata Das] Moved utils functions to UIUtils. 548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message) 6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui ee6543f [Tathagata Das] Minor changes based on Andrew's comments. fa760fe [Tathagata Das] Fixed long line. 1c0bcef [Tathagata Das] Refactored streaming UI into two files. 1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI. 827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor 3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui c78c92d [Andrew Or] Remove outdated comment 8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor) 0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor 9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example 61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui 53be2c5 [Tathagata Das] Minor style updates. ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically a37ad4f [Andrew Or] Comments, imports and formatting (minor) cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor 7d57444 [Andrew Or] Refactoring the UI interface to add flexibility aef4dd5 [Tathagata Das] Added Apache licenses. db27bad [Tathagata Das] Added last batch processing time to StreamingUI. 4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. 93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI. 56cc7fb [Tathagata Das] First cut implementation of Streaming UI. --- .../scala/org/apache/spark/SparkContext.scala | 1 - .../spark/deploy/SparkUIContainer.scala | 50 ----- .../{IndexPage.scala => HistoryPage.scala} | 12 +- .../spark/deploy/history/HistoryServer.scala | 61 +++--- .../apache/spark/deploy/master/Master.scala | 8 +- .../deploy/master/ui/ApplicationPage.scala | 13 +- .../ui/{IndexPage.scala => MasterPage.scala} | 23 ++- .../spark/deploy/master/ui/MasterWebUI.scala | 54 ++---- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../spark/deploy/worker/ui/LogPage.scala | 147 ++++++++++++++ .../ui/{IndexPage.scala => WorkerPage.scala} | 6 +- .../spark/deploy/worker/ui/WorkerWebUI.scala | 180 +++--------------- .../scheduler/ApplicationEventListener.scala | 4 +- .../apache/spark/storage/StorageUtils.scala | 16 +- .../org/apache/spark/ui/JettyUtils.scala | 1 + .../main/scala/org/apache/spark/ui/Page.scala | 22 --- .../scala/org/apache/spark/ui/SparkUI.scala | 108 ++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 172 +++++++++++++---- .../scala/org/apache/spark/ui/WebUI.scala | 141 +++++++++++--- ...ironmentUI.scala => EnvironmentPage.scala} | 47 +---- .../apache/spark/ui/env/EnvironmentTab.scala | 50 +++++ ...{ExecutorsUI.scala => ExecutorsPage.scala} | 84 +------- .../apache/spark/ui/exec/ExecutorsTab.scala | 86 +++++++++ .../apache/spark/ui/jobs/ExecutorTable.scala | 7 +- .../spark/ui/jobs/JobProgressListener.scala | 10 +- ...{IndexPage.scala => JobProgressPage.scala} | 16 +- ...bProgressUI.scala => JobProgressTab.scala} | 45 ++--- .../org/apache/spark/ui/jobs/PoolPage.scala | 14 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 7 +- .../org/apache/spark/ui/jobs/StagePage.scala | 45 ++--- .../org/apache/spark/ui/jobs/StageTable.scala | 18 +- .../org/apache/spark/ui/storage/RDDPage.scala | 17 +- .../{IndexPage.scala => StoragePage.scala} | 13 +- ...{BlockManagerUI.scala => StorageTab.scala} | 32 +--- .../org/apache/spark/util/JsonProtocol.scala | 12 +- .../scala/org/apache/spark/ui/UISuite.scala | 81 +++++++- .../apache/spark/util/JsonProtocolSuite.scala | 4 +- project/MimaBuild.scala | 8 +- .../spark/streaming/StreamingContext.scala | 23 +-- .../spark/streaming/dstream/DStream.scala | 9 - .../dstream/NetworkInputDStream.scala | 79 +++++--- .../spark/streaming/scheduler/BatchInfo.scala | 1 + .../streaming/scheduler/JobGenerator.scala | 9 +- .../streaming/scheduler/JobScheduler.scala | 11 +- .../spark/streaming/scheduler/JobSet.scala | 7 +- .../scheduler/NetworkInputTracker.scala | 86 ++++++--- .../scheduler/StreamingListener.scala | 18 +- .../scheduler/StreamingListenerBus.scala | 4 + .../ui/StreamingJobProgressListener.scala | 148 ++++++++++++++ .../spark/streaming/ui/StreamingPage.scala | 180 ++++++++++++++++++ .../spark/streaming/ui/StreamingTab.scala | 27 +-- .../spark/streaming/InputStreamsSuite.scala | 6 +- .../streaming/StreamingContextSuite.scala | 1 - .../org/apache/spark/streaming/UISuite.scala | 46 +++++ 54 files changed, 1426 insertions(+), 846 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala rename core/src/main/scala/org/apache/spark/deploy/history/{IndexPage.scala => HistoryPage.scala} (85%) rename core/src/main/scala/org/apache/spark/deploy/master/ui/{IndexPage.scala => MasterPage.scala} (91%) create mode 100644 core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala rename core/src/main/scala/org/apache/spark/deploy/worker/ui/{IndexPage.scala => WorkerPage.scala} (97%) delete mode 100644 core/src/main/scala/org/apache/spark/ui/Page.scala rename core/src/main/scala/org/apache/spark/ui/env/{EnvironmentUI.scala => EnvironmentPage.scala} (61%) create mode 100644 core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala rename core/src/main/scala/org/apache/spark/ui/exec/{ExecutorsUI.scala => ExecutorsPage.scala} (61%) create mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala rename core/src/main/scala/org/apache/spark/ui/jobs/{IndexPage.scala => JobProgressPage.scala} (90%) rename core/src/main/scala/org/apache/spark/ui/jobs/{JobProgressUI.scala => JobProgressTab.scala} (53%) rename core/src/main/scala/org/apache/spark/ui/storage/{IndexPage.scala => StoragePage.scala} (90%) rename core/src/main/scala/org/apache/spark/ui/storage/{BlockManagerUI.scala => StorageTab.scala} (75%) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala rename core/src/test/scala/org/apache/spark/SparkUISuite.scala => streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala (58%) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 3bcc8ce2b25a6..a764c174d562c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -213,7 +213,6 @@ class SparkContext(config: SparkConf) extends Logging { // Initialize the Spark UI, registering all associated listeners private[spark] val ui = new SparkUI(this) ui.bind() - ui.start() // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala b/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala deleted file mode 100644 index 33fceae4ff489..0000000000000 --- a/core/src/main/scala/org/apache/spark/deploy/SparkUIContainer.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy - -import org.apache.spark.ui.{SparkUI, WebUI} - -private[spark] abstract class SparkUIContainer(name: String) extends WebUI(name) { - - /** Attach a SparkUI to this container. Only valid after bind(). */ - def attachUI(ui: SparkUI) { - assert(serverInfo.isDefined, - "%s must be bound to a server before attaching SparkUIs".format(name)) - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - rootHandler.addHandler(handler) - if (!handler.isStarted) { - handler.start() - } - } - } - - /** Detach a SparkUI from this container. Only valid after bind(). */ - def detachUI(ui: SparkUI) { - assert(serverInfo.isDefined, - "%s must be bound to a server before detaching SparkUIs".format(name)) - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { - if (handler.isStarted) { - handler.stop() - } - rootHandler.removeHandler(handler) - } - } - -} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala similarity index 85% rename from core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala rename to core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 54dffffec71c5..180c853ce3096 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.{UIUtils, WebUI} +import org.apache.spark.ui.{WebUIPage, UIUtils} -private[spark] class IndexPage(parent: HistoryServer) { +private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { def render(request: HttpServletRequest): Seq[Node] = { val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated } @@ -62,13 +62,13 @@ private[spark] class IndexPage(parent: HistoryServer) { private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { val appName = if (info.started) info.name else info.logDirPath.getName val uiAddress = parent.getAddress + info.ui.basePath - val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started" - val endTime = if (info.completed) WebUI.formatDate(info.endTime) else "Not completed" + val startTime = if (info.started) UIUtils.formatDate(info.startTime) else "Not started" + val endTime = if (info.completed) UIUtils.formatDate(info.endTime) else "Not completed" val difference = if (info.started && info.completed) info.endTime - info.startTime else -1L - val duration = if (difference > 0) WebUI.formatDuration(difference) else "---" + val duration = if (difference > 0) UIUtils.formatDuration(difference) else "---" val sparkUser = if (info.started) info.sparkUser else "Unknown user" val logDirectory = info.logDirPath.getName - val lastUpdated = WebUI.formatDate(info.lastUpdated) + val lastUpdated = UIUtils.formatDate(info.lastUpdated) {appName} {startTime} diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 97d2ba9deed33..cf64700f9098c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -17,17 +17,13 @@ package org.apache.spark.deploy.history -import javax.servlet.http.HttpServletRequest - import scala.collection.mutable import org.apache.hadoop.fs.{FileStatus, Path} -import org.eclipse.jetty.servlet.ServletContextHandler import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.deploy.SparkUIContainer import org.apache.spark.scheduler._ -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{WebUI, SparkUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.Utils @@ -46,17 +42,15 @@ import org.apache.spark.util.Utils */ class HistoryServer( val baseLogDir: String, + securityManager: SecurityManager, conf: SparkConf) - extends SparkUIContainer("History Server") with Logging { + extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging { import HistoryServer._ private val fileSystem = Utils.getHadoopFileSystem(baseLogDir) private val localHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - private val port = WEB_UI_PORT - private val securityManager = new SecurityManager(conf) - private val indexPage = new IndexPage(this) // A timestamp of when the disk was last accessed to check for log updates private var lastLogCheckTime = -1L @@ -90,37 +84,23 @@ class HistoryServer( } } - private val handlers = Seq[ServletContextHandler]( - createStaticHandler(STATIC_RESOURCE_DIR, "/static"), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager) - ) - // A mapping of application ID to its history information, which includes the rendered UI val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]() + initialize() + /** - * Start the history server. + * Initialize the history server. * * This starts a background thread that periodically synchronizes information displayed on * this UI with the event logs in the provided base directory. */ - def start() { + def initialize() { + attachPage(new HistoryPage(this)) + attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static")) logCheckingThread.start() } - /** Bind to the HTTP server behind this web interface. */ - override def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) - logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to bind HistoryServer", e) - System.exit(1) - } - } - /** * Check for any updates to event logs in the base directory. This is only effective once * the server has been bound. @@ -151,7 +131,7 @@ class HistoryServer( // Remove any applications that should no longer be retained appIdToInfo.foreach { case (appId, info) => if (!retainedAppIds.contains(appId)) { - detachUI(info.ui) + detachSparkUI(info.ui) appIdToInfo.remove(appId) } } @@ -186,15 +166,14 @@ class HistoryServer( val path = logDir.getPath val appId = path.getName val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec) - val ui = new SparkUI(replayBus, appId, "/history/" + appId) val appListener = new ApplicationEventListener replayBus.addListener(appListener) + val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId) // Do not call ui.bind() to avoid creating a new server for each application - ui.start() replayBus.replay() if (appListener.applicationStarted) { - attachUI(ui) + attachSparkUI(ui) val appName = appListener.appName val sparkUser = appListener.sparkUser val startTime = appListener.startTime @@ -213,6 +192,18 @@ class HistoryServer( fileSystem.close() } + /** Attach a reconstructed UI to this server. Only valid after bind(). */ + private def attachSparkUI(ui: SparkUI) { + assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs") + ui.getHandlers.foreach(attachHandler) + } + + /** Detach a reconstructed UI from this server. Only valid after bind(). */ + private def detachSparkUI(ui: SparkUI) { + assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs") + ui.getHandlers.foreach(detachHandler) + } + /** Return the address of this server. */ def getAddress: String = "http://" + publicHost + ":" + boundPort @@ -262,9 +253,9 @@ object HistoryServer { def main(argStrings: Array[String]) { val args = new HistoryServerArguments(argStrings) - val server = new HistoryServer(args.logDir, conf) + val securityManager = new SecurityManager(conf) + val server = new HistoryServer(args.logDir, securityManager, conf) server.bind() - server.start() // Wait until the end of the world... or if the HistoryServer process is manually stopped while(true) { Thread.sleep(Int.MaxValue) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2446e86cb6672..6c58e741df001 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -625,7 +625,7 @@ private[spark] class Master( if (completedApps.size >= RETAINED_APPLICATIONS) { val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1) completedApps.take(toRemove).foreach( a => { - appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) } + appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) } applicationMetricsSystem.removeSource(a.appSource) }) completedApps.trimStart(toRemove) @@ -667,12 +667,12 @@ private[spark] class Master( if (!eventLogPaths.isEmpty) { try { val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec) - val ui = new SparkUI(replayBus, appName + " (completed)", "/history/" + app.id) - ui.start() + val ui = new SparkUI( + new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id) replayBus.replay() app.desc.appUiUrl = ui.basePath appIdToUI(app.id) = ui - webUi.attachUI(ui) + webUi.attachSparkUI(ui) return true } catch { case t: Throwable => diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index cb092cb5d576b..b5cd4d2ea963f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,15 +28,16 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class ApplicationPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") { + + private val master = parent.masterActorRef + private val timeout = parent.timeout /** Executor details for a particular application */ - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -96,7 +97,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - def executorRow(executor: ExecutorInfo): Seq[Node] = { + private def executorRow(executor: ExecutorInfo): Seq[Node] = { {executor.id} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala similarity index 91% rename from core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala rename to core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 8c1d6c7cce450..7ca3b08a28728 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -25,17 +25,17 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.{JsonProtocol} +import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { + private val master = parent.masterActorRef + private val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) JsonProtocol.writeMasterState(state) @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } - def workerRow(worker: WorkerInfo): Seq[Node] = { + private def workerRow(worker: WorkerInfo): Seq[Node] = { {worker.id} @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { } - - def appRow(app: ApplicationInfo): Seq[Node] = { + private def appRow(app: ApplicationInfo): Seq[Node] = { {app.id} @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) { {Utils.megabytesToString(app.desc.memoryPerSlave)} - {WebUI.formatDate(app.submitDate)} + {UIUtils.formatDate(app.submitDate)} {app.desc.user} {app.state.toString} - {WebUI.formatDuration(app.duration)} + {UIUtils.formatDuration(app.duration)} } - def driverRow(driver: DriverInfo): Seq[Node] = { + private def driverRow(driver: DriverInfo): Seq[Node] = { {driver.id} {driver.submitDate} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index 30c8ade408a5a..a18b39fc95d64 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,14 +17,9 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging -import org.apache.spark.deploy.SparkUIContainer import org.apache.spark.deploy.master.Master -import org.apache.spark.ui.SparkUI +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -33,44 +28,33 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class MasterWebUI(val master: Master, requestedPort: Int) - extends SparkUIContainer("MasterWebUI") with Logging { + extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging { val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) - private val host = Utils.localHostName() - private val port = requestedPort - private val applicationPage = new ApplicationPage(this) - private val indexPage = new IndexPage(this) + initialize() - private val handlers: Seq[ServletContextHandler] = { - master.masterMetricsSystem.getServletHandlers ++ - master.applicationMetricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"), - createServletHandler("/app/json", - (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr), - createServletHandler("/app", - (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), master.securityMgr) - ) + /** Initialize all components of the server. */ + def initialize() { + attachPage(new ApplicationPage(this)) + attachPage(new MasterPage(this)) + attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) + master.masterMetricsSystem.getServletHandlers.foreach(attachHandler) + master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } - /** Bind to the HTTP server behind this web interface. */ - override def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf)) - logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Master web UI", e) - System.exit(1) - } + /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ + def attachSparkUI(ui: SparkUI) { + assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") + ui.getHandlers.foreach(attachHandler) } + /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ + def detachSparkUI(ui: SparkUI) { + assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") + ui.getHandlers.foreach(detachHandler) + } } private[spark] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bf5a8d09dd2df..52c164ca3c574 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -128,8 +128,8 @@ private[spark] class Worker( host, port, cores, Utils.megabytesToString(memory))) logInfo("Spark home: " + sparkHome) createWorkDir() - webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.bind() registerWithMaster() diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala new file mode 100644 index 0000000000000..fec1207948628 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker.ui + +import java.io.File +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{WebUIPage, UIUtils} +import org.apache.spark.util.Utils + +private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { + private val worker = parent.worker + private val workDir = parent.workDir + + def renderLog(request: HttpServletRequest): String = { + val defaultBytes = 100 * 1024 + + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val path = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + s"${workDir.getPath}/$appId/$executorId/$logType" + case (None, None, Some(d)) => + s"${workDir.getPath}/$driverId/$logType" + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + + val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" + pre + Utils.offsetBytes(path, startByte, endByte) + } + + def render(request: HttpServletRequest): Seq[Node] = { + val defaultBytes = 100 * 1024 + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val (path, params) = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + case (None, None, Some(d)) => + (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + val logText = {Utils.offsetBytes(path, startByte, endByte)} + val linkToMaster =

Back to Master

+ val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} + + val backButton = + if (startByte > 0) { + + + + } + else { + + } + + val nextButton = + if (endByte < logLength) { + + + + } + else { + + } + + val content = + + + {linkToMaster} +
+
{backButton}
+
{range}
+
{nextButton}
+
+
+
+
{logText}
+
+ + + UIUtils.basicSparkPage(content, logType + " log page for " + appId) + } + + /** Determine the byte range for a log or log page. */ + private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { + val defaultBytes = 100 * 1024 + val maxBytes = 1024 * 1024 + val file = new File(path) + val logLength = file.length() + val getOffset = offset.getOrElse(logLength - defaultBytes) + val startByte = + if (getOffset < 0) 0L + else if (getOffset > logLength) logLength + else getOffset + val logPageLength = math.min(byteLength, maxBytes) + val endByte = math.min(startByte + logPageLength, logLength) + (startByte, endByte) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala similarity index 97% rename from core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala rename to core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala index 49c1009cac2bf..d4513118ced05 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala @@ -28,15 +28,15 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: WorkerWebUI) { +private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) JsonProtocol.writeWorkerState(workerState) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 5625a44549aaa..0ad2edba2227f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -20,174 +20,44 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkConf} import org.apache.spark.deploy.worker.Worker -import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.AkkaUtils /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends WebUI("WorkerWebUI") with Logging { +class WorkerWebUI( + val worker: Worker, + val workDir: File, + port: Option[Int] = None) + extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf) + with Logging { val timeout = AkkaUtils.askTimeout(worker.conf) - private val host = Utils.localHostName() - private val port = requestedPort.getOrElse( - worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) - private val indexPage = new IndexPage(this) - - private val handlers: Seq[ServletContextHandler] = { - worker.metricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"), - createServletHandler("/log", - (request: HttpServletRequest) => log(request), worker.securityMgr), - createServletHandler("/logPage", - (request: HttpServletRequest) => logPage(request), worker.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) - ) - } - - /** Bind to the HTTP server behind this web interface. */ - override def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf)) - logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Worker web UI", e) - System.exit(1) - } - } - - private def log(request: HttpServletRequest): String = { - val defaultBytes = 100 * 1024 - - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val path = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - s"${workDir.getPath}/$appId/$executorId/$logType" - case (None, None, Some(d)) => - s"${workDir.getPath}/$driverId/$logType" - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - - val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" - pre + Utils.offsetBytes(path, startByte, endByte) - } - - private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { - val defaultBytes = 100 * 1024 - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val (path, params) = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") - case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/$logType", s"driverId=$d") - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} - - val backButton = - if (startByte > 0) { - - - - } - else { - - } - - val nextButton = - if (endByte < logLength) { - - - - } - else { - - } - - val content = - - - {linkToMaster} -
-
{backButton}
-
{range}
-
{nextButton}
-
-
-
-
{logText}
-
- - - UIUtils.basicSparkPage(content, logType + " log page for " + appId) + initialize() + + /** Initialize all components of the server. */ + def initialize() { + val logPage = new LogPage(this) + attachPage(logPage) + attachPage(new WorkerPage(this)) + attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static")) + attachHandler(createServletHandler("/log", + (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) + worker.metricsSystem.getServletHandlers.foreach(attachHandler) } - - /** Determine the byte range for a log or log page. */ - private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { - val defaultBytes = 100 * 1024 - val maxBytes = 1024 * 1024 - val file = new File(path) - val logLength = file.length() - val getOffset = offset.getOrElse(logLength - defaultBytes) - val startByte = - if (getOffset < 0) 0L - else if (getOffset > logLength) logLength - else getOffset - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) - } - } private[spark] object WorkerWebUI { - val DEFAULT_PORT=8081 + val DEFAULT_PORT = 8081 val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR + + def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = { + requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT)) + } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala index affda13df6531..c1001227151a5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ApplicationEventListener.scala @@ -31,11 +31,11 @@ private[spark] class ApplicationEventListener extends SparkListener { def applicationStarted = startTime != -1 - def applicationFinished = endTime != -1 + def applicationCompleted = endTime != -1 def applicationDuration: Long = { val difference = endTime - startTime - if (applicationStarted && applicationFinished && difference > 0) difference else -1L + if (applicationStarted && applicationCompleted && difference > 0) difference else -1L } override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala index 07255aa366a6d..7ed371326855d 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala @@ -42,24 +42,22 @@ class StorageStatus( def memRemaining : Long = maxMem - memUsed() - def rddBlocks = blocks.flatMap { - case (rdd: RDDBlockId, status) => Some(rdd, status) - case _ => None - } + def rddBlocks = blocks.collect { case (rdd: RDDBlockId, status) => (rdd, status) } } @DeveloperApi private[spark] class RDDInfo( - val id: Int, - val name: String, - val numPartitions: Int, - val storageLevel: StorageLevel) extends Ordered[RDDInfo] { + val id: Int, + val name: String, + val numPartitions: Int, + val storageLevel: StorageLevel) + extends Ordered[RDDInfo] { var numCachedPartitions = 0 var memSize = 0L var diskSize = 0L - var tachyonSize= 0L + var tachyonSize = 0L override def toString = { import Utils.bytesToString diff --git a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala index dd0818e8ab01c..62a4e3d0f6a42 100644 --- a/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/JettyUtils.scala @@ -121,6 +121,7 @@ private[spark] object JettyUtils extends Logging { /** Create a handler for serving files from a static directory */ def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = { val contextHandler = new ServletContextHandler + contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false") val staticHandler = new DefaultServlet val holder = new ServletHolder(staticHandler) Option(getClass.getClassLoader.getResource(resourceBase)) match { diff --git a/core/src/main/scala/org/apache/spark/ui/Page.scala b/core/src/main/scala/org/apache/spark/ui/Page.scala deleted file mode 100644 index b2a069a37552d..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/Page.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui - -private[spark] object Page extends Enumeration { - val Stages, Storage, Environment, Executors = Value -} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 7fa4fd3149eb6..2fef1a635427c 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,112 +17,86 @@ package org.apache.spark.ui -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.EnvironmentUI -import org.apache.spark.ui.exec.ExecutorsUI -import org.apache.spark.ui.jobs.JobProgressUI -import org.apache.spark.ui.storage.BlockManagerUI -import org.apache.spark.util.Utils +import org.apache.spark.ui.env.EnvironmentTab +import org.apache.spark.ui.exec.ExecutorsTab +import org.apache.spark.ui.jobs.JobProgressTab +import org.apache.spark.ui.storage.StorageTab -/** Top level user interface for Spark */ +/** + * Top level user interface for a Spark application. + */ private[spark] class SparkUI( val sc: SparkContext, val conf: SparkConf, + val securityManager: SecurityManager, val listenerBus: SparkListenerBus, var appName: String, val basePath: String = "") - extends WebUI("SparkUI") with Logging { + extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath) + with Logging { - def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) - def this(listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, new SparkConf, listenerBus, appName, basePath) + def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) + def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = + this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath) // If SparkContext is not provided, assume the associated application is not live val live = sc != null - val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - - private val localHost = Utils.localHostName() - private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost) - private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + // Maintain executor storage status through Spark events + val storageStatusListener = new StorageStatusListener - private val storage = new BlockManagerUI(this) - private val jobs = new JobProgressUI(this) - private val env = new EnvironmentUI(this) - private val exec = new ExecutorsUI(this) + initialize() - val handlers: Seq[ServletContextHandler] = { - val metricsServletHandlers = if (live) { - SparkEnv.get.metricsSystem.getServletHandlers - } else { - Array[ServletContextHandler]() + /** Initialize all components of the server. */ + def initialize() { + listenerBus.addListener(storageStatusListener) + val jobProgressTab = new JobProgressTab(this) + attachTab(jobProgressTab) + attachTab(new StorageTab(this)) + attachTab(new EnvironmentTab(this)) + attachTab(new ExecutorsTab(this)) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + attachHandler(createRedirectHandler("/", "/stages", basePath = basePath)) + attachHandler( + createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest)) + if (live) { + sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } - storage.getHandlers ++ - jobs.getHandlers ++ - env.getHandlers ++ - exec.getHandlers ++ - metricsServletHandlers ++ - Seq[ServletContextHandler] ( - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"), - createRedirectHandler("/", "/stages", basePath = basePath) - ) } - // Maintain executor storage status through Spark events - val storageStatusListener = new StorageStatusListener - + /** Set the app name for this UI. */ def setAppName(name: String) { appName = name } - /** Initialize all components of the server */ - def start() { - storage.start() - jobs.start() - env.start() - exec.start() - - // Storage status listener must receive events first, as other listeners depend on its state - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(storage.listener) - listenerBus.addListener(jobs.listener) - listenerBus.addListener(env.listener) - listenerBus.addListener(exec.listener) - } - - /** Bind to the HTTP server behind this web interface. */ - override def bind() { - try { - serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf)) - logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) - } catch { - case e: Exception => - logError("Failed to create Spark web UI", e) - System.exit(1) - } + /** Register the given listener with the listener bus. */ + def registerListener(listener: SparkListener) { + listenerBus.addListener(listener) } /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() - logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) + logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } /** * Return the application UI host:port. This does not include the scheme (http://). */ - private[spark] def appUIHostPort = publicHost + ":" + boundPort + private[spark] def appUIHostPort = publicHostName + ":" + boundPort private[spark] def appUIAddress = s"http://$appUIHostPort" - } private[spark] object SparkUI { val DEFAULT_PORT = 4040 val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static" + + def getUIPort(conf: SparkConf): Int = { + conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT) + } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a7cf04b3cbb86..6a2d652528d8a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -17,16 +17,115 @@ package org.apache.spark.ui +import java.text.SimpleDateFormat +import java.util.{Locale, Date} + import scala.xml.Node +import org.apache.spark.Logging /** Utility functions for generating XML pages with spark content. */ -private[spark] object UIUtils { +private[spark] object UIUtils extends Logging { + + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + def formatDate(date: Date): String = dateFormat.get.format(date) + + def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + "%.1f h".format(hours) + } + + /** Generate a verbose human-readable string representing a duration such as "5 second 35 ms" */ + def formatDurationVerbose(ms: Long): String = { + try { + val second = 1000L + val minute = 60 * second + val hour = 60 * minute + val day = 24 * hour + val week = 7 * day + val year = 365 * day + + def toString(num: Long, unit: String): String = { + if (num == 0) { + "" + } else if (num == 1) { + s"$num $unit" + } else { + s"$num ${unit}s" + } + } + + val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms" + val secondString = toString((ms % minute) / second, "second") + val minuteString = toString((ms % hour) / minute, "minute") + val hourString = toString((ms % day) / hour, "hour") + val dayString = toString((ms % week) / day, "day") + val weekString = toString((ms % year) / week, "week") + val yearString = toString(ms / year, "year") - import Page._ + Seq( + second -> millisecondsString, + minute -> s"$secondString $millisecondsString", + hour -> s"$minuteString $secondString", + day -> s"$hourString $minuteString $secondString", + week -> s"$dayString $hourString $minuteString", + year -> s"$weekString $dayString $hourString" + ).foreach { case (durationLimit, durationString) => + if (ms < durationLimit) { + // if time is less than the limit (upto year) + return durationString + } + } + // if time is more than a year + return s"$yearString $weekString $dayString" + } catch { + case e: Exception => + logError("Error converting time to string", e) + // if there is some error, return blank string + return "" + } + } + + /** Generate a human-readable string representing a number (e.g. 100 K) */ + def formatNumber(records: Double): String = { + val trillion = 1e12 + val billion = 1e9 + val million = 1e6 + val thousand = 1e3 + + val (value, unit) = { + if (records >= 2*trillion) { + (records / trillion, " T") + } else if (records >= 2*billion) { + (records / billion, " B") + } else if (records >= 2*million) { + (records / million, " M") + } else if (records >= 2*thousand) { + (records / thousand, " K") + } else { + (records, "") + } + } + "%.1f%s".formatLocal(Locale.US, value, unit) + } // Yarn has to go through a proxy so the base uri is provided and has to be on all links - private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")). - getOrElse("") + val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource @@ -36,26 +135,14 @@ private[spark] object UIUtils { basePath: String, appName: String, title: String, - page: Page.Value) : Seq[Node] = { - val jobs = page match { - case Stages => -
  • Stages
  • - case _ =>
  • Stages
  • - } - val storage = page match { - case Storage => -
  • Storage
  • - case _ =>
  • Storage
  • - } - val environment = page match { - case Environment => -
  • Environment
  • - case _ =>
  • Environment
  • - } - val executors = page match { - case Executors => -
  • Executors
  • - case _ =>
  • Executors
  • + tabs: Seq[WebUITab], + activeTab: WebUITab, + refreshInterval: Option[Int] = None): Seq[Node] = { + + val header = tabs.map { tab => +
  • + {tab.name} +
  • } @@ -74,16 +161,10 @@ private[spark] object UIUtils { - + -
    @@ -129,21 +210,36 @@ private[spark] object UIUtils { /** Returns an HTML table constructed by generating a row for each object in a sequence. */ def listingTable[T]( headers: Seq[String], - makeRow: T => Seq[Node], - rows: Seq[T], + generateDataRow: T => Seq[Node], + data: Seq[T], fixedWidth: Boolean = false): Seq[Node] = { - val colWidth = 100.toDouble / headers.size - val colWidthAttr = if (fixedWidth) colWidth + "%" else "" var tableClass = "table table-bordered table-striped table-condensed sortable" if (fixedWidth) { tableClass += " table-fixed" } - + val colWidth = 100.toDouble / headers.size + val colWidthAttr = if (fixedWidth) colWidth + "%" else "" + val headerRow: Seq[Node] = { + // if none of the headers have "\n" in them + if (headers.forall(!_.contains("\n"))) { + // represent header as simple text + headers.map(h => {h}) + } else { + // represent header text as list while respecting "\n" + headers.map { case h => + +
      + { h.split("\n").map { case t =>
    • {t}
    • } } +
    + + } + } + } - {headers.map(h => )} + {headerRow} - {rows.map(r => makeRow(r))} + {data.map(r => generateDataRow(r))}
    {h}
    } diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 2cc7582eca8a3..b08f308fda1dd 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -17,53 +17,134 @@ package org.apache.spark.ui -import java.text.SimpleDateFormat -import java.util.Date +import javax.servlet.http.HttpServletRequest -private[spark] abstract class WebUI(name: String) { +import scala.collection.mutable.ArrayBuffer +import scala.xml.Node + +import org.eclipse.jetty.servlet.ServletContextHandler +import org.json4s.JsonAST.{JNothing, JValue} + +import org.apache.spark.{Logging, SecurityManager, SparkConf} +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils + +/** + * The top level component of the UI hierarchy that contains the server. + * + * Each WebUI represents a collection of tabs, each of which in turn represents a collection of + * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. + */ +private[spark] abstract class WebUI( + securityManager: SecurityManager, + port: Int, + conf: SparkConf, + basePath: String = "") + extends Logging { + + protected val tabs = ArrayBuffer[WebUITab]() + protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None + protected val localHostName = Utils.localHostName() + protected val publicHostName = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName) + private val className = Utils.getFormattedClassName(this) + + def getTabs: Seq[WebUITab] = tabs.toSeq + def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + + /** Attach a tab to this UI, along with all of its attached pages. */ + def attachTab(tab: WebUITab) { + tab.pages.foreach(attachPage) + tabs += tab + } + + /** Attach a page to this UI. */ + def attachPage(page: WebUIPage) { + val pagePath = "/" + page.prefix + attachHandler(createServletHandler(pagePath, + (request: HttpServletRequest) => page.render(request), securityManager, basePath)) + attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json", + (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)) + } + + /** Attach a handler to this UI. */ + def attachHandler(handler: ServletContextHandler) { + handlers += handler + serverInfo.foreach { info => + info.rootHandler.addHandler(handler) + if (!handler.isStarted) { + handler.start() + } + } + } - /** - * Bind to the HTTP server behind this web interface. - * Overridden implementation should set serverInfo. - */ - def bind() { } + /** Detach a handler from this UI. */ + def detachHandler(handler: ServletContextHandler) { + handlers -= handler + serverInfo.foreach { info => + info.rootHandler.removeHandler(handler) + if (handler.isStarted) { + handler.stop() + } + } + } + + /** Initialize all components of the server. */ + def initialize() + + /** Bind to the HTTP server behind this web interface. */ + def bind() { + assert(!serverInfo.isDefined, "Attempted to bind %s more than once!".format(className)) + try { + serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf)) + logInfo("Started %s at http://%s:%d".format(className, publicHostName, boundPort)) + } catch { + case e: Exception => + logError("Failed to bind %s".format(className), e) + System.exit(1) + } + } /** Return the actual port to which this server is bound. Only valid after bind(). */ def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) /** Stop the server behind this web interface. Only valid after bind(). */ def stop() { - assert(serverInfo.isDefined, "Attempted to stop %s before binding to a server!".format(name)) + assert(serverInfo.isDefined, + "Attempted to stop %s before binding to a server!".format(className)) serverInfo.get.server.stop() } } + /** - * Utilities used throughout the web UI. + * A tab that represents a collection of pages. + * The prefix is appended to the parent address to form a full path, and must not contain slashes. */ -private[spark] object WebUI { - // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. - private val dateFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") +private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) { + val pages = ArrayBuffer[WebUIPage]() + val name = prefix.capitalize + + /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ + def attachPage(page: WebUIPage) { + page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") + pages += page } - def formatDate(date: Date): String = dateFormat.get.format(date) + /** Get a list of header tabs from the parent UI. */ + def headerTabs: Seq[WebUITab] = parent.getTabs +} - def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) - def formatDuration(milliseconds: Long): String = { - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return "%.0f s".format(seconds) - } - val minutes = seconds / 60 - if (minutes < 10) { - return "%.1f min".format(minutes) - } else if (minutes < 60) { - return "%.0f min".format(minutes) - } - val hours = minutes / 60 - "%.1f h".format(hours) - } +/** + * A page that represents the leaf node in the UI hierarchy. + * + * The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab. + * If the parent is a WebUI, the prefix is appended to the parent's address to form a full path. + * Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent + * to form a relative path. The prefix must not contain slashes. + */ +private[spark] abstract class WebUIPage(var prefix: String) { + def render(request: HttpServletRequest): Seq[Node] + def renderJson(request: HttpServletRequest): JValue = JNothing } diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala similarity index 61% rename from core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala rename to core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala index 33df97187ea78..b347eb1b83c1f 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentPage.scala @@ -21,29 +21,12 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler +import org.apache.spark.ui.{UIUtils, WebUIPage} -import org.apache.spark.scheduler._ -import org.apache.spark.ui._ -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.Page.Environment - -private[ui] class EnvironmentUI(parent: SparkUI) { +private[ui] class EnvironmentPage(parent: EnvironmentTab) extends WebUIPage("") { + private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[EnvironmentListener] = None - - private def appName = parent.appName - - lazy val listener = _listener.get - - def start() { - _listener = Some(new EnvironmentListener) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/environment", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( @@ -62,7 +45,7 @@ private[ui] class EnvironmentUI(parent: SparkUI) {

    Classpath Entries

    {classpathEntriesTable} - UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment) + UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent) } private def propertyHeader = Seq("Name", "Value") @@ -71,23 +54,3 @@ private[ui] class EnvironmentUI(parent: SparkUI) { private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} private def classPathRow(data: (String, String)) = {data._1}{data._2} } - -/** - * A SparkListener that prepares information to be displayed on the EnvironmentUI - */ -private[ui] class EnvironmentListener extends SparkListener { - var jvmInformation = Seq[(String, String)]() - var sparkProperties = Seq[(String, String)]() - var systemProperties = Seq[(String, String)]() - var classpathEntries = Seq[(String, String)]() - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - jvmInformation = environmentDetails("JVM Information") - sparkProperties = environmentDetails("Spark Properties") - systemProperties = environmentDetails("System Properties") - classpathEntries = environmentDetails("Classpath Entries") - } - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala new file mode 100644 index 0000000000000..03b46e1bd59af --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.env + +import org.apache.spark.scheduler._ +import org.apache.spark.ui._ + +private[ui] class EnvironmentTab(parent: SparkUI) extends WebUITab(parent, "environment") { + val appName = parent.appName + val basePath = parent.basePath + val listener = new EnvironmentListener + + attachPage(new EnvironmentPage(this)) + parent.registerListener(listener) +} + +/** + * A SparkListener that prepares information to be displayed on the EnvironmentTab + */ +private[ui] class EnvironmentListener extends SparkListener { + var jvmInformation = Seq[(String, String)]() + var sparkProperties = Seq[(String, String)]() + var systemProperties = Seq[(String, String)]() + var classpathEntries = Seq[(String, String)]() + + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + synchronized { + val environmentDetails = environmentUpdate.environmentDetails + jvmInformation = environmentDetails("JVM Information") + sparkProperties = environmentDetails("Spark Properties") + systemProperties = environmentDetails("System Properties") + classpathEntries = environmentDetails("Classpath Entries") + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala similarity index 61% rename from core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala rename to core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala index 77a38a1d3aa7c..c1e69f6cdaffb 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala @@ -19,35 +19,15 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.HashMap import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.ExceptionFailure -import org.apache.spark.scheduler._ -import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.Page.Executors -import org.apache.spark.ui.{SparkUI, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils -private[ui] class ExecutorsUI(parent: SparkUI) { +private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") { + private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[ExecutorsListener] = None - - private def appName = parent.appName - - lazy val listener = _listener.get - - def start() { - _listener = Some(new ExecutorsListener(parent.storageStatusListener)) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/executors", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList @@ -75,8 +55,8 @@ private[ui] class ExecutorsUI(parent: SparkUI) {
    ; - UIUtils.headerSparkPage( - content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) + UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")", + parent.headerTabs, parent) } /** Header fields for the executors table */ @@ -159,55 +139,3 @@ private[ui] class ExecutorsUI(parent: SparkUI) { execFields.zip(execValues).toMap } } - -/** - * A SparkListener that prepares information to be displayed on the ExecutorsUI - */ -private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - - val executorToTasksActive = HashMap[String, Int]() - val executorToTasksComplete = HashMap[String, Int]() - val executorToTasksFailed = HashMap[String, Int]() - val executorToDuration = HashMap[String, Long]() - val executorToShuffleRead = HashMap[String, Long]() - val executorToShuffleWrite = HashMap[String, Long]() - - def storageStatusList = storageStatusListener.storageStatusList - - override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val eid = formatExecutorId(taskStart.taskInfo.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val info = taskEnd.taskInfo - if (info != null) { - val eid = formatExecutorId(info.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 - executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration - taskEnd.reason match { - case e: ExceptionFailure => - executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 - case _ => - executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 - } - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.shuffleReadMetrics.foreach { shuffleRead => - executorToShuffleRead(eid) = - executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead - } - metrics.shuffleWriteMetrics.foreach { shuffleWrite => - executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten - } - } - } - } - - // This addresses executor ID inconsistencies in the local mode - private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) -} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala new file mode 100644 index 0000000000000..5678bf34ac730 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.exec + +import scala.collection.mutable.HashMap + +import org.apache.spark.ExceptionFailure +import org.apache.spark.scheduler._ +import org.apache.spark.storage.StorageStatusListener +import org.apache.spark.ui.{SparkUI, WebUITab} + +private[ui] class ExecutorsTab(parent: SparkUI) extends WebUITab(parent, "executors") { + val appName = parent.appName + val basePath = parent.basePath + val listener = new ExecutorsListener(parent.storageStatusListener) + + attachPage(new ExecutorsPage(this)) + parent.registerListener(listener) +} + +/** + * A SparkListener that prepares information to be displayed on the ExecutorsTab + */ +private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) + extends SparkListener { + + val executorToTasksActive = HashMap[String, Int]() + val executorToTasksComplete = HashMap[String, Int]() + val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() + + def storageStatusList = storageStatusListener.storageStatusList + + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { + val eid = formatExecutorId(taskStart.taskInfo.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + val info = taskEnd.taskInfo + if (info != null) { + val eid = formatExecutorId(info.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 + executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration + taskEnd.reason match { + case e: ExceptionFailure => + executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 + case _ => + executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 + } + + // Update shuffle read/write + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { shuffleRead => + executorToShuffleRead(eid) = + executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead + } + metrics.shuffleWriteMetrics.foreach { shuffleWrite => + executorToShuffleWrite(eid) = + executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten + } + } + } + } + + // This addresses executor ID inconsistencies in the local mode + private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) +} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 73861ae6746da..c83e196c9c156 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.Node +import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing executor summary */ -private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { - private lazy val listener = parent.listener +private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { {k} {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} - {parent.formatDuration(v.taskTime)} + {UIUtils.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 5167e20ea3d7d..0db4afa701b41 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -222,12 +222,10 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener { override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { synchronized { - val schedulingModeName = - environmentUpdate.environmentDetails("Spark Properties").toMap.get("spark.scheduler.mode") - schedulingMode = schedulingModeName match { - case Some(name) => Some(SchedulingMode.withName(name)) - case None => None - } + schedulingMode = environmentUpdate + .environmentDetails("Spark Properties").toMap + .get("spark.scheduler.mode") + .map(SchedulingMode.withName) } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala similarity index 90% rename from core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala rename to core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala index 8619a31380f1e..34ff2ac34a7ca 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressPage.scala @@ -22,25 +22,23 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable -import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class IndexPage(parent: JobProgressUI) { +private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") { + private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler - private def appName = parent.appName - def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled) @@ -59,7 +57,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { // Total duration is not meaningful unless the UI is live
  • Total Duration: - {parent.formatDuration(now - sc.startTime)} + {UIUtils.formatDuration(now - sc.startTime)}
  • }}
  • @@ -94,7 +92,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {

    Failed Stages ({failedStages.size})

    ++ failedStagesTable.toNodeSeq - UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala similarity index 53% rename from core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala rename to core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 30e3f35f2182b..3308c8c8a3d37 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -19,39 +19,28 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils +import org.apache.spark.ui.{SparkUI, WebUITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressUI(parent: SparkUI) { +private[ui] class JobProgressTab(parent: SparkUI) extends WebUITab(parent, "stages") { + val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc - val killEnabled = parent.conf.getBoolean("spark.ui.killEnabled", true) - - lazy val listener = _listener.get - lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - private val indexPage = new IndexPage(this) - private val stagePage = new StagePage(this) - private val poolPage = new PoolPage(this) - private var _listener: Option[JobProgressListener] = None + val conf = if (live) sc.conf else new SparkConf + val killEnabled = conf.getBoolean("spark.ui.killEnabled", true) + val listener = new JobProgressListener(conf) - def appName = parent.appName + attachPage(new JobProgressPage(this)) + attachPage(new StagePage(this)) + attachPage(new PoolPage(this)) + parent.registerListener(listener) - def start() { - val conf = if (live) sc.conf else new SparkConf - _listener = Some(new JobProgressListener(conf)) - } - - def formatDuration(ms: Long) = Utils.msDurationToString(ms) + def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - private def handleKillRequest(request: HttpServletRequest) = { + def handleKillRequest(request: HttpServletRequest) = { if (killEnabled) { val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt @@ -64,14 +53,4 @@ private[ui] class JobProgressUI(parent: SparkUI) { Thread.sleep(100) } } - - def getHandlers = Seq[ServletContextHandler]( - createRedirectHandler("/stages/stage/kill", "/stages", handleKillRequest), - createServletHandler("/stages/stage", - (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath), - createServletHandler("/stages/pool", - (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath), - createServletHandler("/stages", - (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 3638e6035ba81..fd83d37583967 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -22,17 +22,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} -import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressUI) { +private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") { + private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener - - private def appName = parent.appName + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { @@ -52,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressUI) {

    Summary

    ++ poolTable.toNodeSeq ++

    {activeStages.size} Active Stages

    ++ activeStagesTable.toNodeSeq - UIUtils.headerSparkPage( - content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName, + parent.headerTabs, parent) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index c5c8d8668740b..f4b68f241966d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { +private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private val basePath = parent.basePath - private val poolToActiveStages = listener.poolToActiveStages - private lazy val listener = parent.listener + private val listener = parent.listener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { SchedulingMode - {rows.map(r => makeRow(r, poolToActiveStages))} + {rows.map(r => makeRow(r, listener.poolToActiveStages))} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index b6c3e3cf45163..4bce472036f7d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,17 +22,14 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.Page._ -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressUI) { +private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") { + private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.listener - private lazy val sc = parent.sc - - private def appName = parent.appName + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { @@ -44,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressUI) {

    Summary Metrics

    No tasks have started yet

    Tasks

    No tasks have started yet
  • - return UIUtils.headerSparkPage( - content, basePath, appName, "Details for Stage %s".format(stageId), Stages) + return UIUtils.headerSparkPage(content, basePath, appName, + "Details for Stage %s".format(stageId), parent.headerTabs, parent) } val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) @@ -60,7 +57,7 @@ private[ui] class StagePage(parent: JobProgressUI) { val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 var activeTime = 0L - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val tasksActive = listener.stageIdToTasksActive(stageId).values tasksActive.foreach(activeTime += _.timeRunning(now)) @@ -70,7 +67,7 @@ private[ui] class StagePage(parent: JobProgressUI) {
    • Total task time across all tasks: - {parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)} + {UIUtils.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
    • {if (hasShuffleRead)
    • @@ -121,13 +118,13 @@ private[ui] class StagePage(parent: JobProgressUI) { } val serializationQuantiles = "Result serialization time" +: Distribution(serializationTimes). - get.getQuantiles().map(ms => parent.formatDuration(ms.toLong)) + get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong)) val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.executorRunTime.toDouble } val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles() - .map(ms => parent.formatDuration(ms.toLong)) + .map(ms => UIUtils.formatDuration(ms.toLong)) val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) => if (info.gettingResultTime > 0) { @@ -138,7 +135,7 @@ private[ui] class StagePage(parent: JobProgressUI) { } val gettingResultQuantiles = "Time spent fetching task results" +: Distribution(gettingResultTimes).get.getQuantiles().map { millis => - parent.formatDuration(millis.toLong) + UIUtils.formatDuration(millis.toLong) } // The scheduler delay includes the network delay to send the task to the worker // machine and to send back the result (but not the time to fetch the task result, @@ -155,7 +152,7 @@ private[ui] class StagePage(parent: JobProgressUI) { } val schedulerDelayQuantiles = "Scheduler delay" +: Distribution(schedulerDelays).get.getQuantiles().map { millis => - parent.formatDuration(millis.toLong) + UIUtils.formatDuration(millis.toLong) } def getQuantileCols(data: Seq[Double]) = @@ -206,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressUI) {

      Aggregated Metrics by Executor

      ++ executorTable.toNodeSeq ++

      Tasks

      ++ taskTable - UIUtils.headerSparkPage( - content, basePath, appName, "Details for Stage %d".format(stageId), Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId), + parent.headerTabs, parent) } } @@ -219,8 +216,8 @@ private[ui] class StagePage(parent: JobProgressUI) { taskData match { case TaskUIData(info, metrics, exception) => val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis()) else metrics.map(_.executorRunTime).getOrElse(1L) - val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration) - else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("") + val formatDuration = if (info.status == "RUNNING") UIUtils.formatDuration(duration) + else metrics.map(m => UIUtils.formatDuration(m.executorRunTime)).getOrElse("") val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val serializationTime = metrics.map(_.resultSerializationTime).getOrElse(0L) @@ -235,8 +232,8 @@ private[ui] class StagePage(parent: JobProgressUI) { val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") - val writeTimeReadable = maybeWriteTime.map( t => t / (1000 * 1000)).map { ms => - if (ms == 0) "" else parent.formatDuration(ms) + val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms => + if (ms == 0) "" else UIUtils.formatDuration(ms) }.getOrElse("") val maybeMemoryBytesSpilled = metrics.map(_.memoryBytesSpilled) @@ -254,15 +251,15 @@ private[ui] class StagePage(parent: JobProgressUI) { {info.status} {info.taskLocality} {info.host} - {WebUI.formatDate(new Date(info.launchTime))} + {UIUtils.formatDate(new Date(info.launchTime))} {formatDuration} - {if (gcTime > 0) parent.formatDuration(gcTime) else ""} + {if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""} - {if (serializationTime > 0) parent.formatDuration(serializationTime) else ""} + {if (serializationTime > 0) UIUtils.formatDuration(serializationTime) else ""} {if (shuffleRead) { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index e419fae5a6589..8c5b1f55fd2dc 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -23,17 +23,17 @@ import scala.collection.mutable.HashMap import scala.xml.Node import org.apache.spark.scheduler.{StageInfo, TaskInfo} -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ private[ui] class StageTable( - stages: Seq[StageInfo], - parent: JobProgressUI, - killEnabled: Boolean = false) { + stages: Seq[StageInfo], + parent: JobProgressTab, + killEnabled: Boolean = false) { private val basePath = parent.basePath - private lazy val listener = parent.listener + private val listener = parent.listener private lazy val isFairScheduler = parent.isFairScheduler def toNodeSeq: Seq[Node] = { @@ -89,25 +89,23 @@ private[ui] class StageTable( {s.name} - val description = listener.stageIdToDescription.get(s.stageId) + listener.stageIdToDescription.get(s.stageId) .map(d =>
      {d}
      {nameLink} {killLink}
      ) .getOrElse(
      {killLink}{nameLink}
      ) - - return description } /** Render an HTML row that represents a stage */ private def stageRow(s: StageInfo): Seq[Node] = { val poolName = listener.stageIdToPool.get(s.stageId) val submissionTime = s.submissionTime match { - case Some(t) => WebUI.formatDate(new Date(t)) + case Some(t) => UIUtils.formatDate(new Date(t)) case None => "Unknown" } val finishTime = s.completionTime.getOrElse(System.currentTimeMillis) val duration = s.submissionTime.map { t => if (finishTime > t) finishTime - t else System.currentTimeMillis - t } - val formattedDuration = duration.map(d => parent.formatDuration(d)).getOrElse("Unknown") + val formattedDuration = duration.map(d => UIUtils.formatDuration(d)).getOrElse("Unknown") val startedTasks = listener.stageIdToTasksActive.getOrElse(s.stageId, HashMap[Long, TaskInfo]()).size val completedTasks = listener.stageIdToTasksComplete.getOrElse(s.stageId, 0) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 75ee9976d7b5f..d07f1c9b20fcf 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -22,23 +22,22 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ -private[ui] class RDDPage(parent: BlockManagerUI) { +private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") { + private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.listener - - private def appName = parent.appName + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { val rddId = request.getParameter("id").toInt val storageStatusList = listener.storageStatusList val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage) + return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", + parent.headerTabs, parent) } // Worker table @@ -96,8 +95,8 @@ private[ui] class RDDPage(parent: BlockManagerUI) { ; - UIUtils.headerSparkPage( - content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage) + UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name, + parent.headerTabs, parent) } /** Header fields for the worker table */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala similarity index 90% rename from core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala rename to core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala index 4f6acc30a88c4..b66edd91f56c0 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StoragePage.scala @@ -22,22 +22,19 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{WebUIPage, UIUtils} import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ -private[ui] class IndexPage(parent: BlockManagerUI) { +private[ui] class StoragePage(parent: StorageTab) extends WebUIPage("") { + private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.listener - - private def appName = parent.appName + private val listener = parent.listener def render(request: HttpServletRequest): Seq[Node] = { - val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) - UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) + UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent) } /** Header fields for the RDD table */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala similarity index 75% rename from core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala rename to core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala index 16996a2da1e72..56429f6c07fcd 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/StorageTab.scala @@ -17,45 +17,27 @@ package org.apache.spark.ui.storage -import javax.servlet.http.HttpServletRequest - import scala.collection.mutable -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.ui._ -import org.apache.spark.ui.JettyUtils._ import org.apache.spark.scheduler._ import org.apache.spark.storage.{RDDInfo, StorageStatusListener, StorageUtils} /** Web UI showing storage status of all RDD's in the given SparkContext. */ -private[ui] class BlockManagerUI(parent: SparkUI) { +private[ui] class StorageTab(parent: SparkUI) extends WebUITab(parent, "storage") { + val appName = parent.appName val basePath = parent.basePath + val listener = new StorageListener(parent.storageStatusListener) - private val indexPage = new IndexPage(this) - private val rddPage = new RDDPage(this) - private var _listener: Option[BlockManagerListener] = None - - lazy val listener = _listener.get - - def appName = parent.appName - - def start() { - _listener = Some(new BlockManagerListener(parent.storageStatusListener)) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/storage/rdd", - (request: HttpServletRequest) => rddPage.render(request), parent.securityManager, basePath), - createServletHandler("/storage", - (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) + attachPage(new StoragePage(this)) + attachPage(new RddPage(this)) + parent.registerListener(listener) } /** * A SparkListener that prepares information to be displayed on the BlockManagerUI */ -private[ui] class BlockManagerListener(storageStatusListener: StorageStatusListener) +private[ui] class StorageListener(storageStatusListener: StorageStatusListener) extends SparkListener { private val _rddInfoMap = mutable.Map[Int, RDDInfo]() diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index f2396f7c80a35..465835ea7fe29 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -88,30 +88,27 @@ private[spark] object JsonProtocol { def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = { val taskInfo = taskStart.taskInfo - val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing ("Event" -> Utils.getFormattedClassName(taskStart)) ~ ("Stage ID" -> taskStart.stageId) ~ - ("Task Info" -> taskInfoJson) + ("Task Info" -> taskInfoToJson(taskInfo)) } def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = { val taskInfo = taskGettingResult.taskInfo - val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing ("Event" -> Utils.getFormattedClassName(taskGettingResult)) ~ - ("Task Info" -> taskInfoJson) + ("Task Info" -> taskInfoToJson(taskInfo)) } def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = { val taskEndReason = taskEndReasonToJson(taskEnd.reason) val taskInfo = taskEnd.taskInfo - val taskInfoJson = if (taskInfo != null) taskInfoToJson(taskInfo) else JNothing val taskMetrics = taskEnd.taskMetrics val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing ("Event" -> Utils.getFormattedClassName(taskEnd)) ~ ("Stage ID" -> taskEnd.stageId) ~ ("Task Type" -> taskEnd.taskType) ~ ("Task End Reason" -> taskEndReason) ~ - ("Task Info" -> taskInfoJson) ~ + ("Task Info" -> taskInfoToJson(taskInfo)) ~ ("Task Metrics" -> taskMetricsJson) } @@ -505,6 +502,9 @@ private[spark] object JsonProtocol { } def taskMetricsFromJson(json: JValue): TaskMetrics = { + if (json == JNothing) { + return TaskMetrics.empty + } val metrics = new TaskMetrics metrics.hostname = (json \ "Host Name").extract[String] metrics.executorDeserializeTime = (json \ "Executor Deserialize Time").extract[Long] diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala index 2f9739f940dc6..b85c483ca2a08 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala @@ -18,16 +18,81 @@ package org.apache.spark.ui import java.net.ServerSocket +import javax.servlet.http.HttpServletRequest +import scala.io.Source import scala.util.{Failure, Success, Try} import org.eclipse.jetty.server.Server import org.eclipse.jetty.servlet.ServletContextHandler import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ -import org.apache.spark.SparkConf +import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.LocalSparkContext._ +import scala.xml.Node class UISuite extends FunSuite { + + test("basic ui visibility") { + withSpark(new SparkContext("local", "test")) { sc => + // test if the ui is visible, and all the expected tabs are visible + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + assert(html.toLowerCase.contains("stages")) + assert(html.toLowerCase.contains("storage")) + assert(html.toLowerCase.contains("environment")) + assert(html.toLowerCase.contains("executors")) + } + } + } + + test("visibility at localhost:4040") { + withSpark(new SparkContext("local", "test")) { sc => + // test if visible from http://localhost:4040 + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL("http://localhost:4040").mkString + assert(html.toLowerCase.contains("stages")) + } + } + } + + test("attaching a new tab") { + withSpark(new SparkContext("local", "test")) { sc => + val sparkUI = sc.ui + + val newTab = new WebUITab(sparkUI, "foo") { + attachPage(new WebUIPage("") { + def render(request: HttpServletRequest): Seq[Node] = { + "html magic" + } + }) + } + sparkUI.attachTab(newTab) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + + // check whether new page exists + assert(html.toLowerCase.contains("foo")) + + // check whether other pages still exist + assert(html.toLowerCase.contains("stages")) + assert(html.toLowerCase.contains("storage")) + assert(html.toLowerCase.contains("environment")) + assert(html.toLowerCase.contains("executors")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(sc.ui.appUIAddress.stripSuffix("/") + "/foo").mkString + // check whether new page exists + assert(html.contains("magic")) + } + } + } + test("jetty port increases under contention") { val startPort = 4040 val server = new Server(startPort) @@ -60,4 +125,18 @@ class UISuite extends FunSuite { case Failure(e) => } } + + test("verify appUIAddress contains the scheme") { + withSpark(new SparkContext("local", "test")) { sc => + val uiAddress = sc.ui.appUIAddress + assert(uiAddress.equals("http://" + sc.ui.appUIHostPort)) + } + } + + test("verify appUIAddress contains the port") { + withSpark(new SparkContext("local", "test")) { sc => + val splitUIAddress = sc.ui.appUIAddress.split(':') + assert(splitUIAddress(2).toInt == sc.ui.boundPort) + } + } } diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index f75297a02dc8b..16470bb7bf60d 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -523,8 +523,8 @@ class JsonProtocolSuite extends FunSuite { 700,"Fetch Wait Time":900,"Remote Bytes Read":1000},"Shuffle Write Metrics": {"Shuffle Bytes Written":1200,"Shuffle Write Time":1500},"Updated Blocks": [{"Block ID":{"Type":"RDDBlockId","RDD ID":0,"Split Index":0},"Status": - {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":false, - "Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} + {"Storage Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false, + "Deserialized":false,"Replication":2},"Memory Size":0,"Disk Size":0,"Tachyon Size":0}}]}} """ private val jobStartJsonString = diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index 5ea4817bfde18..9cb31d70444ff 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -60,6 +60,7 @@ object MimaBuild { Seq( excludePackage("org.apache.spark.api.java"), excludePackage("org.apache.spark.streaming.api.java"), + excludePackage("org.apache.spark.streaming.scheduler"), excludePackage("org.apache.spark.mllib") ) ++ excludeSparkClass("rdd.ClassTags") ++ @@ -70,7 +71,12 @@ object MimaBuild { excludeSparkClass("mllib.regression.LassoWithSGD") ++ excludeSparkClass("mllib.regression.LinearRegressionWithSGD") ++ excludeSparkClass("streaming.dstream.NetworkReceiver") ++ - excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") + excludeSparkClass("streaming.dstream.NetworkReceiver#NetworkReceiverActor") ++ + excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++ + excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++ + excludeSparkClass("streaming.dstream.ReportError") ++ + excludeSparkClass("streaming.dstream.ReportBlock") ++ + excludeSparkClass("streaming.dstream.DStream") case _ => Seq() } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index a4e236c65ff86..ff5d0aaa3d0bd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -17,29 +17,28 @@ package org.apache.spark.streaming -import scala.collection.mutable.Queue -import scala.collection.Map -import scala.reflect.ClassTag - import java.io.InputStream import java.util.concurrent.atomic.AtomicInteger -import akka.actor.Props -import akka.actor.SupervisorStrategy -import org.apache.hadoop.io.LongWritable -import org.apache.hadoop.io.Text +import scala.collection.Map +import scala.collection.mutable.Queue +import scala.reflect.ClassTag + +import akka.actor.{Props, SupervisorStrategy} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.{LongWritable, Text} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat -import org.apache.hadoop.fs.Path import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.MetadataCleaner import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receivers._ import org.apache.spark.streaming.scheduler._ -import org.apache.hadoop.conf.Configuration +import org.apache.spark.streaming.ui.StreamingTab +import org.apache.spark.util.MetadataCleaner /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -158,6 +157,8 @@ class StreamingContext private[streaming] ( private[streaming] val waiter = new ContextWaiter + private[streaming] val uiTab = new StreamingTab(this) + /** Enumeration to identify current state of the StreamingContext */ private[streaming] object StreamingContextState extends Enumeration { type CheckpointState = Value diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index d043200f71a0b..a7e5215437e54 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -353,15 +353,6 @@ abstract class DStream[T: ClassTag] ( dependencies.foreach(_.clearMetadata(time)) } - /* Adds metadata to the Stream while it is running. - * This method should be overwritten by sublcasses of InputDStream. - */ - private[streaming] def addMetadata(metadata: Any) { - if (metadata != null) { - logInfo("Dropping Metadata: " + metadata.toString) - } - } - /** * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala index d19a635fe8eca..5a249706b4d2f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,24 +17,23 @@ package org.apache.spark.streaming.dstream -import java.util.concurrent.{TimeUnit, ArrayBlockingQueue} import java.nio.ByteBuffer +import java.util.concurrent.{ArrayBlockingQueue, TimeUnit} -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.Await -import scala.concurrent.duration._ import scala.reflect.ClassTag -import akka.actor.{Props, Actor} +import akka.actor.{Actor, Props} import akka.pattern.ask -import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} -import org.apache.spark.streaming._ import org.apache.spark.{Logging, SparkEnv} -import org.apache.spark.rdd.{RDD, BlockRDD} +import org.apache.spark.rdd.{BlockRDD, RDD} import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId} -import org.apache.spark.streaming.scheduler.{DeregisterReceiver, AddBlocks, RegisterReceiver} -import org.apache.spark.util.AkkaUtils +import org.apache.spark.streaming._ +import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver} +import org.apache.spark.streaming.util.{RecurringTimer, SystemClock} +import org.apache.spark.util.{AkkaUtils, Utils} /** * Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] @@ -49,8 +48,10 @@ import org.apache.spark.util.AkkaUtils abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { - // This is an unique identifier that is used to match the network receiver with the - // corresponding network input stream. + /** Keeps all received blocks information */ + private lazy val receivedBlockInfo = new HashMap[Time, Array[ReceivedBlockInfo]] + + /** This is an unique identifier for the network input stream. */ val id = ssc.getNewNetworkStreamId() /** @@ -65,25 +66,44 @@ abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingConte def stop() {} + /** Ask NetworkInputTracker for received data blocks and generates RDDs with them. */ override def compute(validTime: Time): Option[RDD[T]] = { // If this is called for any time before the start time of the context, // then this returns an empty RDD. This may happen when recovering from a // master failure if (validTime >= graph.startTime) { - val blockIds = ssc.scheduler.networkInputTracker.getBlocks(id, validTime) + val blockInfo = ssc.scheduler.networkInputTracker.getReceivedBlockInfo(id) + receivedBlockInfo(validTime) = blockInfo + val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId]) Some(new BlockRDD[T](ssc.sc, blockIds)) } else { Some(new BlockRDD[T](ssc.sc, Array[BlockId]())) } } + + /** Get information on received blocks. */ + private[streaming] def getReceivedBlockInfo(time: Time) = { + receivedBlockInfo(time) + } + + /** + * Clear metadata that are older than `rememberDuration` of this DStream. + * This is an internal method that should not be called directly. This + * implementation overrides the default implementation to clear received + * block information. + */ + private[streaming] override def clearMetadata(time: Time) { + super.clearMetadata(time) + val oldReceivedBlocks = receivedBlockInfo.filter(_._1 <= (time - rememberDuration)) + receivedBlockInfo --= oldReceivedBlocks.keys + logDebug("Cleared " + oldReceivedBlocks.size + " RDDs that were older than " + + (time - rememberDuration) + ": " + oldReceivedBlocks.keys.mkString(", ")) + } } private[streaming] sealed trait NetworkReceiverMessage -private[streaming] case class StopReceiver() extends NetworkReceiverMessage -private[streaming] case class ReportBlock(blockId: BlockId, metadata: Any) - extends NetworkReceiverMessage -private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage +private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverMessage /** * Abstract class of a receiver that can be run on worker nodes to receive external data. See @@ -177,6 +197,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging case (e, i) => "Exception " + i + ": " + e.getMessage + "\n" + e.getStackTraceString }.mkString("\n") } + logInfo("Deregistering receiver " + streamId) val future = trackerActor.ask(DeregisterReceiver(streamId, message))(askTimeout) Await.result(future, askTimeout) @@ -209,18 +230,28 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging /** * Push a block (as an ArrayBuffer filled with data) into the block manager. */ - def pushBlock(blockId: BlockId, arrayBuffer: ArrayBuffer[T], metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + arrayBuffer: ArrayBuffer[T], + metadata: Any, + level: StorageLevel + ) { env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level) - trackerActor ! AddBlocks(streamId, Array(blockId), metadata) + trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata)) logDebug("Pushed block " + blockId) } /** * Push a block (as bytes) into the block manager. */ - def pushBlock(blockId: BlockId, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { + def pushBlock( + blockId: StreamBlockId, + bytes: ByteBuffer, + metadata: Any, + level: StorageLevel + ) { env.blockManager.putBytes(blockId, bytes, level) - trackerActor ! AddBlocks(streamId, Array(blockId), metadata) + trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata)) } /** Set the ID of the DStream that this receiver is associated with */ @@ -232,9 +263,11 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging private class NetworkReceiverActor extends Actor { override def preStart() { - logInfo("Registered receiver " + streamId) - val future = trackerActor.ask(RegisterReceiver(streamId, self))(askTimeout) + val msg = RegisterReceiver( + streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self) + val future = trackerActor.ask(msg)(askTimeout) Await.result(future, askTimeout) + logInfo("Registered receiver " + streamId) } override def receive() = { @@ -253,7 +286,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging class BlockGenerator(storageLevel: StorageLevel) extends Serializable with Logging { - case class Block(id: BlockId, buffer: ArrayBuffer[T], metadata: Any = null) + case class Block(id: StreamBlockId, buffer: ArrayBuffer[T], metadata: Any = null) val clock = new SystemClock() val blockInterval = env.conf.getLong("spark.streaming.blockInterval", 200) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala index 7f3cd2f8eb1fd..9c69a2a4e21f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Time */ case class BatchInfo( batchTime: Time, + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]], submissionTime: Long, processingStartTime: Option[Long], processingEndTime: Option[Long] diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 92d885c4bc5a5..e564eccba2df5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -201,7 +201,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { logInfo("Batches to reschedule (" + timesToReschedule.size + " batches): " + timesToReschedule.mkString(", ")) timesToReschedule.foreach(time => - jobScheduler.runJobs(time, graph.generateJobs(time)) + jobScheduler.submitJobSet(JobSet(time, graph.generateJobs(time))) ) // Restart the timer @@ -214,7 +214,12 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { SparkEnv.set(ssc.env) Try(graph.generateJobs(time)) match { case Success(jobs) => - jobScheduler.runJobs(time, jobs) + val receivedBlockInfo = graph.getNetworkInputStreams.map { stream => + val streamId = stream.id + val receivedBlockInfo = stream.getReceivedBlockInfo(time) + (streamId, receivedBlockInfo) + }.toMap + jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 04e0a6a283cfb..d9ada99b472ac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -100,14 +100,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { logInfo("Stopped JobScheduler") } - def runJobs(time: Time, jobs: Seq[Job]) { - if (jobs.isEmpty) { - logInfo("No jobs added for time " + time) + def submitJobSet(jobSet: JobSet) { + if (jobSet.jobs.isEmpty) { + logInfo("No jobs added for time " + jobSet.time) } else { - val jobSet = new JobSet(time, jobs) - jobSets.put(time, jobSet) + jobSets.put(jobSet.time, jobSet) jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job))) - logInfo("Added jobs for time " + time) + logInfo("Added jobs for time " + jobSet.time) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala index fcf303aee6cd7..a69d74362173e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobSet.scala @@ -24,7 +24,11 @@ import org.apache.spark.streaming.Time * belong to the same batch. */ private[streaming] -case class JobSet(time: Time, jobs: Seq[Job]) { +case class JobSet( + time: Time, + jobs: Seq[Job], + receivedBlockInfo: Map[Int, Array[ReceivedBlockInfo]] = Map.empty + ) { private val incompleteJobs = new HashSet[Job]() private val submissionTime = System.currentTimeMillis() // when this jobset was submitted @@ -60,6 +64,7 @@ case class JobSet(time: Time, jobs: Seq[Job]) { def toBatchInfo: BatchInfo = { new BatchInfo( time, + receivedBlockInfo, submissionTime, if (processingStartTime >= 0 ) Some(processingStartTime) else None, if (processingEndTime >= 0 ) Some(processingEndTime) else None diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala index 067e804202236..a1e6f5176825a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala @@ -17,20 +17,42 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{HashMap, Queue, SynchronizedMap} +import scala.collection.mutable.{HashMap, SynchronizedMap, SynchronizedQueue} import akka.actor._ + import org.apache.spark.{Logging, SparkEnv, SparkException} import org.apache.spark.SparkContext._ -import org.apache.spark.storage.BlockId +import org.apache.spark.storage.StreamBlockId import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver} import org.apache.spark.util.AkkaUtils +/** Information about receiver */ +case class ReceiverInfo(streamId: Int, typ: String, location: String) { + override def toString = s"$typ-$streamId" +} + +/** Information about blocks received by the network receiver */ +case class ReceivedBlockInfo( + streamId: Int, + blockId: StreamBlockId, + numRecords: Long, + metadata: Any + ) + +/** + * Messages used by the NetworkReceiver and the NetworkInputTracker to communicate + * with each other. + */ private[streaming] sealed trait NetworkInputTrackerMessage -private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef) - extends NetworkInputTrackerMessage -private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) +private[streaming] case class RegisterReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef + ) extends NetworkInputTrackerMessage +private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage @@ -47,9 +69,10 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { val networkInputStreamMap = Map(networkInputStreams.map(x => (x.id, x)): _*) val receiverExecutor = new ReceiverExecutor() val receiverInfo = new HashMap[Int, ActorRef] with SynchronizedMap[Int, ActorRef] - val receivedBlockIds = new HashMap[Int, Queue[BlockId]] with SynchronizedMap[Int, Queue[BlockId]] + val receivedBlockInfo = new HashMap[Int, SynchronizedQueue[ReceivedBlockInfo]] + with SynchronizedMap[Int, SynchronizedQueue[ReceivedBlockInfo]] val timeout = AkkaUtils.askTimeout(ssc.conf) - + val listenerBus = ssc.scheduler.listenerBus // actor is created when generator starts. // This not being null means the tracker has been started and not stopped @@ -83,12 +106,32 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { } } + /** Return all the blocks received from a receiver. */ + def getReceivedBlockInfo(streamId: Int): Array[ReceivedBlockInfo] = { + val receivedBlockInfo = getReceivedBlockInfoQueue(streamId).dequeueAll(x => true) + logInfo("Stream " + streamId + " received " + receivedBlockInfo.size + " blocks") + receivedBlockInfo.toArray + } + + private def getReceivedBlockInfoQueue(streamId: Int) = { + receivedBlockInfo.getOrElseUpdate(streamId, new SynchronizedQueue[ReceivedBlockInfo]) + } + /** Register a receiver */ - def registerReceiver(streamId: Int, receiverActor: ActorRef, sender: ActorRef) { + def registerReceiver( + streamId: Int, + typ: String, + host: String, + receiverActor: ActorRef, + sender: ActorRef + ) { if (!networkInputStreamMap.contains(streamId)) { throw new Exception("Register received for unexpected id " + streamId) } receiverInfo += ((streamId, receiverActor)) + ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted( + ReceiverInfo(streamId, typ, host) + )) logInfo("Registered receiver for network stream " + streamId + " from " + sender.path.address) } @@ -98,35 +141,26 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging { logError("Deregistered receiver for network stream " + streamId + " with message:\n" + message) } - /** Get all the received blocks for the given stream. */ - def getBlocks(streamId: Int, time: Time): Array[BlockId] = { - val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]()) - val result = queue.dequeueAll(x => true).toArray - logInfo("Stream " + streamId + " received " + result.size + " blocks") - result - } - /** Add new blocks for the given stream */ - def addBlocks(streamId: Int, blockIds: Seq[BlockId], metadata: Any) = { - val queue = receivedBlockIds.getOrElseUpdate(streamId, new Queue[BlockId]) - queue ++= blockIds - networkInputStreamMap(streamId).addMetadata(metadata) - logDebug("Stream " + streamId + " received new blocks: " + blockIds.mkString("[", ", ", "]")) + def addBlocks(receivedBlockInfo: ReceivedBlockInfo) { + getReceivedBlockInfoQueue(receivedBlockInfo.streamId) += receivedBlockInfo + logDebug("Stream " + receivedBlockInfo.streamId + " received new blocks: " + + receivedBlockInfo.blockId) } /** Check if any blocks are left to be processed */ def hasMoreReceivedBlockIds: Boolean = { - !receivedBlockIds.forall(_._2.isEmpty) + !receivedBlockInfo.values.forall(_.isEmpty) } /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { - case RegisterReceiver(streamId, receiverActor) => - registerReceiver(streamId, receiverActor, sender) + case RegisterReceiver(streamId, typ, host, receiverActor) => + registerReceiver(streamId, typ, host, receiverActor, sender) sender ! true - case AddBlocks(streamId, blockIds, metadata) => - addBlocks(streamId, blockIds, metadata) + case AddBlock(receivedBlockInfo) => + addBlocks(receivedBlockInfo) case DeregisterReceiver(streamId, message) => deregisterReceiver(streamId, message) sender ! true diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 461ea3506477f..5db40ebbeb1de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -23,8 +23,11 @@ import org.apache.spark.util.Distribution /** Base trait for events related to StreamingListener */ sealed trait StreamingListenerEvent +case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent +case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) + extends StreamingListenerEvent /** An event used in the listener to shutdown the listener daemon thread. */ private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent @@ -34,14 +37,17 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen * computation. */ trait StreamingListener { - /** - * Called when processing of a batch has completed - */ + + /** Called when a receiver has been started */ + def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { } + + /** Called when a batch of jobs has been submitted for processing. */ + def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { } + + /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } - /** - * Called when processing of a batch has started - */ + /** Called when processing of a batch of jobs has started. */ def onBatchStarted(batchStarted: StreamingListenerBatchStarted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 18811fc2b01d8..ea03dfc7bfeea 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging { while (true) { val event = eventQueue.take event match { + case receiverStarted: StreamingListenerReceiverStarted => + listeners.foreach(_.onReceiverStarted(receiverStarted)) + case batchSubmitted: StreamingListenerBatchSubmitted => + listeners.foreach(_.onBatchSubmitted(batchSubmitted)) case batchStarted: StreamingListenerBatchStarted => listeners.foreach(_.onBatchStarted(batchStarted)) case batchCompleted: StreamingListenerBatchCompleted => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala new file mode 100644 index 0000000000000..8b025b09ed34d --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import org.apache.spark.streaming.{Time, StreamingContext} +import org.apache.spark.streaming.scheduler._ +import scala.collection.mutable.{Queue, HashMap} +import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted +import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted +import org.apache.spark.streaming.scheduler.BatchInfo +import org.apache.spark.streaming.scheduler.ReceiverInfo +import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted +import org.apache.spark.util.Distribution + + +private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener { + + private val waitingBatchInfos = new HashMap[Time, BatchInfo] + private val runningBatchInfos = new HashMap[Time, BatchInfo] + private val completedaBatchInfos = new Queue[BatchInfo] + private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100) + private var totalCompletedBatches = 0L + private val receiverInfos = new HashMap[Int, ReceiverInfo] + + val batchDuration = ssc.graph.batchDuration.milliseconds + + override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = { + synchronized { + receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo) + } + } + + override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized { + runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo + } + + override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized { + runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo + waitingBatchInfos.remove(batchStarted.batchInfo.batchTime) + } + + override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized { + waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime) + runningBatchInfos.remove(batchCompleted.batchInfo.batchTime) + completedaBatchInfos.enqueue(batchCompleted.batchInfo) + if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue() + totalCompletedBatches += 1L + } + + def numNetworkReceivers = synchronized { + ssc.graph.getNetworkInputStreams().size + } + + def numTotalCompletedBatches: Long = synchronized { + totalCompletedBatches + } + + def numUnprocessedBatches: Long = synchronized { + waitingBatchInfos.size + runningBatchInfos.size + } + + def waitingBatches: Seq[BatchInfo] = synchronized { + waitingBatchInfos.values.toSeq + } + + def runningBatches: Seq[BatchInfo] = synchronized { + runningBatchInfos.values.toSeq + } + + def retainedCompletedBatches: Seq[BatchInfo] = synchronized { + completedaBatchInfos.toSeq + } + + def processingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.processingDelay) + } + + def schedulingDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.schedulingDelay) + } + + def totalDelayDistribution: Option[Distribution] = synchronized { + extractDistribution(_.totalDelay) + } + + def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized { + val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit) + val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo) + (0 until numNetworkReceivers).map { receiverId => + val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo => + batchInfo.get(receiverId).getOrElse(Array.empty) + } + val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo => + // calculate records per second for each batch + blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration + } + val distributionOption = Distribution(recordsOfParticularReceiver) + (receiverId, distributionOption) + }.toMap + } + + def lastReceivedBatchRecords: Map[Int, Long] = { + val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo) + lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => + (0 until numNetworkReceivers).map { receiverId => + (receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum) + }.toMap + }.getOrElse { + (0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap + } + } + + def receiverInfo(receiverId: Int): Option[ReceiverInfo] = { + receiverInfos.get(receiverId) + } + + def lastCompletedBatch: Option[BatchInfo] = { + completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption + } + + def lastReceivedBatch: Option[BatchInfo] = { + retainedBatches.lastOption + } + + private def retainedBatches: Seq[BatchInfo] = synchronized { + (waitingBatchInfos.values.toSeq ++ + runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering) + } + + private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = { + Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble)) + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala new file mode 100644 index 0000000000000..6607437db560a --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.ui + +import java.util.Calendar +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.Logging +import org.apache.spark.ui._ +import org.apache.spark.ui.UIUtils._ +import org.apache.spark.util.Distribution + +/** Page for Spark Web UI that shows statistics of a streaming job */ +private[ui] class StreamingPage(parent: StreamingTab) + extends WebUIPage("") with Logging { + + private val listener = parent.listener + private val startTime = Calendar.getInstance().getTime() + private val emptyCell = "-" + + /** Render the page */ + def render(request: HttpServletRequest): Seq[Node] = { + val content = + generateBasicStats() ++

      ++ +

      Statistics over last {listener.retainedCompletedBatches.size} processed batches

      ++ + generateNetworkStatsTable() ++ + generateBatchStatsTable() + UIUtils.headerSparkPage( + content, parent.basePath, parent.appName, "Streaming", parent.headerTabs, parent, Some(5000)) + } + + /** Generate basic stats of the streaming program */ + private def generateBasicStats(): Seq[Node] = { + val timeSinceStart = System.currentTimeMillis() - startTime.getTime +
        +
      • + Started at: {startTime.toString} +
      • +
      • + Time since start: {formatDurationVerbose(timeSinceStart)} +
      • +
      • + Network receivers: {listener.numNetworkReceivers} +
      • +
      • + Batch interval: {formatDurationVerbose(listener.batchDuration)} +
      • +
      • + Processed batches: {listener.numTotalCompletedBatches} +
      • +
      • + Waiting batches: {listener.numUnprocessedBatches} +
      • +
      + } + + /** Generate stats of data received over the network the streaming program */ + private def generateNetworkStatsTable(): Seq[Node] = { + val receivedRecordDistributions = listener.receivedRecordsDistributions + val lastBatchReceivedRecord = listener.lastReceivedBatchRecords + val table = if (receivedRecordDistributions.size > 0) { + val headerRow = Seq( + "Receiver", + "Location", + "Records in last batch\n[" + formatDate(Calendar.getInstance().getTime()) + "]", + "Minimum rate\n[records/sec]", + "25th percentile rate\n[records/sec]", + "Median rate\n[records/sec]", + "75th percentile rate\n[records/sec]", + "Maximum rate\n[records/sec]" + ) + val dataRows = (0 until listener.numNetworkReceivers).map { receiverId => + val receiverInfo = listener.receiverInfo(receiverId) + val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId") + val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell) + val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId)) + val receivedRecordStats = receivedRecordDistributions(receiverId).map { d => + d.getQuantiles().map(r => formatDurationVerbose(r.toLong)) + }.getOrElse { + Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell) + } + Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats + } + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = +
      Network Input Statistics
      ++ +
      {table.getOrElse("No network receivers")}
      + + content + } + + /** Generate stats of batch jobs of the streaming program */ + private def generateBatchStatsTable(): Seq[Node] = { + val numBatches = listener.retainedCompletedBatches.size + val lastCompletedBatch = listener.lastCompletedBatch + val table = if (numBatches > 0) { + val processingDelayQuantilesRow = { + Seq( + "Processing Time", + formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay)) + ) ++ getQuantiles(listener.processingDelayDistribution) + } + val schedulingDelayQuantilesRow = { + Seq( + "Scheduling Delay", + formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay)) + ) ++ getQuantiles(listener.schedulingDelayDistribution) + } + val totalDelayQuantilesRow = { + Seq( + "Total Delay", + formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay)) + ) ++ getQuantiles(listener.totalDelayDistribution) + } + val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile", + "Median", "75th percentile", "Maximum") + val dataRows: Seq[Seq[String]] = Seq( + processingDelayQuantilesRow, + schedulingDelayQuantilesRow, + totalDelayQuantilesRow + ) + Some(listingTable(headerRow, dataRows)) + } else { + None + } + + val content = +
      Batch Processing Statistics
      ++ +
      +
        + {table.getOrElse("No statistics have been generated yet.")} +
      +
      + + content + } + + + /** + * Returns a human-readable string representing a duration such as "5 second 35 ms" + */ + private def formatDurationOption(msOption: Option[Long]): String = { + msOption.map(formatDurationVerbose).getOrElse(emptyCell) + } + + /** Get quantiles for any time distribution */ + private def getQuantiles(timeDistributionOption: Option[Distribution]) = { + timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) } + } + + /** Generate HTML table from string data */ + private def listingTable(headers: Seq[String], data: Seq[Seq[String]]) = { + def generateDataRow(data: Seq[String]): Seq[Node] = { + {data.map(d => {d})} + } + UIUtils.listingTable(headers, generateDataRow, data, fixedWidth = true) + } +} + diff --git a/core/src/test/scala/org/apache/spark/SparkUISuite.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala similarity index 58% rename from core/src/test/scala/org/apache/spark/SparkUISuite.scala rename to streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala index d0d119c15081d..51448d15c6516 100644 --- a/core/src/test/scala/org/apache/spark/SparkUISuite.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala @@ -15,21 +15,22 @@ * limitations under the License. */ -package org.apache.spark +package org.apache.spark.streaming.ui -import java.net.URI +import org.apache.spark.Logging +import org.apache.spark.streaming.StreamingContext +import org.apache.spark.ui.WebUITab -import org.scalatest.FunSuite +/** Spark Web UI tab that shows statistics of a streaming job */ +private[spark] class StreamingTab(ssc: StreamingContext) + extends WebUITab(ssc.sc.ui, "streaming") with Logging { -class SparkUISuite extends FunSuite with SharedSparkContext { + val parent = ssc.sc.ui + val appName = parent.appName + val basePath = parent.basePath + val listener = new StreamingJobProgressListener(ssc) - test("verify appUIAddress contains the scheme") { - val uiAddress = sc.ui.appUIAddress - assert(uiAddress.equals("http://" + sc.ui.appUIHostPort)) - } - - test("verify appUIAddress contains the port") { - val splitUIAddress = sc.ui.appUIAddress.split(':') - assert(splitUIAddress(2).toInt == sc.ui.boundPort) - } + ssc.addStreamingListener(listener) + attachPage(new StreamingPage(this)) + parent.attachTab(this) } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 389b23d4d5e4b..952511d411a8e 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { /** This is a server to test the network input stream */ -class TestServer() extends Logging { +class TestServer(portToBind: Int = 0) extends Logging { val queue = new ArrayBlockingQueue[String](100) - val serverSocket = new ServerSocket(0) + val serverSocket = new ServerSocket(portToBind) val servingThread = new Thread() { override def run() { @@ -282,7 +282,7 @@ class TestServer() extends Logging { def start() { servingThread.start() } - def send(msg: String) { queue.add(msg) } + def send(msg: String) { queue.put(msg) } def stop() { servingThread.interrupt() } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 9cc27ef7f03b5..efd0d22ecb57a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } - test("stop only streaming context") { ssc = new StreamingContext(master, appName, batchDuration) sc = ssc.sparkContext diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala new file mode 100644 index 0000000000000..35538ec188f67 --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming + +import scala.io.Source + +import org.scalatest.FunSuite +import org.scalatest.concurrent.Eventually._ +import org.scalatest.time.SpanSugar._ + +class UISuite extends FunSuite { + + test("streaming tab in spark UI") { + val ssc = new StreamingContext("local", "test", Seconds(1)) + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString + assert(!html.contains("random data that should not be present")) + // test if streaming tab exist + assert(html.toLowerCase.contains("streaming")) + // test if other Spark tabs still exist + assert(html.toLowerCase.contains("stages")) + } + + eventually(timeout(10 seconds), interval(50 milliseconds)) { + val html = Source.fromURL( + ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString + assert(html.toLowerCase.contains("batch")) + assert(html.toLowerCase.contains("network")) + } + } +}