Skip to content

Commit

Permalink
Merge pull request #10 from JoshRosen/metrics-system-cleanup
Browse files Browse the repository at this point in the history
Add defensive checks to MetricsSystem methods; make more methods private
  • Loading branch information
jerryshao committed Dec 17, 2014
2 parents f779fe0 + 87a2292 commit 434d17e
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
private val sources = new mutable.ArrayBuffer[Source]
private val registry = new MetricRegistry()

private var running: Boolean = false

// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
private var metricsServlet: Option[MetricsServlet] = None

/** Get any UI handlers used by this metrics system. */
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
/**
* Get any UI handlers used by this metrics system; can only be called after start().
*/
def getServletHandlers = {
require(running, "Can only call getServletHandlers on a running MetricsSystem")
metricsServlet.map(_.getHandlers).getOrElse(Array())
}

metricsConfig.initialize()

def start() {
require(!running, "Attempting to start a MetricsSystem that is already running")
running = true
registerSources()
registerSinks()
sinks.foreach(_.start)
}

def stop() {
sinks.foreach(_.stop)
if (running) {
sinks.foreach(_.stop)
} else {
logWarning("Stopping a MetricsSystem that is not running")
}
running = false
}

def report() {
Expand All @@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
* @return An unique metric name for each combination of
* application, executor/driver and metric source.
*/
def buildRegistryName(source: Source): String = {
private[spark] def buildRegistryName(source: Source): String = {
val appId = conf.getOption("spark.app.id")
val executorId = conf.getOption("spark.executor.id")
val defaultName = MetricRegistry.name(source.sourceName)
Expand Down Expand Up @@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
})
}

def registerSources() {
private def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)

Expand All @@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
}
}

def registerSinks() {
private def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)

Expand Down

0 comments on commit 434d17e

Please sign in to comment.