Skip to content

Commit

Permalink
[FEA] Add a progress bar in Qualification tool when it is running (#5988
Browse files Browse the repository at this point in the history
)

* [FEA] Add a progress bar in Qualification tool when it is running

Signed-off-by: Ahmed Hussein (amahussein) <a@ahussein.me>

* add NA to counters, and dump stats at execution-end
  • Loading branch information
amahussein authored Jul 20, 2022
1 parent a2f0d4b commit fe0236b
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import org.apache.hadoop.conf.Configuration

import org.apache.spark.internal.Logging
import org.apache.spark.sql.rapids.tool.qualification._
import org.apache.spark.sql.rapids.tool.ui.QualificationReportGenerator
import org.apache.spark.sql.rapids.tool.ui.{ConsoleProgressBar, QualificationReportGenerator}

class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
timeout: Option[Long], nThreads: Int, order: String,
pluginTypeChecker: PluginTypeChecker,
reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean) extends Logging {
reportReadSchema: Boolean, printStdout: Boolean, uiEnabled: Boolean,
enablePB: Boolean) extends Logging {

private val allApps = new ConcurrentLinkedQueue[QualificationSummaryInfo]()

Expand All @@ -44,11 +45,16 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
private val threadPool = Executors.newFixedThreadPool(nThreads, threadFactory)
.asInstanceOf[ThreadPoolExecutor]

private var progressBar: Option[ConsoleProgressBar] = None

private class QualifyThread(path: EventLogInfo) extends Runnable {
def run: Unit = qualifyApp(path, hadoopConf)
}

def qualifyApps(allPaths: Seq[EventLogInfo]): Seq[QualificationSummaryInfo] = {
if (enablePB && allPaths.nonEmpty) { // total count to start the PB cannot be 0
progressBar = Some(new ConsoleProgressBar("Qual Tool", allPaths.length))
}
allPaths.foreach { path =>
try {
threadPool.submit(new QualifyThread(path))
Expand All @@ -64,7 +70,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
" stopping processing any more event logs")
threadPool.shutdownNow()
}

progressBar.foreach(_.finishAll())
val allAppsSum = allApps.asScala.toSeq
val qWriter = new QualOutputWriter(getReportOutputPath, reportReadSchema, printStdout)
// sort order and limit only applies to the report summary text file,
Expand Down Expand Up @@ -108,15 +114,18 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
val startTime = System.currentTimeMillis()
val app = QualificationAppInfo.createApp(path, hadoopConf, pluginTypeChecker)
if (!app.isDefined) {
progressBar.foreach(_.reportUnkownStatusProcess())
logWarning(s"No Application found that contain SQL for ${path.eventLog.toString}!")
None
} else {
val qualSumInfo = app.get.aggregateStats()
if (qualSumInfo.isDefined) {
allApps.add(qualSumInfo.get)
progressBar.foreach(_.reportSuccessfulProcess())
val endTime = System.currentTimeMillis()
logInfo(s"Took ${endTime - startTime}ms to process ${path.eventLog.toString}")
} else {
progressBar.foreach(_.reportUnkownStatusProcess())
logWarning(s"No aggregated stats for event log at: ${path.eventLog.toString}")
}
}
Expand All @@ -129,6 +138,7 @@ class Qualification(outputDir: String, numRows: Int, hadoopConf: Configuration,
logError(s"Error occured while processing file: ${path.eventLog.toString}", o)
System.exit(1)
case e: Exception =>
progressBar.foreach(_.reportFailedProcess())
logWarning(s"Unexpected exception processing log ${path.eventLog.toString}, skipping!", e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.sql.rapids.tool.qualification.QualificationSummaryInfo
object QualificationMain extends Logging {

def main(args: Array[String]) {
val (exitCode, _) = mainInternal(new QualificationArgs(args), printStdout = true)
val (exitCode, _) =
mainInternal(new QualificationArgs(args), printStdout = true, enablePB = true)
if (exitCode != 0) {
System.exit(exitCode)
}
Expand All @@ -40,7 +41,8 @@ object QualificationMain extends Logging {
* Entry point for tests
*/
def mainInternal(appArgs: QualificationArgs,
printStdout:Boolean = false): (Int, Seq[QualificationSummaryInfo]) = {
printStdout: Boolean = false,
enablePB: Boolean = false): (Int, Seq[QualificationSummaryInfo]) = {

val eventlogPaths = appArgs.eventlog()
val filterN = appArgs.filterCriteria
Expand Down Expand Up @@ -86,7 +88,7 @@ object QualificationMain extends Logging {
}

val qual = new Qualification(outputDirectory, numOutputRows, hadoopConf, timeout,
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled)
nThreads, order, pluginTypeChecker, reportReadSchema, printStdout, uiEnabled, enablePB)
val res = qual.qualifyApps(filteredLogs)
(0, res)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.tool.ui

import java.util.{Timer, TimerTask}
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable

import org.apache.spark.internal.Logging

/**
* ConsoleProgressBar shows the progress of the tool (Qualification/Profiler) in the console.
* This class is inspired from org.apache.spark.ui.ConsoleProgressBar.
*
* The implementation defines three counters: succeeded, failed, and N/A.
* The list can be extended to add new counters during runtime which gives flexibility to have
* custom statistics for different contexts.
*
* By default, the progress bar will be shown in the stdout.
* Sample generated line:
* (.)+ Progress (\\d+)% [==> ] ((\\d+) succeeded + (\\d+) failed + (\\d+) N/A) / [(\\d+)]
* toolName Progress 71% [=======> ] (83 succeeded + 2 failed + 0 N/A) / 119
*
* At the end of execution, it dumps all the defined the counters.
*
* @param prefix Gives a title to the PB
* @param totalCount The total number of items to be processed by the Tool.
* Caller needs to verify the value is not 0.
* @param inlinePBEnabled When enabled, the progress bar will be displayed and re-drawn in a single
* console line. Note that redirecting other log messages to stdout while the
* flag is enabled would cause the PB to be overridden.
*/
class ConsoleProgressBar(
prefix: String, totalCount: Long, inlinePBEnabled: Boolean = false) extends Logging {
import ConsoleProgressBar._
private val successCounter = new AtomicLong(0)
private val failureCounter = new AtomicLong(0)
private val statusNotAvailableCounter = new AtomicLong(0)
private val totalCounter = new AtomicLong(0)

private val metrics = mutable.LinkedHashMap[String, AtomicLong](
PROCESS_SUCCESS_COUNT -> successCounter,
PROCESS_FAILURE_COUNT -> failureCounter,
PROCESS_NOT_AVAILABLE_COUNT -> statusNotAvailableCounter,
EXECUTION_TOTAL_COUNTER -> totalCounter)

// Delay to show up a progress bar, in milliseconds
private val updatePeriodMSec = 500L
private val firstDelayMSec = 1000L
private val updateTimeOut =
if (inlinePBEnabled) { // shorter timeout when PB is inlined
10 * 1000L
} else {
60 * 1000L
}

private val launchTime = System.currentTimeMillis()
private var lastFinishTime = 0L
private var lastUpdateTime = 0L
private var lastProgressBar = ""
private var currentCount = 0L
private var lastUpdatedCount = 0L

// Schedule a refresh thread to run periodically
private val timer = new Timer("refresh tool progress", true)
timer.schedule(new TimerTask{
override def run(): Unit = {
refresh()
}
}, firstDelayMSec, updatePeriodMSec)

/**
* Add a new counter with a unique key.
* @param key a unique name to define the counter.
* @return if key is defined in metrics, return its associated value.
* Otherwise, update the metrics with the mapping key -> d and return d.
*/
def addCounter(key: String): AtomicLong = {
this.synchronized {
metrics.getOrElseUpdate(key, new AtomicLong(0))
}
}

// Get the counter by name. This is used to access newly defined counters.
def getCounterByName(key: String): Option[AtomicLong] = {
metrics.get(key)
}

// Increment a counter by key. This is used to access newly defined counters.
def incCounterByName(key: String): Long = {
metrics.get(key) match {
case Some(e) => e.incrementAndGet()
case None => 0
}
}

// Decrement a counter by key. This is used to access newly defined counters.
def decCounterByName(key: String): Long = {
metrics.get(key) match {
case Some(e) => e.decrementAndGet()
case None => 0
}
}

def reportSuccessfulProcess(): Unit = {
successCounter.incrementAndGet()
totalCounter.incrementAndGet()
}

def reportFailedProcess(): Unit = {
failureCounter.incrementAndGet()
totalCounter.incrementAndGet()
}

def reportUnkownStatusProcess(): Unit = {
statusNotAvailableCounter.incrementAndGet()
totalCounter.incrementAndGet()
}

def metricsToString: String = {
val sb = new mutable.StringBuilder()
metrics.foreach { case (name, counter) =>
sb.append('\t').append(name).append(" = ").append(counter.longValue()).append('\n')
}
sb.toString()
}

private def show(now: Long): Unit = {
val percent = currentCount * 100 / totalCount
val header = s"$prefix Progress $percent% ["
val tailer =
s"] (${successCounter.longValue()} succeeded + " +
s"${failureCounter.longValue()} failed + " +
s"${statusNotAvailableCounter.longValue()} N/A) / $totalCount"
val w = TerminalWidth - header.length - tailer.length
val bar = if (w > 0) {
val percent = w * currentCount / totalCount
(0 until w).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("")
} else {
""
}
val consoleLine = header + bar + tailer
if (inlinePBEnabled) {
System.out.print(CR + consoleLine + CR)
} else {
System.out.println(consoleLine)
}
lastUpdateTime = now
lastUpdatedCount = currentCount
lastProgressBar = consoleLine
}

private def refresh(): Unit = synchronized {
val now = System.currentTimeMillis()
if (now - lastFinishTime < firstDelayMSec) {
return
}
currentCount = totalCounter.longValue()
// only refresh if it's changed OR after updateTimeOut seconds
if (lastUpdatedCount == currentCount && now - lastUpdateTime < updateTimeOut) {
return
}
show(now)
}

/**
* Clear the progress bar if showed.
*/
private def clear(): Unit = {
if (inlinePBEnabled && lastProgressBar.nonEmpty) {
System.out.printf(CR + " " * TerminalWidth + CR)
}
lastProgressBar = ""
lastUpdatedCount = 0
}

/**
* Mark all processing as finished.
*/
def finishAll(): Unit = synchronized {
stop()
currentCount = totalCounter.longValue()
lastFinishTime = System.currentTimeMillis()
show(lastFinishTime)
clear()
System.out.printf(
CR + s"$prefix execution time: ${lastFinishTime - launchTime}ms\n$metricsToString")
}

/**
* Tear down the timer thread.
*/
private def stop(): Unit = timer.cancel()
}

object ConsoleProgressBar {
private val CR = '\r'
// The width of terminal
private val TerminalWidth = sys.env.getOrElse("COLUMNS", "120").toInt

val PROCESS_SUCCESS_COUNT = "process.success.count"
val PROCESS_FAILURE_COUNT = "process.failure.count"
val PROCESS_NOT_AVAILABLE_COUNT = "process.NA.count"
val EXECUTION_TOTAL_COUNTER = "execution.total.count"
}

0 comments on commit fe0236b

Please sign in to comment.