From 28bed7a96d5c4e7e9a1c9f0f85b42f60a6e96d3b Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 29 Sep 2020 14:58:05 -0500 Subject: [PATCH 1/3] Update SpillableColumnarBatch to remove buffer from catalog on close Signed-off-by: Jason Lowe --- .../com/nvidia/spark/rapids/RapidsBufferCatalog.scala | 8 ++++++-- .../com/nvidia/spark/rapids/SpillableColumnarBatch.scala | 4 +--- .../org/apache/spark/sql/rapids/TempSpillBufferId.scala | 2 +- 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala index 77b7f3c4eef..12b7aca9baa 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsBufferCatalog.scala @@ -20,14 +20,12 @@ import java.util.concurrent.ConcurrentHashMap import java.util.function.BiFunction import ai.rapids.cudf.{DeviceMemoryBuffer, Rmm, Table} -import com.nvidia.spark.rapids.RapidsPluginImplicits.AutoCloseableArray import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.internal.Logging import org.apache.spark.sql.rapids.RapidsDiskBlockManager -import org.apache.spark.sql.vectorized.ColumnarBatch /** * Catalog for lookup of buffers by ID. The constructor is only visible for testing, generally @@ -103,6 +101,9 @@ class RapidsBufferCatalog extends Logging { buffer.free() } } + + /** Return the number of buffers currently in the catalog. */ + def numBuffers: Int = bufferMap.size() } object RapidsBufferCatalog extends Logging with Arm { @@ -204,4 +205,7 @@ object RapidsBufferCatalog extends Logging with Arm { * @return buffer that has been acquired */ def acquireBuffer(id: RapidsBufferId): RapidsBuffer = singleton.acquireBuffer(id) + + /** Remove a buffer ID from the catalog and release the resources of the registered buffer. */ + def removeBuffer(id: RapidsBufferId): Unit = singleton.removeBuffer(id) } \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala index 835f3475255..13a740fe98f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/SpillableColumnarBatch.scala @@ -106,9 +106,7 @@ class SpillableColumnarBatchImpl (id: TempSpillBufferId, rowCount: Int) */ override def close(): Unit = { if (!closed) { - withResource(RapidsBufferCatalog.acquireBuffer(id)) { rapidsBuffer => - rapidsBuffer.free() - } + RapidsBufferCatalog.removeBuffer(id) closed = true } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TempSpillBufferId.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TempSpillBufferId.scala index c193182a67e..0f9510a28ba 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TempSpillBufferId.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/TempSpillBufferId.scala @@ -41,7 +41,7 @@ object TempSpillBufferId { } } -class TempSpillBufferId private( +case class TempSpillBufferId private( override val tableId: Int, bufferId: TempLocalBlockId) extends RapidsBufferId { From 7ee2ddd148e6eaf9978d1efccdeb87507d841bf5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 29 Sep 2020 17:36:45 -0500 Subject: [PATCH 2/3] Add unit test Signed-off-by: Jason Lowe --- .../rapids/SpillableColumnarBatchSuite.scala | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala new file mode 100644 index 00000000000..c8a7fdfd4a5 --- /dev/null +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids + +import java.util.UUID + +import ai.rapids.cudf.MemoryBuffer +import com.nvidia.spark.rapids.{Arm, RapidsBuffer, RapidsBufferCatalog, RapidsBufferId, RapidsConf, SpillableColumnarBatchImpl, StorageTier} +import com.nvidia.spark.rapids.StorageTier.StorageTier +import com.nvidia.spark.rapids.format.TableMeta +import org.scalatest.FunSuite +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.SparkConf +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.storage.TempLocalBlockId + +class SpillableColumnarBatchSuite extends FunSuite with Arm { + test("close updates catalog") { + val id = TempSpillBufferId(0, TempLocalBlockId(new UUID(1, 2))) + val mockBuffer = new MockBuffer(id) + val catalog = RapidsBufferCatalog.singleton + val oldBufferCount = catalog.numBuffers + catalog.registerNewBuffer(mockBuffer) + assertResult(oldBufferCount + 1)(catalog.numBuffers) + val spillableBatch = new SpillableColumnarBatchImpl(id, 5) + spillableBatch.close() + assertResult(oldBufferCount)(catalog.numBuffers) + } + + class MockBuffer(override val id: RapidsBufferId) extends RapidsBuffer { + override val size: Long = 123 + override val meta: TableMeta = null + override val storageTier: StorageTier = StorageTier.DEVICE + override def getColumnarBatch: ColumnarBatch = null + override def getMemoryBuffer: MemoryBuffer = null + override def addReference(): Boolean = true + override def free(): Unit = {} + override def getSpillPriority: Long = 0 + override def setSpillPriority(priority: Long): Unit = {} + override def close(): Unit = {} + } +} From e1d1f672f2b4594895f13b791bd5e1813242fc00 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 29 Sep 2020 17:43:27 -0500 Subject: [PATCH 3/3] Remove unused imports Signed-off-by: Jason Lowe --- .../apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala index c8a7fdfd4a5..2beab315c3e 100644 --- a/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala +++ b/tests/src/test/scala/org/apache/spark/sql/rapids/SpillableColumnarBatchSuite.scala @@ -19,13 +19,11 @@ package org.apache.spark.sql.rapids import java.util.UUID import ai.rapids.cudf.MemoryBuffer -import com.nvidia.spark.rapids.{Arm, RapidsBuffer, RapidsBufferCatalog, RapidsBufferId, RapidsConf, SpillableColumnarBatchImpl, StorageTier} +import com.nvidia.spark.rapids.{Arm, RapidsBuffer, RapidsBufferCatalog, RapidsBufferId, SpillableColumnarBatchImpl, StorageTier} import com.nvidia.spark.rapids.StorageTier.StorageTier import com.nvidia.spark.rapids.format.TableMeta import org.scalatest.FunSuite -import org.scalatest.mockito.MockitoSugar -import org.apache.spark.SparkConf import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.storage.TempLocalBlockId