Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Profiling tool: Print UCX and GDS parameters #2785

Merged
merged 6 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ class CollectInformation(apps: Seq[ApplicationInfo],
val messageHeader = "\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
if (app.allDataFrames.contains(s"propertiesDF_${app.index}")) {
app.runQuery(query = app.generateRapidsProperties + " order by key",
app.runQuery(query = app.generateRapidsUcxGdsProperties + " order by key",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use a more generic name here, such as generateNvidiaProperties ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

fileWriter = fileWriter, messageHeader = messageHeader)
} else {
fileWriter.foreach(_.write("No Spark Rapids parameters Found!\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,20 @@ class CompareApplications(apps: Seq[ApplicationInfo],
// For the 1st app
if (i == 1) {
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
app.generateRapidsUcxGdsProperties + ") union "
query += "(" + app.generateRapidsUcxGdsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
query += "\n LEFT OUTER JOIN \n"
} else if (i < apps.size) { // For the 2nd to non-last app(s)
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
app.generateRapidsUcxGdsProperties + ") union "
query += "(" + app.generateRapidsUcxGdsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
query += "\n LEFT OUTER JOIN \n"
} else { // For the last app
withClauseAllKeys += "select distinct key from (" +
app.generateRapidsProperties + "))\n"
query += "(" + app.generateRapidsProperties + s") tmp_$i"
app.generateRapidsUcxGdsProperties + "))\n"
query += "(" + app.generateRapidsUcxGdsProperties + s") tmp_$i"
query += s" on allKeys.key=tmp_$i.key"
}
selectValuePart += s",value_app$i"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,12 +649,14 @@ class ApplicationInfo(
}
}

// Function to generate a query for printing Rapids related Spark properties
def generateRapidsProperties: String =
// Function to generate a query for printing Rapids/UCX/GDS related Spark properties
def generateRapidsUcxGdsProperties: String =
s"""select key,value as value_app$index
|from propertiesDF_$index
|where source ='spark'
|and key like 'spark.rapids%'
|or key like 'spark.executorEnv.UCX%'
|or key in ('spark.shuffle.manager','spark.shuffle.service.enabled')
|""".stripMargin

// Function to generate the SQL string for aggregating task metrics columns.
Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -322,4 +322,28 @@ class ApplicationInfoSuite extends FunSuite with Logging {
tempFile4.delete()
}
}

test("test gds-ucx-parameters") {
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs =
new ProfileArgs(Array(s"$logDir/gds_ucx_eventlog.zstd"))
var index: Int = 1
val eventlogPaths = appArgs.eventlog()
for (path <- eventlogPaths) {
apps += new ApplicationInfo(appArgs.numOutputRows.getOrElse(1000), sparkSession,
EventLogPathProcessor.getEventLogInfo(path, sparkSession).head._1, index)
index += 1
}
assert(apps.size == 1)
for (app <- apps) {
val rows = app.runQuery(query = app.generateRapidsUcxGdsProperties + " order by key",
fileWriter = None).collect()
assert(rows.length == 22) // 22 properties captured.
// verify ucx parameters are captured.
assert(rows(1)(0).equals("spark.executorEnv.UCX_MAX_RNDV_RAILS"))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is order guaranteed here? Would be better to kit reference specific rows

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

order is guaranteed here. using order by propertyName in the query.


//verify gds parameters are captured.
assert(rows(11)(0).equals("spark.rapids.memory.gpu.direct.storage.spill.useHostMemory"))
}
}
}