Skip to content

Commit

Permalink
[SPARK-26363][WEBUI] Avoid duplicated KV store lookups in method `tas…
Browse files Browse the repository at this point in the history
…kList`

## What changes were proposed in this pull request?

In the method `taskList`(since #21688),  the executor log value is queried in KV store  for every task(method `constructTaskData`).
This PR propose to use a hashmap for reducing duplicated KV store lookups in the method.

![image](https://user-images.githubusercontent.com/1097932/49946230-841c7680-ff29-11e8-8b83-d8f7553bfe5e.png)

## How was this patch tested?

Manual check

Closes #23310 from gengliangwang/removeExecutorLog.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
gengliangwang authored and srowen committed Dec 30, 2018
1 parent e6d3e7d commit 240817b
Showing 1 changed file with 28 additions and 24 deletions.
52 changes: 28 additions & 24 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.status
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.status.api.v1
Expand Down Expand Up @@ -386,10 +387,9 @@ private[spark] class AppStatusStore(

def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = {
val stageKey = Array(stageId, stageAttemptId)
store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse()
.max(maxTasks).asScala.map { taskDataWrapper =>
constructTaskData(taskDataWrapper)
}.toSeq.reverse
val taskDataWrapperIter = store.view(classOf[TaskDataWrapper]).index("stage")
.first(stageKey).last(stageKey).reverse().max(maxTasks).asScala
constructTaskDataList(taskDataWrapperIter).reverse
}

def taskList(
Expand Down Expand Up @@ -428,9 +428,8 @@ private[spark] class AppStatusStore(
}

val ordered = if (ascending) indexed else indexed.reverse()
ordered.skip(offset).max(length).asScala.map { taskDataWrapper =>
constructTaskData(taskDataWrapper)
}.toSeq
val taskDataWrapperIter = ordered.skip(offset).max(length).asScala
constructTaskDataList(taskDataWrapperIter)
}

def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = {
Expand Down Expand Up @@ -536,24 +535,29 @@ private[spark] class AppStatusStore(
store.close()
}

def constructTaskData(taskDataWrapper: TaskDataWrapper) : v1.TaskData = {
val taskDataOld: v1.TaskData = taskDataWrapper.toApi
val executorLogs: Option[Map[String, String]] = try {
Some(executorSummary(taskDataOld.executorId).executorLogs)
} catch {
case e: NoSuchElementException => e.getMessage
None
}
new v1.TaskData(taskDataOld.taskId, taskDataOld.index,
taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart,
taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status,
taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates,
taskDataOld.errorMessage, taskDataOld.taskMetrics,
executorLogs.getOrElse(Map[String, String]()),
AppStatusUtils.schedulerDelay(taskDataOld),
AppStatusUtils.gettingResultTime(taskDataOld))
def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = {
val executorIdToLogs = new HashMap[String, Map[String, String]]()
taskDataWrapperIter.map { taskDataWrapper =>
val taskDataOld: v1.TaskData = taskDataWrapper.toApi
val executorLogs = executorIdToLogs.getOrElseUpdate(taskDataOld.executorId, {
try {
executorSummary(taskDataOld.executorId).executorLogs
} catch {
case e: NoSuchElementException =>
Map.empty
}
})

new v1.TaskData(taskDataOld.taskId, taskDataOld.index,
taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart,
taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status,
taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates,
taskDataOld.errorMessage, taskDataOld.taskMetrics,
executorLogs,
AppStatusUtils.schedulerDelay(taskDataOld),
AppStatusUtils.gettingResultTime(taskDataOld))
}.toSeq
}

}

private[spark] object AppStatusStore {
Expand Down

0 comments on commit 240817b

Please sign in to comment.