Skip to content

Commit

Permalink
Avoid readers acquiring GPU on next batch query if not first batch (#…
Browse files Browse the repository at this point in the history
…2870)

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Jul 7, 2021
1 parent 3410f68 commit 6342a8e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class CSVPartitionReader(
conf)
private var isFirstChunkForIterator: Boolean = true
private var isExhausted: Boolean = false
private var isFirstBatch: Boolean = true
private var maxDeviceMemory: Long = 0

metrics = execMetrics
Expand Down Expand Up @@ -540,9 +541,14 @@ class CSVPartitionReader(
} else {
readBatch()
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ abstract class MultiFileCloudPartitionReaderBase(
private var filesToRead = 0
protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None
private var isInitted = false
private var isFirstBatch = true
private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaDataBase]]()
private val tasksToRun = new Queue[Callable[HostMemoryBuffersWithMetaDataBase]]()
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
Expand Down Expand Up @@ -419,9 +420,15 @@ abstract class MultiFileCloudPartitionReaderBase(
next()
}

// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}

batch.isDefined
}

Expand Down Expand Up @@ -542,6 +549,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
private val blockIterator: BufferedIterator[SingleDataBlockInfo] =
clippedBlocks.iterator.buffered
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
private[this] var isFirstBatch = true

private case class CurrentChunkMeta(
isCorrectRebaseMode: Boolean,
Expand Down Expand Up @@ -677,9 +685,16 @@ abstract class MultiFileCoalescingPartitionReaderBase(
batch = readBatch()
}
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}

batch.isDefined
}

Expand Down
18 changes: 10 additions & 8 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,8 @@ trait OrcPartitionReaderBase extends Logging with Arm with ScanWithMetrics {
*
* @param conf Hadoop configuration
* @param partFile file split to read
* @param orcFileReaderOpts file reader options
* @param orcReader ORC reader instance
* @param readerOpts reader options
* @param dataReader ORC data reader instance
* @param ctx the context to provide some necessary information
* @param readDataSchema Spark schema of what will be read from the file
* @param requestedMapping map of read schema field index to data schema index if no column names
* @param debugDumpPrefix path prefix for dumping the memory file or null
* @param maxReadBatchSizeRows maximum number of rows to read in a batch
* @param maxReadBatchSizeBytes maximum number of bytes to read in a batch
Expand All @@ -627,6 +623,7 @@ class GpuOrcPartitionReader(
maxReadBatchSizeBytes: Long,
execMetrics : Map[String, GpuMetric]) extends FilePartitionReaderBase(conf, execMetrics)
with OrcPartitionReaderBase {
private[this] var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -636,9 +633,14 @@ class GpuOrcPartitionReader(
} else {
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import java.util.concurrent._
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable.HashSet
import scala.collection.mutable.{ArrayBuffer, ArrayBuilder}
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.math.max

import ai.rapids.cudf._
import ai.rapids.cudf.DType.DTypeEnum
import com.nvidia.spark.RebaseHelper
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange
Expand Down Expand Up @@ -1216,6 +1215,7 @@ class ParquetPartitionReader(
with ParquetPartitionReaderBase {

private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered
private var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -1228,9 +1228,14 @@ class ParquetPartitionReader(
batch = readBatch()
}
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down

0 comments on commit 6342a8e

Please sign in to comment.