Skip to content

Commit

Permalink
qualification tool: add filtering by app start time (#2940)
Browse files Browse the repository at this point in the history
* Add logic to filter by time period

Signed-off-by: Thomas Graves <tgraves@apache.org>

* change name of param

* fix matching default for days
  • Loading branch information
tgravescs authored Jul 15, 2021
1 parent c8cc36a commit 1b6fbfc
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package com.nvidia.spark.rapids.tool.qualification

import org.rogach.scallop.{ScallopConf, ScallopOption}

import org.apache.spark.sql.rapids.tool.AppFilterImpl

class QualificationArgs(arguments: Seq[String]) extends ScallopConf(arguments) {

banner("""
Expand Down Expand Up @@ -47,6 +49,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
opt[String](required = false,
descr = "Filter event logs whose application name matches exactly with input string" +
"i.e no regular expressions supported.")
val startAppTime: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application start occurred within the past specified " +
"time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," +
"m(months). If a period is not specified it defaults to days.")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string")
Expand Down Expand Up @@ -100,6 +107,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
case _ => Left("Error, read score percent must be between 0 and 100.")
}

validate(startAppTime) {
case time if (AppFilterImpl.parseAppTimePeriod(time) > 0L) => Right(Unit)
case _ => Left("Time period specified, must be greater than 0 and valid periods " +
"are min(minute),h(hours),d(days),w(weeks),m(months).")
}

verify()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,6 @@ object QualificationMain extends Logging {
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
appArgs.applicationName.isDefined
appArgs.applicationName.isSupplied || appArgs.startAppTime.isSupplied
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,21 +85,25 @@ abstract class AppBase(
logFiles.foreach { file =>
Utils.tryWithResource(openEventLogInternal(file.getPath, fs)) { in =>
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList
totalNumEvents += lines.size
var i = 0
var done = false
val linesSize = lines.size
while (i < linesSize && !done) {
try {
val line = lines(i)
totalNumEvents += 1
val event = JsonProtocol.sparkEventFromJson(parse(line))
i += 1
done = processEvent(event)
}
catch {
case e: ClassNotFoundException =>
logWarning(s"ClassNotFoundException: ${e.getMessage}")
// swallow any messages about this class since likely using spark version
// before 3.1
if (!e.getMessage.contains("SparkListenerResourceProfileAdded")) {
logWarning(s"ClassNotFoundException: ${e.getMessage}")
}
}
i += 1
}
}
}
Expand Down Expand Up @@ -224,4 +228,4 @@ abstract class AppBase(
)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.rapids.tool

import java.util.Calendar
import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecutor, TimeUnit}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -76,7 +77,18 @@ class AppFilterImpl(
if (appNameOpt.isDefined) {
appNameOpt.get.equals(filterAppName)
} else {
// in complete log file
// incomplete log file
false
}
}
filtered.map(_.eventlog).toSeq
} else if (appArgs.startAppTime.isSupplied) {
val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs)
val filtered = apps.filter { app =>
val appStartOpt = app.appInfo.map(_.startTime)
if (appStartOpt.isDefined) {
appStartOpt.get >= msTimeToFilter
} else {
false
}
}
Expand All @@ -100,3 +112,63 @@ class AppFilterImpl(
appsForFiltering.add(appInfo)
}
}

object AppFilterImpl {

def parseAppTimePeriodArgs(appArgs: QualificationArgs): Long = {
if (appArgs.startAppTime.isSupplied) {
val appStartStr = appArgs.startAppTime.getOrElse("")
parseAppTimePeriod(appStartStr)
} else {
0L
}
}

// parse the user provided time period string into ms.
def parseAppTimePeriod(appStartStr: String): Long = {
val timePeriod = raw"(\d+)([h,d,w,m]|min)?".r
val (timeStr, periodStr) = appStartStr match {
case timePeriod(time, null) =>
(time, "d")
case timePeriod(time, period) =>
(time, period)
case _ =>
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
}
val timeInt = try {
timeStr.toInt
} catch {
case ne: NumberFormatException =>
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
}

if (timeInt <= 0) {
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
}
val c = Calendar.getInstance
periodStr match {
case "min" =>
c.add(Calendar.MINUTE, -timeInt)
case "h" =>
c.add(Calendar.HOUR, -timeInt)
case "d" =>
c.add(Calendar.DATE, -timeInt)
case "w" =>
c.add(Calendar.WEEK_OF_YEAR, -timeInt)
case "m" =>
c.add(Calendar.MONTH, -timeInt)
case _ =>
throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " +
"time must be greater than 0 and valid periods are min(minute),h(hours)" +
",d(days),w(weeks),m(months).")
}
c.getTimeInMillis
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.tool.qualification

import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Paths}
import java.util.Calendar

import org.scalatest.FunSuite

import org.apache.spark.sql.TrampolineUtil
import org.apache.spark.sql.rapids.tool.AppFilterImpl

class AppFilterSuite extends FunSuite {

test("illegal args") {
assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("0"))
assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("1hd"))
assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("1yr"))
assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("-1d"))
assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("0m"))
}

test("time period minute parsing") {
val c = Calendar.getInstance
c.add(Calendar.MINUTE, -6)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "10min")
}

test("time period hour parsing") {
val c = Calendar.getInstance
c.add(Calendar.HOUR, -10)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "14h")
}

test("time period day parsing") {
val c = Calendar.getInstance
c.add(Calendar.DATE, -40)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "41d")
}

test("time period day parsing default") {
val c = Calendar.getInstance
c.add(Calendar.DATE, -5)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "6")
}

test("time period week parsing") {
val c = Calendar.getInstance
c.add(Calendar.WEEK_OF_YEAR, -2)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "3w")
}

test("time period month parsing") {
val c = Calendar.getInstance
c.add(Calendar.MONTH, -8)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "10m")
}

test("time period minute parsing fail") {
val c = Calendar.getInstance
c.add(Calendar.MINUTE, -16)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "10min", failFilter=true)
}

test("time period hour parsing fail") {
val c = Calendar.getInstance
c.add(Calendar.HOUR, -10)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "8h", failFilter=true)
}

test("time period day parsing fail") {
val c = Calendar.getInstance
c.add(Calendar.DATE, -40)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "38d", failFilter=true)
}

test("time period week parsing fail") {
val c = Calendar.getInstance
c.add(Calendar.WEEK_OF_YEAR, -2)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "1w", failFilter=true)
}

test("time period month parsing fail") {
val c = Calendar.getInstance
c.add(Calendar.MONTH, -8)
val newTimeStamp = c.getTimeInMillis
testTimePeriod(newTimeStamp, "7m", failFilter=true)
}

private def testTimePeriod(eventLogTime: Long, startTimePeriod: String,
failFilter: Boolean = false): Unit = {
TrampolineUtil.withTempDir { outpath =>
TrampolineUtil.withTempDir { tmpEventLogDir =>

val elogFile = Paths.get(tmpEventLogDir.getAbsolutePath, "testTimeEventLog")

// scalastyle:off line.size.limit
val supText =
s"""{"Event":"SparkListenerLogStart","Spark Version":"3.1.1"}
|{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1626104300434","Timestamp":${eventLogTime},"User":"user1"}""".stripMargin
// scalastyle:on line.size.limit
Files.write(elogFile, supText.getBytes(StandardCharsets.UTF_8))

val allArgs = Array(
"--output-directory",
outpath.getAbsolutePath(),
"--start-app-time",
startTimePeriod
)
val appArgs = new QualificationArgs(allArgs ++ Array(elogFile.toString()))
val (exit, appSum) = QualificationMain.mainInternal(appArgs)
assert(exit == 0)
if (failFilter) {
assert(appSum.size == 0)
} else {
assert(appSum.size == 1)
}
}
}
}
}

0 comments on commit 1b6fbfc

Please sign in to comment.