Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue with Profiling tool taking a long time due to finding stage ids that maps to sql nodes #5951

Merged
merged 7 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class ExecInfo(
val nodeId: Long,
val isSupported: Boolean,
val children: Option[Seq[ExecInfo]], // only one level deep
val stages: Seq[Int] = Seq.empty,
val stages: Set[Int] = Set.empty,
val shouldRemove: Boolean = false) {
private def childrenToString = {
val str = children.map { c =>
Expand Down Expand Up @@ -79,15 +79,11 @@ object SQLPlanParser extends Logging {
PlanInfo(appID, sqlID, execInfos)
}

def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Seq[Int] = {
def getStagesInSQLNode(node: SparkPlanGraphNode, app: AppBase): Set[Int] = {
val nodeAccums = node.metrics.map(_.accumulatorId)
app.stageAccumulators.flatMap { case (stageId, stageAccums) =>
if (nodeAccums.intersect(stageAccums).nonEmpty) {
Some(stageId)
} else {
None
}
}.toSeq
nodeAccums.flatMap { nodeAccumId =>
app.accumulatorToStages.get(nodeAccumId)
}.flatten.toSet
}

private val skipUDFCheckExecs = Seq("ArrowEvalPython", "AggregateInPandas",
Expand Down Expand Up @@ -125,7 +121,7 @@ object SQLPlanParser extends Logging {
case "ColumnarToRow" =>
// ignore ColumnarToRow to row for now as assume everything is columnar
new ExecInfo(sqlID, node.name, expr = "", 1, duration = None, node.id,
isSupported = false, None, Seq.empty, shouldRemove=true)
isSupported = false, None, Set.empty, shouldRemove=true)
case c if (c.contains("CreateDataSourceTableAsSelectCommand")) =>
// create data source table doesn't show the format so we can't determine
// if we support it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ case class WholeStageExecParser(
// for now
val allStagesIncludingChildren = childNodes.flatMap(_.stages).toSet ++ stagesInNode.toSet
val execInfo = new ExecInfo(sqlID, node.name, node.name, avSpeedupFactor, maxDuration,
node.id, anySupported, Some(childNodes), allStagesIncludingChildren.toSeq)
node.id, anySupported, Some(childNodes), allStagesIncludingChildren)
Seq(execInfo)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ case class SQLMetricInfoCase(
nodeID: Long,
nodeName: String,
nodeDesc: String,
stageIds: Seq[Int])
stageIds: Set[Int])

case class DriverAccumCase(
sqlID: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ abstract class AppBase(
HashMap[Long, ArrayBuffer[TaskStageAccumCase]]()

val stageIdToInfo: HashMap[(Int, Int), StageInfoClass] = new HashMap[(Int, Int), StageInfoClass]()
val stageAccumulators: HashMap[Int, Seq[Long]] = new HashMap[Int, Seq[Long]]()
val accumulatorToStages: HashMap[Long, Set[Int]] = new HashMap[Long, Set[Int]]()

var driverAccumMap: HashMap[Long, ArrayBuffer[DriverAccumCase]] =
HashMap[Long, ArrayBuffer[DriverAccumCase]]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,10 @@ abstract class EventProcessorBase[T <: AppBase](app: T) extends SparkListener wi
stage.duration = ProfileUtils.optionLongMinusOptionLong(stage.completionTime,
stage.info.submissionTime)
val stageAccumulatorIds = event.stageInfo.accumulables.values.map { m => m.id }.toSeq
app.stageAccumulators.put(event.stageInfo.stageId, stageAccumulatorIds)
stageAccumulatorIds.foreach { accumId =>
val existingStages = app.accumulatorToStages.getOrElse(accumId, Set.empty)
app.accumulatorToStages.put(accumId, existingStages + event.stageInfo.stageId)
}
}

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,8 @@ class ApplicationInfo(
var taskEnd: ArrayBuffer[TaskCase] = ArrayBuffer[TaskCase]()
var unsupportedSQLplan: ArrayBuffer[UnsupportedSQLPlan] = ArrayBuffer[UnsupportedSQLPlan]()
var wholeStage: ArrayBuffer[WholeStageCodeGenResults] = ArrayBuffer[WholeStageCodeGenResults]()
val sqlPlanNodeIdToStageIds: mutable.HashMap[(Long, Long), Seq[Int]] =
mutable.HashMap.empty[(Long, Long), Seq[Int]]
val sqlPlanNodeIdToStageIds: mutable.HashMap[(Long, Long), Set[Int]] =
mutable.HashMap.empty[(Long, Long), Set[Int]]

private lazy val eventProcessor = new EventsProcessor(this)

Expand Down Expand Up @@ -302,7 +302,7 @@ class ApplicationInfo(

// Then process SQL plan metric type
for (metric <- node.metrics) {
val stages = sqlPlanNodeIdToStageIds.get((sqlID, node.id)).getOrElse(Seq.empty)
val stages = sqlPlanNodeIdToStageIds.get((sqlID, node.id)).getOrElse(Set.empty)
val allMetric = SQLMetricInfoCase(sqlID, metric.name,
metric.accumulatorId, metric.metricType, node.id,
node.name, node.desc, stages)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ appIndex,sqlID,nodeID,nodeName,accumulatorId,name,max_value,metricType,stageIds
1,0,7,GpuShuffleCoalesce,76,total time,261389422,nsTiming,2
1,0,7,GpuShuffleCoalesce,77,collect batch time,167775821,nsTiming,2
1,0,7,GpuShuffleCoalesce,78,concat batch time,83550919,nsTiming,2
1,0,8,GpuColumnarExchange,79,partition data size,42872100,sum,2;1
1,0,8,GpuColumnarExchange,80,partitions,200,sum,2;1
1,0,8,GpuColumnarExchange,81,output rows,10000000,sum,2;1
1,0,8,GpuColumnarExchange,82,output columnar batches,1200,sum,2;1
1,0,8,GpuColumnarExchange,83,data size,40076192,size,2;1
1,0,8,GpuColumnarExchange,85,local blocks read,1200,sum,2;1
1,0,8,GpuColumnarExchange,88,local bytes read,40132258,size,2;1
1,0,8,GpuColumnarExchange,89,fetch wait time,0,timing,2;1
1,0,8,GpuColumnarExchange,90,records read,1200,sum,2;1
1,0,8,GpuColumnarExchange,91,shuffle bytes written,40132258,size,2;1
1,0,8,GpuColumnarExchange,92,shuffle records written,1200,sum,2;1
1,0,8,GpuColumnarExchange,93,shuffle write time,508750471,nsTiming,2;1
1,0,8,GpuColumnarExchange,79,partition data size,42872100,sum,1;2
1,0,8,GpuColumnarExchange,80,partitions,200,sum,1;2
1,0,8,GpuColumnarExchange,81,output rows,10000000,sum,1;2
1,0,8,GpuColumnarExchange,82,output columnar batches,1200,sum,1;2
1,0,8,GpuColumnarExchange,83,data size,40076192,size,1;2
1,0,8,GpuColumnarExchange,85,local blocks read,1200,sum,1;2
1,0,8,GpuColumnarExchange,88,local bytes read,40132258,size,1;2
1,0,8,GpuColumnarExchange,89,fetch wait time,0,timing,1;2
1,0,8,GpuColumnarExchange,90,records read,1200,sum,1;2
1,0,8,GpuColumnarExchange,91,shuffle bytes written,40132258,size,1;2
1,0,8,GpuColumnarExchange,92,shuffle records written,1200,sum,1;2
1,0,8,GpuColumnarExchange,93,shuffle write time,508750471,nsTiming,1;2
1,0,9,GpuProject,94,total time,6667140,nsTiming,1
1,0,10,GpuRowToColumnar,95,total time,61112304,nsTiming,1
1,0,11,WholeStageCodegen (1),96,duration,5463,timing,1
Expand All @@ -66,18 +66,18 @@ appIndex,sqlID,nodeID,nodeName,accumulatorId,name,max_value,metricType,stageIds
1,0,15,GpuShuffleCoalesce,109,total time,3266208420,nsTiming,2
1,0,15,GpuShuffleCoalesce,110,collect batch time,359397047,nsTiming,2
1,0,15,GpuShuffleCoalesce,111,concat batch time,104974316,nsTiming,2
1,0,16,GpuColumnarExchange,112,partition data size,42872100,sum,2;0
1,0,16,GpuColumnarExchange,113,partitions,200,sum,2;0
1,0,16,GpuColumnarExchange,114,output rows,10000000,sum,2;0
1,0,16,GpuColumnarExchange,115,output columnar batches,1200,sum,2;0
1,0,16,GpuColumnarExchange,116,data size,40076192,size,2;0
1,0,16,GpuColumnarExchange,118,local blocks read,1200,sum,2;0
1,0,16,GpuColumnarExchange,121,local bytes read,40132250,size,2;0
1,0,16,GpuColumnarExchange,122,fetch wait time,0,timing,2;0
1,0,16,GpuColumnarExchange,123,records read,1200,sum,2;0
1,0,16,GpuColumnarExchange,124,shuffle bytes written,40132250,size,2;0
1,0,16,GpuColumnarExchange,125,shuffle records written,1200,sum,2;0
1,0,16,GpuColumnarExchange,126,shuffle write time,400284505,nsTiming,2;0
1,0,16,GpuColumnarExchange,112,partition data size,42872100,sum,0;2
1,0,16,GpuColumnarExchange,113,partitions,200,sum,0;2
1,0,16,GpuColumnarExchange,114,output rows,10000000,sum,0;2
1,0,16,GpuColumnarExchange,115,output columnar batches,1200,sum,0;2
1,0,16,GpuColumnarExchange,116,data size,40076192,size,0;2
1,0,16,GpuColumnarExchange,118,local blocks read,1200,sum,0;2
1,0,16,GpuColumnarExchange,121,local bytes read,40132250,size,0;2
1,0,16,GpuColumnarExchange,122,fetch wait time,0,timing,0;2
1,0,16,GpuColumnarExchange,123,records read,1200,sum,0;2
1,0,16,GpuColumnarExchange,124,shuffle bytes written,40132250,size,0;2
1,0,16,GpuColumnarExchange,125,shuffle records written,1200,sum,0;2
1,0,16,GpuColumnarExchange,126,shuffle write time,400284505,nsTiming,0;2
1,0,17,GpuProject,127,total time,207820,nsTiming,0
1,0,18,GpuRowToColumnar,128,total time,58640462,nsTiming,0
1,0,19,WholeStageCodegen (2),129,duration,5920,timing,0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ object ToolTestUtils extends Logging {
val diffCount = df.except(expectedDf).union(expectedDf.except(df)).count
if (diffCount != 0) {
logWarning("Diff expected vs actual:")
expectedDf.show(false)
df.show(false)
expectedDf.show(1000, false)
df.show(1000, false)
}
assert(diffCount == 0)
}
Expand Down