Skip to content

Commit

Permalink
Qualification and Profiling tool handle Read formats and datatypes (#…
Browse files Browse the repository at this point in the history
…2904)



* Add in printing read schema

Signed-off-by: Thomas Graves <tgraves@apache.org>


* Calculate score with the read format and datatypes included

Signed-off-by: Thomas Graves <tgraves@apache.org>

* fixes

* remove some unneeded rounding

* update test and fixes

Signed-off-by: Thomas Graves <tgraves@apache.org>

* add test files

* more tests and cleanup

* move rounding

* write to stdout as well

* output

* remove ln from println

* shrink output report spacing

* commonize size

* add df bakc in header

* configure stdout off for tests

* update readme

* update desc

* Fix missing extra info with type checks

Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* don't include supplement text in qualification tool csv file

* Add csv output for just not supported format and types

Signed-off-by: Thomas Graves <tgraves@apache.org>

* update expected results


* fix typo
  • Loading branch information
tgravescs authored Jul 14, 2021
1 parent 18f48f5 commit 9998174
Show file tree
Hide file tree
Showing 40 changed files with 974 additions and 168 deletions.
18 changes: 18 additions & 0 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,24 @@
</launchers>
</configuration>
</execution>
<execution>
<id>update_supported_tools</id>
<phase>verify</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<launchers>
<launcher>
<id>update_rapids_support_tools</id>
<mainClass>com.nvidia.spark.rapids.SupportedOpsForTools</mainClass>
<args>
<arg>${project.basedir}/../tools/src/main/resources/supportedDataSource.csv</arg>
</args>
</launcher>
</launchers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,7 @@
<exclude>dependency-reduced-pom.xml</exclude>
<exclude>**/.*/**</exclude>
<exclude>src/main/java/com/nvidia/spark/rapids/format/*</exclude>
<exclude>src/main/resources/supportedDataSource.csv</exclude>
<!-- Apache Rat excludes target folder for projects that are included by
default, but there are some projects that are conditionally included. -->
<exclude>**/target/**/*</exclude>
Expand Down
91 changes: 85 additions & 6 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/TypeChecks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,23 @@ import org.apache.spark.sql.types._
*/
sealed abstract class SupportLevel {
def htmlTag: String
def text: String
}

/**
* N/A neither spark nor the plugin supports this.
*/
object NotApplicable extends SupportLevel {
override def htmlTag: String = "<td> </td>"
override def text: String = "NA"
}

/**
* Spark supports this but the plugin does not.
*/
object NotSupported extends SupportLevel {
override def htmlTag: String = "<td><b>NS</b></td>"
override def htmlTag: String = s"<td><b>$text</b></td>"
override def text: String = "NS"
}

/**
Expand All @@ -52,12 +55,14 @@ object NotSupported extends SupportLevel {
* types because they are not 100% supported.
*/
class Supported(val asterisks: Boolean = false) extends SupportLevel {
override def htmlTag: String =
override def htmlTag: String = s"<td>$text</td>"
override def text: String = {
if (asterisks) {
"<td>S*</td>"
"S*"
} else {
"<td>S</td>"
"S"
}
}
}

