Skip to content

Commit

Permalink
Partially address code review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
abellina committed May 19, 2022
1 parent d17a587 commit 751dce0
Showing 1 changed file with 14 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -379,14 +379,17 @@ abstract class AbstractGpuCoalesceIterator(

// there is a hard limit of 2^31 rows
while (numRows < Int.MaxValue && !hasOnDeck && iter.hasNext) {
closeOnExcept(iter.next()) { cbFromIter =>
val cb = if (inputFilterExpression.isDefined) {
// If we have reached the cuDF limit once, proactively filter batches
// after that first limit is reached.
GpuFilter.apply(cbFromIter, inputFilterExpression.get)
} else {
cbFromIter
}
val cbFromIter = iter.next()

val cb = if (inputFilterExpression.isDefined) {
// If we have reached the cuDF limit once, proactively filter batches
// after that first limit is reached.
GpuFilter(cbFromIter, inputFilterExpression.get)
} else {
cbFromIter
}

closeOnExcept(cb) { _ =>
val nextRows = cb.numRows()
numInputBatches += 1

Expand All @@ -408,10 +411,10 @@ abstract class AbstractGpuCoalesceIterator(
s" are in this partition. Please try increasing your partition count.")
case RequireSingleBatchWithFilter(filterExpression) =>
// filter what we had already stored
closeOnExcept(concatAllAndPutOnGPU()) { concatBatch =>
val filteredDown = GpuFilter.apply(concatBatch, filterExpression)
val filteredDown = GpuFilter(concatAllAndPutOnGPU(), filterExpression)
closeOnExcept(filteredDown) { _ =>
// filter the incoming batch as well
closeOnExcept(GpuFilter.apply(cb, filterExpression)) { filteredCb =>
closeOnExcept(GpuFilter(cb, filterExpression)) { filteredCb =>
val filteredWouldBeRows = filteredDown.numRows() + filteredCb.numRows()
if (filteredWouldBeRows > Int.MaxValue) {
throw new IllegalStateException(
Expand Down

0 comments on commit 751dce0

Please sign in to comment.