Skip to content

Commit

Permalink
Qualification tool: Add filter based on appName (#2934)
Browse files Browse the repository at this point in the history
* Additional filtering features for qualification tool

* refactor, read event logs multithreaded

* addressed review comments and added appName filter

* Add tests and update filter appName feature

* addressed review comments

Signed-off-by: Niranjan Artal <nartal@nvidia.com>
  • Loading branch information
nartal1 authored Jul 15, 2021
1 parent 9998174 commit bb8baab
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 9 deletions.
3 changes: 3 additions & 0 deletions tools/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
com.nvidia.spark.rapids.tool.qualification.QualificationMain [options]
<eventlogs | eventlog directories ...>

-a, --application-name <arg> Filter event logs whose application name
matches exactly with input string i.e no
regular expressions supported.
-f, --filter-criteria <arg> Filter newest or oldest N eventlogs for
processing.eg: 100-newest (for processing
newest 100 event logs). eg: 100-oldest (for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}

import org.apache.spark.deploy.history.{EventLogFileReader, EventLogFileWriter}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.rapids.tool.profiling.ApplicationInfo

sealed trait EventLogInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ Usage: java -cp rapids-4-spark-tools_2.12-<version>.jar:$SPARK_HOME/jars/*
descr = "Filter newest or oldest N eventlogs for processing." +
"eg: 100-newest (for processing newest 100 event logs). " +
"eg: 100-oldest (for processing oldest 100 event logs)")
val applicationName: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose application name matches exactly with input string" +
"i.e no regular expressions supported.")
val matchEventLogs: ScallopOption[String] =
opt[String](required = false,
descr = "Filter event logs whose filenames contain the input string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.nvidia.spark.rapids.tool.EventLogPathProcessor
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.AppFilterImpl
import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo

/**
Expand Down Expand Up @@ -72,14 +73,27 @@ object QualificationMain extends Logging {

val eventLogInfos = EventLogPathProcessor.processAllPaths(filterN.toOption,
matchEventLogs.toOption, eventlogPaths, hadoopConf)
if (eventLogInfos.isEmpty) {

val filteredLogs = if (argsContainsAppFilters(appArgs)) {
val appFilter = new AppFilterImpl(numOutputRows, hadoopConf, timeout, nThreads)
val finaleventlogs = appFilter.filterEventLogs(eventLogInfos, appArgs)
finaleventlogs
} else {
eventLogInfos
}

if (filteredLogs.isEmpty) {
logWarning("No event logs to process after checking paths, exiting!")
return (0, Seq[QualificationSummaryInfo]())
}

val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, readScorePercent, reportReadSchema, printStdout)
val res = qual.qualifyApps(eventLogInfos)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}

def argsContainsAppFilters(appArgs: QualificationArgs): Boolean = {
appArgs.applicationName.isDefined
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ abstract class AppBase(
// The data source information
val dataSourceInfo: ArrayBuffer[DataSourceCase] = ArrayBuffer[DataSourceCase]()

def processEvent(event: SparkListenerEvent): Unit
def processEvent(event: SparkListenerEvent): Boolean

private def openEventLogInternal(log: Path, fs: FileSystem): InputStream = {
EventLogFileWriter.codecName(log) match {
Expand Down Expand Up @@ -86,10 +86,15 @@ abstract class AppBase(
Utils.tryWithResource(openEventLogInternal(file.getPath, fs)) { in =>
val lines = Source.fromInputStream(in)(Codec.UTF8).getLines().toList
totalNumEvents += lines.size
lines.foreach { line =>
var i = 0
var done = false
val linesSize = lines.size
while (i < linesSize && !done) {
try {
val line = lines(i)
val event = JsonProtocol.sparkEventFromJson(parse(line))
processEvent(event)
i += 1
done = processEvent(event)
}
catch {
case e: ClassNotFoundException =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool

import java.util.concurrent.{ConcurrentLinkedQueue, Executors, ThreadPoolExecutor, TimeUnit}

import scala.collection.JavaConverters._

import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.nvidia.spark.rapids.tool.EventLogInfo
import com.nvidia.spark.rapids.tool.qualification.QualificationArgs
import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging

class AppFilterImpl(
numRows: Int,
hadoopConf: Configuration,
timeout: Option[Long],
nThreads: Int) extends Logging {

private val appsForFiltering = new ConcurrentLinkedQueue[AppFilterReturnParameters]()
// default is 24 hours
private val waitTimeInSec = timeout.getOrElse(60 * 60 * 24L)

private val threadFactory = new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("qualAppFilter" + "-%d").build()
logInfo(s"Threadpool size is $nThreads")
private val qualFilterthreadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]

private class FilterThread(path: EventLogInfo) extends Runnable {
def run: Unit = filterEventLog(path, numRows, hadoopConf)
}

def filterEventLogs(
allPaths: Seq[EventLogInfo],
appArgs: QualificationArgs): Seq[EventLogInfo] = {
allPaths.foreach { path =>
try {
qualFilterthreadPool.submit(new FilterThread(path))
} catch {
case e: Exception =>
logError(s"Unexpected exception submitting log ${path.eventLog.toString}, skipping!", e)
}
}
// wait for the threads to finish processing the files
qualFilterthreadPool.shutdown()
if (!qualFilterthreadPool.awaitTermination(waitTimeInSec, TimeUnit.SECONDS)) {
logError(s"Processing log files took longer then $waitTimeInSec seconds," +
" stopping processing any more event logs")
qualFilterthreadPool.shutdownNow()
}

// This will be required to do the actual filtering
val apps = appsForFiltering.asScala

val filterAppName = appArgs.applicationName.getOrElse("")
if (appArgs.applicationName.isSupplied && filterAppName.nonEmpty) {
val filtered = apps.filter { app =>
val appNameOpt = app.appInfo.map(_.appName)
if (appNameOpt.isDefined) {
appNameOpt.get.equals(filterAppName)
} else {
// in complete log file
false
}
}
filtered.map(_.eventlog).toSeq
} else {
apps.map(x => x.eventlog).toSeq
}
}

case class AppFilterReturnParameters(
appInfo: Option[ApplicationStartInfo],
eventlog: EventLogInfo)

private def filterEventLog(
path: EventLogInfo,
numRows: Int,
hadoopConf: Configuration): Unit = {

val startAppInfo = new FilterAppInfo(numRows, path, hadoopConf)
val appInfo = AppFilterReturnParameters(startAppInfo.appInfo, path)
appsForFiltering.add(appInfo)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool

import com.nvidia.spark.rapids.tool.EventLogInfo
import org.apache.hadoop.conf.Configuration

import org.apache.spark.scheduler.{SparkListenerApplicationStart, SparkListenerEvent}

case class ApplicationStartInfo(
appName: String,
startTime: Long)

class FilterAppInfo(
numOutputRows: Int,
eventLogInfo: EventLogInfo,
hadoopConf: Configuration) extends AppBase(numOutputRows, eventLogInfo, hadoopConf) {

def doSparkListenerApplicationStart(
event: SparkListenerApplicationStart): Unit = {
logDebug("Processing event: " + event.getClass)
val thisAppInfo = ApplicationStartInfo(
event.appName,
event.time
)
appInfo = Some(thisAppInfo)
}

var appInfo: Option[ApplicationStartInfo] = None

override def processEvent(event: SparkListenerEvent): Boolean = {
if (event.isInstanceOf[SparkListenerApplicationStart]) {
doSparkListenerApplicationStart(event.asInstanceOf[SparkListenerApplicationStart])
true
} else {
false
}
}

processEvents()
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,7 @@ class ApplicationInfo(

override def processEvent(event: SparkListenerEvent) = {
eventProcessor.processAnyEvent(this, event)
false
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ class QualAppInfo(

processEvents()

override def processEvent(event: SparkListenerEvent): Unit = {
override def processEvent(event: SparkListenerEvent): Boolean = {
eventProcessor.processAnyEvent(this, event)
false
}

// time in ms
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
import scala.collection.mutable.ListBuffer
import scala.io.Source

import com.nvidia.spark.rapids.tool.ToolTestUtils
import com.nvidia.spark.rapids.tool.{EventLogPathProcessor, ToolTestUtils}
import org.scalatest.{BeforeAndAfterEach, FunSuite}

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerStageCompleted, SparkListenerTaskEnd}
import org.apache.spark.sql.{DataFrame, SparkSession, TrampolineUtil}
import org.apache.spark.sql.rapids.tool.ToolUtils
import org.apache.spark.sql.rapids.tool.{AppFilterImpl, ToolUtils}
import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -242,6 +242,27 @@ class QualificationSuite extends FunSuite with BeforeAndAfterEach with Logging {
runQualificationTest(Array(log), "spark2_expectation.csv")
}

test("test appName filter") {
val appName = "Spark shell"
val appArgs = new QualificationArgs(Array(
"--application-name",
appName,
s"$logDir/rdd_only_eventlog",
s"$logDir/empty_eventlog",
s"$logDir/udf_dataset_eventlog"
))

val eventLogInfo = EventLogPathProcessor.processAllPaths(appArgs.filterCriteria.toOption,
appArgs.matchEventLogs.toOption, appArgs.eventlog(),
sparkSession.sparkContext.hadoopConfiguration)

val appFilter = new AppFilterImpl(1000, sparkSession.sparkContext.hadoopConfiguration,
Some(84000), 2)
val result = appFilter.filterEventLogs(eventLogInfo, appArgs)
assert(eventLogInfo.length == 3)
assert(result.length == 2) // 2 out of 3 have "Spark shell" as appName.
}

test("test udf event logs") {
val logFiles = Array(
s"$logDir/dataset_eventlog",
Expand Down

0 comments on commit bb8baab

Please sign in to comment.