Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batch small buffers when spilling via GDS #2295

Merged
merged 18 commits into from
May 10, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 86 additions & 24 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsGdsStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,26 @@ package com.nvidia.spark.rapids
import java.io.File
import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf._
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

import org.apache.spark.sql.rapids.RapidsDiskBlockManager
import org.apache.spark.sql.rapids.{RapidsDiskBlockManager, TempSpillBufferId}

/** A buffer store using GPUDirect Storage (GDS). */
class RapidsGdsStore(
diskBlockManager: RapidsDiskBlockManager,
catalog: RapidsBufferCatalog = RapidsBufferCatalog.singleton)
extends RapidsBufferStore(StorageTier.GDS, catalog) with Arm {
private val BlockSize = 4096;
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private val BatchWriteBufferSize = 1 << 27; // 128 MiB
jlowe marked this conversation as resolved.
Show resolved Hide resolved
private[this] val sharedBufferFiles = new ConcurrentHashMap[RapidsBufferId, File]
private[this] val batchWriteBuffer = CuFileBuffer.allocate(BatchWriteBufferSize, true)
private[this] var batchWriteBufferId = TempSpillBufferId()
private[this] var batchWriteBufferOffset = 0L
private[this] val pendingBuffers = ArrayBuffer.empty[RapidsGdsBuffer]

override protected def createBuffer(other: RapidsBuffer, otherBuffer: MemoryBuffer,
stream: Cuda.Stream): RapidsBufferBase = {
Expand All @@ -39,30 +47,75 @@ class RapidsGdsStore(
case d: DeviceMemoryBuffer => d
case _ => throw new IllegalStateException("copying from buffer without device memory")
}
val id = other.id
val path = if (id.canShareDiskPaths) {
sharedBufferFiles.computeIfAbsent(id, _ => id.getDiskPath(diskBlockManager))
if (isBatched(deviceBuffer.getLength)) {
batchSpill(other, deviceBuffer)
} else {
id.getDiskPath(diskBlockManager)
singleShotSpill(other, deviceBuffer)
}
// When sharing files, append to the file; otherwise, write from the beginning.
val fileOffset = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
CuFile.appendDeviceBufferToFile(path, deviceBuffer)
}
}

private def isBatched(length: Long): Boolean = {
length < BatchWriteBufferSize
}

private def singleShotSpill(
other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer): RapidsBufferBase = {
val id = other.id
val path = if (id.canShareDiskPaths) {
sharedBufferFiles.computeIfAbsent(id, _ => id.getDiskPath(diskBlockManager))
} else {
id.getDiskPath(diskBlockManager)
}
// When sharing files, append to the file; otherwise, write from the beginning.
val fileOffset = if (id.canShareDiskPaths) {
// only one writer at a time for now when using shared files
path.synchronized {
CuFile.appendDeviceBufferToFile(path, deviceBuffer)
}
} else {
CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer)
0
}
logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS")
new RapidsGdsBuffer(id, None, fileOffset, other.size, other.meta, other.getSpillPriority,
other.spillCallback)
}

private def batchSpill(
other: RapidsBuffer, deviceBuffer: DeviceMemoryBuffer): RapidsBufferBase = this.synchronized {
if (deviceBuffer.getLength > BatchWriteBufferSize - batchWriteBufferOffset) {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val path = batchWriteBufferId.getDiskPath(diskBlockManager).getAbsolutePath
jlowe marked this conversation as resolved.
Show resolved Hide resolved
withResource(new CuFileWriteHandle(path)) { handle =>
handle.write(batchWriteBuffer, batchWriteBufferOffset, 0)
logDebug(s"Spilled to $path 0:$batchWriteBufferOffset via GDS")
}
} else {
CuFile.writeDeviceBufferToFile(path, 0, deviceBuffer)
0
pendingBuffers.foreach(_.cuFileBuffer = None)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
pendingBuffers.clear
jlowe marked this conversation as resolved.
Show resolved Hide resolved
batchWriteBufferId = TempSpillBufferId()
batchWriteBufferOffset = 0
}
logDebug(s"Spilled to $path $fileOffset:${other.size} via GDS")
new RapidsGdsBuffer(id, fileOffset, other.size, other.meta, other.getSpillPriority,
other.spillCallback)

val currentBufferOffset = batchWriteBufferOffset
batchWriteBuffer.copyFromMemoryBuffer(
batchWriteBufferOffset, deviceBuffer, 0, deviceBuffer.getLength, Cuda.DEFAULT_STREAM)
batchWriteBufferOffset += alignUp(deviceBuffer.getLength)

val id = other.id
sharedBufferFiles.computeIfAbsent(id, _ => batchWriteBufferId.getDiskPath(diskBlockManager))
val gdsBuffer = new RapidsGdsBuffer(id, Some(batchWriteBuffer), currentBufferOffset,
other.size, other.meta, other.getSpillPriority, other.spillCallback)
pendingBuffers += gdsBuffer
gdsBuffer
}

private def alignUp(length: Long): Long = {
(length + BlockSize - 1) & ~(BlockSize - 1)
}

class RapidsGdsBuffer(
id: RapidsBufferId,
var cuFileBuffer: Option[CuFileBuffer],
val fileOffset: Long,
size: Long,
meta: TableMeta,
Expand All @@ -74,39 +127,48 @@ class RapidsGdsStore(
override def getMemoryBuffer: MemoryBuffer = getDeviceMemoryBuffer

override def materializeMemoryBuffer: MemoryBuffer = {
val path = if (id.canShareDiskPaths) {
val path = if (id.canShareDiskPaths || isBatched(size)) {
sharedBufferFiles.get(id)
} else {
id.getDiskPath(diskBlockManager)
}
closeOnExcept(DeviceMemoryBuffer.allocate(size)) { buffer =>
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
if (cuFileBuffer.isEmpty) {
CuFile.readFileToDeviceBuffer(buffer, path, fileOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
} else {
buffer.copyFromMemoryBuffer(0, cuFileBuffer.get, fileOffset, size, Cuda.DEFAULT_STREAM)
}
buffer
}
}

override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long,
length: Long, stream: Cuda.Stream): Unit = {
val path = if (id.canShareDiskPaths) {
val path = if (id.canShareDiskPaths || isBatched(size)) {
sharedBufferFiles.get(id)
} else {
id.getDiskPath(diskBlockManager)
}
dst match {
case dmOriginal: DeviceMemoryBuffer =>
val dm = dmOriginal.slice(dstOffset, length)
// TODO: switch to async API when it's released, using the passed in CUDA stream.
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
if (cuFileBuffer.isEmpty) {
// TODO: switch to async API when it's released, using the passed in CUDA stream.
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
} else {
dm.copyFromMemoryBuffer(
0, cuFileBuffer.get, fileOffset + srcOffset, size, Cuda.DEFAULT_STREAM)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}
case _ => throw new IllegalStateException(
s"GDS can only copy to device buffer, not ${dst.getClass}")
}
}

override protected def releaseResources(): Unit = {
// Buffers that share paths must be cleaned up elsewhere
jlowe marked this conversation as resolved.
Show resolved Hide resolved
if (id.canShareDiskPaths) {
if (id.canShareDiskPaths || isBatched(size)) {
sharedBufferFiles.remove(id)
} else {
val path = id.getDiskPath(diskBlockManager)
Expand Down