From d637490d36fcf9ff46d896abb8b17e730b50b99c Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 22 Feb 2021 14:08:37 -0600 Subject: [PATCH] Spill metrics everywhere (#1788) Signed-off-by: Robert (Bobby) Evans --- .../spark/rapids/GpuCoalesceBatches.scala | 143 ++---------------- .../spark/rapids/SpillableColumnarBatch.scala | 5 +- .../scala/com/nvidia/spark/rapids/limit.scala | 11 +- .../python/GpuArrowEvalPythonExec.scala | 29 ++-- .../python/GpuWindowInPandasExecBase.scala | 13 +- .../rapids/GpuCoalesceBatchesSuite.scala | 4 +- 6 files changed, 46 insertions(+), 159 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala index 22ce1e30c59..6435f1d4a96 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuCoalesceBatches.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} -import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -337,125 +337,6 @@ abstract class AbstractGpuCoalesceIterator( } } -// Remove this iterator when contiguous_split supports nested types -class GpuCoalesceIteratorNoSpill(iter: Iterator[ColumnarBatch], - schema: StructType, - goal: CoalesceGoal, - maxDecompressBatchMemory: Long, - numInputRows: GpuMetric, - numInputBatches: GpuMetric, - numOutputRows: GpuMetric, - numOutputBatches: GpuMetric, - collectTime: GpuMetric, - concatTime: GpuMetric, - totalTime: GpuMetric, - peakDevMemory: GpuMetric, - opName: String) - extends AbstractGpuCoalesceIterator(iter, - goal, - numInputRows, - numInputBatches, - numOutputRows, - numOutputBatches, - collectTime, - concatTime, - totalTime, - opName) with Arm { - - private val sparkTypes: Array[DataType] = GpuColumnVector.extractTypes(schema) - private var batches: ArrayBuffer[ColumnarBatch] = ArrayBuffer.empty - private var maxDeviceMemory: Long = 0 - - // batch indices that are compressed batches - private[this] var compressedBatchIndices: ArrayBuffer[Int] = ArrayBuffer.empty - - private[this] var codec: TableCompressionCodec = _ - - override def initNewBatch(batch: ColumnarBatch): Unit = { - batches.clear() - compressedBatchIndices.clear() - } - - override def addBatchToConcat(batch: ColumnarBatch): Unit = { - if (isBatchCompressed(batch)) { - compressedBatchIndices += batches.size - } - batches += batch - } - - private def isBatchCompressed(batch: ColumnarBatch): Boolean = { - if (batch.numCols == 0) { - false - } else { - batch.column(0) match { - case _: GpuCompressedColumnVector => true - case _ => false - } - } - } - - override def concatAllAndPutOnGPU(): ColumnarBatch = { - decompressBatches() - val tmp = batches.toArray - // Clear the buffer so we don't close it again (buildNonEmptyBatch closed it for us). - batches = ArrayBuffer.empty - val ret = ConcatAndConsumeAll.buildNonEmptyBatch(tmp, schema) - // sum of current batches and concatenating batches. Approximately sizeof(ret * 2). - maxDeviceMemory = GpuColumnVector.getTotalDeviceMemoryUsed(ret) * 2 - ret - } - - private def decompressBatches(): Unit = { - if (compressedBatchIndices.nonEmpty) { - val compressedVecs = compressedBatchIndices.map { batchIndex => - batches(batchIndex).column(0).asInstanceOf[GpuCompressedColumnVector] - } - if (codec == null) { - val descr = compressedVecs.head.getTableMeta.bufferMeta.codecBufferDescrs(0) - codec = TableCompressionCodec.getCodec(descr.codec) - } - withResource(codec.createBatchDecompressor(maxDecompressBatchMemory, - Cuda.DEFAULT_STREAM)) { decompressor => - compressedVecs.foreach { cv => - val bufferMeta = cv.getTableMeta.bufferMeta - // don't currently support switching codecs when partitioning - val buffer = cv.getTableBuffer.slice(0, cv.getTableBuffer.getLength) - decompressor.addBufferToDecompress(buffer, bufferMeta) - } - withResource(decompressor.finishAsync()) { outputBuffers => - outputBuffers.zipWithIndex.foreach { case (outputBuffer, outputIndex) => - val cv = compressedVecs(outputIndex) - val batchIndex = compressedBatchIndices(outputIndex) - val compressedBatch = batches(batchIndex) - batches(batchIndex) = - MetaUtils.getBatchFromMeta(outputBuffer, cv.getTableMeta, sparkTypes) - compressedBatch.close() - } - } - } - } - } - - override def cleanupConcatIsDone(): Unit = { - peakDevMemory.set(maxDeviceMemory) - batches.foreach(_.close()) - } - - private var onDeck: Option[ColumnarBatch] = None - - override protected def hasOnDeck: Boolean = onDeck.isDefined - override protected def saveOnDeck(batch: ColumnarBatch): Unit = onDeck = Some(batch) - override protected def clearOnDeck(): Unit = { - onDeck.foreach(_.close()) - onDeck = None - } - override protected def popOnDeck(): ColumnarBatch = { - val ret = onDeck.get - onDeck = None - ret - } -} - class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], schema: StructType, goal: CoalesceGoal, @@ -468,6 +349,7 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], concatTime: GpuMetric, totalTime: GpuMetric, peakDevMemory: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback, opName: String) extends AbstractGpuCoalesceIterator(iter, goal, @@ -490,7 +372,8 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], } override def addBatchToConcat(batch: ColumnarBatch): Unit = - batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + batches.append(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_BATCHING_PRIORITY, + spillCallback)) private[this] var codec: TableCompressionCodec = _ @@ -553,7 +436,8 @@ class GpuCoalesceIterator(iter: Iterator[ColumnarBatch], override protected def saveOnDeck(batch: ColumnarBatch): Unit = { assert(onDeck.isEmpty) - onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + onDeck = Some(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) } override protected def clearOnDeck(): Unit = { @@ -583,7 +467,7 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) COLLECT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_COLLECT_TIME), CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME), PEAK_DEVICE_MEMORY -> createSizeMetric(MODERATE_LEVEL, DESCRIPTION_PEAK_DEVICE_MEMORY) - ) + ) ++ spillMetrics override protected def doExecute(): RDD[InternalRow] = { throw new IllegalStateException("ROW BASED PROCESSING IS NOT SUPPORTED") @@ -608,12 +492,6 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) // cache in local vars to avoid serializing the plan val outputSchema = schema val decompressMemoryTarget = maxDecompressBatchMemory - val cannotSpill = child.schema.fields.exists { f => - f.dataType match { - case MapType(_, _, _) | ArrayType(_, _) | StructType(_) => true - case _ => false - } - } val batches = child.executeColumnar() batches.mapPartitions { iter => @@ -621,14 +499,11 @@ case class GpuCoalesceBatches(child: SparkPlan, goal: CoalesceGoal) val numRows = iter.map(_.numRows).sum val combinedCb = new ColumnarBatch(Array.empty, numRows) Iterator.single(combinedCb) - } else if (cannotSpill) { - new GpuCoalesceIteratorNoSpill(iter, outputSchema, goal, decompressMemoryTarget, - numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, - concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") } else { + val callback = GpuMetric.makeSpillCallback(allMetrics) new GpuCoalesceIterator(iter, outputSchema, goal, decompressMemoryTarget, numInputRows, numInputBatches, numOutputRows, numOutputBatches, collectTime, - concatTime, totalTime, peakDevMemory, "GpuCoalesceBatches") + concatTime, totalTime, peakDevMemory, callback, "GpuCoalesceBatches") } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index f02e9212f89..7f514fab491 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -16,8 +16,6 @@ package com.nvidia.spark.rapids -import com.nvidia.spark.rapids.StorageTier.StorageTier - import org.apache.spark.sql.rapids.TempSpillBufferId import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -134,8 +132,7 @@ object SpillableColumnarBatch extends Arm { */ def apply(batch: ColumnarBatch, priority: Long, - spillCallback: RapidsBuffer.SpillCallback = RapidsBuffer.defaultSpillCallback) - : SpillableColumnarBatch = { + spillCallback: RapidsBuffer.SpillCallback): SpillableColumnarBatch = { val numRows = batch.numRows() if (batch.numCols() <= 0) { // We consumed it diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala index f9a26b4b143..93ae20c8ab0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/limit.scala @@ -194,7 +194,8 @@ object GpuTopN extends Arm { inputBatches: GpuMetric, inputRows: GpuMetric, outputBatches: GpuMetric, - outputRows: GpuMetric): Iterator[ColumnarBatch] = + outputRows: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback): Iterator[ColumnarBatch] = new Iterator[ColumnarBatch]() { override def hasNext: Boolean = iter.hasNext @@ -230,7 +231,8 @@ object GpuTopN extends Arm { } } pending = - Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + Some(SpillableColumnarBatch(runningResult, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) } } val ret = pending.get.getColumnarBatch() @@ -267,7 +269,7 @@ case class GpuTopN( NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES), SORT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_SORT_TIME), CONCAT_TIME -> createNanoTimingMetric(MODERATE_LEVEL, DESCRIPTION_CONCAT_TIME) - ) + ) ++ spillMetrics override def doExecuteColumnar(): RDD[ColumnarBatch] = { val sorter = new GpuSorter(sortOrder, child.output) @@ -279,9 +281,10 @@ case class GpuTopN( val outputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val sortTime = gpuLongMetric(SORT_TIME) val concatTime = gpuLongMetric(CONCAT_TIME) + val callback = GpuMetric.makeSpillCallback(allMetrics) child.executeColumnar().mapPartitions { iter => val topN = GpuTopN(limit, sorter, iter, totalTime, sortTime, concatTime, - inputBatches, inputRows, outputBatches, outputRows) + inputBatches, inputRows, outputBatches, outputRows, callback) if (projectList != child.output) { topN.map { batch => GpuProjectExec.projectAndClose(batch, boundProjectExprs, totalTime) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala index c1175617ac0..c9872babbc0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuArrowEvalPythonExec.scala @@ -27,7 +27,7 @@ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import ai.rapids.cudf.{AggregationOnColumn, ArrowIPCOptions, ArrowIPCWriterOptions, ColumnVector, HostBufferConsumer, HostBufferProvider, HostMemoryBuffer, NvtxColor, NvtxRange, StreamedTableReader, Table} -import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, SpillableColumnarBatch, SpillPriorities} +import com.nvidia.spark.rapids.{Arm, ConcatAndConsumeAll, GpuAggregateWindowFunction, GpuBindReferences, GpuColumnVector, GpuColumnVectorFromBuffer, GpuExec, GpuMetric, GpuProjectExec, GpuSemaphore, GpuUnevaluable, RapidsBuffer, SpillableColumnarBatch, SpillPriorities} import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.python.PythonWorkerSemaphore @@ -59,7 +59,8 @@ class RebatchingRoundoffIterator( schema: StructType, targetRoundoff: Int, inputRows: GpuMetric, - inputBatches: GpuMetric) + inputBatches: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback) extends Iterator[ColumnarBatch] with Arm { var pending: Option[SpillableColumnarBatch] = None @@ -94,7 +95,8 @@ class RebatchingRoundoffIterator( inputBatches += 1 inputRows += got.numRows() rowsSoFar += got.numRows() - batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + batches.append(SpillableColumnarBatch(got, SpillPriorities.ACTIVE_BATCHING_PRIORITY, + spillCallback)) } val toConcat = batches.safeMap(_.getColumnarBatch()).toArray ConcatAndConsumeAll.buildNonEmptyBatch(toConcat, schema) @@ -129,7 +131,8 @@ class RebatchingRoundoffIterator( localPending.setSpillPriority(SpillPriorities.ACTIVE_BATCHING_PRIORITY) batches.append(localPending) pending = None - batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY, + spillCallback)) fillAndConcat(batches) } finally { batches.safeClose() @@ -145,7 +148,8 @@ class RebatchingRoundoffIterator( } else { val batches: ArrayBuffer[SpillableColumnarBatch] = ArrayBuffer.empty try { - batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY)) + batches.append(SpillableColumnarBatch(cb, SpillPriorities.ACTIVE_BATCHING_PRIORITY, + spillCallback)) fillAndConcat(batches) } finally { batches.safeClose() @@ -170,7 +174,8 @@ class RebatchingRoundoffIterator( pending = Some(SpillableColumnarBatch(GpuColumnVectorFromBuffer.from(split.last, GpuColumnVector.extractTypes(schema)), - SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) GpuColumnVectorFromBuffer.from(split.head, GpuColumnVector.extractTypes(schema)) } } @@ -185,8 +190,9 @@ class BatchQueue extends AutoCloseable with Arm { mutable.Queue[SpillableColumnarBatch]() private var isSet = false - def add(batch: ColumnarBatch): Unit = synchronized { - queue.enqueue(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)) + def add(batch: ColumnarBatch, spillCallback: RapidsBuffer.SpillCallback): Unit = synchronized { + queue.enqueue(SpillableColumnarBatch(batch, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)) if (!isSet) { // Wake up anyone waiting for the first batch. isSet = true @@ -543,13 +549,14 @@ case class GpuArrowEvalPythonExec( NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES), NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS), NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES) - ) + ) ++ spillMetrics override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) val numInputRows = gpuLongMetric(NUM_INPUT_ROWS) val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES) + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) lazy val isPythonOnGpuEnabled = GpuPythonHelper.isPythonOnGpuEnabled(conf) @@ -598,12 +605,12 @@ case class GpuArrowEvalPythonExec( val boundReferences = GpuBindReferences.bindReferences(allInputs, childOutput) val batchedIterator = new RebatchingRoundoffIterator(iter, inputSchema, targetBatchSize, - numInputRows, numInputBatches) + numInputRows, numInputBatches, spillCallback) val pyInputIterator = batchedIterator.map { batch => // We have to do the project before we add the batch because the batch might be closed // when it is added val ret = GpuProjectExec.project(batch, boundReferences) - queue.add(batch) + queue.add(batch, spillCallback) ret } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala index b63c3009c2a..2ea45b9fc1c 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuWindowInPandasExecBase.scala @@ -88,7 +88,8 @@ class GroupingIterator( wrapped: Iterator[ColumnarBatch], partitionSpec: Seq[Expression], inputRows: GpuMetric, - inputBatches: GpuMetric) extends Iterator[ColumnarBatch] with Arm { + inputBatches: GpuMetric, + spillCallback: RapidsBuffer.SpillCallback) extends Iterator[ColumnarBatch] with Arm { // Currently do it in a somewhat ugly way. In the future cuDF will provide a dedicated API. // Current solution assumes one group data exists in only one batch, so just split the @@ -147,7 +148,8 @@ class GroupingIterator( val splitBatches = tables.safeMap(table => GpuColumnVectorFromBuffer.from(table, GpuColumnVector.extractTypes(batch))) groupBatches.enqueue(splitBatches.tail.map(sb => - SpillableColumnarBatch(sb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY)): _*) + SpillableColumnarBatch(sb, SpillPriorities.ACTIVE_ON_DECK_PRIORITY, + spillCallback)): _*) splitBatches.head } } @@ -392,7 +394,7 @@ trait GpuWindowInPandasExecBase extends UnaryExecNode with GpuExec { NUM_OUTPUT_BATCHES -> createMetric(outputBatchesLevel, DESCRIPTION_NUM_OUTPUT_BATCHES), NUM_INPUT_ROWS -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_ROWS), NUM_INPUT_BATCHES -> createMetric(DEBUG_LEVEL, DESCRIPTION_NUM_INPUT_BATCHES) - ) + ) ++ spillMetrics override protected def doExecute(): RDD[InternalRow] = throw new IllegalStateException(s"Row-based execution should not occur for $this") @@ -402,6 +404,7 @@ trait GpuWindowInPandasExecBase extends UnaryExecNode with GpuExec { val numInputBatches = gpuLongMetric(NUM_INPUT_BATCHES) val numOutputRows = gpuLongMetric(NUM_OUTPUT_ROWS) val numOutputBatches = gpuLongMetric(NUM_OUTPUT_BATCHES) + val spillCallback = GpuMetric.makeSpillCallback(allMetrics) val sessionLocalTimeZone = conf.sessionLocalTimeZone // 1) Unwrap the expressions and build some info data: @@ -504,7 +507,7 @@ trait GpuWindowInPandasExecBase extends UnaryExecNode with GpuExec { // Re-batching the input data by GroupingIterator val boundPartitionRefs = GpuBindReferences.bindGpuReferences(partitionSpec, childOutput) val groupedIterator = new GroupingIterator(inputIter, boundPartitionRefs, - numInputRows, numInputBatches) + numInputRows, numInputBatches, spillCallback) val pyInputIterator = groupedIterator.map { batch => // We have to do the project before we add the batch because the batch might be closed // when it is added @@ -513,7 +516,7 @@ trait GpuWindowInPandasExecBase extends UnaryExecNode with GpuExec { val inputBatch = withResource(projectedBatch) { projectedCb => insertWindowBounds(projectedCb) } - queue.add(batch) + queue.add(batch, spillCallback) inputBatch } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala index 338d36f6e06..a45fd32e8c9 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/GpuCoalesceBatchesSuite.scala @@ -165,7 +165,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { val vector3 = toArrowField("array", ArrayType(IntegerType), nullable = true, null) .createVector(allocator).asInstanceOf[ListVector] vector3.allocateNew() - val elementVector = vector3.getDataVector().asInstanceOf[IntVector] + val elementVector = vector3.getDataVector.asInstanceOf[IntVector] (0 until 10).foreach { i => vector1.setSafe(i, i) @@ -404,6 +404,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { dummyMetric, dummyMetric, dummyMetric, + RapidsBuffer.defaultSpillCallback, "test concat") var expected = 0 @@ -485,6 +486,7 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite { dummyMetric, dummyMetric, dummyMetric, + RapidsBuffer.defaultSpillCallback, "test concat") var expected = 0