From ff1b6174fe943c758bcd878f80706d164912bbc1 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 6 Jul 2021 14:06:50 -0500 Subject: [PATCH] Avoid readers acquiring GPU on next batch query if not first batch Signed-off-by: Jason Lowe --- .../spark/rapids/GpuBatchScanExec.scala | 12 ++++++--- .../spark/rapids/GpuMultiFileReader.scala | 27 ++++++++++++++----- .../com/nvidia/spark/rapids/GpuOrcScan.scala | 12 ++++++--- .../nvidia/spark/rapids/GpuParquetScan.scala | 15 +++++++---- 4 files changed, 49 insertions(+), 17 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 7c2149e7c0e..31d3c3b83ee 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -386,6 +386,7 @@ class CSVPartitionReader( conf) private var isFirstChunkForIterator: Boolean = true private var isExhausted: Boolean = false + private var isFirstBatch: Boolean = true private var maxDeviceMemory: Long = 0 metrics = execMetrics @@ -540,9 +541,14 @@ class CSVPartitionReader( } else { readBatch() } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + if (isFirstBatch) { + if (batch.isEmpty) { + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU if there were no batches. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + } + isFirstBatch = false + } batch.isDefined } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala index 88ab44df0af..5cd684e0a32 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala @@ -188,6 +188,7 @@ abstract class MultiFileCloudPartitionReaderBase( private var filesToRead = 0 protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None private var isInitted = false + private var isFirstBatch = true private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaDataBase]]() private val tasksToRun = new Queue[Callable[HostMemoryBuffersWithMetaDataBase]]() private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics @@ -299,9 +300,15 @@ abstract class MultiFileCloudPartitionReaderBase( next() } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + if (isFirstBatch) { + if (batch.isEmpty) { + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU if there were no batches. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + } + isFirstBatch = false + } + batch.isDefined } @@ -422,6 +429,7 @@ abstract class MultiFileCoalescingPartitionReaderBase( private val blockIterator: BufferedIterator[SingleDataBlockInfo] = clippedBlocks.iterator.buffered private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics + private[this] var isFirstBatch = true private case class CurrentChunkMeta( isCorrectRebaseMode: Boolean, @@ -557,9 +565,16 @@ abstract class MultiFileCoalescingPartitionReaderBase( batch = readBatch() } } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + + if (isFirstBatch) { + if (batch.isEmpty) { + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU if there were no batches. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + } + isFirstBatch = false + } + batch.isDefined } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index 30820ab4569..2c968ec9901 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -388,6 +388,7 @@ class GpuOrcPartitionReader( with ScanWithMetrics with Arm { private var batch: Option[ColumnarBatch] = None private var maxDeviceMemory: Long = 0 + private var isFirstBatch = true metrics = execMetrics @@ -413,9 +414,14 @@ class GpuOrcPartitionReader( } else { metrics(PEAK_DEVICE_MEMORY) += maxDeviceMemory } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + if (isFirstBatch) { + if (batch.isEmpty) { + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU if there were no batches. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + } + isFirstBatch = false + } batch.isDefined } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index aba00cc3a87..fcd9b9c3459 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -25,12 +25,11 @@ import java.util.concurrent._ import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.immutable.HashSet -import scala.collection.mutable.{ArrayBuffer, ArrayBuilder} +import scala.collection.mutable.ArrayBuffer import scala.language.implicitConversions import scala.math.max import ai.rapids.cudf._ -import ai.rapids.cudf.DType.DTypeEnum import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange @@ -1248,6 +1247,7 @@ class ParquetPartitionReader( with ParquetPartitionReaderBase { private val blockIterator: BufferedIterator[BlockMetaData] = clippedBlocks.iterator.buffered + private var isFirstBatch = true override def next(): Boolean = { batch.foreach(_.close()) @@ -1260,9 +1260,14 @@ class ParquetPartitionReader( batch = readBatch() } } - // This is odd, but some operators return data even when there is no input so we need to - // be sure that we grab the GPU - GpuSemaphore.acquireIfNecessary(TaskContext.get()) + if (isFirstBatch) { + if (batch.isEmpty) { + // This is odd, but some operators return data even when there is no input so we need to + // be sure that we grab the GPU if there were no batches. + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + } + isFirstBatch = false + } batch.isDefined }