Skip to content

Commit

Permalink
stream shuffle buffers from GDS to UCX (#2050)
Browse files Browse the repository at this point in the history
* stream shuffle buffers from GDS to UCX

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* review feedback

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* remove CudaUtil

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* fix tests

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* add todo for async cufile api

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* add 2021 to copyright

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* add todo for async memcpy to host

Signed-off-by: Rong Ou <rong.ou@gmail.com>

* review feedback

Signed-off-by: Rong Ou <rong.ou@gmail.com>
  • Loading branch information
rongou authored Apr 7, 2021
1 parent 9eeecad commit 7bdcc15
Show file tree
Hide file tree
Showing 9 changed files with 75 additions and 95 deletions.
51 changes: 0 additions & 51 deletions sql-plugin/src/main/scala/ai/rapids/cudf/CudaUtil.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids

import java.io.File

import ai.rapids.cudf.{DeviceMemoryBuffer, MemoryBuffer, Table}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer, Table}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta

Expand Down Expand Up @@ -115,6 +115,19 @@ trait RapidsBuffer extends AutoCloseable {
*/
def getMemoryBuffer: MemoryBuffer

/**
* Copy the content of this buffer into the specified memory buffer, starting from the given
* offset.
*
* @param srcOffset offset to start copying from.
* @param dst the memory buffer to copy into.
* @param dstOffset offset to copy into.
* @param length number of bytes to copy.
* @param stream CUDA stream to use
*/
def copyToMemoryBuffer(
srcOffset: Long, dst: MemoryBuffer, dstOffset: Long, length: Long, stream: Cuda.Stream)

/**
* Get the device memory buffer from the underlying storage. If the buffer currently resides
* outside of device memory, a new DeviceMemoryBuffer is created with the data copied over.
Expand Down Expand Up @@ -193,6 +206,10 @@ sealed class DegenerateRapidsBuffer(
override def getMemoryBuffer: MemoryBuffer =
throw new UnsupportedOperationException("degenerate buffer has no memory buffer")

override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long, length: Long,
stream: Cuda.Stream): Unit =
throw new UnsupportedOperationException("degenerate buffer cannot copy to memory buffer")

override def getDeviceMemoryBuffer: DeviceMemoryBuffer =
throw new UnsupportedOperationException("degenerate buffer has no device memory buffer")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,21 @@ abstract class RapidsBufferStore(
}
}

override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long,
length: Long, stream: Cuda.Stream): Unit = {
withResource(getMemoryBuffer) { memBuff =>
dst match {
case _: HostMemoryBuffer =>
// TODO: consider moving to the async version.
dst.copyFromMemoryBuffer(dstOffset, memBuff, srcOffset, length, stream)
case _: DeviceMemoryBuffer =>
dst.copyFromMemoryBufferAsync(dstOffset, memBuff, srcOffset, length, stream)
case _ =>
throw new IllegalStateException(s"Infeasible destination buffer type ${dst.getClass}")
}
}
}

override def getDeviceMemoryBuffer: DeviceMemoryBuffer = {
if (RapidsBufferCatalog.shouldUnspill) {
(0 until MAX_UNSPILL_ATTEMPTS).foreach { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,24 @@ class RapidsGdsStore(
}
}

override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long,
length: Long, stream: Cuda.Stream): Unit = {
val path = if (id.canShareDiskPaths) {
sharedBufferFiles.get(id)
} else {
id.getDiskPath(diskBlockManager)
}
dst match {
case dmOriginal: DeviceMemoryBuffer =>
val dm = dmOriginal.slice(dstOffset, length)
// TODO: switch to async API when it's released, using the passed in CUDA stream.
CuFile.readFileToDeviceBuffer(dm, path, fileOffset + srcOffset)
logDebug(s"Created device buffer for $path $fileOffset:$size via GDS")
case _ => throw new IllegalStateException(
s"GDS can only copy to device buffer, not ${dst.getClass}")
}
}

override protected def releaseResources(): Unit = {
// Buffers that share paths must be cleaned up elsewhere
if (id.canShareDiskPaths) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package com.nvidia.spark.rapids.shuffle

import scala.collection.mutable.ArrayBuffer

import ai.rapids.cudf.{Cuda, CudaUtil, DeviceMemoryBuffer, NvtxColor, NvtxRange}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, NvtxColor, NvtxRange, Rmm}
import com.nvidia.spark.rapids.Arm
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.TableMeta
Expand Down Expand Up @@ -165,7 +165,7 @@ class BufferReceiveState(

if (fullSize == b.rangeSize()) {
// we have the full buffer!
contigBuffer = CudaUtil.deviceAllocateOnStream(b.rangeSize(), stream)
contigBuffer = Rmm.alloc(b.rangeSize(), stream)
toClose.append(contigBuffer)

contigBuffer.copyFromDeviceBufferAsync(0, deviceBounceBuffer,
Expand All @@ -183,7 +183,7 @@ class BufferReceiveState(
}
} else {
// need to keep it around
workingOn = CudaUtil.deviceAllocateOnStream(fullSize, stream)
workingOn = Rmm.alloc(fullSize, stream)
toClose.append(workingOn)

workingOn.copyFromDeviceBufferAsync(0, deviceBounceBuffer,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids.shuffle

import ai.rapids.cudf.{Cuda, CudaUtil, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.{Arm, RapidsBuffer, ShuffleMetadata, StorageTier}
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.format.{BufferMeta, BufferTransferRequest, TransferRequest}
Expand Down Expand Up @@ -171,35 +171,8 @@ class BufferSendState(

acquiredBuffs.foreach { case RangeBuffer(blockRange, rapidsBuffer) =>
require(blockRange.rangeSize() <= bounceBuffToUse.getLength - buffOffset)
withResource(rapidsBuffer.getMemoryBuffer) { memBuff =>
bounceBuffToUse match {
case _: HostMemoryBuffer =>
//TODO: HostMemoryBuffer needs the same functionality that
// DeviceMemoryBuffer has to copy from/to device/host buffers
logDebug(s"copying to host memory bounce buffer $memBuff")
CudaUtil.copy(
memBuff,
blockRange.rangeStart,
bounceBuffToUse,
buffOffset,
blockRange.rangeSize())
case d: DeviceMemoryBuffer =>
memBuff match {
case mh: HostMemoryBuffer =>
// host original => device bounce
logDebug(s"copying from host to device memory bounce buffer $memBuff")
d.copyFromHostBufferAsync(buffOffset, mh, blockRange.rangeStart,
blockRange.rangeSize(), serverStream)
case md: DeviceMemoryBuffer =>
// device original => device bounce
logDebug(s"copying from device to device memory bounce buffer $memBuff")
d.copyFromDeviceBufferAsync(buffOffset, md, blockRange.rangeStart,
blockRange.rangeSize(), serverStream)
case _ => throw new IllegalStateException("What buffer is this")
}
case _ => throw new IllegalStateException("What buffer is this")
}
}
rapidsBuffer.copyToMemoryBuffer(blockRange.rangeStart, bounceBuffToUse, buffOffset,
blockRange.rangeSize(), serverStream)
buffOffset += blockRange.rangeSize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import ai.rapids.cudf.{CudaUtil, DeviceMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange}
import ai.rapids.cudf.{DeviceMemoryBuffer, MemoryBuffer, NvtxColor, NvtxRange}
import com.nvidia.spark.rapids.RapidsConf

import org.apache.spark.internal.Logging
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,10 +16,11 @@

package com.nvidia.spark.rapids.shuffle

import ai.rapids.cudf.{DeviceMemoryBuffer, HostMemoryBuffer}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, HostMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.RapidsBuffer
import com.nvidia.spark.rapids.format.TableMeta
import java.util
import org.mockito.ArgumentMatchers.{any, anyLong}
import org.mockito.Mockito._

import org.apache.spark.storage.ShuffleBlockBatchId
Expand All @@ -36,12 +37,17 @@ class RapidsShuffleServerSuite extends RapidsShuffleTestHelper {
deviceBuffer.copyFromHostBuffer(hostBuff)
val mockBuffer = mock[RapidsBuffer]
val mockMeta = RapidsShuffleTestHelper.mockTableMeta(100000)
when(mockBuffer.getMemoryBuffer).thenAnswer { _ =>
deviceBuffer.incRefCount()
when(mockBuffer.copyToMemoryBuffer(anyLong(), any[MemoryBuffer](), anyLong(), anyLong(),
any[Cuda.Stream]())).thenAnswer { invocation =>
// start at 1 close, since we'll need to close at refcount 0 too
val newNumCloses = numCloses.getOrDefault(mockBuffer, 1) + 1
numCloses.put(mockBuffer, newNumCloses)
deviceBuffer
val srcOffset = invocation.getArgument[Long](0)
val dst = invocation.getArgument[MemoryBuffer](1)
val dstOffset = invocation.getArgument[Long](2)
val length = invocation.getArgument[Long](3)
val stream = invocation.getArgument[Cuda.Stream](4)
dst.copyFromMemoryBuffer(dstOffset, deviceBuffer, srcOffset, length, stream)
}
when(mockBuffer.size).thenReturn(deviceBuffer.getLength)
when(mockBuffer.meta).thenReturn(mockMeta)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
* Copyright (c) 2020-2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,7 +18,7 @@ package org.apache.spark.sql.rapids

import java.util.UUID

import ai.rapids.cudf.{DeviceMemoryBuffer, MemoryBuffer}
import ai.rapids.cudf.{Cuda, DeviceMemoryBuffer, MemoryBuffer}
import com.nvidia.spark.rapids.{Arm, RapidsBuffer, RapidsBufferCatalog, RapidsBufferId, SpillableColumnarBatchImpl, StorageTier}
import com.nvidia.spark.rapids.StorageTier.StorageTier
import com.nvidia.spark.rapids.format.TableMeta
Expand Down Expand Up @@ -46,6 +46,8 @@ class SpillableColumnarBatchSuite extends FunSuite with Arm {
override val meta: TableMeta = null
override val storageTier: StorageTier = StorageTier.DEVICE
override def getMemoryBuffer: MemoryBuffer = null
override def copyToMemoryBuffer(srcOffset: Long, dst: MemoryBuffer, dstOffset: Long,
length: Long, stream: Cuda.Stream): Unit = {}
override def getDeviceMemoryBuffer: DeviceMemoryBuffer = null
override def addReference(): Boolean = true
override def free(): Unit = {}
Expand Down

0 comments on commit 7bdcc15

Please sign in to comment.