Skip to content

Commit

Permalink
SPARK-740 Driver metrics (apache#25)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Implements metrics in the coarse-grained driver implementation. The deprecated fine-grained driver is left as-is. Metrics include e.g. frequency of RPCs to/from Mesos, the internal state tracking of tasks/agents, and durations for bringing up the tasks.

## How was this patch tested?

Checked it by hand in a few deployments, and the numbers appear to all add up with nothing missing.
  • Loading branch information
nickbp authored May 11, 2018
1 parent 2682f74 commit f87e8d1
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 8 deletions.
11 changes: 9 additions & 2 deletions core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,20 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging {
var is: InputStream = null
try {
is = path match {
case Some(f) => new FileInputStream(f)
case None => Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
case Some(f) => {
logInfo(s"Loading metrics properties from file $f")
new FileInputStream(f)
}
case None => {
logInfo(s"Loading metrics properties from resource $DEFAULT_METRICS_CONF_FILENAME")
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_METRICS_CONF_FILENAME)
}
}

if (is != null) {
properties.load(is)
}
logInfo(s"Metrics properties: " + properties.toString)
} catch {
case e: Exception =>
val file = path.getOrElse(DEFAULT_METRICS_CONF_FILENAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ private[spark] class MetricsSystem private (
sources += source
try {
val regName = buildRegistryName(source)
logInfo(s"Registering source: $regName")
registry.register(regName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
Expand All @@ -166,6 +167,7 @@ private[spark] class MetricsSystem private (
def removeSource(source: Source) {
sources -= source
val regName = buildRegistryName(source)
logInfo(s"Removing source: $regName")
registry.removeMatching(new MetricFilter {
def matches(name: String, metric: Metric): Boolean = name.startsWith(regName)
})
Expand Down Expand Up @@ -194,6 +196,7 @@ private[spark] class MetricsSystem private (
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
if (null != classPath) {
logInfo(s"Initializing sink: $classPath")
try {
val sink = Utils.classForName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry], classOf[SecurityManager])
Expand All @@ -205,7 +208,7 @@ private[spark] class MetricsSystem private (
}
} catch {
case e: Exception =>
logError("Sink class " + classPath + " cannot be instantiated")
logError(s"Sink class $classPath cannot be instantiated")
throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -745,10 +745,16 @@ private[spark] class MesosClusterScheduler(
override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val taskId = status.getTaskId.getValue

logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message=${status.getMessage}" +
s" reason=${status.getReason}")
if (status.hasReason) {
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message='${status.getMessage}'" +
s" reason=${status.getReason}")
} else {
logInfo(s"Received status update: taskId=${taskId}" +
s" state=${status.getState}" +
s" message='${status.getMessage}'")
}

stateLock.synchronized {
val subId = getSubmissionIdFromTaskId(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
securityManager.isAuthenticationEnabled())
}

private val metricsSource = new MesosCoarseGrainedSchedulerSource(this)

private var nextMesosTaskId = 0

@volatile var appId: String = _
Expand All @@ -173,6 +175,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def start() {
super.start()

sc.env.metricsSystem.registerSource(metricsSource)

val startedBefore = IdHelper.startedBefore.getAndSet(true)

val suffix = if (startedBefore) {
Expand Down Expand Up @@ -313,10 +317,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
*/
override def resourceOffers(d: org.apache.mesos.SchedulerDriver, offers: JList[Offer]) {
stateLock.synchronized {
metricsSource.recordOffers(offers.size)
if (stopCalled) {
logDebug("Ignoring offers during shutdown")
// Driver should simply return a stopped status on race
// condition between this.stop() and completing here
metricsSource.recordDeclineUnused(offers.size)
offers.asScala.map(_.getId).foreach(d.declineOffer)
return
}
Expand All @@ -335,6 +341,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

private def declineUnmatchedOffers(
driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
metricsSource.recordDeclineUnmet(offers.size)
offers.foreach { offer =>
declineOffer(
driver,
Expand Down Expand Up @@ -376,18 +383,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(

logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
s" ports: $ports")
metricsSource.recordTaskLaunch(taskId.getValue, totalCoresAcquired >= maxCores)
}

driver.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
metricsSource.recordDeclineFinished
declineOffer(driver,
offer,
Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
metricsSource.recordDeclineUnused(1)
declineOffer(
driver,
offer)
Expand Down Expand Up @@ -534,6 +544,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
logInfo(s"Mesos task $taskId is now ${status.getState}")

stateLock.synchronized {

metricsSource.recordTaskStatus(taskId, status.getState, state)

val slave = slaves(slaveId)

// If the shuffle service is enabled, have the driver register with each one of the
Expand Down Expand Up @@ -584,7 +597,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
executorTerminated(d, slaveId, taskId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
d.reviveOffers()
metricsSource.recordRevive
d.reviveOffers
}
}
}
Expand Down Expand Up @@ -711,6 +725,28 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
None
}
}

// Calls used for metrics polling, see MesosCoarseGrainedSchedulerSource:

def getCoresUsed(): Double = totalCoresAcquired
def getMaxCores(): Double = maxCores
def getMeanCoresPerTask(): Double = if (coresByTaskId.size == 0)
0 else coresByTaskId.values.sum / coresByTaskId.size.toDouble

def getGpusUsed(): Double = totalGpusAcquired
def getMaxGpus(): Double = maxGpus
def getMeanGpusPerTask(): Double = if (gpusByTaskId.size == 0)
0 else gpusByTaskId.values.sum / gpusByTaskId.size.toDouble

def isExecutorLimitEnabled(): Boolean = !executorLimitOption.isEmpty
def getExecutorLimit(): Int = executorLimit

def getTaskCount(): Int = coresByTaskId.size
def getTaskFailureCount(): Int = slaves.values.map(_.taskFailures).sum
def getKnownAgentsCount(): Int = slaves.size
def getOccupiedAgentsCount(): Int = slaves.values.map(_.taskIDs.size).filter(_ != 0).size
def getBlacklistedAgentCount(): Int =
slaves.values.filter(_.taskFailures >= MAX_SLAVE_FAILURES).size
}

private class Slave(val hostname: String) {
Expand Down
Loading

0 comments on commit f87e8d1

Please sign in to comment.