diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index 312bcccb1cca1..9fd55c6480f06 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -20,6 +20,7 @@ package org.apache.spark.status import java.util.{List => JList} import scala.collection.JavaConverters._ +import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf} import org.apache.spark.status.api.v1 @@ -386,10 +387,9 @@ private[spark] class AppStatusStore( def taskList(stageId: Int, stageAttemptId: Int, maxTasks: Int): Seq[v1.TaskData] = { val stageKey = Array(stageId, stageAttemptId) - store.view(classOf[TaskDataWrapper]).index("stage").first(stageKey).last(stageKey).reverse() - .max(maxTasks).asScala.map { taskDataWrapper => - constructTaskData(taskDataWrapper) - }.toSeq.reverse + val taskDataWrapperIter = store.view(classOf[TaskDataWrapper]).index("stage") + .first(stageKey).last(stageKey).reverse().max(maxTasks).asScala + constructTaskDataList(taskDataWrapperIter).reverse } def taskList( @@ -428,9 +428,8 @@ private[spark] class AppStatusStore( } val ordered = if (ascending) indexed else indexed.reverse() - ordered.skip(offset).max(length).asScala.map { taskDataWrapper => - constructTaskData(taskDataWrapper) - }.toSeq + val taskDataWrapperIter = ordered.skip(offset).max(length).asScala + constructTaskDataList(taskDataWrapperIter) } def executorSummary(stageId: Int, attemptId: Int): Map[String, v1.ExecutorStageSummary] = { @@ -554,6 +553,29 @@ private[spark] class AppStatusStore( AppStatusUtils.gettingResultTime(taskDataOld)) } + def constructTaskDataList(taskDataWrapperIter: Iterable[TaskDataWrapper]): Seq[v1.TaskData] = { + val executorIdToLogs = new HashMap[String, Map[String, String]]() + taskDataWrapperIter.map { taskDataWrapper => + val taskDataOld: v1.TaskData = taskDataWrapper.toApi + val executorLogs = executorIdToLogs.getOrElseUpdate(taskDataOld.executorId, { + try { + executorSummary(taskDataOld.executorId).executorLogs + } catch { + case e: NoSuchElementException => + Map.empty + } + }) + + new v1.TaskData(taskDataOld.taskId, taskDataOld.index, + taskDataOld.attempt, taskDataOld.launchTime, taskDataOld.resultFetchStart, + taskDataOld.duration, taskDataOld.executorId, taskDataOld.host, taskDataOld.status, + taskDataOld.taskLocality, taskDataOld.speculative, taskDataOld.accumulatorUpdates, + taskDataOld.errorMessage, taskDataOld.taskMetrics, + executorLogs, + AppStatusUtils.schedulerDelay(taskDataOld), + AppStatusUtils.gettingResultTime(taskDataOld)) + }.toSeq + } } private[spark] object AppStatusStore {