/**
Expand Down Expand Up @@ -86,10 +91,17 @@ class PartiallySupported(
None
}
val extraInfo = (note.toSeq ++ litOnly.toSeq ++ typeStr.toSeq).mkString("; ")
val allText = s"$text ($extraInfo)"
s"<td><em>$allText</em></td>"
}

// don't include the extra info in the supported text field for now
// as the qualification tool doesn't use it
override def text: String = {
if (asterisks) {
"<td><em>PS* (" + extraInfo + ")</em></td>"
"PS*"
} else {
"<td><em>PS (" + extraInfo + ")</em></td>"
"PS"
}
}
}
Expand Down Expand Up @@ -1641,3 +1653,70 @@ object SupportedOpsDocs {
}
}
}

object SupportedOpsForTools {

private def outputSupportIO() {
// Look at what we have for defaults for some configs because if the configs are off
// it likely means something isn't completely compatible.
val conf = new RapidsConf(Map.empty[String, String])
val types = TypeEnum.values.toSeq
val header = Seq("Format", "Direction") ++ types
println(header.mkString(","))
GpuOverrides.fileFormats.toSeq.sortBy(_._1.toString).foreach {
case (format, ioMap) =>
val formatEnabled = format.toString.toLowerCase match {
case "csv" => conf.isCsvEnabled && conf.isCsvReadEnabled
case "parquet" => conf.isParquetEnabled && conf.isParquetReadEnabled
case "orc" => conf.isOrcEnabled && conf.isOrcReadEnabled
case _ =>
throw new IllegalArgumentException("Format is unknown we need to add it here!")
}
val read = ioMap(ReadFileOp)
// we have lots of configs for various operations, just try to get the main ones
val readOps = types.map { t =>
val typeEnabled = if (format.toString.toLowerCase.equals("csv")) {
t.toString() match {
case "BOOLEAN" => conf.isCsvBoolReadEnabled
case "BYTE" => conf.isCsvByteReadEnabled
case "SHORT" => conf.isCsvShortReadEnabled
case "INT" => conf.isCsvIntReadEnabled
case "LONG" => conf.isCsvLongReadEnabled
case "FLOAT" => conf.isCsvFloatReadEnabled
case "DOUBLE" => conf.isCsvDoubleReadEnabled
case "TIMESTAMP" => conf.isCsvTimestampReadEnabled
case "DATE" => conf.isCsvDateReadEnabled
case "DECIMAL" => conf.decimalTypeEnabled
case _ => true
}
} else {
t.toString() match {
case "DECIMAL" => conf.decimalTypeEnabled
case _ => true
}
}
if (!formatEnabled || !typeEnabled) {
// indicate configured off by default
"CO"
} else {
read.support(t).text
}
}
// only support reads for now
println(s"${(Seq(format, "read") ++ readOps).mkString(",")}")
}
}

def help(): Unit = {
outputSupportIO()
}

def main(args: Array[String]): Unit = {
val out = new FileOutputStream(new File(args(0)))
Console.withOut(out) {
Console.withErr(out) {
SupportedOpsForTools.help()
}
}
}
}
107 changes: 57 additions & 50 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ GPU generated event logs.
at the top level when specifying a directory.

Note: Spark event logs can be downloaded from Spark UI using a "Download" button on the right side,
or can be found in the location specified by `spark.eventLog.dir`.
or can be found in the location specified by `spark.eventLog.dir`. See the
[Apache Spark Monitoring](http://spark.apache.org/docs/latest/monitoring.html) documentation for
more information.

Optional:
- maven installed
Expand Down Expand Up @@ -99,12 +101,12 @@ rapids-4-spark-tools_2.12-<version>.jar \
The qualification tool is used to look at a set of applications to determine if the RAPIDS Accelerator for Apache Spark
might be a good fit for those applications. The tool works by processing the CPU generated event logs from Spark.

Currently it does this by looking at the amount of time spent doing SQL Dataframe
operations vs the entire application time: `(sum(SQL Dataframe Duration) / (application-duration))`.
The more time spent doing SQL Dataframe operations the higher the score is
and the more likely the plugin will be able to help accelerate that application.
Note that the application time is from application start to application end so if you are using an interactive
shell where there is nothing running from a while, this time will include that which might skew the score.
This tool is intended to give the users a starting point and does not guarantee the applications it scores highest
will actually be accelerated the most. Currently it works by looking at the amount of time spent in tasks of SQL
Dataframe operations. The more total task time doing SQL Dataframe operations the higher the score is and the more
likely the plugin will be able to help accelerate that application. The tool also looks for read data formats and types
that the plugin doesn't support and if it finds any not supported it will take away from the score (based on the
total task time in SQL Dataframe operations).

Each application(event log) could have multiple SQL queries. If a SQL's plan has Dataset API inside such as keyword
`$Lambda` or `.apply`, that SQL query is categorized as a DataSet SQL query, otherwise it is a Dataframe SQL query.
Expand All @@ -113,7 +115,8 @@ Note: the duration(s) reported are in milli-seconds.

There are 2 output files from running the tool. One is a summary text file printing in order the applications most
likely to be good candidates for the GPU to the ones least likely. It outputs the application ID, duration,
the SQL Dataframe duration and the SQL duration spent when we found SQL queries with potential problems.
the SQL Dataframe duration and the SQL duration spent when we found SQL queries with potential problems. It also
outputs this same report to STDOUT.
The other file is a CSV file that contains more information and can be used for further post processing.

Note, potential problems are reported in the CSV file in a separate column, which is not included in the score. This
Expand All @@ -133,21 +136,20 @@ Note that SQL queries that contain failed jobs are not included.

Sample output in csv:
```
App Name,App ID,Score,Potential Problems,SQL Dataframe Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures
job1,app-20210507174503-2538,98.13,"",952802,970984,63.14,false,0,""
job2,app-20210507180116-2539,97.88,"",903845,923419,64.88,false,0,""
job3,app-20210319151533-1704,97.59,"",737826,756039,33.95,false,0,""
App Name,App ID,Score,Potential Problems,SQL DF Duration,SQL Dataframe Task Duration,App Duration,Executor CPU Time Percent,App Duration Estimated,SQL Duration with Potential Problems,SQL Ids with Failures,Read Score Percent,Read File Format Score,Unsupported Read File Formats and Types
job3,app-20210507174503-1704,4320658.0,"",9569,4320658,26171,35.34,false,0,"",20,100.0,""
job1,app-20210507174503-2538,19864.04,"",6760,21802,83728,71.3,false,0,"",20,55.56,"Parquet[decimal]"
```

Sample output in text:
```
================================================================================================================
| App ID| App Duration| SQL Dataframe Duration|SQL Duration For Problematic|
================================================================================================================
|app-20210507174503-2538| 970984| 952802| 0|
|app-20210507180116-2539| 923419| 903845| 0|
|app-20210319151533-1704| 756039| 737826| 0|
===========================================================================
| App ID|App Duration|SQL DF Duration|Problematic Duration|
===========================================================================
|app-20210507174503-2538| 26171| 9569| 0|
|app-20210507174503-1704| 83738| 6760| 0|
```

## Download the Spark 3.x distribution
The Qualification tool requires the Spark 3.x jars to be able to run. If you do not already have
Spark 3.x installed, you can download the Spark distribution to any machine and include the jars
Expand Down Expand Up @@ -186,35 +188,40 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
com.nvidia.spark.rapids.tool.qualification.QualificationMain [options]
<eventlogs | eventlog directories ...>

-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
processing oldest 100 event logs)
-m, --match-event-logs <arg> Filter event logs whose filenames contain the
input string
-n, --num-output-rows <arg> Number of output rows in the summary report.
Default is 1000.
--num-threads <arg> Number of thread to use for parallel
processing. The default is the number of cores
on host divided by 4.
--order <arg> Specify the sort order of the report. desc or
asc, desc is the default. desc (descending)
would report applications most likely to be
accelerated at the top and asc (ascending)
would show the least likely to be accelerated
at the top.
-o, --output-directory <arg> Base output directory. Default is current
directory for the default filesystem. The
final output will go into a subdirectory
called rapids_4_spark_qualification_output. It
will overwrite any existing directory with the
same name.
-t, --timeout <arg> Maximum time in seconds to wait for the event
logs to be processed. Default is 24 hours
(86400 seconds) and must be greater than 3
seconds. If it times out, it will report what
it was able to process up until the timeout.
-h, --help Show help message
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
processing oldest 100 event logs)
-m, --match-event-logs <arg> Filter event logs whose filenames contain the
input string
-n, --num-output-rows <arg> Number of output rows in the summary report.
Default is 1000.
--num-threads <arg> Number of thread to use for parallel
processing. The default is the number of cores
on host divided by 4.
--order <arg> Specify the sort order of the report. desc or
asc, desc is the default. desc (descending)
would report applications most likely to be
accelerated at the top and asc (ascending)
would show the least likely to be accelerated
at the top.
-o, --output-directory <arg> Base output directory. Default is current
directory for the default filesystem. The
final output will go into a subdirectory
called rapids_4_spark_qualification_output. It
will overwrite any existing directory with the
same name.
-r, --read-score-percent <arg> The percent the read format and datatypes
apply to the score. Default is 20 percent.
--report-read-schema Whether to output the read formats and
datatypes to the CSV file. This can be very
long. Default is false.
-t, --timeout <arg> Maximum time in seconds to wait for the event
logs to be processed. Default is 24 hours
(86400 seconds) and must be greater than 3
seconds. If it times out, it will report what
it was able to process up until the timeout.
-h, --help Show help message

trailing arguments:
eventlog (required) Event log filenames(space separated) or directories
Expand All @@ -223,9 +230,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
```
### Output
By default this outputs a 2 files under sub-directory `./rapids_4_spark_qualification_output/` that contains
the processed applications. The output will go into your default filesystem, it supports local filesystem
or HDFS.
The summary report goes to STDOUT and by default it outputs 2 files under sub-directory
`./rapids_4_spark_qualification_output/` that contain the processed applications. The output will
go into your default filesystem, it supports local filesystem or HDFS.
The output location can be changed using the `--output-directory` option. Default is current directory.
Expand Down
4 changes: 4 additions & 0 deletions tools/src/main/resources/supportedDataSource.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Format,Direction,BOOLEAN,BYTE,SHORT,INT,LONG,FLOAT,DOUBLE,DATE,TIMESTAMP,STRING,DECIMAL,NULL,BINARY,CALENDAR,ARRAY,MAP,STRUCT,UDT
CSV,read,CO,CO,CO,CO,CO,CO,CO,CO,CO,S,CO,NA,NS,NA,NA,NA,NA,NA
ORC,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,NS,NS,NS,NS
Parquet,read,S,S,S,S,S,S,S,S,S*,S,CO,NA,NS,NA,PS*,PS*,PS*,NS
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,12 @@ object EventLogPathProcessor extends Logging {
}.toMap
}
} catch {
case e: FileNotFoundException =>
case fe: FileNotFoundException =>
logWarning(s"$pathString not found, skipping!")
Map.empty[EventLogInfo, Long]
case e: Exception =>
logWarning(s"Unexpected exception occurred reading $pathString, skipping!", e)
Map.empty[EventLogInfo, Long]
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.rapids.tool.ToolUtils
/**
* Class for writing local files, allows writing to distributed file systems.
*/
class ToolTextFileWriter(finalOutputDir: String, logFileName: String) extends Logging {
class ToolTextFileWriter(finalOutputDir: String, logFileName: String,
finalLocationText: String) extends Logging {

private val textOutputPath = new Path(s"$finalOutputDir/$logFileName")
private val fs = FileSystem.get(textOutputPath.toUri, new Configuration())
Expand All @@ -42,7 +43,7 @@ class ToolTextFileWriter(finalOutputDir: String, logFileName: String) extends Lo

def close(): Unit = {
outFile.foreach { file =>
logInfo(s"Output location: $textOutputPath")
logInfo(s"$finalLocationText output location: $textOutputPath")
file.flush()
file.close()
outFile = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,19 @@ case class DatasetSQLCase(sqlID: Long)
case class ProblematicSQLCase(sqlID: Long, reason: String)

case class UnsupportedSQLPlan(sqlID: Long, nodeID: Long, nodeName: String, nodeDesc: String)

case class DataSourceCase(
sqlID: Long,
format: String,
location: String,
pushedFilters: String,
schema: String)

case class DataSourceCompareCase(
appIndex: Int,
appId: String,
sqlID: Long,
format: String,
location: String,
pushedFilters: String,
schema: String)
Loading

0 comments on commit 9998174

Please sign in to comment.