Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark properties table to profiling tool output #4781

Merged
merged 8 commits into from
Feb 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ Run `--help` for more information.
- Executors information
- Job, stage and SQL ID information
- Rapids related parameters
- Spark Properties
- Rapids Accelerator Jar and cuDF Jar
- SQL Plan Metrics
- Compare Mode: Matching SQL IDs Across Applications
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -33,4 +33,5 @@ case class ApplicationSummaryInfo(
val failedJobs: Seq[FailedJobsProfileResults],
val removedBMs: Seq[BlockManagerRemovedProfileResult],
val removedExecutors: Seq[ExecutorsRemovedProfileResult],
val unsupportedOps: Seq[UnsupportedOpsProfileResult])
val unsupportedOps: Seq[UnsupportedOpsProfileResult],
val sparkProps: Seq[RapidsPropertyProfileResult])
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -133,22 +133,27 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
}
}

// Print Rapids related Spark Properties
// Print RAPIDS related or all Spark Properties
// This table is inverse of the other tables where the row keys are
// property keys and the columns are the application values. So
// column1 would be all the key values for app index 1.
def getRapidsProperties: Seq[RapidsPropertyProfileResult] = {
def getProperties(rapidsOnly: Boolean): Seq[RapidsPropertyProfileResult] = {
val outputHeaders = ArrayBuffer("propertyName")
val props = HashMap[String, ArrayBuffer[String]]()
var numApps = 0
apps.foreach { app =>
numApps += 1
outputHeaders += s"appIndex_${app.index}"
val rapidsRelated = app.sparkProperties.filterKeys { key =>
key.startsWith("spark.rapids") || key.startsWith("spark.executorEnv.UCX") ||
key.startsWith("spark.shuffle.manager") || key.equals("spark.shuffle.service.enabled")
val propsToKeep = if (rapidsOnly) {
app.sparkProperties.filterKeys { key =>
key.startsWith("spark.rapids") || key.startsWith("spark.executorEnv.UCX") ||
key.startsWith("spark.shuffle.manager") || key.equals("spark.shuffle.service.enabled")
}
} else {
// remove the rapids related ones
app.sparkProperties.filterKeys(key => !(key.contains("spark.rapids")))
}
CollectInformation.addNewProps(rapidsRelated, props, numApps)
CollectInformation.addNewProps(propsToKeep, props, numApps)
}
val allRows = props.map { case (k, v) => Seq(k) ++ v }.toSeq
if (allRows.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
val dsInfo = collect.getDataSourceInfo
val execInfo = collect.getExecutorInfo
val jobInfo = collect.getJobInfo
val rapidsProps = collect.getRapidsProperties
val rapidsProps = collect.getProperties(rapidsOnly = true)
val sparkProps = collect.getProperties(rapidsOnly = false)
val rapidsJar = collect.getRapidsJARInfo
val sqlMetrics = collect.getSQLPlanMetrics
// for compare mode we just add in extra tables for matching across applications
Expand Down Expand Up @@ -337,7 +338,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
}
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps, rapidsJar,
sqlMetrics, jsMetAgg, sqlTaskAggMetrics, durAndCpuMet, skewInfo, failedTasks, failedStages,
failedJobs, removedBMs, removedExecutors, unsupportedOps), compareRes)
failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps), compareRes)
}

