Skip to content

Commit

Permalink
Profiling tool add wholestagecodegen to execs mapping, sql to stage i…
Browse files Browse the repository at this point in the history
…nfo and job end time (#5868)

* Profiling tool add wholestagecodegen to execs mapping and job end time

Signed-off-by: Thomas Graves <tgraves@apache.org>

* fixes

* Add sorting for whole stage code gen table

* change stages to string

* handle delimiter change in test

* Update test results

* update csv file numbers

* test job start and job end

* add test sql to stage

* add test whole stage code gen

* fix test

* cleanup

* rename stages to stageIds

* change headings

* add in Ids to header for node names

* Add node is to parantheses in whole stage table

* update test for node ids

* update docs

* split out child node id in whole stage table

* change NodeID

* update test and docs

* update doc
  • Loading branch information
tgravescs authored Jun 21, 2022
1 parent a3701b3 commit 5826fe1
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 120 deletions.
64 changes: 49 additions & 15 deletions docs/spark-profiling-tool.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ Run `--help` for more information.
- Data Source information
- Executors information
- Job, stage and SQL ID information
- SQL to stage information
- WholeStageCodeGen to node mappings (only applies to CPU plans)
- Rapids related parameters
- Spark Properties
- Rapids Accelerator Jar and cuDF Jar
Expand Down Expand Up @@ -263,29 +265,61 @@ Rapids Accelerator Jar and cuDF Jar:
- Job, stage and SQL ID information(not in `compare` mode yet):
```
+--------+-----+---------+-----+
|appIndex|jobID|stageIds |sqlID|
+--------+-----+---------+-----+
|1 |0 |[0] |null |
|1 |1 |[1,2,3,4]|0 |
+--------+-----+---------+-----+
Job Information:
+--------+-----+---------+-----+-------------+-------------+
|appIndex|jobID|stageIds |sqlID|startTime |endTime |
+--------+-----+---------+-----+-------------+-------------+
|1 |0 |[0] |null |1622846402778|1622846410240|
|1 |1 |[1,2,3,4]|0 |1622846431114|1622846441591|
+--------+-----+---------+-----+-------------+-------------+
```
- SQL to Stage Information (sorted by stage duration)
Note that not all SQL nodes have a mapping to stage id so some nodes might be missing.
```
SQL to Stage Information:
+--------+-----+-----+-------+--------------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|appIndex|sqlID|jobID|stageId|stageAttemptId|Stage Duration|SQL Nodes(IDs) |
+--------+-----+-----+-------+--------------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |0 |1 |1 |0 |8174 |Exchange(9),WholeStageCodegen (1)(10),Scan(13) |
|1 |0 |1 |2 |0 |8154 |Exchange(16),WholeStageCodegen (3)(17),Scan(20) |
|1 |0 |1 |3 |0 |2148 |Exchange(2),HashAggregate(4),SortMergeJoin(6),WholeStageCodegen (5)(3),Sort(8),WholeStageCodegen (2)(7),Exchange(9),Sort(15),WholeStageCodegen (4)(14),Exchange(16)|
|1 |0 |1 |4 |0 |126 |HashAggregate(1),WholeStageCodegen (6)(0),Exchange(2) |
+--------+-----+-----+-------+--------------+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
```
- SQL Plan Metrics for Application for each SQL plan node in each SQL:
These are also called accumulables in Spark.
Note that not all SQL nodes have a mapping to stage id.
```
SQL Plan Metrics for Application:
+--------+-----+------+-----------------------------------------------------------+-------------+-----------------------+-------------+----------+
|appIndex|sqlID|nodeID|nodeName |accumulatorId|name |max_value |metricType|
+--------+-----+------+-----------------------------------------------------------+-------------+-----------------------+-------------+----------+
|1 |0 |1 |GpuColumnarExchange |111 |output rows |1111111111 |sum |
|1 |0 |1 |GpuColumnarExchange |112 |output columnar batches|222222 |sum |
|1 |0 |1 |GpuColumnarExchange |113 |data size |333333333333 |size |
|1 |0 |1 |GpuColumnarExchange |114 |shuffle bytes written |444444444444 |size |
|1 |0 |1 |GpuColumnarExchange |115 |shuffle records written|555555 |sum |
|1 |0 |1 |GpuColumnarExchange |116 |shuffle write time |666666666666 |nsTiming |
+--------+-----+------+-----------------------------------------------------------+-------------+-----------------------+-------------+----------+--------+
|appIndex|sqlID|nodeID|nodeName |accumulatorId|name |max_value |metricType|stageIds|
+--------+-----+------+-----------------------------------------------------------+-------------+-----------------------+-------------+----------+--------+
|1 |0 |1 |GpuColumnarExchange |111 |output rows |1111111111 |sum |4,3 |
|1 |0 |1 |GpuColumnarExchange |112 |output columnar batches|222222 |sum |4,3 |
|1 |0 |1 |GpuColumnarExchange |113 |data size |333333333333 |size |4,3 |
|1 |0 |1 |GpuColumnarExchange |114 |shuffle bytes written |444444444444 |size |4,3 |
|1 |0 |1 |GpuColumnarExchange |115 |shuffle records written|555555 |sum |4,3 |
|1 |0 |1 |GpuColumnarExchange |116 |shuffle write time |666666666666 |nsTiming |4,3 |
```
- WholeStageCodeGen to Node Mapping:
```
WholeStageCodeGen Mapping:
+--------+-----+------+---------------------+-------------------+------------+
|appIndex|sqlID|nodeID|SQL Node |Child Node |Child NodeID|
+--------+-----+------+---------------------+-------------------+------------+
|1 |0 |0 |WholeStageCodegen (6)|HashAggregate |1 |
|1 |0 |3 |WholeStageCodegen (5)|HashAggregate |4 |
|1 |0 |3 |WholeStageCodegen (5)|Project |5 |
|1 |0 |3 |WholeStageCodegen (5)|SortMergeJoin |6 |
|1 |0 |7 |WholeStageCodegen (2)|Sort |8 |
```
- Print SQL Plans (-p option):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,6 @@ case class ApplicationSummaryInfo(
val removedBMs: Seq[BlockManagerRemovedProfileResult],
val removedExecutors: Seq[ExecutorsRemovedProfileResult],
val unsupportedOps: Seq[UnsupportedOpsProfileResult],
val sparkProps: Seq[RapidsPropertyProfileResult])
val sparkProps: Seq[RapidsPropertyProfileResult],
val sqlStageInfo: Seq[SQLStageInfoProfileResult],
val wholeStage: Seq[WholeStageCodeGenResults])
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,32 @@ case class JobInfoProfileResult(
appIndex: Int,
jobID: Int,
stageIds: Seq[Int],
sqlID: Option[Long]) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "jobID", "stageIds", "sqlID")
sqlID: Option[Long],
startTime: Long,
endTime: Option[Long]) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "jobID", "stageIds", "sqlID", "startTime", "endTime")
override def convertToSeq: Seq[String] = {
val stageIdStr = s"[${stageIds.mkString(",")}]"
Seq(appIndex.toString, jobID.toString, stageIdStr, sqlID.map(_.toString).getOrElse(null))
Seq(appIndex.toString, jobID.toString, stageIdStr, sqlID.map(_.toString).getOrElse(null),
startTime.toString, endTime.map(_.toString).getOrElse(null))
}
}

