From 004cc390f1777313ab7c3d01abf89b787523e5cc Mon Sep 17 00:00:00 2001 From: Rong Ou Date: Fri, 14 Jan 2022 10:35:25 -0800 Subject: [PATCH] Make better use of pinned memory pool (#4497) * Make better use of pinned memory pool Signed-off-by: Rong Ou * review feedback Signed-off-by: Rong Ou * fix match statement and test Signed-off-by: Rong Ou * fix disk store suite Signed-off-by: Rong Ou * add a separate config for pageable pool size Signed-off-by: Rong Ou * clamp offset spill priorities * make priority offset method more general --- docs/configs.md | 3 +- .../spark/rapids/RapidsBufferCatalog.scala | 9 +- .../com/nvidia/spark/rapids/RapidsConf.scala | 15 +- .../spark/rapids/RapidsHostMemoryStore.scala | 50 +++--- .../nvidia/spark/rapids/SpillPriorities.scala | 39 ++++- .../spark/rapids/RapidsDiskStoreSuite.scala | 160 ++++++++--------- .../rapids/RapidsHostMemoryStoreSuite.scala | 164 +++++++++--------- 7 files changed, 256 insertions(+), 184 deletions(-) diff --git a/docs/configs.md b/docs/configs.md index ed8c17c1e1c..68b067a38f6 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -43,7 +43,8 @@ Name | Description | Default Value spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels and kernel launches.|671088640 spark.rapids.memory.gpu.unspill.enabled|When a spilled GPU buffer is needed again, should it be unspilled, or only copied back into GPU memory temporarily. Unspilling may be useful for GPU buffers that are needed frequently, for example, broadcast variables; however, it may also increase GPU memory usage|false -spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824 +spark.rapids.memory.host.pageablePool.size|The size of the pageable memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|1073741824 +spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools.|-1 spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0 spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0 spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers. Half of the rest will be used if not specified|None diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 8c5f459fc55..222207c1151 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -184,7 +184,12 @@ object RapidsBufferCatalog extends Logging with Arm { gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize) deviceStorage.setSpillStore(gdsStorage) } else { - hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize) + val hostSpillStorageSize = if (rapidsConf.hostSpillStorageSize == -1) { + rapidsConf.pinnedPoolSize + rapidsConf.pageablePoolSize + } else { + rapidsConf.hostSpillStorageSize + } + hostStorage = new RapidsHostMemoryStore(hostSpillStorageSize, rapidsConf.pageablePoolSize) diskStorage = new RapidsDiskStore(diskBlockManager) deviceStorage.setSpillStore(hostStorage) hostStorage.setSpillStore(diskStorage) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index fa374c7afaa..bd5856dc634 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -304,6 +304,12 @@ object RapidsConf { .bytesConf(ByteUnit.BYTE) .createWithDefault(0) + val PAGEABLE_POOL_SIZE = conf("spark.rapids.memory.host.pageablePool.size") + .doc("The size of the pageable memory pool in bytes unless otherwise specified. " + + "Use 0 to disable the pool.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(ByteUnit.GiB.toBytes(1)) + val RMM_DEBUG = conf("spark.rapids.memory.gpu.debug") .doc("Provides a log of GPU memory allocations and frees. If set to " + "STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. " + @@ -355,10 +361,11 @@ object RapidsConf { .createWithDefault(ByteUnit.MiB.toBytes(640)) val HOST_SPILL_STORAGE_SIZE = conf("spark.rapids.memory.host.spillStorageSize") - .doc("Amount of off-heap host memory to use for buffering spilled GPU data " + - "before spilling to local disk") + .doc("Amount of off-heap host memory to use for buffering spilled GPU data before spilling " + + "to local disk. Use -1 to set the amount to the combined size of pinned and pageable " + + "memory pools.") .bytesConf(ByteUnit.BYTE) - .createWithDefault(ByteUnit.GiB.toBytes(1)) + .createWithDefault(-1) val UNSPILL = conf("spark.rapids.memory.gpu.unspill.enabled") .doc("When a spilled GPU buffer is needed again, should it be unspilled, or only copied " + @@ -1458,6 +1465,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE) + lazy val pageablePoolSize: Long = get(PAGEABLE_POOL_SIZE) + lazy val concurrentGpuTasks: Int = get(CONCURRENT_GPU_TASKS) lazy val isTestEnabled: Boolean = get(TEST_CONF) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala index 8a9537e5dbf..15483fed249 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsHostMemoryStore.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package com.nvidia.spark.rapids import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool} +import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_DIRECT_OFFSET, HOST_MEMORY_BUFFER_PAGEABLE_OFFSET, HOST_MEMORY_BUFFER_PINNED_OFFSET} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta @@ -24,20 +25,27 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil /** * A buffer store using host memory. - * @param catalog buffer catalog to use with this store * @param maxSize maximum size in bytes for all buffers in this store + * @param pageableMemoryPoolSize maximum size in bytes for the internal pageable memory pool + * @param catalog buffer catalog to use with this store */ class RapidsHostMemoryStore( maxSize: Long, + pageableMemoryPoolSize: Long, catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton, deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage) extends RapidsBufferStore(StorageTier.HOST, catalog) { - private[this] val pool = HostMemoryBuffer.allocate(maxSize, false) - private[this] val addressAllocator = new AddressSpaceAllocator(maxSize) + private[this] val pool = HostMemoryBuffer.allocate(pageableMemoryPoolSize, false) + private[this] val addressAllocator = new AddressSpaceAllocator(pageableMemoryPoolSize) private[this] var haveLoggedMaxExceeded = false - // Returns an allocated host buffer and whether the allocation is from the internal pool - private def allocateHostBuffer(size: Long): (HostMemoryBuffer, Boolean) = { + private sealed abstract class AllocationMode(val spillPriorityOffset: Long) + private case object Pinned extends AllocationMode(HOST_MEMORY_BUFFER_PINNED_OFFSET) + private case object Pooled extends AllocationMode(HOST_MEMORY_BUFFER_PAGEABLE_OFFSET) + private case object Direct extends AllocationMode(HOST_MEMORY_BUFFER_DIRECT_OFFSET) + + // Returns an allocated host buffer and its allocation mode + private def allocateHostBuffer(size: Long): (HostMemoryBuffer, AllocationMode) = { // spill to keep within the targeted size val amountSpilled = synchronousSpill(math.max(maxSize - size, 0)) if (amountSpilled != 0) { @@ -49,16 +57,16 @@ class RapidsHostMemoryStore( while (buffer == null) { buffer = PinnedMemoryPool.tryAllocate(size) if (buffer != null) { - return (buffer, false) + return (buffer, Pinned) } - if (size > maxSize) { + if (size > pageableMemoryPoolSize) { if (!haveLoggedMaxExceeded) { - logWarning(s"Exceeding host spill max of $maxSize bytes to accommodate a buffer of " + - s"$size bytes. Consider increasing host spill store size.") + logWarning(s"Exceeding host spill max of $pageableMemoryPoolSize bytes to accommodate " + + s"a buffer of $size bytes. Consider increasing pageable memory store size.") haveLoggedMaxExceeded = true } - return (HostMemoryBuffer.allocate(size), false) + return (HostMemoryBuffer.allocate(size, false), Direct) } val allocation = addressAllocator.allocate(size) @@ -69,13 +77,13 @@ class RapidsHostMemoryStore( synchronousSpill(targetSize) } } - (buffer, true) + (buffer, Pooled) } override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer, stream: Cuda.Stream): RapidsBufferBase = { withResource(otherBuffer) { _ => - val (hostBuffer, isPinned) = allocateHostBuffer(other.size) + val (hostBuffer, allocationMode) = allocateHostBuffer(other.size) try { otherBuffer match { case devBuffer: DeviceMemoryBuffer => hostBuffer.copyFromDeviceBuffer(devBuffer, stream) @@ -90,9 +98,9 @@ class RapidsHostMemoryStore( other.id, other.size, other.meta, - other.getSpillPriority, + applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset), hostBuffer, - isPinned, + allocationMode, other.spillCallback, deviceStorage) } @@ -111,7 +119,7 @@ class RapidsHostMemoryStore( meta: TableMeta, spillPriority: Long, buffer: HostMemoryBuffer, - isInternalPoolAllocated: Boolean, + allocationMode: AllocationMode, spillCallback: SpillCallback, deviceStorage: RapidsDeviceMemoryStore) extends RapidsBufferBase( @@ -124,10 +132,12 @@ class RapidsHostMemoryStore( } override protected def releaseResources(): Unit = { - if (isInternalPoolAllocated) { - assert(buffer.getAddress >= pool.getAddress) - assert(buffer.getAddress < pool.getAddress + pool.getLength) - addressAllocator.free(buffer.getAddress - pool.getAddress) + allocationMode match { + case Pooled => + assert(buffer.getAddress >= pool.getAddress) + assert(buffer.getAddress < pool.getAddress + pool.getLength) + addressAllocator.free(buffer.getAddress - pool.getAddress) + case _ => } buffer.close() } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala index c0b6142d289..4d59add6dfb 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillPriorities.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -58,4 +58,41 @@ object SpillPriorities { * Priority for multiple buffers being buffered within a call to next. */ val ACTIVE_BATCHING_PRIORITY: Long = ACTIVE_ON_DECK_PRIORITY + 100 + + /** + * Priority offset for host memory buffers allocated from the pinned memory pool. They are at + * lower priorities, so will be spilled first, making more pinned memory available. + */ + val HOST_MEMORY_BUFFER_PINNED_OFFSET: Long = -100 + + /** + * Priority offset for host memory buffers allocated from the internal pageable memory pool. They + * are at higher priorities than pinned memory buffers, thus making more pinned memory available; + * but at lower priorities than directly allocated buffers, thus freeing up the internal pageable + * memory pool. + */ + val HOST_MEMORY_BUFFER_PAGEABLE_OFFSET: Long = 0 + + /** + * Priority offset for host memory buffers directly allocated from the OS. They are at higher + * priorities, thus freeing up memory pools first. + */ + val HOST_MEMORY_BUFFER_DIRECT_OFFSET: Long = 100 + + /** + * Calculate a new priority based on an offset, clamping it to avoid wraparound. + * + * @param originalPriority the original priority + * @param offset the desired offset + * @return the resulting priority, with clamping if needed + */ + def applyPriorityOffset(originalPriority: Long, offset: Long): Long = { + if (offset < 0 && originalPriority < Long.MinValue - offset) { + Long.MinValue + } else if (offset > 0 && originalPriority > Long.MaxValue - offset) { + Long.MaxValue + } else { + originalPriority + offset + } + } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala index 0d34b8a1925..9a593155101 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsDiskStoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -46,29 +46,30 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = spy(new RapidsBufferCatalog) withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => - assertResult(0)(diskStore.currentSize) - hostStore.setSpillStore(diskStore) - val bufferSize = addTableToStore(devStore, bufferId, spillPriority) - devStore.synchronousSpill(0) - hostStore.synchronousSpill(0) - assertResult(0)(hostStore.currentSize) - assertResult(bufferSize)(diskStore.currentSize) - val path = bufferId.getDiskPath(null) - assert(path.exists) - assertResult(bufferSize)(path.length) - verify(catalog, times(3)).registerNewBuffer(ArgumentMatchers.any[RapidsBuffer]) - verify(catalog).removeBufferTier( - ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.DISK)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) - assertResult(bufferId)(buffer.id) - assertResult(spillPriority)(buffer.getSpillPriority) + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => + assertResult(0)(diskStore.currentSize) + hostStore.setSpillStore(diskStore) + val bufferSize = addTableToStore(devStore, bufferId, spillPriority) + devStore.synchronousSpill(0) + hostStore.synchronousSpill(0) + assertResult(0)(hostStore.currentSize) + assertResult(bufferSize)(diskStore.currentSize) + val path = bufferId.getDiskPath(null) + assert(path.exists) + assertResult(bufferSize)(path.length) + verify(catalog, times(3)).registerNewBuffer(ArgumentMatchers.any[RapidsBuffer]) + verify(catalog).removeBufferTier( + ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.DISK)(buffer.storageTier) + assertResult(bufferSize)(buffer.size) + assertResult(bufferId)(buffer.id) + assertResult(spillPriority)(buffer.getSpillPriority) + } } - } } } } @@ -83,27 +84,28 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog, devStore)) { - diskStore => - hostStore.setSpillStore(diskStore) - addTableToStore(devStore, bufferId, spillPriority) - val expectedBatch = withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.DEVICE)(buffer.storageTier) - buffer.getColumnarBatch(sparkTypes) - } - withResource(expectedBatch) { expectedBatch => - devStore.synchronousSpill(0) - hostStore.synchronousSpill(0) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.DISK)(buffer.storageTier) - withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => - TestUtils.compareBatches(expectedBatch, actualBatch) + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog, devStore)) { + diskStore => + hostStore.setSpillStore(diskStore) + addTableToStore(devStore, bufferId, spillPriority) + val expectedBatch = withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.DEVICE)(buffer.storageTier) + buffer.getColumnarBatch(sparkTypes) + } + withResource(expectedBatch) { expectedBatch => + devStore.synchronousSpill(0) + hostStore.synchronousSpill(0) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => + TestUtils.compareBatches(expectedBatch, actualBatch) + } } } - } - } + } } } } @@ -116,33 +118,34 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => - hostStore.setSpillStore(diskStore) - addTableToStore(devStore, bufferId, spillPriority) - val expectedBuffer = withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.DEVICE)(buffer.storageTier) - withResource(buffer.getMemoryBuffer) { devbuf => - closeOnExcept(HostMemoryBuffer.allocate(devbuf.getLength)) { hostbuf => - hostbuf.copyFromDeviceBuffer(devbuf.asInstanceOf[DeviceMemoryBuffer]) - hostbuf + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => + hostStore.setSpillStore(diskStore) + addTableToStore(devStore, bufferId, spillPriority) + val expectedBuffer = withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.DEVICE)(buffer.storageTier) + withResource(buffer.getMemoryBuffer) { devbuf => + closeOnExcept(HostMemoryBuffer.allocate(devbuf.getLength)) { hostbuf => + hostbuf.copyFromDeviceBuffer(devbuf.asInstanceOf[DeviceMemoryBuffer]) + hostbuf + } } } - } - withResource(expectedBuffer) { expectedBuffer => - devStore.synchronousSpill(0) - hostStore.synchronousSpill(0) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.DISK)(buffer.storageTier) - withResource(buffer.getMemoryBuffer) { actualBuffer => - assert(actualBuffer.isInstanceOf[HostMemoryBuffer]) - val actualHostBuffer = actualBuffer.asInstanceOf[HostMemoryBuffer] - assertResult(expectedBuffer.asByteBuffer)(actualHostBuffer.asByteBuffer) + withResource(expectedBuffer) { expectedBuffer => + devStore.synchronousSpill(0) + hostStore.synchronousSpill(0) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.DISK)(buffer.storageTier) + withResource(buffer.getMemoryBuffer) { actualBuffer => + assert(actualBuffer.isInstanceOf[HostMemoryBuffer]) + val actualHostBuffer = actualBuffer.asInstanceOf[HostMemoryBuffer] + assertResult(expectedBuffer.asByteBuffer)(actualHostBuffer.asByteBuffer) + } } } } - } } } } @@ -163,21 +166,22 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => - hostStore.setSpillStore(diskStore) - addTableToStore(devStore, bufferId, spillPriority) - devStore.synchronousSpill(0) - hostStore.synchronousSpill(0) - assert(bufferPath.exists) - catalog.removeBuffer(bufferId) - if (canShareDiskPaths) { - assert(bufferPath.exists()) - } else { - assert(!bufferPath.exists) + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore => + hostStore.setSpillStore(diskStore) + addTableToStore(devStore, bufferId, spillPriority) + devStore.synchronousSpill(0) + hostStore.synchronousSpill(0) + assert(bufferPath.exists) + catalog.removeBuffer(bufferId) + if (canShareDiskPaths) { + assert(bufferPath.exists()) + } else { + assert(!bufferPath.exists) + } } - } } } } diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala index 48e71d8674a..f5455d62733 100644 --- a/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala +++ b/tests/src/test/scala/com/nvidia/spark/rapids/RapidsHostMemoryStoreSuite.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2021, NVIDIA CORPORATION. + * Copyright (c) 2020-2022, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { private def buildContiguousTable(numRows: Int): ContiguousTable = { val vals = (0 until numRows).map(_.toLong) - withResource(HostColumnVector.fromLongs(vals:_*)) { hcv => + withResource(HostColumnVector.fromLongs(vals: _*)) { hcv => withResource(hcv.copyToDevice()) { cv => withResource(new Table(cv)) { table => table.contiguousSplit()(0) @@ -57,30 +57,31 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = spy(new RapidsBufferCatalog) withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - assertResult(0)(hostStore.currentSize) - assertResult(hostStoreMaxSize)(hostStore.numBytesFree) - devStore.setSpillStore(hostStore) + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + assertResult(0)(hostStore.currentSize) + assertResult(hostStoreMaxSize)(hostStore.numBytesFree) + devStore.setSpillStore(hostStore) - val bufferSize = withResource(buildContiguousTable()) { ct => - val len = ct.getBuffer.getLength - // store takes ownership of the table - devStore.addContiguousTable(bufferId, ct, spillPriority) - len - } + val bufferSize = withResource(buildContiguousTable()) { ct => + val len = ct.getBuffer.getLength + // store takes ownership of the table + devStore.addContiguousTable(bufferId, ct, spillPriority) + len + } - devStore.synchronousSpill(0) - assertResult(bufferSize)(hostStore.currentSize) - assertResult(hostStoreMaxSize - bufferSize)(hostStore.numBytesFree) - verify(catalog, times(2)).registerNewBuffer(ArgumentMatchers.any[RapidsBuffer]) - verify(catalog).removeBufferTier( - ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.HOST)(buffer.storageTier) - assertResult(bufferSize)(buffer.size) - assertResult(bufferId)(buffer.id) - assertResult(spillPriority)(buffer.getSpillPriority) - } + devStore.synchronousSpill(0) + assertResult(bufferSize)(hostStore.currentSize) + assertResult(hostStoreMaxSize - bufferSize)(hostStore.numBytesFree) + verify(catalog, times(2)).registerNewBuffer(ArgumentMatchers.any[RapidsBuffer]) + verify(catalog).removeBufferTier( + ArgumentMatchers.eq(bufferId), ArgumentMatchers.eq(StorageTier.DEVICE)) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.HOST)(buffer.storageTier) + assertResult(bufferSize)(buffer.size) + assertResult(bufferId)(buffer.id) + assertResult(spillPriority)(buffer.getSpillPriority) + } } } } @@ -91,23 +92,24 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(buildContiguousTable()) { ct => - withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedBuffer => - expectedBuffer.copyFromDeviceBuffer(ct.getBuffer) - devStore.addContiguousTable(bufferId, ct, spillPriority) - devStore.synchronousSpill(0) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - withResource(buffer.getMemoryBuffer) { actualBuffer => - assert(actualBuffer.isInstanceOf[HostMemoryBuffer]) - assertResult(expectedBuffer.asByteBuffer) { - actualBuffer.asInstanceOf[HostMemoryBuffer].asByteBuffer + withResource(new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(buildContiguousTable()) { ct => + withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedBuffer => + expectedBuffer.copyFromDeviceBuffer(ct.getBuffer) + devStore.addContiguousTable(bufferId, ct, spillPriority) + devStore.synchronousSpill(0) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + withResource(buffer.getMemoryBuffer) { actualBuffer => + assert(actualBuffer.isInstanceOf[HostMemoryBuffer]) + assertResult(expectedBuffer.asByteBuffer) { + actualBuffer.asInstanceOf[HostMemoryBuffer].asByteBuffer + } } } } } - } } } } @@ -120,21 +122,23 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val hostStoreMaxSize = 1L * 1024 * 1024 val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog, devStore)) { hostStore => - devStore.setSpillStore(hostStore) - withResource(buildContiguousTable()) { ct => - withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) { - expectedBatch => - devStore.addContiguousTable(bufferId, ct, spillPriority) - devStore.synchronousSpill(0) - withResource(catalog.acquireBuffer(bufferId)) { buffer => - assertResult(StorageTier.HOST)(buffer.storageTier) - withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => - TestUtils.compareBatches(expectedBatch, actualBatch) + withResource( + new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog, devStore)) { + hostStore => + devStore.setSpillStore(hostStore) + withResource(buildContiguousTable()) { ct => + withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) { + expectedBatch => + devStore.addContiguousTable(bufferId, ct, spillPriority) + devStore.synchronousSpill(0) + withResource(catalog.acquireBuffer(bufferId)) { buffer => + assertResult(StorageTier.HOST)(buffer.storageTier) + withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => + TestUtils.compareBatches(expectedBatch, actualBatch) + } } - } + } } - } } } } @@ -148,40 +152,42 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar { val catalog = new RapidsBufferCatalog withResource(new RapidsDeviceMemoryStore(catalog)) { devStore => val mockStore = mock[RapidsBufferStore] - when(mockStore.tier) thenReturn(StorageTier.DISK) - withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog, devStore)) { hostStore => - devStore.setSpillStore(hostStore) - hostStore.setSpillStore(mockStore) - withResource(buildContiguousTable(1024 * 1024)) { bigTable => - withResource(buildContiguousTable(1)) { smallTable => - withResource(GpuColumnVector.from(bigTable.getTable, sparkTypes)) { expectedBatch => - // store takes ownership of the table - devStore.addContiguousTable(bigBufferId, bigTable, spillPriority) - devStore.synchronousSpill(0) - verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer], - ArgumentMatchers.any[MemoryBuffer], - ArgumentMatchers.any[Cuda.Stream]) - withResource(catalog.acquireBuffer(bigBufferId)) { buffer => - assertResult(StorageTier.HOST)(buffer.storageTier) - withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => - TestUtils.compareBatches(expectedBatch, actualBatch) + when(mockStore.tier) thenReturn (StorageTier.DISK) + withResource( + new RapidsHostMemoryStore(hostStoreMaxSize, hostStoreMaxSize, catalog, devStore)) { + hostStore => + devStore.setSpillStore(hostStore) + hostStore.setSpillStore(mockStore) + withResource(buildContiguousTable(1024 * 1024)) { bigTable => + withResource(buildContiguousTable(1)) { smallTable => + withResource(GpuColumnVector.from(bigTable.getTable, sparkTypes)) { expectedBatch => + // store takes ownership of the table + devStore.addContiguousTable(bigBufferId, bigTable, spillPriority) + devStore.synchronousSpill(0) + verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer], + ArgumentMatchers.any[MemoryBuffer], + ArgumentMatchers.any[Cuda.Stream]) + withResource(catalog.acquireBuffer(bigBufferId)) { buffer => + assertResult(StorageTier.HOST)(buffer.storageTier) + withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch => + TestUtils.compareBatches(expectedBatch, actualBatch) + } } - } - devStore.addContiguousTable(smallBufferId, smallTable, spillPriority) - devStore.synchronousSpill(0) - val rapidsBufferCaptor: ArgumentCaptor[RapidsBuffer] = - ArgumentCaptor.forClass(classOf[RapidsBuffer]) - val memoryBufferCaptor: ArgumentCaptor[MemoryBuffer] = - ArgumentCaptor.forClass(classOf[MemoryBuffer]) - verify(mockStore).copyBuffer(rapidsBufferCaptor.capture(), - memoryBufferCaptor.capture(), ArgumentMatchers.any[Cuda.Stream]) - withResource(memoryBufferCaptor.getValue) { _ => - assertResult(bigBufferId)(rapidsBufferCaptor.getValue.id) + devStore.addContiguousTable(smallBufferId, smallTable, spillPriority) + devStore.synchronousSpill(0) + val rapidsBufferCaptor: ArgumentCaptor[RapidsBuffer] = + ArgumentCaptor.forClass(classOf[RapidsBuffer]) + val memoryBufferCaptor: ArgumentCaptor[MemoryBuffer] = + ArgumentCaptor.forClass(classOf[MemoryBuffer]) + verify(mockStore).copyBuffer(rapidsBufferCaptor.capture(), + memoryBufferCaptor.capture(), ArgumentMatchers.any[Cuda.Stream]) + withResource(memoryBufferCaptor.getValue) { _ => + assertResult(bigBufferId)(rapidsBufferCaptor.getValue.id) + } } } } - } } } }