Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Qualification tool: Add negation filter #2938

Merged
merged 8 commits into from
Jul 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
<eventlogs | eventlog directories ...>

-a, --application-name <arg> Filter event logs whose application name
matches exactly with input string i.e no
regular expressions supported.
matches exactly or is a substring of input
string. Regular expressions not supported.
--application-name ~<arg> Filter event logs based on the complement
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
of a selection criterion. i.e Select all
event logs except the ones which have
application name as the input string.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
"eg: 100-oldest (for processing oldest 100 event logs)")
val applicationName: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application name matches exactly with input string" +
"i.e no regular expressions supported.")
descr = "Filter event logs whose application name matches " +
"exactly or is a substring of input string. Regular expressions not supported." +
"For filtering based on complement of application name, use ~APPLICATION_NAME. i.e " +
"Select all event logs except the ones which have application name as the input string.")
val startAppTime: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application start occurred within the past specified " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class AppFilterImpl(
// default is 24 hours
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val NEGATE = "~"

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
Expand Down Expand Up @@ -72,14 +74,11 @@ class AppFilterImpl(

val filterAppName = appArgs.applicationName.getOrElse("")
if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) {
val filtered = apps.filter { app =>
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.equals(filterAppName)
} else {
// incomplete log file
false
}
val filtered = if (filterAppName.startsWith(NEGATE)) {
// remove ~ before passing it into the containsAppName function
apps.filterNot(app => containsAppName(app, filterAppName.substring(1)))
} else {
apps.filter(app => containsAppName(app, filterAppName))
}
filtered.map(_.eventlog).toSeq
} else if (appArgs.startAppTime.isSupplied) {
Expand All @@ -98,6 +97,16 @@ class AppFilterImpl(
}
}

private def containsAppName(app: AppFilterReturnParameters, filterAppName: String): Boolean = {
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.contains(filterAppName)
} else {
// in complete log file
false
}
}

case class AppFilterReturnParameters(
appInfo: Option[ApplicationStartInfo],
eventlog: EventLogInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName.
}

test("test appName filter - Negation") {
val appName = "~Spark shell"
val appArgs = new QualificationArgs(Array(
"--application-name",
appName,
s"$logDir/rdd_only_eventlog",
s"$logDir/empty_eventlog",
s"$logDir/udf_dataset_eventlog"
))

val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption,
appArgs.matchEventLogs.toOption, appArgs.eventlog(),
sparkSession.sparkContext.hadoopConfiguration)

val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration,
Some(84000), 2)
val result = appFilter.filterEventLogs(eventLogInfo, appArgs)
assert(eventLogInfo.length == 3)
assert(result.length == 1) // 1 out of 3 does not has "Spark shell" as appName.
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down