Skip to content

Commit

Permalink
Profiling tool: Print UCX and GDS parameters (#2785)
Browse files Browse the repository at this point in the history
* print UCX parameters

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* Profiling tool: Print UCX and GDS parameters

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* addressed review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jun 30, 2021
1 parent 5c89f1b commit 95b29b0
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ class CollectInformation(apps: Seq[ApplicationInfo],
val messageHeader = "\nSpark Rapids parameters set explicitly:\n"
for (app <- apps) {
if (app.allDataFrames.contains(s"propertiesDF_${app.index}")) {
app.runQuery(query = app.generateRapidsProperties + " order by propertyName",
app.runQuery(query = app.generateNvidiaProperties + " order by propertyName",
fileWriter = fileWriter, messageHeader = messageHeader)
} else {
fileWriter.foreach(_.write("No Spark Rapids parameters Found!\n"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,14 @@ class CompareApplications(apps: Seq[ApplicationInfo],
if (app.allDataFrames.contains(s"propertiesDF_${app.index}")) {
if (i < apps.size) {
withClauseAllKeys += "select distinct propertyName from (" +
app.generateRapidsProperties + ") union "
query += "(" + app.generateRapidsProperties + s") tmp_$i"
app.generateNvidiaProperties + ") union "
query += "(" + app.generateNvidiaProperties + s") tmp_$i"
query += s" on allKeys.propertyName=tmp_$i.propertyName"
query += "\n LEFT OUTER JOIN \n"
} else { // For the last app
withClauseAllKeys += "select distinct propertyName from (" +
app.generateRapidsProperties + "))\n"
query += "(" + app.generateRapidsProperties + s") tmp_$i"
app.generateNvidiaProperties + "))\n"
query += "(" + app.generateNvidiaProperties + s") tmp_$i"
query += s" on allKeys.propertyName=tmp_$i.propertyName"
}
selectValuePart += s",appIndex_${app.index}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -756,12 +756,14 @@ class ApplicationInfo(
}
}

// Function to generate a query for printing Rapids related Spark properties
def generateRapidsProperties: String = {
// Function to generate a query for printing Rapids/UCX/GDS related Spark properties
def generateNvidiaProperties: String = {
s"""select key as propertyName,value as appIndex_$index
|from propertiesDF_$index
|where source ='spark'
|and key like 'spark.rapids%'
|or key like 'spark.executorEnv.UCX%'
|or key in ('spark.shuffle.manager','spark.shuffle.service.enabled')
|""".stripMargin
}

Expand Down
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -354,4 +354,28 @@ class ApplicationInfoSuite extends FunSuite with Logging {
tempFile4.delete()
}
}

test("test gds-ucx-parameters") {
val apps: ArrayBuffer[ApplicationInfo] = ArrayBuffer[ApplicationInfo]()
val appArgs =
new ProfileArgs(Array(s"$logDir/gds_ucx_eventlog.zstd"))
var index: Int = 1
val eventlogPaths = appArgs.eventlog()
for (path <- eventlogPaths) {
apps += new ApplicationInfo(appArgs.numOutputRows.getOrElse(1000), sparkSession,
EventLogPathProcessor.getEventLogInfo(path, sparkSession).head._1, index)
index += 1
}
assert(apps.size == 1)
for (app <- apps) {
val rows = app.runQuery(query = app.generateNvidiaProperties + " order by propertyName",
fileWriter = None).collect()
assert(rows.length == 5) // 5 properties captured.
// verify ucx parameters are captured.
assert(rows(0)(0).equals("spark.executorEnv.UCX_RNDV_SCHEME"))

//verify gds parameters are captured.
assert(rows(1)(0).equals("spark.rapids.memory.gpu.direct.storage.spill.alignedIO"))
}
}
}

0 comments on commit 95b29b0

Please sign in to comment.