diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala index bcb54a641d9..38d09252897 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationArgs.scala @@ -17,6 +17,8 @@ package com.nvidia.spark.rapids.tool.qualification import org.rogach.scallop.{ScallopConf, ScallopOption} +import org.apache.spark.sql.rapids.tool.AppFilterImpl + class QualificationArgs(arguments: Seq[String]) extends ScallopConf(arguments) { banner(""" @@ -47,6 +49,11 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* opt[String](required = false, descr = "Filter event logs whose application name matches exactly with input string" + "i.e no regular expressions supported.") + val startAppTime: ScallopOption[String] = + opt[String](required = false, + descr = "Filter event logs whose application start occurred within the past specified " + + "time period. Valid time periods are min(minute),h(hours),d(days),w(weeks)," + + "m(months). If a period is not specified it defaults to days.") val matchEventLogs: ScallopOption[String] = opt[String](required = false, descr = "Filter event logs whose filenames contain the input string") @@ -100,6 +107,12 @@ Usage: java -cp rapids-4-spark-tools_2.12-.jar:$SPARK_HOME/jars/* case _ => Left("Error, read score percent must be between 0 and 100.") } + validate(startAppTime) { + case time if (AppFilterImpl.parseAppTimePeriod(time) > 0L) => Right(Unit) + case _ => Left("Time period specified, must be greater than 0 and valid periods " + + "are min(minute),h(hours),d(days),w(weeks),m(months).") + } + verify() } diff --git a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala index 475ca4a3582..61d44c6747a 100644 --- a/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala +++ b/tools/src/main/scala/com/nvidia/spark/rapids/tool/qualification/QualificationMain.scala @@ -94,6 +94,6 @@ object QualificationMain extends Logging { } def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = { - appArgs.applicationName.isDefined + appArgs.applicationName.isSupplied || appArgs.startAppTime.isSupplied } } diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala index 6944d0b5b36..0c0c093620c 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppBase.scala @@ -85,21 +85,25 @@ abstract class AppBase( logFiles.foreach { file => Utils.tryWithResource(openEventLogInternal(file.getPath, fs)) { in => val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList - totalNumEvents += lines.size var i = 0 var done = false val linesSize = lines.size while (i < linesSize && !done) { try { val line = lines(i) + totalNumEvents += 1 val event = JsonProtocol.sparkEventFromJson(parse(line)) - i += 1 done = processEvent(event) } catch { case e: ClassNotFoundException => - logWarning(s"ClassNotFoundException: ${e.getMessage}") + // swallow any messages about this class since likely using spark version + // before 3.1 + if (!e.getMessage.contains("SparkListenerResourceProfileAdded")) { + logWarning(s"ClassNotFoundException: ${e.getMessage}") + } } + i += 1 } } } @@ -224,4 +228,4 @@ abstract class AppBase( ) } } -} \ No newline at end of file +} diff --git a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala index 51a1f633be8..f93106b5534 100644 --- a/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala +++ b/tools/src/main/scala/org/apache/spark/sql/rapids/tool/AppFilterImpl.scala @@ -16,6 +16,7 @@ package org.apache.spark.sql.rapids.tool +import java.util.Calendar import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecutor, TimeUnit} import scala.collection.JavaConverters._ @@ -76,7 +77,18 @@ class AppFilterImpl( if (appNameOpt.isDefined) { appNameOpt.get.equals(filterAppName) } else { - // in complete log file + // incomplete log file + false + } + } + filtered.map(_.eventlog).toSeq + } else if (appArgs.startAppTime.isSupplied) { + val msTimeToFilter = AppFilterImpl.parseAppTimePeriodArgs(appArgs) + val filtered = apps.filter { app => + val appStartOpt = app.appInfo.map(_.startTime) + if (appStartOpt.isDefined) { + appStartOpt.get >= msTimeToFilter + } else { false } } @@ -100,3 +112,63 @@ class AppFilterImpl( appsForFiltering.add(appInfo) } } + +object AppFilterImpl { + + def parseAppTimePeriodArgs(appArgs: QualificationArgs): Long = { + if (appArgs.startAppTime.isSupplied) { + val appStartStr = appArgs.startAppTime.getOrElse("") + parseAppTimePeriod(appStartStr) + } else { + 0L + } + } + + // parse the user provided time period string into ms. + def parseAppTimePeriod(appStartStr: String): Long = { + val timePeriod = raw"(\d+)([h,d,w,m]|min)?".r + val (timeStr, periodStr) = appStartStr match { + case timePeriod(time, null) => + (time, "d") + case timePeriod(time, period) => + (time, period) + case _ => + throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " + + "time must be greater than 0 and valid periods are min(minute),h(hours)" + + ",d(days),w(weeks),m(months).") + } + val timeInt = try { + timeStr.toInt + } catch { + case ne: NumberFormatException => + throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " + + "time must be greater than 0 and valid periods are min(minute),h(hours)" + + ",d(days),w(weeks),m(months).") + } + + if (timeInt <= 0) { + throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " + + "time must be greater than 0 and valid periods are min(minute),h(hours)" + + ",d(days),w(weeks),m(months).") + } + val c = Calendar.getInstance + periodStr match { + case "min" => + c.add(Calendar.MINUTE, -timeInt) + case "h" => + c.add(Calendar.HOUR, -timeInt) + case "d" => + c.add(Calendar.DATE, -timeInt) + case "w" => + c.add(Calendar.WEEK_OF_YEAR, -timeInt) + case "m" => + c.add(Calendar.MONTH, -timeInt) + case _ => + throw new IllegalArgumentException(s"Invalid time period $appStartStr specified, " + + "time must be greater than 0 and valid periods are min(minute),h(hours)" + + ",d(days),w(weeks),m(months).") + } + c.getTimeInMillis + } + +} diff --git a/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/AppFilterSuite.scala b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/AppFilterSuite.scala new file mode 100644 index 00000000000..0cbe1c4651d --- /dev/null +++ b/tools/src/test/scala/com/nvidia/spark/rapids/tool/qualification/AppFilterSuite.scala @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.tool.qualification + +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import java.util.Calendar + +import org.scalatest.FunSuite + +import org.apache.spark.sql.TrampolineUtil +import org.apache.spark.sql.rapids.tool.AppFilterImpl + +class AppFilterSuite extends FunSuite { + + test("illegal args") { + assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("0")) + assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("1hd")) + assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("1yr")) + assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("-1d")) + assertThrows[IllegalArgumentException](AppFilterImpl.parseAppTimePeriod("0m")) + } + + test("time period minute parsing") { + val c = Calendar.getInstance + c.add(Calendar.MINUTE, -6) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "10min") + } + + test("time period hour parsing") { + val c = Calendar.getInstance + c.add(Calendar.HOUR, -10) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "14h") + } + + test("time period day parsing") { + val c = Calendar.getInstance + c.add(Calendar.DATE, -40) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "41d") + } + + test("time period day parsing default") { + val c = Calendar.getInstance + c.add(Calendar.DATE, -5) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "6") + } + + test("time period week parsing") { + val c = Calendar.getInstance + c.add(Calendar.WEEK_OF_YEAR, -2) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "3w") + } + + test("time period month parsing") { + val c = Calendar.getInstance + c.add(Calendar.MONTH, -8) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "10m") + } + + test("time period minute parsing fail") { + val c = Calendar.getInstance + c.add(Calendar.MINUTE, -16) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "10min", failFilter=true) + } + + test("time period hour parsing fail") { + val c = Calendar.getInstance + c.add(Calendar.HOUR, -10) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "8h", failFilter=true) + } + + test("time period day parsing fail") { + val c = Calendar.getInstance + c.add(Calendar.DATE, -40) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "38d", failFilter=true) + } + + test("time period week parsing fail") { + val c = Calendar.getInstance + c.add(Calendar.WEEK_OF_YEAR, -2) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "1w", failFilter=true) + } + + test("time period month parsing fail") { + val c = Calendar.getInstance + c.add(Calendar.MONTH, -8) + val newTimeStamp = c.getTimeInMillis + testTimePeriod(newTimeStamp, "7m", failFilter=true) + } + + private def testTimePeriod(eventLogTime: Long, startTimePeriod: String, + failFilter: Boolean = false): Unit = { + TrampolineUtil.withTempDir { outpath => + TrampolineUtil.withTempDir { tmpEventLogDir => + + val elogFile = Paths.get(tmpEventLogDir.getAbsolutePath, "testTimeEventLog") + + // scalastyle:off line.size.limit + val supText = + s"""{"Event":"SparkListenerLogStart","Spark Version":"3.1.1"} + |{"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"local-1626104300434","Timestamp":${eventLogTime},"User":"user1"}""".stripMargin + // scalastyle:on line.size.limit + Files.write(elogFile, supText.getBytes(StandardCharsets.UTF_8)) + + val allArgs = Array( + "--output-directory", + outpath.getAbsolutePath(), + "--start-app-time", + startTimePeriod + ) + val appArgs = new QualificationArgs(allArgs ++ Array(elogFile.toString())) + val (exit, appSum) = QualificationMain.mainInternal(appArgs) + assert(exit == 0) + if (failFilter) { + assert(appSum.size == 0) + } else { + assert(appSum.size == 1) + } + } + } + } +}