def writeOutput(profileOutputWriter: ProfileOutputWriter,
Expand All @@ -347,19 +348,25 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
val sums = if (outputCombined) {
// the properties table here has the column names as the app indexes so we have to
// handle special
def combineProps(sums: Seq[ApplicationSummaryInfo]): Seq[RapidsPropertyProfileResult] = {
def combineProps(rapidsOnly: Boolean,
sums: Seq[ApplicationSummaryInfo]): Seq[RapidsPropertyProfileResult] = {
var numApps = 0
val props = HashMap[String, ArrayBuffer[String]]()
val outputHeaders = ArrayBuffer("propertyName")
sums.foreach { app =>
if (app.rapidsProps.nonEmpty) {
val inputProps = if (rapidsOnly) {
app.rapidsProps
} else {
app.sparkProps
}
if (inputProps.nonEmpty) {
numApps += 1
val rapidsRelated = app.rapidsProps.map { rp =>
rp.rows(0) -> rp.rows(1)
val appMappedProps = inputProps.map { p =>
p.rows(0) -> p.rows(1)
}.toMap

outputHeaders += app.rapidsProps.head.outputHeaders(1)
CollectInformation.addNewProps(rapidsRelated, props, numApps)
outputHeaders += inputProps.head.outputHeaders(1)
CollectInformation.addNewProps(appMappedProps, props, numApps)
}
}
val allRows = props.map { case (k, v) => Seq(k) ++ v }.toSeq
Expand All @@ -373,7 +380,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
appsSum.flatMap(_.dsInfo).sortBy(_.appIndex),
appsSum.flatMap(_.execInfo).sortBy(_.appIndex),
appsSum.flatMap(_.jobInfo).sortBy(_.appIndex),
combineProps(appsSum).sortBy(_.key),
combineProps(rapidsOnly=true, appsSum).sortBy(_.key),
appsSum.flatMap(_.rapidsJar).sortBy(_.appIndex),
appsSum.flatMap(_.sqlMetrics).sortBy(_.appIndex),
appsSum.flatMap(_.jsMetAgg).sortBy(_.appIndex),
Expand All @@ -385,7 +392,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
appsSum.flatMap(_.failedJobs).sortBy(_.appIndex),
appsSum.flatMap(_.removedBMs).sortBy(_.appIndex),
appsSum.flatMap(_.removedExecutors).sortBy(_.appIndex),
appsSum.flatMap(_.unsupportedOps).sortBy(_.appIndex)
appsSum.flatMap(_.unsupportedOps).sortBy(_.appIndex),
combineProps(rapidsOnly=false, appsSum).sortBy(_.key)
)
Seq(reduced)
} else {
Expand All @@ -399,6 +407,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
profileOutputWriter.write("Job Information", app.jobInfo)
profileOutputWriter.write("Spark Rapids parameters set explicitly", app.rapidsProps,
Some("Spark Rapids parameters"))
profileOutputWriter.write("Spark Properties", app.sparkProps,
Some("Spark Properties"))
profileOutputWriter.write("Rapids Accelerator Jar and cuDF Jar", app.rapidsJar,
Some("Rapids 4 Spark Jars"))
profileOutputWriter.write("SQL Plan Metrics for Application", app.sqlMetrics,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -436,7 +436,7 @@ class EventsProcessor(app: ApplicationInfo) extends EventProcessorBase[Applicati

// To process all other unknown events
override def doOtherEvent(app: ApplicationInfo, event: SparkListenerEvent): Unit = {
logInfo("Processing other event: " + event.getClass)
logDebug("Skipping unhandled event: " + event.getClass)
// not used
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
* Copyright (c) 2021-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -508,14 +508,19 @@ class ApplicationInfoSuite extends FunSuite with Logging {
assert(apps.size == 1)
val collect = new CollectInformation(apps)
for (app <- apps) {
val props = collect.getRapidsProperties
val rows = props.map(_.rows.head)
val rapidsProps = collect.getProperties(rapidsOnly = true)
val rows = rapidsProps.map(_.rows.head)
assert(rows.length == 5) // 5 properties captured.
// verify ucx parameters are captured.
assert(rows.contains("spark.executorEnv.UCX_RNDV_SCHEME"))

//verify gds parameters are captured.
assert(rows.contains("spark.rapids.memory.gpu.direct.storage.spill.alignedIO"))

val sparkProps = collect.getProperties(rapidsOnly = false)
val sparkPropsRows = sparkProps.map(_.rows.head)
assert(sparkPropsRows.contains("spark.eventLog.dir"))
assert(sparkPropsRows.contains("spark.plugins"))
}
}

Expand Down Expand Up @@ -575,7 +580,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 12)
assert(dotDirs.length === 13)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand All @@ -602,7 +607,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 10)
assert(dotDirs.length === 11)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -632,7 +637,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 12)
assert(dotDirs.length === 13)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down Expand Up @@ -662,7 +667,7 @@ class ApplicationInfoSuite extends FunSuite with Logging {
val dotDirs = ToolTestUtils.listFilesMatching(tempSubDir, { f =>
f.endsWith(".csv")
})
assert(dotDirs.length === 10)
assert(dotDirs.length === 11)
for (file <- dotDirs) {
assert(file.getAbsolutePath.endsWith(".csv"))
// just load each one to make sure formatted properly
Expand Down