Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid readers acquiring GPU on next batch query if not first batch #2870

Merged
merged 3 commits into from
Jul 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class CSVPartitionReader(
conf)
private var isFirstChunkForIterator: Boolean = true
private var isExhausted: Boolean = false
private var isFirstBatch: Boolean = true
private var maxDeviceMemory: Long = 0

metrics = execMetrics
Expand Down Expand Up @@ -540,9 +541,14 @@ class CSVPartitionReader(
} else {
readBatch()
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ abstract class MultiFileCloudPartitionReaderBase(
private var filesToRead = 0
protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None
private var isInitted = false
private var isFirstBatch = true
private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaDataBase]]()
private val tasksToRun = new Queue[Callable[HostMemoryBuffersWithMetaDataBase]]()
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
Expand Down Expand Up @@ -419,9 +420,15 @@ abstract class MultiFileCloudPartitionReaderBase(
next()
}

// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}

batch.isDefined
}

Expand Down Expand Up @@ -542,6 +549,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
private val blockIterator: BufferedIterator[SingleDataBlockInfo] =
clippedBlocks.iterator.buffered
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
private[this] var isFirstBatch = true

private case class CurrentChunkMeta(
isCorrectRebaseMode: Boolean,
Expand Down Expand Up @@ -677,9 +685,16 @@ abstract class MultiFileCoalescingPartitionReaderBase(
batch = readBatch()
}
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())

if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}

batch.isDefined
}

Expand Down
18 changes: 10 additions & 8 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,8 @@ trait OrcPartitionReaderBase extends Logging with Arm with ScanWithMetrics {
*
* @param conf Hadoop configuration
* @param partFile file split to read
* @param orcFileReaderOpts file reader options
* @param orcReader ORC reader instance
* @param readerOpts reader options
* @param dataReader ORC data reader instance
* @param ctx the context to provide some necessary information
* @param readDataSchema Spark schema of what will be read from the file
* @param requestedMapping map of read schema field index to data schema index if no column names
* @param debugDumpPrefix path prefix for dumping the memory file or null
* @param maxReadBatchSizeRows maximum number of rows to read in a batch
* @param maxReadBatchSizeBytes maximum number of bytes to read in a batch
Expand All @@ -627,6 +623,7 @@ class GpuOrcPartitionReader(
maxReadBatchSizeBytes: Long,
execMetrics : Map[String, GpuMetric]) extends FilePartitionReaderBase(conf, execMetrics)
with OrcPartitionReaderBase {
private[this] var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -636,9 +633,14 @@ class GpuOrcPartitionReader(
} else {
metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ import java.util.concurrent._
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.immutable.HashSet
import scala.collection.mutable.{ArrayBuffer, ArrayBuilder}
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.math.max

import ai.rapids.cudf._
import ai.rapids.cudf.DType.DTypeEnum
import com.nvidia.spark.RebaseHelper
import com.nvidia.spark.rapids.GpuMetric._
import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange
Expand Down Expand Up @@ -1216,6 +1215,7 @@ class ParquetPartitionReader(
with ParquetPartitionReaderBase {

private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered
private var isFirstBatch = true

override def next(): Boolean = {
batch.foreach(_.close())
Expand All @@ -1228,9 +1228,14 @@ class ParquetPartitionReader(
batch = readBatch()
}
}
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU
GpuSemaphore.acquireIfNecessary(TaskContext.get())
if (isFirstBatch) {
if (batch.isEmpty) {
// This is odd, but some operators return data even when there is no input so we need to
// be sure that we grab the GPU if there were no batches.
GpuSemaphore.acquireIfNecessary(TaskContext.get())
}
isFirstBatch = false
}
batch.isDefined
}

Expand Down