Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make better use of pinned memory pool #4497

Merged
merged 10 commits into from
Jan 14, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ object RapidsBufferCatalog extends Logging with Arm {
gdsStorage = new RapidsGdsStore(diskBlockManager, rapidsConf.gdsSpillBatchWriteBufferSize)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
deviceStorage.setSpillStore(gdsStorage)
} else {
hostStorage = new RapidsHostMemoryStore(rapidsConf.hostSpillStorageSize)
hostStorage = new RapidsHostMemoryStore(
rapidsConf.hostSpillStorageSize, rapidsConf.pinnedPoolSize)
diskStorage = new RapidsDiskStore(diskBlockManager)
deviceStorage.setSpillStore(hostStorage)
hostStorage.setSpillStore(diskStorage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,45 @@ import org.apache.spark.sql.rapids.execution.TrampolineUtil
*/
class RapidsHostMemoryStore(
maxSize: Long,
pinnedSize: Long,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton,
deviceStorage: RapidsDeviceMemoryStore = RapidsBufferCatalog.getDeviceStorage)
extends RapidsBufferStore(StorageTier.HOST, catalog) {
private[this] val pool = HostMemoryBuffer.allocate(maxSize, false)
private[this] val addressAllocator = new AddressSpaceAllocator(maxSize)
private[this] var haveLoggedMaxExceeded = false

// Returns an allocated host buffer and whether the allocation is from the internal pool
private def allocateHostBuffer(size: Long): (HostMemoryBuffer, Boolean) = {
private object AllocationMode extends Enumeration {
type AllocationMode = Value
val Pinned, Pooled, Direct = Value
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
import AllocationMode._

// Returns an allocated host buffer and its allocation mode
private def allocateHostBuffer(size: Long): (HostMemoryBuffer, AllocationMode) = {
// spill to keep within the targeted size
val amountSpilled = synchronousSpill(math.max(maxSize - size, 0))
val targetSize = math.max(maxSize + pinnedSize - size, 0)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val amountSpilled = synchronousSpill(targetSize)
if (amountSpilled != 0) {
logInfo(s"Spilled $amountSpilled bytes from the host memory store")
TrampolineUtil.incTaskMetricsDiskBytesSpilled(amountSpilled)
}

var buffer: HostMemoryBuffer = null
while (buffer == null) {
buffer = PinnedMemoryPool.tryAllocate(size)
if (buffer != null) {
return (buffer, false)
}

if (size > maxSize) {
if (size > maxSize && size > pinnedSize) {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
if (!haveLoggedMaxExceeded) {
logWarning(s"Exceeding host spill max of $maxSize bytes to accommodate a buffer of " +
s"$size bytes. Consider increasing host spill store size.")
logWarning(s"Exceeding host spill max of $maxSize bytes and pinned memory pool size of " +
s"$pinnedSize bytes to accommodate a buffer of $size bytes. Consider increasing " +
s"host spill store size or pinned memory pool size.")
haveLoggedMaxExceeded = true
}
return (HostMemoryBuffer.allocate(size), false)
return (HostMemoryBuffer.allocate(size), Direct)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

buffer = PinnedMemoryPool.tryAllocate(size)
if (buffer != null) {
return (buffer, Pinned)
}

val allocation = addressAllocator.allocate(size)
Expand All @@ -69,13 +78,13 @@ class RapidsHostMemoryStore(
synchronousSpill(targetSize)
}
}
(buffer, true)
(buffer, Pooled)
}

override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer,
stream: Cuda.Stream): RapidsBufferBase = {
withResource(otherBuffer) { _ =>
val (hostBuffer, isPinned) = allocateHostBuffer(other.size)
val (hostBuffer, allocationMode) = allocateHostBuffer(other.size)
try {
otherBuffer match {
case devBuffer: DeviceMemoryBuffer => hostBuffer.copyFromDeviceBuffer(devBuffer, stream)
Expand All @@ -86,13 +95,18 @@ class RapidsHostMemoryStore(
hostBuffer.close()
throw e
}
val spillPriority = other.getSpillPriority - (allocationMode match {
case Pinned => 200
case Pooled => 100
case Direct => 0
jlowe marked this conversation as resolved.
Show resolved Hide resolved
})
new RapidsHostMemoryBuffer(
other.id,
other.size,
other.meta,
other.getSpillPriority,
spillPriority,
hostBuffer,
isPinned,
allocationMode,
other.spillCallback,
deviceStorage)
}
Expand All @@ -111,7 +125,7 @@ class RapidsHostMemoryStore(
meta: TableMeta,
spillPriority: Long,
buffer: HostMemoryBuffer,
isInternalPoolAllocated: Boolean,
allocationMode: AllocationMode,
spillCallback: SpillCallback,
deviceStorage: RapidsDeviceMemoryStore)
extends RapidsBufferBase(
Expand All @@ -124,7 +138,7 @@ class RapidsHostMemoryStore(
}

override protected def releaseResources(): Unit = {
if (isInternalPoolAllocated) {
if (allocationMode == Pooled) {
assert(buffer.getAddress >= pool.getAddress)
assert(buffer.getAddress < pool.getAddress + pool.getLength)
addressAllocator.free(buffer.getAddress - pool.getAddress)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = spy(new RapidsBufferCatalog)
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore =>
assertResult(0)(diskStore.currentSize)
Expand Down Expand Up @@ -83,7 +83,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog, devStore)) {
diskStore =>
Expand Down Expand Up @@ -116,7 +116,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore =>
hostStore.setSpillStore(diskStore)
Expand Down Expand Up @@ -163,7 +163,7 @@ class RapidsDiskStoreSuite extends FunSuiteWithTempDir with Arm with MockitoSuga
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(new RapidsDiskStore(mock[RapidsDiskBlockManager], catalog)) { diskStore =>
hostStore.setSpillStore(diskStore)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = spy(new RapidsBufferCatalog)
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
assertResult(0)(hostStore.currentSize)
assertResult(hostStoreMaxSize)(hostStore.numBytesFree)
devStore.setSpillStore(hostStore)
Expand Down Expand Up @@ -91,7 +91,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(buildContiguousTable()) { ct =>
withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedBuffer =>
Expand Down Expand Up @@ -120,7 +120,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
val hostStoreMaxSize = 1L * 1024 * 1024
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog, devStore)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog, devStore)) { hostStore =>
devStore.setSpillStore(hostStore)
withResource(buildContiguousTable()) { ct =>
withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) {
Expand Down Expand Up @@ -149,7 +149,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
val mockStore = mock[RapidsBufferStore]
when(mockStore.tier) thenReturn(StorageTier.DISK)
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog, devStore)) { hostStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, 0, catalog, devStore)) { hostStore =>
devStore.setSpillStore(hostStore)
hostStore.setSpillStore(mockStore)
withResource(buildContiguousTable(1024 * 1024)) { bigTable =>
Expand Down