Skip to content

Commit

Permalink
Qualification tool: Add negation filter (#2938)
Browse files Browse the repository at this point in the history
* Add negation filter

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* addressed review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* addressed review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* Update QualificationArgs

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* fix description

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* Update description

Signed-off-by: Niranjan Artal <nartal@nvidia.com>

* address review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jul 16, 2021
1 parent 1b6fbfc commit e847165
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
8 changes: 6 additions & 2 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
<eventlogs | eventlog directories ...>

-a, --application-name <arg> Filter event logs whose application name
matches exactly with input string i.e no
regular expressions supported.
matches exactly or is a substring of input
string. Regular expressions not supported.
--application-name ~<arg> Filter event logs based on the complement
of a selection criterion. i.e Select all
event logs except the ones which have
application name as the input string.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
"eg: 100-oldest (for processing oldest 100 event logs)")
val applicationName: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application name matches exactly with input string" +
"i.e no regular expressions supported.")
descr = "Filter event logs whose application name matches " +
"exactly or is a substring of input string. Regular expressions not supported." +
"For filtering based on complement of application name, use ~APPLICATION_NAME. i.e " +
"Select all event logs except the ones which have application name as the input string.")
val startAppTime: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application start occurred within the past specified " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class AppFilterImpl(
// default is 24 hours
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val NEGATE = "~"

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
Expand Down Expand Up @@ -72,14 +74,11 @@ class AppFilterImpl(

val filterAppName = appArgs.applicationName.getOrElse("")
if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) {
val filtered = apps.filter { app =>
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.equals(filterAppName)
} else {
// incomplete log file
false
}
val filtered = if (filterAppName.startsWith(NEGATE)) {
// remove ~ before passing it into the containsAppName function
apps.filterNot(app => containsAppName(app, filterAppName.substring(1)))
} else {
apps.filter(app => containsAppName(app, filterAppName))
}
filtered.map(_.eventlog).toSeq
} else if (appArgs.startAppTime.isSupplied) {
Expand All @@ -98,6 +97,16 @@ class AppFilterImpl(
}
}

private def containsAppName(app: AppFilterReturnParameters, filterAppName: String): Boolean = {
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.contains(filterAppName)
} else {
// in complete log file
false
}
}

case class AppFilterReturnParameters(
appInfo: Option[ApplicationStartInfo],
eventlog: EventLogInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName.
}

test("test appName filter - Negation") {
val appName = "~Spark shell"
val appArgs = new QualificationArgs(Array(
"--application-name",
appName,
s"$logDir/rdd_only_eventlog",
s"$logDir/empty_eventlog",
s"$logDir/udf_dataset_eventlog"
))

val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption,
appArgs.matchEventLogs.toOption, appArgs.eventlog(),
sparkSession.sparkContext.hadoopConfiguration)

val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration,
Some(84000), 2)
val result = appFilter.filterEventLogs(eventLogInfo, appArgs)
assert(eventLogInfo.length == 3)
assert(result.length == 1) // 1 out of 3 does not has "Spark shell" as appName.
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down

0 comments on commit e847165

Please sign in to comment.