case class SQLStageInfoProfileResult(
appIndex: Int,
sqlID: Long,
jobID: Int,
stageId: Int,
stageAttemptId: Int,
duration: Option[Long],
nodeNames: Seq[String]) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "sqlID", "jobID", "stageId",
"stageAttemptId", "Stage Duration", "SQL Nodes(IDs)")

override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, sqlID.toString, jobID.toString, stageId.toString,
stageAttemptId.toString, duration.map(_.toString).getOrElse(null),
nodeNames.mkString(","))
}
}

Expand Down Expand Up @@ -136,13 +157,13 @@ class SQLExecutionInfoClass(
var sqlCpuTimePercent: Double = -1)

case class SQLAccumProfileResults(appIndex: Int, sqlID: Long, nodeID: Long,
nodeName: String, accumulatorId: Long,
name: String, max_value: Long, metricType: String) extends ProfileResult {
nodeName: String, accumulatorId: Long, name: String, max_value: Long,
metricType: String, stageIds: String) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "sqlID", "nodeID", "nodeName", "accumulatorId",
"name", "max_value", "metricType")
"name", "max_value", "metricType", "stageIds")
override def convertToSeq: Seq[String] = {
Seq(appIndex.toString, sqlID.toString, nodeID.toString, nodeName, accumulatorId.toString,
name, max_value.toString, metricType)
name, max_value.toString, metricType, stageIds)
}
}

Expand Down Expand Up @@ -238,7 +259,8 @@ case class SQLMetricInfoCase(
metricType: String,
nodeID: Long,
nodeName: String,
nodeDesc: String)
nodeDesc: String,
stageIds: Seq[Int])

