Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add negation filter #2938

Merged
merged 8 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
<eventlogs | eventlog directories ...>

-a, --application-name <arg> Filter event logs whose application name
matches exactly with input string i.e no
regular expressions supported.
matches exactly or is a substring of input
string. Regular expressions not supported.
--application-name ~<arg> Filter event logs based on the complement
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
of a selection criterion. i.e Select all
event logs except the ones which have
application name as the input string.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class AppFilterImpl(
// default is 24 hours
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val negate = '~'
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
Expand Down Expand Up @@ -71,21 +73,37 @@ class AppFilterImpl(

val filterAppName = appArgs.applicationName.getOrElse("")
if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) {
val filtered = apps.filter { app =>
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.equals(filterAppName)
} else {
// in complete log file
false
}
val filtered = if (filterAppName(0).equals(negate)) {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
apps.filterNot(app => filterAppsNegate(app, filterAppName))
} else {
apps.filter(app => filterApps(app, filterAppName))
}
filtered.map(_.eventlog).toSeq
} else {
apps.map(x => x.eventlog).toSeq
}
}

def filterApps(app: AppFilterReturnParameters, filterAppName: String): Boolean = {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.contains(filterAppName)
} else {
// in complete log file
false
}
}

def filterAppsNegate(app: AppFilterReturnParameters, filterAppName: String): Boolean = {
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.contains(filterAppName.substring(1))
} else {
// in complete log file
false
}
}

case class AppFilterReturnParameters(
appInfo: Option[ApplicationStartInfo],
eventlog: EventLogInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName.
}

test("test appName filter - Negation") {
val appName = "~Spark shell"
val appArgs = new QualificationArgs(Array(
"--application-name",
appName,
s"$logDir/rdd_only_eventlog",
s"$logDir/empty_eventlog",
s"$logDir/udf_dataset_eventlog"
))

val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption,
appArgs.matchEventLogs.toOption, appArgs.eventlog(),
sparkSession.sparkContext.hadoopConfiguration)

val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration,
Some(84000), 2)
val result = appFilter.filterEventLogs(eventLogInfo, appArgs)
assert(eventLogInfo.length == 3)
assert(result.length == 1) // 1 out of 3 does not has "Spark shell" as appName.
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down