Skip to content

Commit

Permalink
Fix issue with Profiling tool taking a long time due to finding stage…
Browse files Browse the repository at this point in the history
… ids that maps to sql nodes (#5951)

* Profiling tool taking to long to lookup stages based on metrics

* fixes

* use Set for stages

* update tests

* update name

* sign off

* sign off

Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
tgravescs authored Jul 6, 2022
1 parent 071c1de commit 0b511cb
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ExecInfo(
val nodeId: Long,
val isSupported: Boolean,
val children: Option[Seq[ExecInfo]], // only one level deep
val stages: Seq[Int] = Seq.empty,
val stages: Set[Int] = Set.empty,
val shouldRemove: Boolean = false) {
private def childrenToString = {
val str = children.map { c =>
Expand Down Expand Up @@ -79,15 +79,11 @@ object SQLPlanParser extends Logging {
PlanInfo(appID, sqlID, execInfos)
}

def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Seq[Int] = {
def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
app.stageAccumulators.flatMap { case (stageId, stageAccums) =>
if (nodeAccums.intersect(stageAccums).nonEmpty) {
Some(stageId)
} else {
None
}
}.toSeq
nodeAccums.flatMap { nodeAccumId =>
app.accumulatorToStages.get(nodeAccumId)
}.flatten.toSet
}

private val skipUDFCheckExecs = Seq("ArrowEvalPython", "AggregateInPandas",
Expand Down Expand Up @@ -125,7 +121,7 @@ object SQLPlanParser extends Logging {
case "ColumnarToRow" =>
// ignore ColumnarToRow to row for now as assume everything is columnar
new ExecInfo(sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = false, None, Seq.empty, shouldRemove=true)
isSupported = false, None, Set.empty, shouldRemove=true)
case c if (c.contains("CreateDataSourceTableAsSelectCommand")) =>
// create data source table doesn't show the format so we can't determine
// if we support it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class WholeStageExecParser(
// for now
val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet
val execInfo = new ExecInfo(sqlID, node.name, node.name, avSpeedupFactor, maxDuration,
node.id, anySupported, Some(childNodes), allStagesIncludingChildren.toSeq)
node.id, anySupported, Some(childNodes), allStagesIncludingChildren)
Seq(execInfo)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ case class SQLMetricInfoCase(
nodeID: Long,
nodeName: String,
nodeDesc: String,
stageIds: Seq[Int])
stageIds: Set[Int])

case class DriverAccumCase(
sqlID: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ abstract class AppBase(
HashMap[Long, ArrayBuffer[TaskStageAccumCase]]()

val stageIdToInfo: HashMap[(Int, Int), StageInfoClass] = new HashMap[(Int, Int), StageInfoClass]()
val stageAccumulators: HashMap[Int, Seq[Long]] = new HashMap[Int, Seq[Long]]()
val accumulatorToStages: HashMap[Long, Set[Int]] = new HashMap[Long, Set[Int]]()

var driverAccumMap: HashMap[Long, ArrayBuffer[DriverAccumCase]] =
HashMap[Long, ArrayBuffer[DriverAccumCase]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,10 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
stage.duration = ProfileUtils.optionLongMinusOptionLong(stage.completionTime,
stage.info.submissionTime)
val stageAccumulatorIds = event.stageInfo.accumulables.values.map { m => m.id }.toSeq
app.stageAccumulators.put(event.stageInfo.stageId, stageAccumulatorIds)
stageAccumulatorIds.foreach { accumId =>
val existingStages = app.accumulatorToStages.getOrElse(accumId, Set.empty)
app.accumulatorToStages.put(accumId, existingStages + event.stageInfo.stageId)
}
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ class ApplicationInfo(
var taskEnd: ArrayBuffer[TaskCase] = ArrayBuffer[TaskCase]()
var unsupportedSQLplan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var wholeStage: ArrayBuffer[WholeStageCodeGenResults] = ArrayBuffer[WholeStageCodeGenResults]()
val sqlPlanNodeIdToStageIds: mutable.HashMap[(Long, Long), Seq[Int]] =
mutable.HashMap.empty[(Long, Long), Seq[Int]]
val sqlPlanNodeIdToStageIds: mutable.HashMap[(Long, Long), Set[Int]] =
mutable.HashMap.empty[(Long, Long), Set[Int]]

private lazy val eventProcessor = new EventsProcessor(this)

Expand Down Expand Up @@ -302,7 +302,7 @@ class ApplicationInfo(

// Then process SQL plan metric type
for (metric <- node.metrics) {
val stages = sqlPlanNodeIdToStageIds.get((sqlID, node.id)).getOrElse(Seq.empty)
val stages = sqlPlanNodeIdToStageIds.get((sqlID, node.id)).getOrElse(Set.empty)
val allMetric = SQLMetricInfoCase(sqlID, metric.name,
metric.accumulatorId, metric.metricType, node.id,
node.name, node.desc, stages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ appIndex,sqlID,nodeID,nodeName,accumulatorId,name,max_value,metricType,stageIds
1,0,7,GpuShuffleCoalesce,76,total time,261389422,nsTiming,2
1,0,7,GpuShuffleCoalesce,77,collect batch time,167775821,nsTiming,2
1,0,7,GpuShuffleCoalesce,78,concat batch time,83550919,nsTiming,2
1,0,8,GpuColumnarExchange,79,partition data size,42872100,sum,2;1
1,0,8,GpuColumnarExchange,80,partitions,200,sum,2;1
1,0,8,GpuColumnarExchange,81,output rows,10000000,sum,2;1
1,0,8,GpuColumnarExchange,82,output columnar batches,1200,sum,2;1
1,0,8,GpuColumnarExchange,83,data size,40076192,size,2;1
1,0,8,GpuColumnarExchange,85,local blocks read,1200,sum,2;1
1,0,8,GpuColumnarExchange,88,local bytes read,40132258,size,2;1
1,0,8,GpuColumnarExchange,89,fetch wait time,0,timing,2;1
1,0,8,GpuColumnarExchange,90,records read,1200,sum,2;1
1,0,8,GpuColumnarExchange,91,shuffle bytes written,40132258,size,2;1
1,0,8,GpuColumnarExchange,92,shuffle records written,1200,sum,2;1
1,0,8,GpuColumnarExchange,93,shuffle write time,508750471,nsTiming,2;1
1,0,8,GpuColumnarExchange,79,partition data size,42872100,sum,1;2
1,0,8,GpuColumnarExchange,80,partitions,200,sum,1;2
1,0,8,GpuColumnarExchange,81,output rows,10000000,sum,1;2
1,0,8,GpuColumnarExchange,82,output columnar batches,1200,sum,1;2
1,0,8,GpuColumnarExchange,83,data size,40076192,size,1;2
1,0,8,GpuColumnarExchange,85,local blocks read,1200,sum,1;2
1,0,8,GpuColumnarExchange,88,local bytes read,40132258,size,1;2
1,0,8,GpuColumnarExchange,89,fetch wait time,0,timing,1;2
1,0,8,GpuColumnarExchange,90,records read,1200,sum,1;2
1,0,8,GpuColumnarExchange,91,shuffle bytes written,40132258,size,1;2
1,0,8,GpuColumnarExchange,92,shuffle records written,1200,sum,1;2
1,0,8,GpuColumnarExchange,93,shuffle write time,508750471,nsTiming,1;2
1,0,9,GpuProject,94,total time,6667140,nsTiming,1
1,0,10,GpuRowToColumnar,95,total time,61112304,nsTiming,1
1,0,11,WholeStageCodegen (1),96,duration,5463,timing,1
Expand All @@ -66,18 +66,18 @@ appIndex,sqlID,nodeID,nodeName,accumulatorId,name,max_value,metricType,stageIds
1,0,15,GpuShuffleCoalesce,109,total time,3266208420,nsTiming,2
1,0,15,GpuShuffleCoalesce,110,collect batch time,359397047,nsTiming,2
1,0,15,GpuShuffleCoalesce,111,concat batch time,104974316,nsTiming,2
1,0,16,GpuColumnarExchange,112,partition data size,42872100,sum,2;0
1,0,16,GpuColumnarExchange,113,partitions,200,sum,2;0
1,0,16,GpuColumnarExchange,114,output rows,10000000,sum,2;0
1,0,16,GpuColumnarExchange,115,output columnar batches,1200,sum,2;0
1,0,16,GpuColumnarExchange,116,data size,40076192,size,2;0
1,0,16,GpuColumnarExchange,118,local blocks read,1200,sum,2;0
1,0,16,GpuColumnarExchange,121,local bytes read,40132250,size,2;0
1,0,16,GpuColumnarExchange,122,fetch wait time,0,timing,2;0
1,0,16,GpuColumnarExchange,123,records read,1200,sum,2;0
1,0,16,GpuColumnarExchange,124,shuffle bytes written,40132250,size,2;0
1,0,16,GpuColumnarExchange,125,shuffle records written,1200,sum,2;0
1,0,16,GpuColumnarExchange,126,shuffle write time,400284505,nsTiming,2;0
1,0,16,GpuColumnarExchange,112,partition data size,42872100,sum,0;2
1,0,16,GpuColumnarExchange,113,partitions,200,sum,0;2
1,0,16,GpuColumnarExchange,114,output rows,10000000,sum,0;2
1,0,16,GpuColumnarExchange,115,output columnar batches,1200,sum,0;2
1,0,16,GpuColumnarExchange,116,data size,40076192,size,0;2
1,0,16,GpuColumnarExchange,118,local blocks read,1200,sum,0;2
1,0,16,GpuColumnarExchange,121,local bytes read,40132250,size,0;2
1,0,16,GpuColumnarExchange,122,fetch wait time,0,timing,0;2
1,0,16,GpuColumnarExchange,123,records read,1200,sum,0;2
1,0,16,GpuColumnarExchange,124,shuffle bytes written,40132250,size,0;2
1,0,16,GpuColumnarExchange,125,shuffle records written,1200,sum,0;2
1,0,16,GpuColumnarExchange,126,shuffle write time,400284505,nsTiming,0;2
1,0,17,GpuProject,127,total time,207820,nsTiming,0
1,0,18,GpuRowToColumnar,128,total time,58640462,nsTiming,0
1,0,19,WholeStageCodegen (2),129,duration,5920,timing,0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ object ToolTestUtils extends Logging {
val diffCount = df.except(expectedDf).union(expectedDf.except(df)).count
if (diffCount != 0) {
logWarning("Diff expected vs actual:")
expectedDf.show(false)
df.show(false)
expectedDf.show(1000, false)
df.show(1000, false)
}
assert(diffCount == 0)
}
Expand Down

0 comments on commit 0b511cb

Please sign in to comment.