Skip to content

Commit

Permalink
Fix collect time metric in CoalesceBatches (#729)
Browse files Browse the repository at this point in the history
* Fix collect time in CoalesceBatches

* Also move totalTime

* switch to use if !isdefined

Signed-off-by: Thomas Graves <tgraves@apache.org>

* remove extra newline

Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
tgravescs authored Sep 11, 2020
1 parent ade7a5f commit be6bb90
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
private val iter = new RemoveEmptyBatchIterator(origIter, numInputBatches)
private var onDeck: Option[ColumnarBatch] = None
private var batchInitialized: Boolean = false
private var collectMetric: Option[MetricRange] = None
private var totalMetric: Option[MetricRange] = None

/** We need to track the sizes of string columns to make sure we don't exceed 2GB */
private val stringFieldIndices: Array[Int] = schema.fields.zipWithIndex
Expand All @@ -174,7 +176,22 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
Option(TaskContext.get())
.foreach(_.addTaskCompletionListener[Unit](_ => onDeck.foreach(_.close())))

override def hasNext: Boolean = onDeck.isDefined || iter.hasNext
override def hasNext: Boolean = {
if (!collectMetric.isDefined) {
// use one being not set as indicator that neither are intialized to avoid
// 2 checks or extra initialized variable
collectMetric = Some(new MetricRange(collectTime))
totalMetric = Some(new MetricRange(totalTime))
}
val res = onDeck.isDefined || iter.hasNext
if (!res) {
collectMetric.foreach(_.close())
collectMetric = None
totalMetric.foreach(_.close())
totalMetric = None
}
res
}

/**
* Called first to initialize any state needed for a new batch to be created.
Expand Down Expand Up @@ -236,9 +253,6 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
* @return The coalesced batch
*/
override def next(): ColumnarBatch = {

val total = new MetricRange(totalTime)

// reset batch state
batchInitialized = false
batchRowLimit = 0
Expand All @@ -261,7 +275,6 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
numBytes += columnSizes.sum
}

val collect = new MetricRange(collectTime)
try {

// there is a hard limit of 2^31 rows
Expand Down Expand Up @@ -339,7 +352,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
s"and $numBytes bytes")

} finally {
collect.close()
collectMetric.foreach(_.close())
collectMetric = None
}

val concatRange = new NvtxWithMetrics(s"$opName concat", NvtxColor.CYAN, concatTime)
Expand All @@ -351,7 +365,8 @@ abstract class AbstractGpuCoalesceIterator(origIter: Iterator[ColumnarBatch],
ret
} finally {
cleanupConcatIsDone()
total.close()
totalMetric.foreach(_.close())
totalMetric = None
}
}

Expand Down

0 comments on commit be6bb90

Please sign in to comment.