case class DriverAccumCase(
sqlID: Long,
Expand Down Expand Up @@ -588,3 +610,23 @@ case class CompareProfileResults(outputHeadersIn: Seq[String],
override val outputHeaders: Seq[String] = outputHeadersIn
override def convertToSeq: Seq[String] = rows
}

case class WholeStageCodeGenResults(
appIndex: Int,
sqlID: Long,
nodeID: Long,
parent: String,
child: String,
childNodeID: Long
) extends ProfileResult {
override val outputHeaders = Seq("appIndex", "sqlID", "nodeID", "SQL Node",
"Child Node", "Child NodeID")
override def convertToSeq: Seq[String] = {
Seq(appIndex.toString,
sqlID.toString,
nodeID.toString,
parent,
child,
childNodeID.toString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
def getJobInfo: Seq[JobInfoProfileResult] = {
val allRows = apps.flatMap { app =>
app.jobIdToInfo.map { case (jobId, j) =>
JobInfoProfileResult(app.index, j.jobID, j.stageIds, j.sqlID)
JobInfoProfileResult(app.index, j.jobID, j.stageIds, j.sqlID, j.startTime, j.endTime)
}
}
if (allRows.size > 0) {
Expand All @@ -133,6 +133,23 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
}
}

def getSQLToStage: Seq[SQLStageInfoProfileResult] = {
val allRows = apps.flatMap { app =>
app.aggregateSQLStageInfo
}
if (allRows.size > 0) {
case class Reverse[T](t: T)
implicit def ReverseOrdering[T: Ordering]: Ordering[Reverse[T]] =
Ordering[T].reverse.on(_.t)

// intentionally sort this table by the duration to be able to quickly
// see the stage that took the longest
allRows.sortBy(cols => (cols.appIndex, Reverse(cols.duration)))
} else {
Seq.empty
}
}

// Print RAPIDS related or all Spark Properties
// This table is inverse of the other tables where the row keys are
// property keys and the columns are the application values. So
Expand Down Expand Up @@ -164,6 +181,18 @@ class CollectInformation(apps: Seq[ApplicationInfo]) extends Logging {
}
}

// Print SQL whole stage code gen mapping
def getWholeStageCodeGenMapping: Seq[WholeStageCodeGenResults] = {
val allWholeStages = apps.flatMap { app =>
app.wholeStage
}
if (allWholeStages.size > 0) {
allWholeStages.sortBy(cols => (cols.appIndex, cols.sqlID, cols.nodeID))
} else {
Seq.empty
}
}

// Print SQL Plan Metrics
def getSQLPlanMetrics: Seq[SQLAccumProfileResults] = {
val sqlAccums = CollectInformation.generateSQLAccums(apps)
Expand Down Expand Up @@ -223,7 +252,7 @@ object CollectInformation extends Logging {
val max = Math.max(driverMax.getOrElse(0L), taskMax.getOrElse(0L))
Some(SQLAccumProfileResults(app.index, metric.sqlID,
metric.nodeID, metric.nodeName, metric.accumulatorId,
metric.name, max, metric.metricType))
metric.name, max, metric.metricType, metric.stageIds.mkString(",")))
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,12 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
val dsInfo = collect.getDataSourceInfo
val execInfo = collect.getExecutorInfo
val jobInfo = collect.getJobInfo
val sqlStageInfo = collect.getSQLToStage
val rapidsProps = collect.getProperties(rapidsOnly = true)
val sparkProps = collect.getProperties(rapidsOnly = false)
val rapidsJar = collect.getRapidsJARInfo
val sqlMetrics = collect.getSQLPlanMetrics
val wholeStage = collect.getWholeStageCodeGenMapping
// for compare mode we just add in extra tables for matching across applications
// the rest of the tables simply list all applications specified
val compareRes = if (appArgs.compare()) {
Expand Down Expand Up @@ -338,7 +340,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
}
(ApplicationSummaryInfo(appInfo, dsInfo, execInfo, jobInfo, rapidsProps, rapidsJar,
sqlMetrics, jsMetAgg, sqlTaskAggMetrics, durAndCpuMet, skewInfo, failedTasks, failedStages,
failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps), compareRes)
failedJobs, removedBMs, removedExecutors, unsupportedOps, sparkProps, sqlStageInfo,
wholeStage), compareRes)
}

def writeOutput(profileOutputWriter: ProfileOutputWriter,
Expand Down Expand Up @@ -393,7 +396,9 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
appsSum.flatMap(_.removedBMs).sortBy(_.appIndex),
appsSum.flatMap(_.removedExecutors).sortBy(_.appIndex),
appsSum.flatMap(_.unsupportedOps).sortBy(_.appIndex),
combineProps(rapidsOnly=false, appsSum).sortBy(_.key)
combineProps(rapidsOnly=false, appsSum).sortBy(_.key),
appsSum.flatMap(_.sqlStageInfo).sortBy(_.duration)(Ordering[Option[Long]].reverse),
appsSum.flatMap(_.wholeStage).sortBy(_.appIndex)
)
Seq(reduced)
} else {
Expand All @@ -405,6 +410,7 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
profileOutputWriter.write("Data Source Information", app.dsInfo)
profileOutputWriter.write("Executor Information", app.execInfo)
profileOutputWriter.write("Job Information", app.jobInfo)
profileOutputWriter.write("SQL to Stage Information", app.sqlStageInfo)
profileOutputWriter.write("Spark Rapids parameters set explicitly", app.rapidsProps,
Some("Spark Rapids parameters"))
profileOutputWriter.write("Spark Properties", app.sparkProps,
Expand All @@ -413,7 +419,8 @@ class Profiler(hadoopConf: Configuration, appArgs: ProfileArgs) extends Logging
Some("Rapids 4 Spark Jars"))
profileOutputWriter.write("SQL Plan Metrics for Application", app.sqlMetrics,
Some("SQL Plan Metrics"))

profileOutputWriter.write("WholeStageCodeGen Mapping", app.wholeStage,
Some("WholeStagecodeGen Mapping"))
comparedRes.foreach { compareSum =>
val matchingSqlIds = compareSum.matchingSqlIds
val matchingStageIds = compareSum.matchingStageIds
Expand Down
Loading

0 comments on commit 5826fe1

Please sign in to comment.