diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractHostByteBufferIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractHostByteBufferIterator.scala new file mode 100644 index 00000000000..24f3bacf28b --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AbstractHostByteBufferIterator.scala @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import java.nio.ByteBuffer + +import ai.rapids.cudf.{Cuda, HostMemoryBuffer, MemoryBuffer} + +abstract class AbstractHostByteBufferIterator + extends Iterator[ByteBuffer] { + private[this] var nextBufferStart: Long = 0L + + val totalLength: Long + + protected val limit: Long = Integer.MAX_VALUE + + def getByteBuffer(offset: Long, length: Long): ByteBuffer + + override def hasNext: Boolean = nextBufferStart < totalLength + + override def next(): ByteBuffer = { + val offset = nextBufferStart + val length = Math.min(totalLength - nextBufferStart, limit) + nextBufferStart += length + getByteBuffer(offset, length) + } +} + +/** + * Create an iterator that will emit ByteBuffer instances sequentially + * to work around the 2GB ByteBuffer size limitation. This allows + * the entire address range of a >2GB host buffer to be covered + * by a sequence of ByteBuffer instances. + *

NOTE: It is the caller's responsibility to ensure this iterator + * does not outlive the host buffer. The iterator DOES NOT increment + * the reference count of the host buffer to ensure it remains valid. + * + * @param hostBuffer host buffer to iterate + * @return ByteBuffer iterator + */ +class HostByteBufferIterator(hostBuffer: HostMemoryBuffer) + extends AbstractHostByteBufferIterator { + override protected val limit: Long = Integer.MAX_VALUE + + override val totalLength: Long = if (hostBuffer == null) { + 0 + } else { + hostBuffer.getLength + } + + override def getByteBuffer(offset: Long, length: Long): ByteBuffer = { + hostBuffer.asByteBuffer(offset, length.toInt) + } +} + +/** + * Create an iterator that will emit ByteBuffer instances sequentially + * to work around the 2GB ByteBuffer size limitation after copying a `MemoryBuffer` + * (which is likely a `DeviceMemoryBuffer`) to a host-backed bounce buffer + * that is likely smaller than 2GB. + * @note It is the caller's responsibility to ensure this iterator + * does not outlive `memoryBuffer`. The iterator DOES NOT increment + * the reference count of `memoryBuffer` to ensure it remains valid. + * @param memoryBuffer memory buffer to copy. This is likely a DeviceMemoryBuffer + * @param bounceBuffer a host bounce buffer that will be used to stage copies onto the host + * @param stream stream to synchronize on after staging to bounceBuffer + * @return ByteBuffer iterator + */ +class MemoryBufferToHostByteBufferIterator( + memoryBuffer: MemoryBuffer, + bounceBuffer: HostMemoryBuffer, + stream: Cuda.Stream) + extends AbstractHostByteBufferIterator { + override val totalLength: Long = if (memoryBuffer == null) { + 0 + } else { + memoryBuffer.getLength + } + + override protected val limit: Long = + Math.min(bounceBuffer.getLength, Integer.MAX_VALUE) + + override def getByteBuffer(offset: Long, length: Long): ByteBuffer = { + bounceBuffer + .copyFromMemoryBufferAsync(0, memoryBuffer, offset, length, stream) + stream.sync() + bounceBuffer.asByteBuffer(0, length.toInt) + } +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostByteBufferIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostByteBufferIterator.scala deleted file mode 100644 index 5df9f220435..00000000000 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostByteBufferIterator.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids - -import java.nio.ByteBuffer - -import ai.rapids.cudf.HostMemoryBuffer - -/** - * Create an iterator that will emit ByteBuffer instances sequentially - * to work around the 2GB ByteBuffer size limitation. This allows - * the entire address range of a >2GB host buffer to be covered - * by a sequence of ByteBuffer instances. - *

NOTE: It is the caller's responsibility to ensure this iterator - * does not outlive the host buffer. The iterator DOES NOT increment - * the reference count of the host buffer to ensure it remains valid. - * - * @param hostBuffer host buffer to iterate - * @return ByteBuffer iterator - */ -class HostByteBufferIterator(hostBuffer: HostMemoryBuffer) - extends Iterator[ByteBuffer] { - private[this] var nextBufferStart: Long = 0L - - override def hasNext: Boolean = hostBuffer != null && nextBufferStart < hostBuffer.getLength - - override def next(): ByteBuffer = { - val offset = nextBufferStart - val length = Math.min(hostBuffer.getLength - nextBufferStart, Integer.MAX_VALUE) - nextBufferStart += length - hostBuffer.asByteBuffer(offset, length.toInt) - } -} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala index ed0699e92f0..4bdf2493647 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBuffer.scala @@ -230,14 +230,14 @@ trait RapidsBuffer extends AutoCloseable { * * @note Do not use this size to allocate a target buffer to copy, always use `getPackedSize.` */ - def getMemoryUsedBytes: Long + val memoryUsedBytes: Long /** * The size of this buffer if it has already gone through contiguous_split. * * @note Use this function when allocating a target buffer for spill or shuffle purposes. */ - def getPackedSizeBytes: Long = getMemoryUsedBytes + def getPackedSizeBytes: Long = memoryUsedBytes /** * At spill time, obtain an iterator used to copy this buffer to a different tier. @@ -389,7 +389,7 @@ sealed class DegenerateRapidsBuffer( override val id: RapidsBufferId, override val meta: TableMeta) extends RapidsBuffer { - override def getMemoryUsedBytes: Long = 0L + override val memoryUsedBytes: Long = 0L override val storageTier: StorageTier = StorageTier.DEVICE @@ -451,7 +451,7 @@ trait RapidsHostBatchBuffer extends AutoCloseable { */ def getHostColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch - def getMemoryUsedBytes(): Long + val memoryUsedBytes: Long } trait RapidsBufferChannelWritable { @@ -459,7 +459,10 @@ trait RapidsBufferChannelWritable { * At spill time, write this buffer to an nio WritableByteChannel. * @param writableChannel that this buffer can just write itself to, either byte-for-byte * or via serialization if needed. + * @param stream the Cuda.Stream for the spilling thread. If the `RapidsBuffer` that + * implements this method is on the device, synchronization may be needed + * for staged copies. * @return the amount of bytes written to the channel */ - def writeToChannel(writableChannel: WritableByteChannel): Long + def writeToChannel(writableChannel: WritableByteChannel, stream: Cuda.Stream): Long } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 45fa981cff5..5c58e73bf55 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -19,9 +19,7 @@ package com.nvidia.spark.rapids import java.util.concurrent.ConcurrentHashMap import java.util.function.BiFunction -import scala.collection.mutable.ArrayBuffer - -import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange, Rmm, Table} +import ai.rapids.cudf.{ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Rmm, Table} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import com.nvidia.spark.rapids.RapidsBufferCatalog.getExistingRapidsBufferAndAcquire import com.nvidia.spark.rapids.RapidsPluginImplicits._ @@ -32,7 +30,6 @@ import com.nvidia.spark.rapids.jni.RmmSpark import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId} -import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.vectorized.ColumnarBatch /** @@ -378,7 +375,28 @@ class RapidsBufferCatalog( table: Table, initialSpillPriority: Long, needsSync: Boolean = true): RapidsBufferHandle = { - val id = TempSpillBufferId() + addTable(TempSpillBufferId(), table, initialSpillPriority, needsSync) + } + + /** + * Adds a table to the device storage. + * + * This takes ownership of the table. The reason for this is that tables + * don't have a reference count, so we cannot cleanly capture ownership by increasing + * ref count and decreasing from the caller. + * + * @param id specific RapidsBufferId to use for this table + * @param table table that will be owned by the store + * @param initialSpillPriority starting spill priority value + * @param needsSync whether the spill framework should stream synchronize while adding + * this table (defaults to true) + * @return RapidsBufferHandle handle for this RapidsBuffer + */ + def addTable( + id: RapidsBufferId, + table: Table, + initialSpillPriority: Long, + needsSync: Boolean): RapidsBufferHandle = { val rapidsBuffer = deviceStorage.addTable( id, table, @@ -442,7 +460,7 @@ class RapidsBufferCatalog( */ def acquireBuffer(handle: RapidsBufferHandle): RapidsBuffer = { val id = handle.id - (0 until RapidsBufferCatalog.MAX_BUFFER_LOOKUP_ATTEMPTS).foreach { _ => + def lookupAndReturn: Option[RapidsBuffer] = { val buffers = bufferMap.get(id) if (buffers == null || buffers.isEmpty) { throw new NoSuchElementException( @@ -450,7 +468,27 @@ class RapidsBufferCatalog( } val buffer = buffers.head if (buffer.addReference()) { - return buffer + Some(buffer) + } else { + None + } + } + + // fast path + (0 until RapidsBufferCatalog.MAX_BUFFER_LOOKUP_ATTEMPTS).foreach { _ => + val mayBuffer = lookupAndReturn + if (mayBuffer.isDefined) { + return mayBuffer.get + } + } + + // try one last time after locking the catalog (slow path) + // if there is a lot of contention here, I would rather lock the world than + // have tasks error out with "Unable to acquire" + synchronized { + val mayBuffer = lookupAndReturn + if (mayBuffer.isDefined) { + return mayBuffer.get } } throw new IllegalStateException(s"Unable to acquire buffer for ID: $id") @@ -552,14 +590,9 @@ class RapidsBufferCatalog( store: RapidsBufferStore, targetTotalSize: Long, stream: Cuda.Stream = Cuda.DEFAULT_STREAM): Option[Long] = { - val spillStore = store.spillStore - if (spillStore == null) { + if (store.spillStore == null) { throw new OutOfMemoryError("Requested to spill without a spill store") } - - // total amount spilled in this invocation - var totalSpilled: Long = 0 - require(targetTotalSize >= 0, s"Negative spill target size: $targetTotalSize") val mySpillCount = spillCount @@ -573,124 +606,21 @@ class RapidsBufferCatalog( // None which lets the calling code know that rmm should retry allocation None } else { - // this thread win the race and should spill + // this thread wins the race and should spill spillCount += 1 - - logWarning(s"Targeting a ${store.name} size of $targetTotalSize. " + - s"Current total ${store.currentSize}. " + - s"Current spillable ${store.currentSpillableSize}") - - if (store.currentSpillableSize > targetTotalSize) { - withResource(new NvtxRange(s"${store.name} sync spill", NvtxColor.ORANGE)) { _ => - logWarning(s"${store.name} store spilling to reduce usage from " + - s"${store.currentSize} total (${store.currentSpillableSize} spillable) " + - s"to $targetTotalSize bytes") - - // If the store has 0 spillable bytes left, it has exhausted. - var exhausted = false - - val buffersToFree = new ArrayBuffer[RapidsBuffer]() - - try { - while (!exhausted && - store.currentSpillableSize > targetTotalSize) { - val nextSpillable = store.nextSpillable() - if (nextSpillable != null) { - // we have a buffer (nextSpillable) to spill - spillBuffer(nextSpillable, spillStore, stream) - .foreach(buffersToFree.append(_)) - totalSpilled += nextSpillable.getMemoryUsedBytes - } - } - if (totalSpilled <= 0) { - // we didn't spill in this iteration, exit loop - exhausted = true - logWarning("Unable to spill enough to meet request. " + - s"Total=${store.currentSize} " + - s"Spillable=${store.currentSpillableSize} " + - s"Target=$targetTotalSize") - } - } finally { - if (buffersToFree.nonEmpty) { - // This is a hack in order to completely synchronize with the GPU before we free - // a buffer. It is necessary because of non-synchronous cuDF calls that could fall - // behind where the CPU is. Freeing a rapids buffer in these cases needs to wait for - // all launched GPU work, otherwise crashes or data corruption could occur. - // A more performant implementation would be to synchronize on the thread that read - // the buffer via events. - // https://github.com/NVIDIA/spark-rapids/issues/8610 - Cuda.deviceSynchronize() - buffersToFree.safeFree() - } - } - } - } - Some(totalSpilled) - } - } - } - - /** - * Given a specific `RapidsBuffer` spill it to `spillStore` - * @return the buffer, if successfully spilled, in order for the caller to free it - * @note called with catalog lock held - */ - private def spillBuffer( - buffer: RapidsBuffer, - spillStore: RapidsBufferStore, - stream: Cuda.Stream): Option[RapidsBuffer] = { - if (buffer.addReference()) { - withResource(buffer) { _ => - logDebug(s"Spilling $buffer ${buffer.id} to ${spillStore.name}") - val bufferHasSpilled = isBufferSpilled(buffer.id, buffer.storageTier) - if (!bufferHasSpilled) { - // if the spillStore specifies a maximum size spill taking this ceiling - // into account before trying to create a buffer there - // TODO: we may need to handle what happens if we can't spill anymore - // because all host buffers are being referenced. - trySpillToMaximumSize(buffer, spillStore, stream) - - // copy the buffer to spillStore - val newBuffer = spillStore.copyBuffer(buffer, stream) - - // once spilled, we get back a new RapidsBuffer instance in this new tier - registerNewBuffer(newBuffer) - } else { - logDebug(s"Skipping spilling $buffer ${buffer.id} to ${spillStore.name} as it is " + - s"already stored in multiple tiers") - } + Some(store.synchronousSpill(targetTotalSize, this, stream)) } - // we can now remove the old tier linkage - removeBufferTier(buffer.id, buffer.storageTier) - - // return the buffer - Some(buffer) - } else { - None } } - /** - * If `spillStore` defines a maximum size, spill to make room for `buffer`. - */ - private def trySpillToMaximumSize( - buffer: RapidsBuffer, - spillStore: RapidsBufferStore, - stream: Cuda.Stream): Unit = { - val spillStoreMaxSize = spillStore.getMaxSize - if (spillStoreMaxSize.isDefined) { - // this spillStore has a maximum size requirement (host only). We need to spill from it - // in order to make room for `buffer`. - val targetTotalSize = - math.max(spillStoreMaxSize.get - buffer.getMemoryUsedBytes, 0) - val maybeAmountSpilled = synchronousSpill(spillStore, targetTotalSize, stream) - maybeAmountSpilled.foreach { amountSpilled => - if (amountSpilled != 0) { - logInfo(s"Spilled $amountSpilled bytes from the ${spillStore.name} store") - TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled) - } - } - } + def updateTiers(bufferSpill: BufferSpill): Long = bufferSpill match { + case BufferSpill(spilledBuffer, maybeNewBuffer) => + logDebug(s"Spilled ${spilledBuffer.id} from tier ${spilledBuffer.storageTier}. " + + s"Removing. Registering ${maybeNewBuffer.map(_.id).getOrElse ("None")} " + + s"${maybeNewBuffer}") + maybeNewBuffer.foreach(registerNewBuffer) + removeBufferTier(spilledBuffer.id, spilledBuffer.storageTier) + spilledBuffer.memoryUsedBytes } /** @@ -707,10 +637,12 @@ class RapidsBufferCatalog( // do not create a new one, else add a reference acquireBuffer(buffer.id, StorageTier.DEVICE) match { case None => - val newBuffer = deviceStorage.copyBuffer(buffer, stream) - newBuffer.addReference() // add a reference since we are about to use it - registerNewBuffer(newBuffer) - newBuffer + val maybeNewBuffer = deviceStorage.copyBuffer(buffer, this, stream) + maybeNewBuffer.map { newBuffer => + newBuffer.addReference() // add a reference since we are about to use it + registerNewBuffer(newBuffer) + newBuffer + }.get // the GPU store has to return a buffer here for now, or throw OOM case Some(existingBuffer) => existingBuffer } } @@ -764,7 +696,6 @@ class RapidsBufferCatalog( } object RapidsBufferCatalog extends Logging { - private val MAX_BUFFER_LOOKUP_ATTEMPTS = 100 private var deviceStorage: RapidsDeviceMemoryStore = _ @@ -841,7 +772,9 @@ object RapidsBufferCatalog extends Logging { // We are going to re-initialize so make sure all of the old things were closed... closeImpl() assert(memoryEventHandler == null) - deviceStorage = new RapidsDeviceMemoryStore(rapidsConf.chunkedPackBounceBufferSize) + deviceStorage = new RapidsDeviceMemoryStore( + rapidsConf.chunkedPackBounceBufferSize, + rapidsConf.spillToDiskBounceBufferSize) diskBlockManager = new RapidsDiskBlockManager(conf) if (rapidsConf.isGdsSpillEnabled) { gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala index ecbd8e64259..090d1fc5f65 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferStore.scala @@ -21,7 +21,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import scala.collection.mutable -import ai.rapids.cudf.{BaseDeviceMemoryBuffer, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer} +import ai.rapids.cudf.{BaseDeviceMemoryBuffer, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange} import com.nvidia.spark.rapids.Arm._ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import com.nvidia.spark.rapids.StorageTier.{DEVICE, StorageTier} @@ -32,6 +32,15 @@ import org.apache.spark.sql.rapids.GpuTaskMetrics import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch +/** + * A helper case class that contains the buffer we spilled from our current tier + * and likely a new buffer created in a spill store tier, but it can be set to None. + * If the buffer already exists in the target spill store, `newBuffer` will be None. + * @param spilledBuffer a `RapidsBuffer` we spilled from this store + * @param newBuffer an optional `RapidsBuffer` in the target spill store. + */ +case class BufferSpill(spilledBuffer: RapidsBuffer, newBuffer: Option[RapidsBuffer]) + /** * Base class for all buffer store types. * @@ -67,14 +76,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (old != null) { throw new DuplicateBufferException(s"duplicate buffer registered: ${buffer.id}") } - totalBytesStored += buffer.getMemoryUsedBytes + totalBytesStored += buffer.memoryUsedBytes // device buffers "spillability" is handled via DeviceMemoryBuffer ref counting // so spillableOnAdd should be false, all other buffer tiers are spillable at // all times. - if (spillableOnAdd) { + if (spillableOnAdd && buffer.memoryUsedBytes > 0) { if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.getMemoryUsedBytes + totalBytesSpillable += buffer.memoryUsedBytes } } } @@ -84,9 +93,9 @@ abstract class RapidsBufferStore(val tier: StorageTier) spilling.remove(id) val obj = buffers.remove(id) if (obj != null) { - totalBytesStored -= obj.getMemoryUsedBytes + totalBytesStored -= obj.memoryUsedBytes if (spillable.remove(obj)) { - totalBytesSpillable -= obj.getMemoryUsedBytes + totalBytesSpillable -= obj.memoryUsedBytes } } } @@ -115,19 +124,19 @@ abstract class RapidsBufferStore(val tier: StorageTier) * @param isSpillable whether the buffer should now be spillable */ def setSpillable(buffer: RapidsBufferBase, isSpillable: Boolean): Unit = synchronized { - if (isSpillable) { + if (isSpillable && buffer.memoryUsedBytes > 0) { // if this buffer is in the store and isn't currently spilling if (!spilling.contains(buffer.id) && buffers.containsKey(buffer.id)) { // try to add it to the spillable collection if (spillable.offer(buffer)) { - totalBytesSpillable += buffer.getMemoryUsedBytes + totalBytesSpillable += buffer.memoryUsedBytes logDebug(s"Buffer ${buffer.id} is spillable. " + s"total=${totalBytesStored} spillable=${totalBytesSpillable}") } // else it was already there (unlikely) } } else { if (spillable.remove(buffer)) { - totalBytesSpillable -= buffer.getMemoryUsedBytes + totalBytesSpillable -= buffer.memoryUsedBytes logDebug(s"Buffer ${buffer.id} is not spillable. " + s"total=${totalBytesStored}, spillable=${totalBytesSpillable}") } // else it was already removed @@ -139,8 +148,8 @@ abstract class RapidsBufferStore(val tier: StorageTier) if (buffer != null) { // mark the id as "spilling" (this buffer is in the middle of a spill operation) spilling.add(buffer.id) - totalBytesSpillable -= buffer.getMemoryUsedBytes - logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.getMemoryUsedBytes} " + + totalBytesSpillable -= buffer.memoryUsedBytes + logDebug(s"Spilling buffer ${buffer.id}. size=${buffer.memoryUsedBytes} " + s"total=${totalBytesStored}, new spillable=${totalBytesSpillable}") } buffer @@ -196,15 +205,19 @@ abstract class RapidsBufferStore(val tier: StorageTier) * (i.e.: this method will not take ownership of the incoming buffer object). * This does not need to update the catalog, the caller is responsible for that. * @param buffer data from another store + * @param catalog RapidsBufferCatalog we may need to modify during this copy * @param stream CUDA stream to use for copy or null * @return the new buffer that was created */ def copyBuffer( buffer: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase = { - freeOnExcept(createBuffer(buffer, stream)) { newBuffer => - addBuffer(newBuffer) - newBuffer + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { + createBuffer(buffer, catalog, stream).map { newBuffer => + freeOnExcept(newBuffer) { newBuffer => + addBuffer(newBuffer) + newBuffer + } } } @@ -220,12 +233,14 @@ abstract class RapidsBufferStore(val tier: StorageTier) * @note DO NOT close the buffer unless adding a reference! * @note `createBuffer` impls should synchronize against `stream` before returning, if needed. * @param buffer data from another store + * @param catalog RapidsBufferCatalog we may need to modify during this create * @param stream CUDA stream to use or null * @return the new buffer that was created. */ protected def createBuffer( buffer: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] /** Update bookkeeping for a new buffer */ protected def addBuffer(buffer: RapidsBufferBase): Unit = { @@ -255,6 +270,129 @@ abstract class RapidsBufferStore(val tier: StorageTier) buffers.nextSpillableBuffer() } + def synchronousSpill( + targetTotalSize: Long, + catalog: RapidsBufferCatalog, + stream: Cuda.Stream = Cuda.DEFAULT_STREAM): Long = { + if (currentSpillableSize > targetTotalSize) { + logWarning(s"Targeting a ${name} size of $targetTotalSize. " + + s"Current total ${currentSize}. " + + s"Current spillable ${currentSpillableSize}") + val bufferSpills = new mutable.ArrayBuffer[BufferSpill]() + withResource(new NvtxRange(s"${name} sync spill", NvtxColor.ORANGE)) { _ => + logWarning(s"${name} store spilling to reduce usage from " + + s"${currentSize} total (${currentSpillableSize} spillable) " + + s"to $targetTotalSize bytes") + + // If the store has 0 spillable bytes left, it has exhausted. + try { + var exhausted = false + var totalSpilled = 0L + while (!exhausted && + currentSpillableSize > targetTotalSize) { + val nextSpillableBuffer = nextSpillable() + if (nextSpillableBuffer != null) { + if (nextSpillableBuffer.addReference()) { + withResource(nextSpillableBuffer) { _ => + val bufferHasSpilled = + catalog.isBufferSpilled( + nextSpillableBuffer.id, + nextSpillableBuffer.storageTier) + val bufferSpill = if (!bufferHasSpilled) { + spillBuffer( + nextSpillableBuffer, this, catalog, stream) + } else { + // if `nextSpillableBuffer` already spilled, we still need to + // remove it from our tier and call free on it, but set + // `newBuffer` to None because there's nothing to register + // as it has already spilled. + BufferSpill(nextSpillableBuffer, None) + } + totalSpilled += bufferSpill.spilledBuffer.memoryUsedBytes + bufferSpills.append(bufferSpill) + catalog.updateTiers(bufferSpill) + } + } + } + } + if (totalSpilled <= 0) { + // we didn't spill in this iteration, exit loop + exhausted = true + logWarning("Unable to spill enough to meet request. " + + s"Total=${currentSize} " + + s"Spillable=${currentSpillableSize} " + + s"Target=$targetTotalSize") + } + totalSpilled + } finally { + if (bufferSpills.nonEmpty) { + // This is a hack in order to completely synchronize with the GPU before we free + // a buffer. It is necessary because of non-synchronous cuDF calls that could fall + // behind where the CPU is. Freeing a rapids buffer in these cases needs to wait for + // all launched GPU work, otherwise crashes or data corruption could occur. + // A more performant implementation would be to synchronize on the thread that read + // the buffer via events. + // https://github.com/NVIDIA/spark-rapids/issues/8610 + Cuda.deviceSynchronize() + bufferSpills.foreach(_.spilledBuffer.safeFree()) + } + } + } + } else { + 0L // nothing spilled + } + } + + /** + * Given a specific `RapidsBuffer` spill it to `spillStore` + * + * @return a `BufferSpill` instance with the target buffer in this store, and an optional + * new `RapidsBuffer` in the target spill store if this rapids buffer hadn't already + * spilled. + * @note called with catalog lock held + */ + private def spillBuffer( + buffer: RapidsBuffer, + store: RapidsBufferStore, + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): BufferSpill = { + // copy the buffer to spillStore + var maybeNewBuffer: Option[RapidsBuffer] = None + var lastTier: Option[StorageTier] = None + var nextSpillStore = store.spillStore + while (maybeNewBuffer.isEmpty && nextSpillStore != null) { + lastTier = Some(nextSpillStore.tier) + // copy buffer if it fits + maybeNewBuffer = nextSpillStore.copyBuffer(buffer, catalog, stream) + + // if it didn't fit, we can try a lower tier that has more space + if (maybeNewBuffer.isEmpty) { + nextSpillStore = nextSpillStore.spillStore + } + } + if (maybeNewBuffer.isEmpty) { + throw new IllegalStateException( + s"Unable to spill buffer ${buffer.id} of size ${buffer.memoryUsedBytes} " + + s"to tier ${lastTier}") + } + // return the buffer to free and the new buffer to register + BufferSpill(buffer, maybeNewBuffer) + } + + /** + * Tries to make room for `buffer` in the host store by spilling. + * + * @param buffer buffer that will be copied to the host store if it fits + * @param stream CUDA stream to synchronize for memory operations + * @return true if the buffer fits after a potential spill + */ + protected def trySpillToMaximumSize( + buffer: RapidsBuffer, + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Boolean = { + true // default to success, HostMemoryStore overrides this + } + /** Base class for all buffers in this store. */ abstract class RapidsBufferBase( override val id: RapidsBufferId, @@ -411,7 +549,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) freeBuffer() } } else { - logWarning(s"Trying to free an invalid buffer => $id, size = ${getMemoryUsedBytes}, $this") + logWarning(s"Trying to free an invalid buffer => $id, size = ${memoryUsedBytes}, $this") } } @@ -453,7 +591,7 @@ abstract class RapidsBufferStore(val tier: StorageTier) releaseResources() } - override def toString: String = s"$name buffer size=${getMemoryUsedBytes}" + override def toString: String = s"$name buffer size=${memoryUsedBytes}" } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 9fe3793cffe..f5c60dedd5b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -1995,6 +1995,16 @@ object RapidsConf { "The chunked pack bounce buffer must be at least 1MB in size") .createWithDefault(128L * 1024 * 1024) + val SPILL_TO_DISK_BOUNCE_BUFFER_SIZE = + conf("spark.rapids.memory.host.spillToDiskBounceBufferSize") + .doc("Amount of host memory (in bytes) to set aside at startup for the " + + "bounce buffer used for gpu to disk spill that bypasses the host store.") + .internal() + .bytesConf(ByteUnit.BYTE) + .checkValue(v => v >= 1, + "The gpu to disk spill bounce buffer must have a positive size") + .createWithDefault(128L * 1024 * 1024) + val SPLIT_UNTIL_SIZE_OVERRIDE = conf("spark.rapids.sql.test.overrides.splitUntilSize") .doc("Only for tests: override the value of GpuDeviceManager.splitUntilSize") .internal() @@ -2679,6 +2689,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val chunkedPackBounceBufferSize: Long = get(CHUNKED_PACK_BOUNCE_BUFFER_SIZE) + lazy val spillToDiskBounceBufferSize: Long = get(SPILL_TO_DISK_BOUNCE_BUFFER_SIZE) + lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE) private val optimizerDefaults = Map( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala index 7b0f07bf876..bc09752bfdf 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStore.scala @@ -16,16 +16,18 @@ package com.nvidia.spark.rapids +import java.nio.channels.WritableByteChannel import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable import ai.rapids.cudf.{ColumnVector, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, Table} import com.nvidia.spark.rapids.Arm._ +import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableSeq import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta -import org.apache.spark.sql.rapids.TempSpillBufferId +import org.apache.spark.sql.rapids.storage.RapidsStorageUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -35,7 +37,9 @@ import org.apache.spark.sql.vectorized.ColumnarBatch * during spill in chunked_pack. The parameter defaults to 128MB, * with a rule-of-thumb of 1MB per SM. */ -class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024) +class RapidsDeviceMemoryStore( + chunkedPackBounceBufferSize: Long = 128L*1024*1024, + hostBounceBufferSize: Long = 128L*1024*1024) extends RapidsBufferStore(StorageTier.DEVICE) { // The RapidsDeviceMemoryStore handles spillability via ref counting @@ -45,9 +49,13 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 private var chunkedPackBounceBuffer: DeviceMemoryBuffer = DeviceMemoryBuffer.allocate(chunkedPackBounceBufferSize) + private var hostSpillBounceBuffer: HostMemoryBuffer = + HostMemoryBuffer.allocate(hostBounceBufferSize) + override protected def createBuffer( other: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase = { + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { val memoryBuffer = withResource(other.getCopyIterator) { copyIterator => copyIterator.next() } @@ -64,12 +72,12 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 case b => throw new IllegalStateException(s"Unrecognized buffer: $b") } } - new RapidsDeviceMemoryBuffer( + Some(new RapidsDeviceMemoryBuffer( other.id, deviceBuffer.getLength, other.meta, deviceBuffer, - other.getSpillPriority) + other.getSpillPriority)) } } @@ -127,7 +135,7 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 * @return the RapidsBuffer instance that was added. */ def addTable( - id: TempSpillBufferId, + id: RapidsBufferId, table: Table, initialSpillPriority: Long, needsSync: Boolean): RapidsBuffer = { @@ -209,13 +217,14 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 * @param spillPriority a starting spill priority */ class RapidsTable( - id: TempSpillBufferId, + id: RapidsBufferId, table: Table, spillPriority: Long) extends RapidsBufferBase( id, null, - spillPriority) { + spillPriority) + with RapidsBufferChannelWritable { /** The storage tier for this buffer */ override val storageTier: StorageTier = StorageTier.DEVICE @@ -256,7 +265,7 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 chunkedPacker.getMeta } - override def getMemoryUsedBytes: Long = unpackedSizeInBytes + override val memoryUsedBytes: Long = unpackedSizeInBytes override def getPackedSizeBytes: Long = getChunkedPacker.getTotalContiguousSize @@ -371,6 +380,32 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 } } } + + override def writeToChannel(outputChannel: WritableByteChannel, stream: Cuda.Stream): Long = { + var written: Long = 0L + withResource(getCopyIterator) { copyIter => + while(copyIter.hasNext) { + withResource(copyIter.next()) { slice => + val iter = + new MemoryBufferToHostByteBufferIterator( + slice, + hostSpillBounceBuffer, + stream) + iter.foreach { bb => + try { + while (bb.hasRemaining) { + written += outputChannel.write(bb) + } + } finally { + RapidsStorageUtils.dispose(bb) + } + } + } + } + written + } + } + } class RapidsDeviceMemoryBuffer( @@ -380,9 +415,10 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 contigBuffer: DeviceMemoryBuffer, spillPriority: Long) extends RapidsBufferBase(id, meta, spillPriority) - with MemoryBuffer.EventHandler { + with MemoryBuffer.EventHandler + with RapidsBufferChannelWritable { - override def getMemoryUsedBytes(): Long = size + override val memoryUsedBytes: Long = size override val storageTier: StorageTier = StorageTier.DEVICE @@ -456,10 +492,32 @@ class RapidsDeviceMemoryStore(chunkedPackBounceBufferSize: Long = 128L*1024*1024 } super.free() } + + override def writeToChannel(outputChannel: WritableByteChannel, stream: Cuda.Stream): Long = { + var written: Long = 0L + val iter = new MemoryBufferToHostByteBufferIterator( + contigBuffer, + hostSpillBounceBuffer, + stream) + iter.foreach { bb => + try { + while (bb.hasRemaining) { + written += outputChannel.write(bb) + } + } finally { + RapidsStorageUtils.dispose(bb) + } + } + written + } } override def close(): Unit = { - super.close() - chunkedPackBounceBuffer.close() - chunkedPackBounceBuffer = null + try { + super.close() + } finally { + Seq(chunkedPackBounceBuffer, hostSpillBounceBuffer).safeClose() + chunkedPackBounceBuffer = null + hostSpillBounceBuffer = null + } } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala index ffb8960ccb1..d61f6061116 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsDiskStore.scala @@ -37,7 +37,8 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) override protected def createBuffer( incoming: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase = { + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { // assuming that the disk store gets contiguous buffers val id = incoming.id val path = if (id.canShareDiskPaths) { @@ -49,14 +50,14 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) val (fileOffset, diskLength) = if (id.canShareDiskPaths) { // only one writer at a time for now when using shared files path.synchronized { - writeToFile(incoming, path, append = true) + writeToFile(incoming, path, append = true, stream) } } else { - writeToFile(incoming, path, append = false) + writeToFile(incoming, path, append = false, stream) } logDebug(s"Spilled to $path $fileOffset:$diskLength") - incoming match { + val buff = incoming match { case _: RapidsHostBatchBuffer => new RapidsDiskColumnarBatch( id, @@ -73,19 +74,26 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) incoming.meta, incoming.getSpillPriority) } + Some(buff) } /** Copy a host buffer to a file, returning the file offset at which the data was written. */ private def writeToFile( incoming: RapidsBuffer, path: File, - append: Boolean): (Long, Long) = { + append: Boolean, + stream: Cuda.Stream): (Long, Long) = { incoming match { case fileWritable: RapidsBufferChannelWritable => withResource(new FileOutputStream(path, append)) { fos => withResource(fos.getChannel) { outputChannel => val startOffset = outputChannel.position() - val writtenBytes = fileWritable.writeToChannel(outputChannel) + val writtenBytes = fileWritable.writeToChannel(outputChannel, stream) + if (writtenBytes == 0) { + throw new IllegalStateException( + s"Buffer ${fileWritable} wrote 0 bytes disk on spill. This is not supported!" + ) + } (startOffset, writtenBytes) } } @@ -109,16 +117,17 @@ class RapidsDiskStore(diskBlockManager: RapidsDiskBlockManager) id, meta, spillPriority) { private[this] var hostBuffer: Option[HostMemoryBuffer] = None - override def getMemoryUsedBytes(): Long = size + override val memoryUsedBytes: Long = size override val storageTier: StorageTier = StorageTier.DISK override def getMemoryBuffer: MemoryBuffer = synchronized { if (hostBuffer.isEmpty) { + require(size > 0, + s"$this attempted an invalid 0-byte mmap of a file") val path = id.getDiskPath(diskBlockManager) val mappedBuffer = HostMemoryBuffer.mapFile(path, MapMode.READ_WRITE, fileOffset, size) - logDebug(s"Created mmap buffer for $path $fileOffset:$size") hostBuffer = Some(mappedBuffer) } hostBuffer.foreach(_.incRefCount()) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala index 1030d6e4f75..529f3fece63 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala @@ -39,13 +39,14 @@ class RapidsGdsStore( override protected def createBuffer( other: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase = { + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { // assume that we get 1 buffer val otherBuffer = withResource(other.getCopyIterator) { it => it.next() } - withResource(otherBuffer) { _ => + val buff = withResource(otherBuffer) { _ => val deviceBuffer = otherBuffer match { case d: BaseDeviceMemoryBuffer => d case _ => throw new IllegalStateException("copying from buffer without device memory") @@ -56,6 +57,7 @@ class RapidsGdsStore( singleShotSpill(other, deviceBuffer) } } + Some(buff) } override def close(): Unit = { @@ -71,7 +73,7 @@ class RapidsGdsStore( extends RapidsBufferBase(id, meta, spillPriority) { override val storageTier: StorageTier = StorageTier.GDS - override def getMemoryUsedBytes(): Long = size + override val memoryUsedBytes: Long = size override def getMemoryBuffer: MemoryBuffer = getDeviceMemoryBuffer } @@ -231,7 +233,7 @@ class RapidsGdsStore( var isPending: Boolean = true) extends RapidsGdsBuffer(id, size, meta, spillPriority) { - override def getMemoryUsedBytes(): Long = size + override val memoryUsedBytes: Long = size override def materializeMemoryBuffer: MemoryBuffer = this.synchronized { closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer => diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 743711cee7a..dbdbb38f13c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -28,6 +28,7 @@ import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta +import org.apache.spark.sql.rapids.execution.TrampolineUtil import org.apache.spark.sql.rapids.storage.RapidsStorageUtils import org.apache.spark.sql.types.DataType import org.apache.spark.sql.vectorized.ColumnarBatch @@ -41,7 +42,7 @@ class RapidsHostMemoryStore( maxSize: Long) extends RapidsBufferStore(StorageTier.HOST) { - override def spillableOnAdd: Boolean = false + override protected def spillableOnAdd: Boolean = false override def getMaxSize: Option[Long] = Some(maxSize) @@ -97,42 +98,75 @@ class RapidsHostMemoryStore( } } + override protected def trySpillToMaximumSize( + buffer: RapidsBuffer, + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Boolean = { + // this spillStore has a maximum size requirement (host only). We need to spill from it + // in order to make room for `buffer`. + val targetTotalSize = maxSize - buffer.memoryUsedBytes + if (targetTotalSize <= 0) { + // lets not spill to host when the buffer we are about + // to spill is larger than our limit + false + } else { + val amountSpilled = synchronousSpill(targetTotalSize, catalog, stream) + if (amountSpilled != 0) { + logDebug(s"Spilled $amountSpilled bytes from ${name} to make room for ${buffer.id}") + TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled) + } + // if after spill we can fit the new buffer, return true + buffer.memoryUsedBytes <= (maxSize - currentSize) + } + } + override protected def createBuffer( other: RapidsBuffer, - stream: Cuda.Stream): RapidsBufferBase = { - withResource(other.getCopyIterator) { otherBufferIterator => - val isChunked = otherBufferIterator.isChunked - val totalCopySize = otherBufferIterator.getTotalCopySize - closeOnExcept(allocateHostBuffer(totalCopySize)) { hostBuffer => - withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => - var hostOffset = 0L - val start = System.nanoTime() - while (otherBufferIterator.hasNext) { - val otherBuffer = otherBufferIterator.next() - withResource(otherBuffer) { _ => - otherBuffer match { - case devBuffer: DeviceMemoryBuffer => - hostBuffer.copyFromMemoryBufferAsync( - hostOffset, devBuffer, 0, otherBuffer.getLength, stream) - hostOffset += otherBuffer.getLength - case _ => - throw new IllegalStateException("copying from buffer without device memory") + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { + val wouldFit = trySpillToMaximumSize(other, catalog, stream) + // TODO: this is disabled for now since subsequent work will tie this into + // our host allocator apis. + if (false && !wouldFit) { + // skip host + logWarning(s"Buffer ${other} with size ${other.memoryUsedBytes} does not fit " + + s"in the host store, skipping tier.") + None + } else { + withResource(other.getCopyIterator) { otherBufferIterator => + val isChunked = otherBufferIterator.isChunked + val totalCopySize = otherBufferIterator.getTotalCopySize + closeOnExcept(allocateHostBuffer(totalCopySize)) { hostBuffer => + withResource(new NvtxRange("spill to host", NvtxColor.BLUE)) { _ => + var hostOffset = 0L + val start = System.nanoTime() + while (otherBufferIterator.hasNext) { + val otherBuffer = otherBufferIterator.next() + withResource(otherBuffer) { _ => + otherBuffer match { + case devBuffer: DeviceMemoryBuffer => + hostBuffer.copyFromMemoryBufferAsync( + hostOffset, devBuffer, 0, otherBuffer.getLength, stream) + hostOffset += otherBuffer.getLength + case _ => + throw new IllegalStateException("copying from buffer without device memory") + } } } + stream.sync() + val end = System.nanoTime() + val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong + val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong + logDebug(s"Spill to host (chunked=$isChunked) " + + s"size=$szMB MiB bandwidth=$bw MiB/sec") } - stream.sync() - val end = System.nanoTime() - val szMB = (totalCopySize.toDouble / 1024.0 / 1024.0).toLong - val bw = (szMB.toDouble / ((end - start).toDouble / 1000000000.0)).toLong - logDebug(s"Spill to host (chunked=$isChunked) " + - s"size=$szMB MiB bandwidth=$bw MiB/sec") + Some(new RapidsHostMemoryBuffer( + other.id, + totalCopySize, + other.meta, + applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET), + hostBuffer)) } - new RapidsHostMemoryBuffer( - other.id, - totalCopySize, - other.meta, - applyPriorityOffset(other.getSpillPriority, HOST_MEMORY_BUFFER_SPILL_OFFSET), - hostBuffer) } } } @@ -158,7 +192,7 @@ class RapidsHostMemoryStore( } } - override def writeToChannel(outputChannel: WritableByteChannel): Long = { + override def writeToChannel(outputChannel: WritableByteChannel, ignored: Cuda.Stream): Long = { var written: Long = 0L val iter = new HostByteBufferIterator(buffer) iter.foreach { bb => @@ -184,7 +218,7 @@ class RapidsHostMemoryStore( } /** The size of this buffer in bytes. */ - override def getMemoryUsedBytes: Long = size + override val memoryUsedBytes: Long = size // If this require triggers, we are re-adding a `HostMemoryBuffer` outside of // the catalog lock, which should not possible. The event handler is set to null @@ -304,10 +338,6 @@ class RapidsHostMemoryStore( override val storageTier: StorageTier = StorageTier.HOST - // This is the current size in batch form. It is to be used while this - // batch hasn't migrated to another store. - private val hostSizeInByes: Long = RapidsHostColumnVector.getTotalHostMemoryUsed(hostCb) - // By default all columns are NOT spillable since we are not the only owners of // the columns (the caller is holding onto a ColumnarBatch that will be closed // after instantiation, triggering onClosed callbacks) @@ -329,7 +359,9 @@ class RapidsHostMemoryStore( null } - override def getMemoryUsedBytes: Long = hostSizeInByes + // This is the current size in batch form. It is to be used while this + // batch hasn't migrated to another store. + override val memoryUsedBytes: Long = RapidsHostColumnVector.getTotalHostMemoryUsed(hostCb) /** * Mark a column as spillable @@ -376,7 +408,7 @@ class RapidsHostMemoryStore( "RapidsHostColumnarBatch does not support getCopyIterator") } - override def writeToChannel(outputChannel: WritableByteChannel): Long = { + override def writeToChannel(outputChannel: WritableByteChannel, ignored: Cuda.Stream): Long = { withResource(Channels.newOutputStream(outputChannel)) { outputStream => withResource(new DataOutputStream(outputStream)) { dos => val columns = RapidsHostColumnVector.extractBases(hostCb) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index beb5db35cbd..82efa7699ef 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -97,7 +97,7 @@ class SpillableColumnarBatchImpl ( } override lazy val sizeInBytes: Long = - withRapidsBuffer(_.getMemoryUsedBytes) + withRapidsBuffer(_.memoryUsedBytes) /** * Set a new spill priority. @@ -164,7 +164,7 @@ class SpillableHostColumnarBatchImpl ( } override lazy val sizeInBytes: Long = { - withRapidsHostBatchBuffer(_.getMemoryUsedBytes) + withRapidsHostBatchBuffer(_.memoryUsedBytes) } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala index 43399cb4825..e9d9d7f2d65 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIterator.scala @@ -361,7 +361,7 @@ class RapidsShuffleIterator( try { sb = catalog.acquireBuffer(handle) cb = sb.getColumnarBatch(sparkTypes) - metricsUpdater.update(blockedTime, 1, sb.getMemoryUsedBytes, cb.numRows()) + metricsUpdater.update(blockedTime, 1, sb.memoryUsedBytes, cb.numRows()) } finally { nvtxRangeAfterGettingBatch.close() range.close() diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala index 54827e12878..3666b85458e 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsBufferCatalogSuite.scala @@ -243,7 +243,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar { assertResult(unspilled)(unspilledSame) } // verify that we invoked the copy function exactly once - verify(deviceStore, times(1)).copyBuffer(any(), any()) + verify(deviceStore, times(1)).copyBuffer(any(), any(), any()) unspilled } val unspilledSame = catalog.unspillBufferToDeviceStore( @@ -253,7 +253,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar { assertResult(unspilled)(unspilledSame) } // verify that we invoked the copy function exactly once - verify(deviceStore, times(1)).copyBuffer(any(), any()) + verify(deviceStore, times(1)).copyBuffer(any(), any(), any()) } } } @@ -330,7 +330,7 @@ class RapidsBufferCatalogSuite extends AnyFunSuite with MockitoSugar { var _acquireAttempts: Int = acquireAttempts var currentPriority: Long = initialPriority override val id: RapidsBufferId = bufferId - override def getMemoryUsedBytes: Long = 0 + override val memoryUsedBytes: Long = 0 override def meta: TableMeta = tableMeta override val storageTier: StorageTier = tier override def getColumnarBatch(sparkTypes: Array[DataType]): ColumnarBatch = null diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala index 1e4bde7a65d..45d96be4cb6 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDeviceMemoryStoreSuite.scala @@ -466,9 +466,11 @@ class RapidsDeviceMemoryStoreSuite extends AnyFunSuite with MockitoSugar { override protected def createBuffer( b: RapidsBuffer, - s: Cuda.Stream): RapidsBufferBase = { + c: RapidsBufferCatalog, + s: Cuda.Stream): Option[RapidsBufferBase] = { spilledBuffers += b.id - new MockRapidsBuffer(b.id, b.getPackedSizeBytes, b.meta, b.getSpillPriority) + Some(new MockRapidsBuffer( + b.id, b.getPackedSizeBytes, b.meta, b.getSpillPriority)) } class MockRapidsBuffer(id: RapidsBufferId, size: Long, meta: TableMeta, spillPriority: Long) @@ -481,7 +483,7 @@ class RapidsDeviceMemoryStoreSuite extends AnyFunSuite with MockitoSugar { throw new UnsupportedOperationException /** The size of this buffer in bytes. */ - override def getMemoryUsedBytes: Long = size + override val memoryUsedBytes: Long = size } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index e8878e2cc6c..6adcbcc1909 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -19,7 +19,7 @@ package com.nvidia.spark.rapids import java.io.File import java.math.RoundingMode -import ai.rapids.cudf.{ContiguousTable, DeviceMemoryBuffer, HostMemoryBuffer, Table} +import ai.rapids.cudf.{ColumnVector, ContiguousTable, Cuda, DeviceMemoryBuffer, HostMemoryBuffer, Table} import com.nvidia.spark.rapids.Arm.{closeOnExcept, withResource} import org.mockito.ArgumentMatchers import org.mockito.Mockito.{spy, times, verify} @@ -31,16 +31,31 @@ import org.apache.spark.sql.types.{DataType, DecimalType, DoubleType, IntegerTyp class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { private def buildContiguousTable(): ContiguousTable = { - withResource(new Table.TestBuilder() + withResource(buildTable()) { table => + table.contiguousSplit()(0) + } + } + + private def buildTable(): Table = { + new Table.TestBuilder() .column(5, null.asInstanceOf[java.lang.Integer], 3, 1) .column("five", "two", null, null) .column(5.0, 2.0, 3.0, 1.0) .decimal64Column(-5, RoundingMode.UNNECESSARY, 0, null, -1.4, 10.123) - .build()) { table => - table.contiguousSplit()(0) + .build() + } + + private def buildEmptyTable(): Table = { + withResource(buildTable()) { tbl => + withResource(ColumnVector.fromBooleans(false, false, false, false)) { mask => + tbl.filter(mask) // filter all out + } } } + private val mockTableDataTypes: Array[DataType] = + Array(IntegerType, StringType, DoubleType, DecimalType(10, 5)) + test("spill updates catalog") { val bufferId = MockRapidsBufferId(7, canShareDiskPaths = false) val spillPriority = -7 @@ -54,7 +69,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { assertResult(0)(diskStore.currentSize) hostStore.setSpillStore(diskStore) val (bufferSize, handle) = - addTableToCatalog(catalog, bufferId, spillPriority) + addContiguousTableToCatalog(catalog, bufferId, spillPriority) val path = handle.id.getDiskPath(null) assert(!path.exists()) catalog.synchronousSpill(devStore, 0) @@ -68,7 +83,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.DISK)(buffer.storageTier) - assertResult(bufferSize)(buffer.getMemoryUsedBytes) + assertResult(bufferSize)(buffer.memoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } @@ -93,7 +108,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => hostStore.setSpillStore(diskStore) - val (_, handle) = addTableToCatalog(catalog, bufferId, spillPriority) + val (_, handle) = addContiguousTableToCatalog(catalog, bufferId, spillPriority) assert(!handle.id.getDiskPath(null).exists()) val expectedTable = withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.DEVICE)(buffer.storageTier) @@ -134,7 +149,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { devStore.setSpillStore(hostStore) withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => hostStore.setSpillStore(diskStore) - val (_, handle) = addTableToCatalog(catalog, bufferId, spillPriority) + val (_, handle) = addContiguousTableToCatalog(catalog, bufferId, spillPriority) assert(!handle.id.getDiskPath(null).exists()) val expectedBuffer = withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.DEVICE)(buffer.storageTier) @@ -162,6 +177,161 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { } } + test("skip host: spill device memory buffer to disk") { + val bufferId = MockRapidsBufferId(1, canShareDiskPaths = false) + val bufferPath = bufferId.getDiskPath(null) + assert(!bufferPath.exists) + val spillPriority = -7 + withResource(new RapidsDeviceMemoryStore) { devStore => + val catalog = new RapidsBufferCatalog(devStore) + withResource(new AlwaysFailingRapidsHostMemoryStore) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => + hostStore.setSpillStore(diskStore) + val (_, handle) = addContiguousTableToCatalog(catalog, bufferId, spillPriority) + assert(!handle.id.getDiskPath(null).exists()) + val expectedBuffer = withResource(catalog.acquireBuffer(handle)) { buffer => + assertResult(StorageTier.DEVICE)(buffer.storageTier) + withResource(buffer.getMemoryBuffer) { devbuf => + closeOnExcept(HostMemoryBuffer.allocate(devbuf.getLength)) { hostbuf => + hostbuf.copyFromDeviceBuffer(devbuf.asInstanceOf[DeviceMemoryBuffer]) + hostbuf + } + } + } + withResource(expectedBuffer) { expectedBuffer => + catalog.synchronousSpill(devStore, 0) + withResource(catalog.acquireBuffer(handle)) { buffer => + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getMemoryBuffer) { actualBuffer => + assert(actualBuffer.isInstanceOf[HostMemoryBuffer]) + val actualHostBuffer = actualBuffer.asInstanceOf[HostMemoryBuffer] + assertResult(expectedBuffer.asByteBuffer)(actualHostBuffer.asByteBuffer) + } + } + } + } + } + } + } + + test("skip host: spill table to disk") { + val bufferId = MockRapidsBufferId(1, canShareDiskPaths = false) + val bufferPath = bufferId.getDiskPath(null) + assert(!bufferPath.exists) + val spillPriority = -7 + withResource(new RapidsDeviceMemoryStore) { devStore => + val catalog = new RapidsBufferCatalog(devStore) + withResource(new AlwaysFailingRapidsHostMemoryStore) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => + hostStore.setSpillStore(diskStore) + val handle = addTableToCatalog(catalog, bufferId, spillPriority) + withResource(buildTable()) { expectedTable => + withResource( + GpuColumnVector.from(expectedTable, mockTableDataTypes)) { expectedBatch => + catalog.synchronousSpill(devStore, 0) + withResource(catalog.acquireBuffer(handle)) { buffer => + assert(handle.id.getDiskPath(null).exists()) + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getColumnarBatch(mockTableDataTypes)) { fromDiskBatch => + TestUtils.compareBatches(expectedBatch, fromDiskBatch) + } + } + } + } + } + } + } + } + + test("skip host: spill table to disk with small host bounce buffer") { + val bufferId = MockRapidsBufferId(1, canShareDiskPaths = false) + val bufferPath = bufferId.getDiskPath(null) + assert(!bufferPath.exists) + val spillPriority = -7 + withResource(new RapidsDeviceMemoryStore(1L*1024*1024, 10)) { devStore => + val catalog = new RapidsBufferCatalog(devStore) + withResource(new AlwaysFailingRapidsHostMemoryStore) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => + hostStore.setSpillStore(diskStore) + val handle = addTableToCatalog(catalog, bufferId, spillPriority) + withResource(buildTable()) { expectedTable => + withResource( + GpuColumnVector.from(expectedTable, mockTableDataTypes)) { expectedBatch => + catalog.synchronousSpill(devStore, 0) + withResource(catalog.acquireBuffer(handle)) { buffer => + assert(handle.id.getDiskPath(null).exists()) + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getColumnarBatch(mockTableDataTypes)) { fromDiskBatch => + TestUtils.compareBatches(expectedBatch, fromDiskBatch) + } + } + } + } + } + } + } + } + + + test("0-byte table is never spillable as we would fail to mmap") { + val bufferId = MockRapidsBufferId(1, canShareDiskPaths = false) + val bufferPath = bufferId.getDiskPath(null) + val bufferId2 = MockRapidsBufferId(2, canShareDiskPaths = false) + assert(!bufferPath.exists) + val spillPriority = -7 + val hostStoreMaxSize = 1L * 1024 * 1024 + withResource(new RapidsDeviceMemoryStore) { devStore => + val catalog = new RapidsBufferCatalog(devStore) + withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) { hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => + hostStore.setSpillStore(diskStore) + val handle = addZeroRowsTableToCatalog(catalog, bufferId, spillPriority - 1) + val handle2 = addTableToCatalog(catalog, bufferId2, spillPriority) + withResource(handle2) { _ => + assert(!handle.id.getDiskPath(null).exists()) + withResource(buildTable()) { expectedTable => + withResource(buildEmptyTable()) { expectedEmptyTable => + withResource( + GpuColumnVector.from( + expectedTable, mockTableDataTypes)) { expectedCb => + withResource( + GpuColumnVector.from( + expectedEmptyTable, mockTableDataTypes)) { expectedEmptyCb => + catalog.synchronousSpill(devStore, 0) + catalog.synchronousSpill(hostStore, 0) + withResource(catalog.acquireBuffer(handle2)) { buffer => + withResource(catalog.acquireBuffer(handle)) { emptyBuffer => + // the 0-byte table never moved from device. It is not spillable + assertResult(StorageTier.DEVICE)(emptyBuffer.storageTier) + withResource(emptyBuffer.getColumnarBatch(mockTableDataTypes)) { cb => + TestUtils.compareBatches(expectedEmptyCb, cb) + } + // the second table (with rows) did spill + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getColumnarBatch(mockTableDataTypes)) { cb => + TestUtils.compareBatches(expectedCb, cb) + } + } + } + assertResult(0)(devStore.currentSize) + assertResult(0)(hostStore.currentSize) + } + } + } + } + } + } + } + } + } + test("exclusive spill files are deleted when buffer deleted") { testBufferFileDeletion(canShareDiskPaths = false) } @@ -170,6 +340,15 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { testBufferFileDeletion(canShareDiskPaths = true) } + class AlwaysFailingRapidsHostMemoryStore extends RapidsHostMemoryStore(0L){ + override def createBuffer( + other: RapidsBuffer, + catalog: RapidsBufferCatalog, + stream: Cuda.Stream): Option[RapidsBufferBase] = { + None + } + } + private def testBufferFileDeletion(canShareDiskPaths: Boolean): Unit = { val bufferId = MockRapidsBufferId(1, canShareDiskPaths) val bufferPath = bufferId.getDiskPath(null) @@ -183,7 +362,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { devStore.setSpillStore(hostStore) withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager])) { diskStore => hostStore.setSpillStore(diskStore) - val (_, handle) = addTableToCatalog(catalog, bufferId, spillPriority) + val (_, handle) = addContiguousTableToCatalog(catalog, bufferId, spillPriority) val bufferPath = handle.id.getDiskPath(null) assert(!bufferPath.exists()) catalog.synchronousSpill(devStore, 0) @@ -200,7 +379,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { } } - private def addTableToCatalog( + private def addContiguousTableToCatalog( catalog: RapidsBufferCatalog, bufferId: RapidsBufferId, spillPriority: Long): (Long, RapidsBufferHandle) = { @@ -216,6 +395,31 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with MockitoSugar { } } + private def addTableToCatalog( + catalog: RapidsBufferCatalog, + bufferId: RapidsBufferId, + spillPriority: Long): RapidsBufferHandle = { + // store takes ownership of the table + catalog.addTable( + bufferId, + buildTable(), + spillPriority, + false) + } + + private def addZeroRowsTableToCatalog( + catalog: RapidsBufferCatalog, + bufferId: RapidsBufferId, + spillPriority: Long): RapidsBufferHandle = { + val table = buildEmptyTable() + // store takes ownership of the table + catalog.addTable( + bufferId, + table, + spillPriority, + false) + } + case class MockRapidsBufferId( tableId: Int, override val canShareDiskPaths: Boolean) extends RapidsBufferId { diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala index 275d1d99fb2..94fa440fd72 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsGdsStoreSuite.scala @@ -92,7 +92,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) assertResult(id)(buffer.id) - assertResult(size)(buffer.getMemoryUsedBytes) + assertResult(size)(buffer.memoryUsedBytes) assertResult(spillPriority)(buffer.getSpillPriority) } } @@ -126,7 +126,7 @@ class RapidsGdsStoreSuite extends FunSuiteWithTempDir with MockitoSugar { ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.GDS)(buffer.storageTier) - assertResult(bufferSize)(buffer.getMemoryUsedBytes) + assertResult(bufferSize)(buffer.memoryUsedBytes) assertResult(bufferId)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 1354cc32bba..2d028f0cf7b 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -116,7 +116,7 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { ArgumentMatchers.eq(handle.id), ArgumentMatchers.eq(StorageTier.DEVICE)) withResource(catalog.acquireBuffer(handle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) - assertResult(bufferSize)(buffer.getMemoryUsedBytes) + assertResult(bufferSize)(buffer.memoryUsedBytes) assertResult(handle.id)(buffer.id) assertResult(spillPriority)(buffer.getSpillPriority) } @@ -556,7 +556,7 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { override def getDiskPath(diskBlockManager: RapidsDiskBlockManager): File = null }) when(mockStore.getMaxSize).thenAnswer(_ => None) - when(mockStore.copyBuffer(any(), any())).thenReturn(mockBuff) + when(mockStore.copyBuffer(any(), any(), any())).thenReturn(Some(mockBuff)) when(mockStore.tier) thenReturn (StorageTier.DISK) withResource(new RapidsHostMemoryStore(hostStoreMaxSize)) { hostStore => devStore.setSpillStore(hostStore) @@ -580,7 +580,9 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { } // close the bigTable so it can be spilled bigTable = null catalog.synchronousSpill(devStore, 0) - verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer], + verify(mockStore, never()).copyBuffer( + ArgumentMatchers.any[RapidsBuffer], + ArgumentMatchers.any[RapidsBufferCatalog], ArgumentMatchers.any[Cuda.Stream]) withResource(catalog.acquireBuffer(bigHandle)) { buffer => assertResult(StorageTier.HOST)(buffer.storageTier) @@ -598,7 +600,9 @@ class RapidsHostMemoryStoreSuite extends AnyFunSuite with MockitoSugar { catalog.synchronousSpill(devStore, 0) val rapidsBufferCaptor: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer]) - verify(mockStore).copyBuffer(rapidsBufferCaptor.capture(), + verify(mockStore).copyBuffer( + rapidsBufferCaptor.capture(), + ArgumentMatchers.any[RapidsBufferCatalog], ArgumentMatchers.any[Cuda.Stream]) assertResult(bigHandle.id)(rapidsBufferCaptor.getValue.id) } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala index afc17081afb..4e0325f9048 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/shuffle/RapidsShuffleIteratorSuite.scala @@ -227,7 +227,7 @@ class RapidsShuffleIteratorSuite extends RapidsShuffleTestHelper { assert(cl.hasNext) assertResult(cb)(cl.next()) assertResult(1)(testMetricsUpdater.totalRemoteBlocksFetched) - assertResult(mockBuffer.getMemoryUsedBytes)(testMetricsUpdater.totalRemoteBytesRead) + assertResult(mockBuffer.memoryUsedBytes)(testMetricsUpdater.totalRemoteBytesRead) assertResult(10)(testMetricsUpdater.totalRowsFetched) } finally { RmmSpark.taskDone(taskId) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index a5209e9bd0e..c4a531a8d7d 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -47,7 +47,7 @@ class SpillableColumnarBatchSuite extends AnyFunSuite { } class MockBuffer(override val id: RapidsBufferId) extends RapidsBuffer { - override def getMemoryUsedBytes: Long = 123 + override val memoryUsedBytes: Long = 123 override def meta: TableMeta = null override val storageTier: StorageTier = StorageTier.DEVICE override def getMemoryBuffer: MemoryBuffer = null