From 5f21b71ce75a8b18907a3d37e285d3453f139660 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 17 Sep 2020 10:12:35 -0500 Subject: [PATCH] Improved GpuArrowEvalPythonExec (#783) * Improved GpuArrowEvalPythonExec Signed-off-by: Robert (Bobby) Evans --- docs/configs.md | 2 +- .../src/main/python/udf_cudf_test.py | 1 - integration_tests/src/main/python/udf_test.py | 3 +- .../nvidia/spark/rapids/GpuColumnVector.java | 6 + .../spark/rapids/GpuCoalesceBatches.scala | 5 +- .../nvidia/spark/rapids/GpuOverrides.scala | 8 +- .../nvidia/spark/rapids/SpillPriorities.scala | 11 +- .../spark/rapids/SpillableColumnarBatch.scala | 15 +- .../python/GpuArrowEvalPythonExec.scala | 299 +++++++++++++----- 9 files changed, 248 insertions(+), 102 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index 2e09a5aabf0..534107481b9 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -260,7 +260,7 @@ Name | Description | Default Value | Notes spark.rapids.sql.exec.ShuffledHashJoinExec|Implementation of join using hashed shuffled data|true|None| spark.rapids.sql.exec.SortMergeJoinExec|Sort merge join, replacing with shuffled hash join|true|None| spark.rapids.sql.exec.AggregateInPandasExec|The backend for Grouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now| -spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF, also accelerates the data transfer between the Java process and Python process|false|This is disabled by default because Performance is not ideal for UDFs that take a long time| +spark.rapids.sql.exec.ArrowEvalPythonExec|The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the Java process and Python process. It also supports running the Python UDFs code on the GPU when enabled|true|None| spark.rapids.sql.exec.FlatMapCoGroupsInPandasExec|The backend for CoGrouped Aggregation Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now| spark.rapids.sql.exec.FlatMapGroupsInPandasExec|The backend for Grouped Map Pandas UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now| spark.rapids.sql.exec.MapInPandasExec|The backend for Map Pandas Iterator UDF, it runs on CPU itself now but supports running the Python UDFs code on GPU when calling cuDF APIs in the UDF|false|This is disabled by default because Performance is not ideal now| diff --git a/integration_tests/src/main/python/udf_cudf_test.py b/integration_tests/src/main/python/udf_cudf_test.py index 6385422c437..7d710adb062 100644 --- a/integration_tests/src/main/python/udf_cudf_test.py +++ b/integration_tests/src/main/python/udf_cudf_test.py @@ -23,7 +23,6 @@ _conf = { - 'spark.rapids.sql.exec.ArrowEvalPythonExec':'true', 'spark.rapids.sql.exec.MapInPandasExec':'true', 'spark.rapids.sql.exec.FlatMapGroupsInPandasExec': 'true', 'spark.rapids.sql.exec.AggregateInPandasExec': 'true', diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index cc76a533a4d..22ab47b9847 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -29,8 +29,7 @@ except Exception as e: pytestmark = pytest.mark.skip(reason=str(e)) -arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true', - 'spark.rapids.sql.exec.ArrowEvalPythonExec': 'true'} +arrow_udf_conf = {'spark.sql.execution.arrow.pyspark.enabled': 'true'} #################################################################### # NOTE: pytest does not play well with pyspark udfs, because pyspark diff --git a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java index fef3de23c3c..04030aed7a7 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/rapids/GpuColumnVector.java @@ -344,6 +344,12 @@ public static final GpuColumnVector[] extractColumns(ColumnarBatch batch) { return vectors; } + public static final GpuColumnVector[] extractColumns(Table table) { + try (ColumnarBatch batch = from(table)) { + return extractColumns(batch); + } + } + private final ai.rapids.cudf.ColumnVector cudfCv; /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 46203750b86..cdb8f72efe0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -20,7 +20,6 @@ import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{BufferType, NvtxColor, Table} import com.nvidia.spark.rapids.RapidsPluginImplicits._ -import com.nvidia.spark.rapids.SpillPriorities.COALESCE_BATCH_ON_DECK_PRIORITY import org.apache.spark.TaskContext import org.apache.spark.internal.Logging @@ -406,7 +405,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], } override def addBatchToConcat(batch: ColumnarBatch): Unit = - batches.append(SpillableColumnarBatch(batch, SpillPriorities.COALESCE_BATCH_PRIORITY)) + batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) override def getColumnSizes(cb: ColumnarBatch): Array[Long] = { if (!GpuCompressedColumnVector.isBatchCompressed(cb)) { @@ -474,7 +473,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], override protected def saveOnDeck(batch: ColumnarBatch): Unit = { assert(onDeck.isEmpty) - onDeck = Some(SpillableColumnarBatch(batch, COALESCE_BATCH_ON_DECK_PRIORITY)) + onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) } override protected def clearOnDeck(): Unit = { diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 7f6cd72968d..274ccf53753 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -1718,9 +1718,9 @@ object GpuOverrides { GpuLocalLimitExec(localLimitExec.limit, childPlans(0).convertIfNeeded()) }), exec[ArrowEvalPythonExec]( - "The backend of the Scalar Pandas UDFs, it supports running the Python UDFs code on GPU" + - " when calling cuDF APIs in the UDF, also accelerates the data transfer between the" + - " Java process and Python process", + "The backend of the Scalar Pandas UDFs. Accelerates the data transfer between the" + + " Java process and Python process. It also supports running the Python UDFs code on" + + " the GPU when enabled", (e, conf, p, r) => new SparkPlanMeta[ArrowEvalPythonExec](e, conf, p, r) { val udfs: Seq[BaseExprMeta[PythonUDF]] = @@ -1738,7 +1738,7 @@ object GpuOverrides { resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], childPlans.head.convertIfNeeded(), e.evalType) - }).disabledByDefault("Performance is not ideal for UDFs that take a long time"), + }), exec[GlobalLimitExec]( "Limiting of results across partitions", (globalLimitExec, conf, p, r) => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala index c1a542d27d7..c0b6142d289 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala @@ -49,14 +49,13 @@ object SpillPriorities { val INPUT_FROM_SHUFFLE_PRIORITY: Long = Long.MaxValue - 1000 /** - * Priority for buffers in coalesce batch that did not fit into the batch we are working on. - * Most of the time this is shuffle input data that we read early so it should be slightly higher - * priority to keep around than other input shuffle buffers. + * Priority for buffers that are waiting for next to be called. i.e. data held between + * calls to `hasNext` and `next` or between different calls to `next`. */ - val COALESCE_BATCH_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1 + val ACTIVE_ON_DECK_PRIORITY: Long = INPUT_FROM_SHUFFLE_PRIORITY + 1 /** - * Priority for buffers in coalesce batch that are being held before the coalesce. + * Priority for multiple buffers being buffered within a call to next. */ - val COALESCE_BATCH_PRIORITY: Long = COALESCE_BATCH_ON_DECK_PRIORITY + 100 + val ACTIVE_BATCHING_PRIORITY: Long = ACTIVE_ON_DECK_PRIORITY + 100 } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index e16aab1a926..835f3475255 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -138,14 +138,21 @@ object SpillableColumnarBatch extends Arm { initialSpillPriority: Long): Unit = { val numColumns = batch.numCols() if (GpuCompressedColumnVector.isBatchCompressed(batch)) { - val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector] - RapidsBufferCatalog.addBuffer(id, cv.getBuffer, cv.getTableMeta, initialSpillPriority) + withResource(batch) { batch => + val cv = batch.column(0).asInstanceOf[GpuCompressedColumnVector] + val buff = cv.getBuffer + buff.incRefCount() + RapidsBufferCatalog.addBuffer(id, buff, cv.getTableMeta, initialSpillPriority) + } } else if (numColumns > 0 && (0 until numColumns) .forall(i => batch.column(i).isInstanceOf[GpuColumnVectorFromBuffer])) { val cv = batch.column(0).asInstanceOf[GpuColumnVectorFromBuffer] - withResource(GpuColumnVector.from(batch)) { table => - RapidsBufferCatalog.addTable(id, table, cv.getBuffer, initialSpillPriority) + withResource(batch) { batch => + val table = GpuColumnVector.from(batch) + val buff = cv.getBuffer + buff.incRefCount() + RapidsBufferCatalog.addTable(id, table, buff, initialSpillPriority) } } else { withResource(batch) { batch => diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index 73acdef228e..74dd87160b9 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -23,15 +23,17 @@ import java.io.{DataInputStream, DataOutputStream} import java.net.Socket import java.util.concurrent.atomic.AtomicBoolean -import ai.rapids.cudf._ -import com.nvidia.spark.rapids._ -import com.nvidia.spark.rapids.GpuMetricNames._ -import com.nvidia.spark.rapids.python.PythonWorkerSemaphore import scala.collection.mutable import scala.collection.mutable.ArrayBuffer +import ai.rapids.cudf.{ArrowIPCOptions, ArrowIPCWriterOptions, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table} +import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuProjectExec, GpuSemaphore, GpuUnevaluable, SpillableColumnarBatch, SpillPriorities} +import com.nvidia.spark.rapids.GpuMetricNames._ +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.python.PythonWorkerSemaphore + import org.apache.spark.{SparkEnv, TaskContext} -import org.apache.spark.api.python._ +import org.apache.spark.api.python.{BasePythonRunner, ChainedPythonFunctions, PythonEvalType, PythonFunction, PythonRDD, SpecialLengths} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -45,96 +47,182 @@ import org.apache.spark.sql.util.ArrowUtils import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils - -class RebatchingIterator( +/** + * This iterator will round incoming batches to multiples of targetRoundoff rows, if possible. + * The last batch might not be a multiple of it. + * @param wrapped the incoming ColumnarBatch iterator. + * @param targetRoundoff the target multiple number of rows + * @param inputRows metric for rows read. + * @param inputBatches metric for batches read + */ +class RebatchingRoundoffIterator( wrapped: Iterator[ColumnarBatch], - targetRowSize: Int, + targetRoundoff: Int, inputRows: SQLMetric, inputBatches: SQLMetric) extends Iterator[ColumnarBatch] with Arm { - var pending: Table = _ + var pending: Option[SpillableColumnarBatch] = None - override def hasNext: Boolean = pending != null || wrapped.hasNext + TaskContext.get().addTaskCompletionListener[Unit]{ _ => + pending.foreach(_.close()) + pending = None + } + + override def hasNext: Boolean = pending.isDefined || wrapped.hasNext + + private[this] def popPending(): ColumnarBatch = { + withResource(pending.get) { scb => + pending = None + scb.getColumnarBatch() + } + } - private[this] def updatePending(): Unit = { - if (pending == null) { - withResource(wrapped.next()) { cb => - inputBatches += 1 - inputRows += cb.numRows() - pending = GpuColumnVector.from(cb) + private[this] def concat(l: ColumnarBatch, r: ColumnarBatch): ColumnarBatch = { + withResource(GpuColumnVector.from(l)) { lTable => + withResource(GpuColumnVector.from(r)) { rTable => + withResource(Table.concatenate(lTable, rTable)) { concatTable => + GpuColumnVector.from(concatTable) + } } } } + private[this] def fillAndConcat(batches: ArrayBuffer[SpillableColumnarBatch]): ColumnarBatch = { + var rowsSoFar = batches.map(_.numRows()).sum + while (wrapped.hasNext && rowsSoFar < targetRoundoff) { + val got = wrapped.next() + rowsSoFar += got.numRows() + batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + } + val toConcat = batches.safeMap(_.getColumnarBatch()).toArray + ConcatAndConsumeAll.buildNonEmptyBatch(toConcat) + } + override def next(): ColumnarBatch = { - updatePending() - - while (pending.getRowCount < targetRowSize) { - if (wrapped.hasNext) { - val combined = withResource(wrapped.next()) { cb => - inputBatches += 1 - inputRows += cb.numRows() - withResource(GpuColumnVector.from(cb)) { nextTable => - Table.concatenate(pending, nextTable) + GpuSemaphore.acquireIfNecessary(TaskContext.get()) + + val combined : ColumnarBatch = if (pending.isDefined) { + if (!wrapped.hasNext) { + // No more data return what is in pending + popPending() + } else { + // Don't read pending yet, because we are going to call next to get enough data. + // The target number of rows is typically small enough that we will be able to do this + // in a single call. + val rowsNeeded = targetRoundoff - pending.get.numRows() + val cb = wrapped.next() + if (cb.numRows() >= rowsNeeded) { + withResource(cb) { cb => + withResource(popPending()) { fromPending => + concat(fromPending, cb) + } + } + } else { + // If that does not work then we will need to fall back to slower special case code + val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty + try { + val localPending = pending.get + localPending.setSpillPriority(SpillPriorities.ACTIVE_BATCHING_PRIORITY) + batches.append(localPending) + pending = None + batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + fillAndConcat(batches) + } finally { + batches.safeClose() } } - pending.close() - pending = combined + } + } else { + val cb = wrapped.next() + if (cb.numRows() >= targetRoundoff) { + cb } else { - // No more to data so return what is left - val ret = withResource(pending) { p => - GpuColumnVector.from(p) + val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty + try { + batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + fillAndConcat(batches) + } finally { + batches.safeClose() } - pending = null - return ret } } - // We got lucky - if (pending.getRowCount == targetRowSize) { - val ret = withResource(pending) { p => - GpuColumnVector.from(p) - } - pending = null - return ret + val rc: Long = combined.numRows() + + if (rc % targetRoundoff == 0 || rc < targetRoundoff) { + return combined } - val split = pending.contiguousSplit(targetRowSize) - split.foreach(_.getBuffer.close()) - pending.close() - pending = split(1).getTable - withResource(split.head.getTable) { ret => - GpuColumnVector.from(ret) + val splitIndex = (targetRoundoff * (rc/targetRoundoff)).toInt + val split = withResource(combined) { combinedCb => + withResource(GpuColumnVector.from(combinedCb)) { combinedTable => + combinedTable.contiguousSplit(splitIndex) + } + } + withResource(split) { split => + assert(pending.isEmpty) + pending = + Some(SpillableColumnarBatch(GpuColumnVectorFromBuffer.from(split.last), + SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + GpuColumnVectorFromBuffer.from(split.head) } } } -// TODO extend this with spilling and other wonderful things -class BatchQueue extends AutoCloseable { - // TODO for now we will use an built in queue - private val queue: mutable.Queue[ColumnarBatch] = mutable.Queue[ColumnarBatch]() +/** + * A simple queue that holds the pending batches that need to line up with + * and combined with batches comming back from python + */ +class BatchQueue extends AutoCloseable with Arm { + private val queue: mutable.Queue[SpillableColumnarBatch] = + mutable.Queue[SpillableColumnarBatch]() + private var isSet = false def add(batch: ColumnarBatch): Unit = synchronized { - // If you cannot add something blow up - queue.enqueue(batch) + queue.enqueue(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + if (!isSet) { + // Wake up anyone waiting for the first batch. + isSet = true + notifyAll() + } } def remove(): ColumnarBatch = synchronized { if (queue.isEmpty) { null } else { - queue.dequeue() + withResource(queue.dequeue()) { scp => + scp.getColumnarBatch() + } + } + } + + def hasNext: Boolean = synchronized { + if (!isSet) { + wait() } + queue.nonEmpty + } + + /** + * Get the number of rows in the next batch, without actually getting the batch. + */ + def peekBatchSize: Int = synchronized { + queue.head.numRows() } override def close(): Unit = synchronized { + if (!isSet) { + isSet = true + notifyAll() + } while(queue.nonEmpty) { queue.dequeue().close() } } } -/* +/** * Helper functions for [[GpuPythonUDF]] */ object GpuPythonUDF { @@ -158,7 +246,7 @@ object GpuPythonUDF { def isWindowPandasUDF(e: Expression): Boolean = isGroupedAggPandasUDF(e) } -/* +/** * A serialized version of a Python lambda function. This is a special expression, which needs a * dedicated physical operator to execute it, and thus can't be pushed down to data sources. */ @@ -188,12 +276,14 @@ case class GpuPythonUDF( } } -/* +/** * A trait that can be mixed-in with `BasePythonRunner`. It implements the logic from * Python (Arrow) to GPU/JVM (ColumnarBatch). */ trait GpuPythonArrowOutput extends Arm { self: BasePythonRunner[_, ColumnarBatch] => + def minReadTargetBatchSize: Int = 1 + protected def newReaderIterator( stream: DataInputStream, writerThread: WriterThread, @@ -221,10 +311,17 @@ trait GpuPythonArrowOutput extends Arm { self: BasePythonRunner[_, ColumnarBatch throw writerThread.exception.get } try { + // Because of batching and other things we have to be sure that we release the semaphore + // before any operation that could block. This is because we are using multiple threads + // for a single task and the GpuSemaphore might not wake up both threads associated with + // the task, so a reader can be blocked waiting for data, while a writer is waiting on + // the semaphore + GpuSemaphore.releaseIfNecessary(TaskContext.get()) if (arrowReader != null && batchLoaded) { + // The GpuSemaphore is acquired in a callback val table = withResource(new NvtxRange("read python batch", NvtxColor.DARK_GREEN)) { _ => - arrowReader.getNextIfAvailable + arrowReader.getNextIfAvailable(minReadTargetBatchSize) } if (table == null) { batchLoaded = false @@ -240,7 +337,10 @@ trait GpuPythonArrowOutput extends Arm { self: BasePythonRunner[_, ColumnarBatch } else { stream.readInt() match { case SpecialLengths.START_ARROW_STREAM => - arrowReader = Table.readArrowIPCChunked(new StreamToBufferProvider(stream)) + val builder = ArrowIPCOptions.builder() + builder.withCallback(() => GpuSemaphore.acquireIfNecessary(TaskContext.get())) + arrowReader = Table.readArrowIPCChunked(builder.build(), + new StreamToBufferProvider(stream)) read() case SpecialLengths.TIMING_DATA => handleTimingData() @@ -259,7 +359,7 @@ trait GpuPythonArrowOutput extends Arm { self: BasePythonRunner[_, ColumnarBatch } -/* +/** * Similar to `PythonUDFRunner`, but exchange data with Python worker via Arrow stream. */ class GpuArrowPythonRunner( @@ -268,7 +368,8 @@ class GpuArrowPythonRunner( argOffsets: Array[Array[Int]], schema: StructType, timeZoneId: String, - conf: Map[String, String]) + conf: Map[String, String], + batchSize: Long) extends BasePythonRunner[ColumnarBatch, ColumnarBatch](funcs, evalType, argOffsets) with GpuPythonArrowOutput { @@ -301,6 +402,11 @@ class GpuArrowPythonRunner( protected override def writeIteratorToStream(dataOut: DataOutputStream): Unit = { val writer = { val builder = ArrowIPCWriterOptions.builder() + builder.withMaxChunkSize(batchSize) + builder.withCallback((table: Table) => { + table.close() + GpuSemaphore.releaseIfNecessary(TaskContext.get()) + }) schema.foreach { field => if (field.nullable) { builder.withColumnNames(field.name) @@ -312,14 +418,16 @@ class GpuArrowPythonRunner( } Utils.tryWithSafeFinally { while(inputIterator.hasNext) { - withResource(inputIterator.next()) { nextBatch => - withResource(GpuColumnVector.from(nextBatch)) { table => - withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => - writer.write(table) - } - } + val table = withResource(inputIterator.next()) { nextBatch => + GpuColumnVector.from(nextBatch) + } + withResource(new NvtxRange("write python batch", NvtxColor.DARK_GREEN)) { _ => + // The callback will handle closing table and releasing the semaphore + writer.write(table) } } + // The iterator can grab the semaphore even on an empty batch + GpuSemaphore.releaseIfNecessary(TaskContext.get()) } { writer.close() dataOut.flush() @@ -369,9 +477,9 @@ class StreamToBufferProvider(inputStream: DataInputStream) extends HostBufferPro } } -/* +/** * A physical plan that evaluates a [[GpuPythonUDF]]. The transformation of the data to arrow - * happens on the GPU (practically a noop), But execution of the UDFs are on the CPU or GPU. + * happens on the GPU (practically a noop), But execution of the UDFs are on the CPU. */ case class GpuArrowEvalPythonExec( udfs: Seq[GpuPythonUDF], @@ -455,10 +563,14 @@ case class GpuArrowEvalPythonExec( }) val boundReferences = GpuBindReferences.bindReferences(allInputs, child.output) - val batchedIterator = new RebatchingIterator(iter, batchSize, numInputRows, numInputBatches) + val batchedIterator = new RebatchingRoundoffIterator(iter, batchSize, + numInputRows, numInputBatches) val projectedIterator = batchedIterator.map { batch => + // We have to do the project before we add the batch because the batch might be closed + // when it is added + val ret = GpuProjectExec.project(batch, boundReferences) queue.add(batch) - GpuProjectExec.project(batch, boundReferences) + ret } if (isPythonOnGpuEnabled) { @@ -466,27 +578,52 @@ case class GpuArrowEvalPythonExec( PythonWorkerSemaphore.acquireIfNecessary(context) } + var targetReadBatchSize = 1 + val outputBatchIterator = new GpuArrowPythonRunner( pyFuncs, evalType, argOffsets, schema, sessionLocalTimeZone, - pythonRunnerConf).compute(projectedIterator, + pythonRunnerConf, + batchSize){ + override def minReadTargetBatchSize: Int = targetReadBatchSize + }.compute(projectedIterator, context.partitionId(), context) - outputBatchIterator.map { outputBatch => - withResource(outputBatch) { outBatch => - withResource(queue.remove()) { origBatch => - val rows = origBatch.numRows() - assert(outBatch.numRows() == rows) - val lColumns = GpuColumnVector.extractColumns(origBatch) - val rColumns = GpuColumnVector.extractColumns(outBatch) - numOutputBatches += 1 - numOutputRows += rows - new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()), - rows) + new Iterator[ColumnarBatch] { + // for hasNext we are waiting on the queue to have something inserted into it + // instead of waiting for a result to be ready from python. The reason for this + // is to let us know the target number of rows in the batch that we want when reading. + // It is a bit hacked up but it works. In the future when we support spilling we should + // store the number of rows separate from the batch. That way we can get the target batch + // size out without needing to grab the GpuSemaphore which we cannot do if we might block + // on a read operation. + override def hasNext: Boolean = queue.hasNext + + private [this] def combine(origBatch: ColumnarBatch, table: Table): ColumnarBatch = { + val lColumns = GpuColumnVector.extractColumns(origBatch) + val rColumns = GpuColumnVector.extractColumns(table) + new ColumnarBatch(lColumns.map(_.incRefCount()) ++ rColumns.map(_.incRefCount()), + origBatch.numRows()) + } + + override def next(): ColumnarBatch = { + val numRows = queue.peekBatchSize + // This is a bit of a hack, but it lets us set what we want to read without + // copying/rewriting a lot of the abstraction that spark has in place. + targetReadBatchSize = numRows + withResource(outputBatchIterator.next()) { cb => + withResource(GpuColumnVector.from(cb)) { fromPython => + assert(fromPython.getRowCount == numRows) + withResource(queue.remove()) { origBatch => + numOutputBatches += 1 + numOutputRows += numRows + combine(origBatch, fromPython) + } + } } } }