Skip to content

Commit

Permalink
use hash map to reduce kv store lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
gengliangwang committed Dec 20, 2018
1 parent e89040f commit 67992a5
Showing 1 changed file with 29 additions and 7 deletions.
36 changes: 29 additions & 7 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.status
import java.util.{List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap

import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.status.api.v1
Expand Down Expand Up @@ -386,10 +387,9 @@ private[spark] class AppStatusStore(

def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = {
val stageKey = Array(stageId, stageAttemptId)
store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse()
.max(maxTasks).asScala.map { taskDataWrapper =>
constructTaskData(taskDataWrapper)
}.toSeq.reverse
val taskDataWrapperIter = store.view(classOf[TaskDataWrapper]).index("stage")
.first(stageKey).last(stageKey).reverse().max(maxTasks).asScala
constructTaskDataList(taskDataWrapperIter).reverse
}

def taskList(
Expand Down Expand Up @@ -428,9 +428,8 @@ private[spark] class AppStatusStore(
}

val ordered = if (ascending) indexed else indexed.reverse()
ordered.skip(offset).max(length).asScala.map { taskDataWrapper =>
constructTaskData(taskDataWrapper)
}.toSeq
val taskDataWrapperIter = ordered.skip(offset).max(length).asScala
constructTaskDataList(taskDataWrapperIter)
}

def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = {
Expand Down Expand Up @@ -554,6 +553,29 @@ private[spark] class AppStatusStore(
AppStatusUtils.gettingResultTime(taskDataOld))
}

def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = {
val executorIdToLogs = new HashMap[String, Map[String, String]]()
taskDataWrapperIter.map { taskDataWrapper =>
val taskDataOld: v1.TaskData = taskDataWrapper.toApi
val executorLogs = executorIdToLogs.getOrElseUpdate(taskDataOld.executorId, {
try {
executorSummary(taskDataOld.executorId).executorLogs
} catch {
case e: NoSuchElementException =>
Map.empty
}
})

new v1.TaskData(taskDataOld.taskId, taskDataOld.index,
taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart,
taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status,
taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates,
taskDataOld.errorMessage, taskDataOld.taskMetrics,
executorLogs,
AppStatusUtils.schedulerDelay(taskDataOld),
AppStatusUtils.gettingResultTime(taskDataOld))
}.toSeq
}
}

private[spark] object AppStatusStore {
Expand Down

0 comments on commit 67992a5

Please sign in to comment.