diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala index e390df5a714..419ee253688 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/profiling/ProfileOutputWriter.scala @@ -21,8 +21,6 @@ import org.apache.commons.lang3.StringUtils class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: Int, outputCSV: Boolean = false) { - private val CSVDelimiter = "," - private val textFileWriter = new ToolTextFileWriter(outputDir, s"$filePrefix.log", "Profile summary") @@ -64,13 +62,14 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: val csvWriter = new ToolTextFileWriter(outputDir, s"${suffix}.csv", s"$header CSV:") try { - val headerString = outRows.head.outputHeaders.mkString(CSVDelimiter) + val headerString = outRows.head.outputHeaders.mkString(ProfileOutputWriter.CSVDelimiter) csvWriter.write(headerString + "\n") val rows = outRows.map(_.convertToSeq) rows.foreach { row => - val delimiterHandledRow = row.map(ProfileUtils.replaceDelimiter(_, CSVDelimiter)) + val delimiterHandledRow = + row.map(ProfileUtils.replaceDelimiter(_, ProfileOutputWriter.CSVDelimiter)) val formattedRow = delimiterHandledRow.map(stringIfempty(_)) - val outStr = formattedRow.mkString(CSVDelimiter) + val outStr = formattedRow.mkString(ProfileOutputWriter.CSVDelimiter) csvWriter.write(outStr + "\n") } } finally { @@ -92,6 +91,7 @@ class ProfileOutputWriter(outputDir: String, filePrefix: String, numOutputRows: } object ProfileOutputWriter { + val CSVDelimiter = "," /** * Regular expression matching full width characters. diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala index 1f7e090821f..c3ca9e7f758 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/profiling/EventsProcessor.scala @@ -23,6 +23,7 @@ import scala.collection.mutable.ArrayBuffer import com.nvidia.spark.rapids.tool.profiling._ +import org.apache.spark.TaskFailedReason import org.apache.spark.internal.Logging import org.apache.spark.scheduler._ import org.apache.spark.sql.execution.ui.{SparkListenerDriverAccumUpdates, SparkListenerSQLAdaptiveExecutionUpdate, SparkListenerSQLAdaptiveSQLMetricUpdates, SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart} @@ -209,12 +210,18 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati + res.name + ",value=" + res.value + ",update=" + res.update) } } + val reason = event.reason match { + case failed: TaskFailedReason => + failed.toErrorString + case _ => + event.reason.toString + } val thisTask = TaskCase( event.stageId, event.stageAttemptId, event.taskType, - event.reason.toString, + reason, event.taskInfo.taskId, event.taskInfo.attemptNumber, event.taskInfo.launchTime, diff --git a/tools/src/test/resources/ProfilingExpectations/tasks_failure_eventlog_expectation.csv b/tools/src/test/resources/ProfilingExpectations/tasks_failure_eventlog_expectation.csv index c3fa9fccdee..977fa478eba 100644 --- a/tools/src/test/resources/ProfilingExpectations/tasks_failure_eventlog_expectation.csv +++ b/tools/src/test/resources/ProfilingExpectations/tasks_failure_eventlog_expectation.csv @@ -1,4 +1,4 @@ appIndex,stageId,stageAttemptId,taskId,attempt,failureReason -1,238,0,8519,0,"ExecutorLostFailure(2,true,Some(Executor Process Lost))" -1,238,0,8560,1,"FetchFailed(BlockManagerId(1, hostname-08.domain.com, 46008, None),54,8347,9,72,org.apache.spark.shu" -1,238,0,8574,1,"FetchFailed(BlockManagerId(1, hostname-08.domain.com, 46008, None),54,8339,1,138,org.apache.spark.sh" +1,238,0,8519,0,ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Executor Process +1,238,0,8560,1,FetchFailed(BlockManagerId(1; hostname-08.domain.com; 46008; None); shuffleId=54; mapIndex=9; mapId= +1,238,0,8574,1,FetchFailed(BlockManagerId(1; hostname-08.domain.com; 46008; None); shuffleId=54; mapIndex=1; mapId= diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala index 8bd42b24dcc..ebe43a4758f 100644 --- a/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/profiling/HealthCheckSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021, NVIDIA CORPORATION. + * Copyright (c) 2021-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,8 +58,14 @@ class HealthCheckSuite extends FunSuite { val healthCheck = new HealthCheck(apps) for (app <- apps) { val failedTasks = healthCheck.getFailedTasks + // the end reason gets the delimiter changed when writing to CSV so to compare properly + // change it to be the same here + val failedWithDelimiter = failedTasks.map { t => + val delimited = ProfileUtils.replaceDelimiter(t.endReason, ProfileOutputWriter.CSVDelimiter) + t.copy(endReason = delimited) + } import sparkSession.implicits._ - val taskAccums = failedTasks.toDF + val taskAccums = failedWithDelimiter.toDF val tasksResultExpectation = new File(expRoot, "tasks_failure_eventlog_expectation.csv") val tasksDfExpect =