Skip to content

Commit

Permalink
Avoid empty batches on columnar to row conversion (NVIDIA#1204)
Browse files Browse the repository at this point in the history
Signed-off-by: Robert (Bobby) Evans <bobby@apache.org>
  • Loading branch information
revans2 authored Dec 2, 2020
1 parent 902f109 commit b5a78dc
Showing 1 changed file with 39 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,35 +94,48 @@ class AcceleratedColumnarToRowIterator(
new Table(rearrangedColumns : _*)
}

def loadNextBatch(): Unit = {
private[this] def setupBatch(cb: ColumnarBatch): Boolean = {
if (numInputBatches != null) {
numInputBatches += 1
}
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
if (numOutputRows != null) {
numOutputRows += cb.numRows()
}
if (cb.numRows() > 0) {
val nvtxRange = if (totalTime != null) {
new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, totalTime)
} else {
new NvtxRange("ColumnarToRow: batch", NvtxColor.RED)
}
withResource(nvtxRange) { _ =>
withResource(rearrangeRows(cb)) { table =>
withResource(table.convertToRows()) { rowsCvList =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
setCurrentBatch(pendingCvs.dequeue())
return true
}
}
}
}
false
}

private[this] def loadNextBatch(): Unit = {
closeCurrentBatch()
if (!pendingCvs.isEmpty) {
setCurrentBatch(pendingCvs.dequeue())
} else if (batches.hasNext) {
withResource(batches.next()) { cb =>
if (numInputBatches != null) {
numInputBatches += 1
}
// In order to match the numOutputRows metric in the generated code we update
// numOutputRows for each batch. This is less accurate than doing it at output
// because it will over count the number of rows output in the case of a limit,
// but it is more efficient.
if (numOutputRows != null) {
numOutputRows += cb.numRows()
}
val nvtxRange = if (totalTime != null) {
new NvtxWithMetrics("ColumnarToRow: batch", NvtxColor.RED, totalTime)
} else {
new NvtxRange("ColumnarToRow: batch", NvtxColor.RED)
}
withResource(nvtxRange) { _ =>
withResource(rearrangeRows(cb)) { table =>
withResource(table.convertToRows()) { rowsCvList =>
rowsCvList.foreach { rowsCv =>
pendingCvs += rowsCv.copyToHost()
}
setCurrentBatch(pendingCvs.dequeue())
}
} else {
while (batches.hasNext) {
withResource(batches.next()) { cb =>
if (setupBatch(cb)) {
GpuSemaphore.releaseIfNecessary(TaskContext.get())
return
}
}
}
Expand Down

0 comments on commit b5a78dc

Please sign in to comment.