Skip to content

Commit

Permalink
Profile/qualification tool error handling improvements and support sp…
Browse files Browse the repository at this point in the history
…ark < 3.1.1 (NVIDIA#2604)

* Qualification tool

Signed-off-by: Thomas Graves <tgraves@apache.org>

* remove unused func

* Add missing files

* Add checks for format option

* cast columsn to string to write to text

* Revert "Add checks for format option"

This reverts commit 6f5271c.

* cleanup

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* update output dir

* formating

* Update help messages

* update app name

* cleanup

* put test functions back

* fix typo

* add printSQLPlanMetrics and printRapidsJar

* use opt

* Add Analysis

* format output

* more tests

Signed-off-by: Thomas Graves <tgraves@apache.org>

* tests working

* test rearrange utils

* move test file

* move test file right location

* add Analysis Suite

* update test analysis

* add
rapids-4-spark-tools/src/test/resources/ProfilingExpectations/rapids_join_eventlog_jobandstagemetrics_expectation.csv

* add more tests

* more tests

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* remove unneeded expectation file

* Add more analysis tests

* comment

* cleanup

* Start handling ResourceProfile with reflection

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* allow spark 3.0 and 3.1.1 to parse logs and fix bug with missing table

* Add more test files

* Add more tests and error handling

* improve error handling to skip bad event logs

* update readme

* remove unneeded temp dir

* Revert "remove unneeded temp dir"

This reverts commit 828b302.

* close file writer on exception

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* move test files

* fix move of test files
  • Loading branch information
tgravescs authored Jun 6, 2021
1 parent 3e42530 commit e477e34
Show file tree
Hide file tree
Showing 12 changed files with 1,118 additions and 51 deletions.
2 changes: 1 addition & 1 deletion tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Information such as Spark version, executor information, properties and so on. W
(The code is based on Apache Spark 3.1.1 source code, and tested using Spark 3.0.x and 3.1.1 event logs)

## Prerequisites
- Spark 3.1.1 or newer installed
- Spark 3.0.1 or newer installed
- Java 8 or above
- Complete Spark event log(s) from Spark 3.0 or above version.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,15 @@ class CollectInformation(apps: ArrayBuffer[ApplicationInfo], fileWriter: FileWri
def printSQLPlanMetrics(shouldGenDot: Boolean, outputDir: String,
writeOutput: Boolean = true): Unit ={
for (app <- apps){
val messageHeader = "\nSQL Plan Metrics for Application:\n"
val accums = app.runQuery(app.generateSQLAccums, fileWriter = Some(fileWriter),
messageHeader=messageHeader)
if (shouldGenDot) {
generateDot(outputDir, Some(accums))
if (app.allDataFrames.contains(s"sqlMetricsDF_${app.index}") &&
app.allDataFrames.contains(s"driverAccumDF_${app.index}") &&
app.allDataFrames.contains(s"taskStageAccumDF_${app.index}")) {
val messageHeader = "\nSQL Plan Metrics for Application:\n"
val accums = app.runQuery(app.generateSQLAccums, fileWriter = Some(fileWriter),
messageHeader=messageHeader)
if (shouldGenDot) {
generateDot(outputDir, Some(accums))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,51 @@ object ProfileMain extends Logging {
if (appArgs.compare()) {
// Create an Array of Applications(with an index starting from 1)
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
apps += new ApplicationInfo(numOutputRows, sparkSession, path, index)
index += 1
}

//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
return 0
try {
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
apps += new ApplicationInfo(numOutputRows, sparkSession, path, index)
index += 1
}

//Exit if there are no applications to process.
if (apps.isEmpty) {
logInfo("No application to process. Exiting")
return 0
}
processApps(apps, generateDot = false)
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
fileWriter.close()
logError(s"Error parsing JSON", e)
return 1
}
processApps(apps, generateDot = false)
// Show the application Id <-> appIndex mapping.
for (app <- apps) {
logApplicationInfo(app)
}
} else {
// This mode is to process one application at one time.
var index: Int = 1
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val app = new ApplicationInfo(numOutputRows, sparkSession, path, index)
apps += app
logApplicationInfo(app)
// This is a bit odd that we process apps individual right now due to
// memory concerns. So the aggregation functions only aggregate single
// application not across applications.
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
try {
for (path <- allPaths.filter(p => !p.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val app = new ApplicationInfo(numOutputRows, sparkSession, path, index)
apps += app
logApplicationInfo(app)
// This is a bit odd that we process apps individual right now due to
// memory concerns. So the aggregation functions only aggregate single
// application not across applications.
processApps(apps, appArgs.generateDot())
app.dropAllTempViews()
index += 1
}
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
fileWriter.close()
logError(s"Error parsing JSON", e)
return 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,22 @@ object Qualification extends Logging {
numRows: Int,
sparkSession: SparkSession,
includeCpuPercent: Boolean,
dropTempViews: Boolean): DataFrame = {
dropTempViews: Boolean): Option[DataFrame] = {
var index: Int = 1
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
for (path <- allPaths.filterNot(_.getName.contains("."))) {
// This apps only contains 1 app in each loop.
val app = new ApplicationInfo(numRows, sparkSession,
path, index, true)
apps += app
logApplicationInfo(app)
index += 1
try {
// This apps only contains 1 app in each loop.
val app = new ApplicationInfo(numRows, sparkSession, path, index, true)
apps += app
logApplicationInfo(app)
index += 1
} catch {
case e: com.fasterxml.jackson.core.JsonParseException =>
logWarning(s"Error parsing JSON, skipping $path")
}
}
if (apps.isEmpty) return None
val analysis = new Analysis(apps, None)
if (includeCpuPercent) {
val sqlAggMetricsDF = analysis.sqlMetricsAggregation()
Expand All @@ -65,7 +70,7 @@ object Qualification extends Logging {
sparkSession.catalog.dropTempView("sqlAggMetricsDF")
apps.foreach( _.dropAllTempViews())
}
df
Some(df)
}

def constructQueryQualifyApps(apps: ArrayBuffer[ApplicationInfo],
Expand All @@ -78,8 +83,7 @@ object Qualification extends Logging {
case false => "(" + app.qualificationDurationNoMetricsSQL + ")"
}
}.mkString(" union ")
val df = apps.head.runQuery(query + " order by Rank desc, `App Duration` desc")
df
apps.head.runQuery(query + " order by Score desc, `App Duration` desc")
}

def writeQualification(df: DataFrame, outputDir: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ object QualificationMain extends Logging {
*/
def main(args: Array[String]) {
val sparkSession = ProfileUtils.createSparkSession
val (exitCode, optDf) = mainInternal(sparkSession, new QualificationArgs(args))
val (exitCode, _) = mainInternal(sparkSession, new QualificationArgs(args))
if (exitCode != 0) {
System.exit(exitCode)
}
Expand All @@ -55,13 +55,13 @@ object QualificationMain extends Logging {

val includeCpuPercent = appArgs.includeExecCpuPercent.getOrElse(false)
val numOutputRows = appArgs.numOutputRows.getOrElse(1000)
val df = Qualification.qualifyApps(allPaths,
val dfOpt = Qualification.qualifyApps(allPaths,
numOutputRows, sparkSession, includeCpuPercent, dropTempViews)
if (writeOutput) {
Qualification.writeQualification(df, outputDirectory,
if (writeOutput && dfOpt.isDefined) {
Qualification.writeQualification(dfOpt.get, outputDirectory,
appArgs.outputFormat.getOrElse("csv"), includeCpuPercent, numOutputRows)
}
(0, Some(df))
(0, dfOpt)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ class ApplicationInfo(
s"""select
|first(appName) as `App Name`,
|'$appId' as `App ID`,
|ROUND((sum(sqlQualDuration) * 100) / first(app.duration), 2) as Rank,
|ROUND((sum(sqlQualDuration) * 100) / first(app.duration), 2) as Score,
|concat_ws(",", collect_list(problematic)) as `Potential Problems`,
|sum(sqlQualDuration) as `SQL Dataframe Duration`,
|first(app.duration) as `App Duration`
Expand All @@ -720,7 +720,7 @@ class ApplicationInfo(
def qualificationDurationSumSQL: String = {
s"""select first(appName) as `App Name`,
|first(appID) as `App ID`,
|ROUND((sum(dfDuration) * 100) / first(appDuration), 2) as Rank,
|ROUND((sum(dfDuration) * 100) / first(appDuration), 2) as Score,
|concat_ws(",", collect_list(potentialProblems)) as `Potential Problems`,
|sum(dfDuration) as `SQL Dataframe Duration`,
|first(appDuration) as `App Duration`,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ object EventsProcessor extends Logging {
event match {
case _: SparkListenerLogStart =>
doSparkListenerLogStart(app, event.asInstanceOf[SparkListenerLogStart])
case _: SparkListenerResourceProfileAdded =>
doSparkListenerResourceProfileAdded(app,
event.asInstanceOf[SparkListenerResourceProfileAdded])
case _: SparkListenerBlockManagerAdded =>
doSparkListenerBlockManagerAdded(app,
event.asInstanceOf[SparkListenerBlockManagerAdded])
Expand Down Expand Up @@ -94,7 +91,33 @@ object EventsProcessor extends Logging {
case _: SparkListenerSQLAdaptiveSQLMetricUpdates =>
doSparkListenerSQLAdaptiveSQLMetricUpdates(app,
event.asInstanceOf[SparkListenerSQLAdaptiveSQLMetricUpdates])
case _ => doOtherEvent(app, event)
case _ =>
val wasResourceProfileAddedEvent = doSparkListenerResourceProfileAddedReflect(app, event)
if (!wasResourceProfileAddedEvent) doOtherEvent(app, event)
}
}

def doSparkListenerResourceProfileAddedReflect(
app: ApplicationInfo,
event: SparkListenerEvent): Boolean = {
val rpAddedClass = "org.apache.spark.scheduler.SparkListenerResourceProfileAdded"
if (event.getClass.getName.equals(rpAddedClass)) {
try {
event match {
case _: SparkListenerResourceProfileAdded =>
doSparkListenerResourceProfileAdded(app,
event.asInstanceOf[SparkListenerResourceProfileAdded])
true
case _ => false
}
} catch {
case _: ClassNotFoundException =>
logWarning("Error trying to parse SparkListenerResourceProfileAdded, Spark" +
" version likely older than 3.1.X, unable to parse it properly.")
false
}
} else {
false
}
}

Expand Down

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions tools/src/test/resources/spark-events-profiling/rp_nosql_eventlog

Large diffs are not rendered by default.

477 changes: 477 additions & 0 deletions tools/src/test/resources/spark-events-profiling/rp_sql_eventlog

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.scalatest.FunSuite
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.{DataFrame, SparkSession, TrampolineUtil}
import org.apache.spark.sql.rapids.tool.profiling._

class ApplicationInfoSuite extends FunSuite with Logging {
Expand Down Expand Up @@ -82,6 +82,42 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(cuDFJar.size == 1, "CUDF jar check")
}

test("test sql and resourceprofile eventlog") {
val eventLog = s"$logDir/rp_sql_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 0)
}
}

test("malformed json eventlog") {
val eventLog = s"$logDir/malformed_json_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 1)
}
}

test("test no sql eventlog") {
val eventLog = s"$logDir/rp_nosql_eventlog"
TrampolineUtil.withTempDir { tempDir =>
val appArgs = new ProfileArgs(Array(
"--output-directory",
tempDir.getAbsolutePath,
eventLog))
val exit = ProfileMain.mainInternal(sparkSession, appArgs)
assert(exit == 0)
}
}

test("test printSQLPlanMetrics") {
var apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
}
}

test("skip malformed json eventlog") {
val profileLogDir = ToolTestUtils.getTestResourcePath("spark-events-profiling")
val badEventLog = s"$profileLogDir/malformed_json_eventlog"
val logFiles = Array(s"$logDir/nds_q86_test", badEventLog)
runQualificationTest(logFiles, "nds_q86_test_expectation.csv")
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down

0 comments on commit e477e34

Please sign in to comment.