Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4595][Core] Fix MetricsServlet not work issue #3444

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ class SparkContext(config: SparkConf) extends Logging {
// The metrics system for Driver need to be set spark.app.id to app ID.
// So it should start after we get app ID from the task scheduler and set spark.app.id.
metricsSystem.start()
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ private[spark] class Master(
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
// started.
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)

val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
case "ZOOKEEPER" =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ class MasterWebUI(val master: Master, requestedPort: Int)
attachPage(new HistoryNotFoundPage(this))
attachPage(new MasterPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ private[spark] class Worker(

metricsSystem.registerSource(workerSource)
metricsSystem.start()
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}

def changeMaster(url: String, uiUrl: String) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ class WorkerWebUI(
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
attachHandler(createServletHandler("/log",
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
}
}

Expand Down
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()

private var running: Boolean = false

// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None

/** Get any UI handlers used by this metrics system. */
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}

metricsConfig.initialize()

def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}

def stop() {
sinks.foreach(_.stop)
if (running) {
sinks.foreach(_.stop)
} else {
logWarning("Stopping a MetricsSystem that is not running")
}
running = false
}

def report() {
Expand All @@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
def buildRegistryName(source: Source): String = {
private[spark] def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
Expand Down Expand Up @@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
})
}

def registerSources() {
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

Expand All @@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
}
}

def registerSinks() {
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ private[spark] class SparkUI private (
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
// If the UI is live, then serve
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
}
initialize()

Expand Down