Skip to content

Commit

Permalink
Make better use of pinned memory pool (#4497)
Browse files Browse the repository at this point in the history
* Make better use of pinned memory pool

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* review feedback

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* fix match statement and test

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* fix disk store suite

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* add a separate config for pageable pool size

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* clamp offset spill priorities

* make priority offset method more general
  • Loading branch information
rongou authored Jan 14, 2022
1 parent 910bcc1 commit 004cc39
Show file tree
Hide file tree
Showing 7 changed files with 256 additions and 184 deletions.
3 changes: 2 additions & 1 deletion docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ Name | Description | Default Value
<a name="memory.gpu.pooling.enabled"></a>spark.rapids.memory.gpu.pooling.enabled|Should RMM act as a pooling allocator for GPU memory, or should it just pass through to CUDA memory allocation directly. DEPRECATED: please use spark.rapids.memory.gpu.pool instead.|true
<a name="memory.gpu.reserve"></a>spark.rapids.memory.gpu.reserve|The amount of GPU memory that should remain unallocated by RMM and left for system use such as memory needed for kernels and kernel launches.|671088640
<a name="memory.gpu.unspill.enabled"></a>spark.rapids.memory.gpu.unspill.enabled|When a spilled GPU buffer is needed again, should it be unspilled, or only copied back into GPU memory temporarily. Unspilling may be useful for GPU buffers that are needed frequently, for example, broadcast variables; however, it may also increase GPU memory usage|false
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk|1073741824
<a name="memory.host.pageablePool.size"></a>spark.rapids.memory.host.pageablePool.size|The size of the pageable memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|1073741824
<a name="memory.host.spillStorageSize"></a>spark.rapids.memory.host.spillStorageSize|Amount of off-heap host memory to use for buffering spilled GPU data before spilling to local disk. Use -1 to set the amount to the combined size of pinned and pageable memory pools.|-1
<a name="memory.pinnedPool.size"></a>spark.rapids.memory.pinnedPool.size|The size of the pinned memory pool in bytes unless otherwise specified. Use 0 to disable the pool.|0
<a name="python.concurrentPythonWorkers"></a>spark.rapids.python.concurrentPythonWorkers|Set the number of Python worker processes that can execute concurrently per GPU. Python worker processes may temporarily block when the number of concurrent Python worker processes started by the same executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors. >0 means enabled, while <=0 means unlimited|0
<a name="python.memory.gpu.allocFraction"></a>spark.rapids.python.memory.gpu.allocFraction|The fraction of total GPU memory that should be initially allocated for pooled memory for all the Python workers. It supposes to be less than (1 - $(spark.rapids.memory.gpu.allocFraction)), since the executor will share the GPU with its owning Python workers. Half of the rest will be used if not specified|None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -184,7 +184,12 @@ object RapidsBufferCatalog extends Logging with Arm {
gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize)
deviceStorage.setSpillStore(gdsStorage)
} else {
hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize)
val hostSpillStorageSize = if (rapidsConf.hostSpillStorageSize == -1) {
rapidsConf.pinnedPoolSize + rapidsConf.pageablePoolSize
} else {
rapidsConf.hostSpillStorageSize
}
hostStorage = new RapidsHostMemoryStore(hostSpillStorageSize, rapidsConf.pageablePoolSize)
diskStorage = new RapidsDiskStore(diskBlockManager)
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)
Expand Down
15 changes: 12 additions & 3 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ object RapidsConf {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(0)

val PAGEABLE_POOL_SIZE = conf("spark.rapids.memory.host.pageablePool.size")
.doc("The size of the pageable memory pool in bytes unless otherwise specified. " +
"Use 0 to disable the pool.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ByteUnit.GiB.toBytes(1))

val RMM_DEBUG = conf("spark.rapids.memory.gpu.debug")
.doc("Provides a log of GPU memory allocations and frees. If set to " +
"STDOUT or STDERR the logging will go there. Setting it to NONE disables logging. " +
Expand Down Expand Up @@ -355,10 +361,11 @@ object RapidsConf {
.createWithDefault(ByteUnit.MiB.toBytes(640))

val HOST_SPILL_STORAGE_SIZE = conf("spark.rapids.memory.host.spillStorageSize")
.doc("Amount of off-heap host memory to use for buffering spilled GPU data " +
"before spilling to local disk")
.doc("Amount of off-heap host memory to use for buffering spilled GPU data before spilling " +
"to local disk. Use -1 to set the amount to the combined size of pinned and pageable " +
"memory pools.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ByteUnit.GiB.toBytes(1))
.createWithDefault(-1)

val UNSPILL = conf("spark.rapids.memory.gpu.unspill.enabled")
.doc("When a spilled GPU buffer is needed again, should it be unspilled, or only copied " +
Expand Down Expand Up @@ -1458,6 +1465,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val pinnedPoolSize: Long = get(PINNED_POOL_SIZE)

lazy val pageablePoolSize: Long = get(PAGEABLE_POOL_SIZE)

lazy val concurrentGpuTasks: Int = get(CONCURRENT_GPU_TASKS)

lazy val isTestEnabled: Boolean = get(TEST_CONF)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,27 +17,35 @@
package com.nvidia.spark.rapids

import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer, PinnedMemoryPool}
import com.nvidia.spark.rapids.SpillPriorities.{applyPriorityOffset, HOST_MEMORY_BUFFER_DIRECT_OFFSET, HOST_MEMORY_BUFFER_PAGEABLE_OFFSET, HOST_MEMORY_BUFFER_PINNED_OFFSET}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.execution.TrampolineUtil

/**
* A buffer store using host memory.
* @param catalog buffer catalog to use with this store
* @param maxSize maximum size in bytes for all buffers in this store
* @param pageableMemoryPoolSize maximum size in bytes for the internal pageable memory pool
* @param catalog buffer catalog to use with this store
*/
class RapidsHostMemoryStore(
maxSize: Long,
pageableMemoryPoolSize: Long,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton,
deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage)
extends RapidsBufferStore(StorageTier.HOST, catalog) {
private[this] val pool = HostMemoryBuffer.allocate(maxSize, false)
private[this] val addressAllocator = new AddressSpaceAllocator(maxSize)
private[this] val pool = HostMemoryBuffer.allocate(pageableMemoryPoolSize, false)
private[this] val addressAllocator = new AddressSpaceAllocator(pageableMemoryPoolSize)
private[this] var haveLoggedMaxExceeded = false

// Returns an allocated host buffer and whether the allocation is from the internal pool
private def allocateHostBuffer(size: Long): (HostMemoryBuffer, Boolean) = {
private sealed abstract class AllocationMode(val spillPriorityOffset: Long)
private case object Pinned extends AllocationMode(HOST_MEMORY_BUFFER_PINNED_OFFSET)
private case object Pooled extends AllocationMode(HOST_MEMORY_BUFFER_PAGEABLE_OFFSET)
private case object Direct extends AllocationMode(HOST_MEMORY_BUFFER_DIRECT_OFFSET)

// Returns an allocated host buffer and its allocation mode
private def allocateHostBuffer(size: Long): (HostMemoryBuffer, AllocationMode) = {
// spill to keep within the targeted size
val amountSpilled = synchronousSpill(math.max(maxSize - size, 0))
if (amountSpilled != 0) {
Expand All @@ -49,16 +57,16 @@ class RapidsHostMemoryStore(
while (buffer == null) {
buffer = PinnedMemoryPool.tryAllocate(size)
if (buffer != null) {
return (buffer, false)
return (buffer, Pinned)
}

if (size > maxSize) {
if (size > pageableMemoryPoolSize) {
if (!haveLoggedMaxExceeded) {
logWarning(s"Exceeding host spill max of $maxSize bytes to accommodate a buffer of " +
s"$size bytes. Consider increasing host spill store size.")
logWarning(s"Exceeding host spill max of $pageableMemoryPoolSize bytes to accommodate " +
s"a buffer of $size bytes. Consider increasing pageable memory store size.")
haveLoggedMaxExceeded = true
}
return (HostMemoryBuffer.allocate(size), false)
return (HostMemoryBuffer.allocate(size, false), Direct)
}

val allocation = addressAllocator.allocate(size)
Expand All @@ -69,13 +77,13 @@ class RapidsHostMemoryStore(
synchronousSpill(targetSize)
}
}
(buffer, true)
(buffer, Pooled)
}

override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer,
stream: Cuda.Stream): RapidsBufferBase = {
withResource(otherBuffer) { _ =>
val (hostBuffer, isPinned) = allocateHostBuffer(other.size)
val (hostBuffer, allocationMode) = allocateHostBuffer(other.size)
try {
otherBuffer match {
case devBuffer: DeviceMemoryBuffer => hostBuffer.copyFromDeviceBuffer(devBuffer, stream)
Expand All @@ -90,9 +98,9 @@ class RapidsHostMemoryStore(
other.id,
other.size,
other.meta,
other.getSpillPriority,
applyPriorityOffset(other.getSpillPriority, allocationMode.spillPriorityOffset),
hostBuffer,
isPinned,
allocationMode,
other.spillCallback,
deviceStorage)
}
Expand All @@ -111,7 +119,7 @@ class RapidsHostMemoryStore(
meta: TableMeta,
spillPriority: Long,
buffer: HostMemoryBuffer,
isInternalPoolAllocated: Boolean,
allocationMode: AllocationMode,
spillCallback: SpillCallback,
deviceStorage: RapidsDeviceMemoryStore)
extends RapidsBufferBase(
Expand All @@ -124,10 +132,12 @@ class RapidsHostMemoryStore(
}

override protected def releaseResources(): Unit = {
if (isInternalPoolAllocated) {
assert(buffer.getAddress >= pool.getAddress)
assert(buffer.getAddress < pool.getAddress + pool.getLength)
addressAllocator.free(buffer.getAddress - pool.getAddress)
allocationMode match {
case Pooled =>
assert(buffer.getAddress >= pool.getAddress)
assert(buffer.getAddress < pool.getAddress + pool.getLength)
addressAllocator.free(buffer.getAddress - pool.getAddress)
case _ =>
}
buffer.close()
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,4 +58,41 @@ object SpillPriorities {
* Priority for multiple buffers being buffered within a call to next.
*/
val ACTIVE_BATCHING_PRIORITY: Long = ACTIVE_ON_DECK_PRIORITY + 100

/**
* Priority offset for host memory buffers allocated from the pinned memory pool. They are at
* lower priorities, so will be spilled first, making more pinned memory available.
*/
val HOST_MEMORY_BUFFER_PINNED_OFFSET: Long = -100

/**
* Priority offset for host memory buffers allocated from the internal pageable memory pool. They
* are at higher priorities than pinned memory buffers, thus making more pinned memory available;
* but at lower priorities than directly allocated buffers, thus freeing up the internal pageable
* memory pool.
*/
val HOST_MEMORY_BUFFER_PAGEABLE_OFFSET: Long = 0

/**
* Priority offset for host memory buffers directly allocated from the OS. They are at higher
* priorities, thus freeing up memory pools first.
*/
val HOST_MEMORY_BUFFER_DIRECT_OFFSET: Long = 100

/**
* Calculate a new priority based on an offset, clamping it to avoid wraparound.
*
* @param originalPriority the original priority
* @param offset the desired offset
* @return the resulting priority, with clamping if needed
*/
def applyPriorityOffset(originalPriority: Long, offset: Long): Long = {
if (offset < 0 && originalPriority < Long.MinValue - offset) {
Long.MinValue
} else if (offset > 0 && originalPriority > Long.MaxValue - offset) {
Long.MaxValue
} else {
originalPriority + offset
}
}
}
Loading

0 comments on commit 004cc39

Please sign in to comment.