Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resource leaks in unit tests #1747

Merged
merged 2 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
val codec = TableCompressionCodec.getCodec(CodecType.NVCOMP_LZ4)
withResource(codec.createBatchCompressor(0, Cuda.DEFAULT_STREAM)) { compressor =>
compressor.addTableToCompress(buildContiguousTable(start, numRows))
GpuCompressedColumnVector.from(compressor.finish().head)
withResource(compressor.finish()) { compressed =>
GpuCompressedColumnVector.from(compressed.head)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { store =>
val spillPriority = 3
val bufferId = MockRapidsBufferId(7)
closeOnExcept(buildContiguousTable()) { ct =>
// store takes ownership of the table
withResource(buildContiguousTable()) { ct =>
store.addContiguousTable(bufferId, ct, spillPriority)
}
val captor: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer])
Expand Down Expand Up @@ -142,7 +141,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
assertResult(0)(store.currentSize)
val bufferSizes = new Array[Long](2)
bufferSizes.indices.foreach { i =>
closeOnExcept(buildContiguousTable()) { ct =>
withResource(buildContiguousTable()) { ct =>
bufferSizes(i) = ct.getBuffer.getLength
// store takes ownership of the table
store.addContiguousTable(MockRapidsBufferId(i), ct, initialSpillPriority = 0)
Expand All @@ -164,7 +163,7 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { store =>
store.setSpillStore(spillStore)
spillPriorities.indices.foreach { i =>
closeOnExcept(buildContiguousTable()) { ct =>
withResource(buildContiguousTable()) { ct =>
bufferSizes(i) = ct.getBuffer.getLength
// store takes ownership of the table
store.addContiguousTable(MockRapidsBufferId(i), ct, spillPriorities(i))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit
devStore: RapidsDeviceMemoryStore,
bufferId: RapidsBufferId,
spillPriority: Long): Long = {
closeOnExcept(buildContiguousTable()) { ct =>
withResource(buildContiguousTable()) { ct =>
val bufferSize = ct.getBuffer.getLength
// store takes ownership of the table
devStore.addContiguousTable(bufferId, ct, spillPriority)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
assertResult(hostStoreMaxSize)(hostStore.numBytesFree)
devStore.setSpillStore(hostStore)

val bufferSize = closeOnExcept(buildContiguousTable()) { ct =>
val bufferSize = withResource(buildContiguousTable()) { ct =>
val len = ct.getBuffer.getLength
// store takes ownership of the table
devStore.addContiguousTable(bufferId, ct, spillPriority)
Expand Down Expand Up @@ -93,14 +93,10 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
var ct = buildContiguousTable()
try {
withResource(buildContiguousTable()) { ct =>
withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedBuffer =>
expectedBuffer.copyFromDeviceBuffer(ct.getBuffer)
// store takes ownership of the table
devStore.addContiguousTable(bufferId, ct, spillPriority)
ct = null

devStore.synchronousSpill(0)
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
withResource(buffer.getMemoryBuffer) { actualBuffer =>
Expand All @@ -111,10 +107,6 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
}
}
}
} finally {
if (ct != null) {
ct.close()
}
}
}
}
Expand All @@ -130,14 +122,10 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { devStore =>
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
var ct = buildContiguousTable()
try {
withResource(buildContiguousTable()) { ct =>
withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) {
expectedBatch =>
// store takes ownership of the table
devStore.addContiguousTable(bufferId, ct, spillPriority)
ct = null

devStore.synchronousSpill(0)
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
assertResult(StorageTier.HOST)(buffer.storageTier)
Expand All @@ -146,10 +134,6 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
}
}
}
} finally {
if (ct != null) {
ct.close()
}
}
}
}
Expand All @@ -167,35 +151,28 @@ class RapidsHostMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsHostMemoryStore(hostStoreMaxSize, catalog)) { hostStore =>
devStore.setSpillStore(hostStore)
hostStore.setSpillStore(mockStore)
var bigTable: ContiguousTable = null
var smallTable: ContiguousTable = null
try {
bigTable = buildContiguousTable(1024 * 1024)
smallTable = buildContiguousTable(1)
withResource(GpuColumnVector.from(bigTable.getTable, sparkTypes)) { expectedBatch =>
// store takes ownership of the table
devStore.addContiguousTable(bigBufferId, bigTable, spillPriority)
bigTable = null

devStore.synchronousSpill(0)
verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer],
ArgumentMatchers.any[Cuda.Stream])
withResource(catalog.acquireBuffer(bigBufferId)) { buffer =>
assertResult(StorageTier.HOST)(buffer.storageTier)
withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch =>
TestUtils.compareBatches(expectedBatch, actualBatch)
withResource(buildContiguousTable(1024 * 1024)) { bigTable =>
withResource(buildContiguousTable(1)) { smallTable =>
withResource(GpuColumnVector.from(bigTable.getTable, sparkTypes)) { expectedBatch =>
// store takes ownership of the table
devStore.addContiguousTable(bigBufferId, bigTable, spillPriority)
devStore.synchronousSpill(0)
verify(mockStore, never()).copyBuffer(ArgumentMatchers.any[RapidsBuffer],
ArgumentMatchers.any[Cuda.Stream])
withResource(catalog.acquireBuffer(bigBufferId)) { buffer =>
assertResult(StorageTier.HOST)(buffer.storageTier)
withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch =>
TestUtils.compareBatches(expectedBatch, actualBatch)
}
}
}

devStore.addContiguousTable(smallBufferId, smallTable, spillPriority)
smallTable = null
devStore.synchronousSpill(0)
val ac: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer])
verify(mockStore).copyBuffer(ac.capture(), ArgumentMatchers.any[Cuda.Stream])
assertResult(bigBufferId)(ac.getValue.id)
devStore.addContiguousTable(smallBufferId, smallTable, spillPriority)
devStore.synchronousSpill(0)
val ac: ArgumentCaptor[RapidsBuffer] = ArgumentCaptor.forClass(classOf[RapidsBuffer])
verify(mockStore).copyBuffer(ac.capture(), ArgumentMatchers.any[Cuda.Stream])
assertResult(bigBufferId)(ac.getValue.id)
}
}
} finally {
Seq(bigTable, smallTable).safeClose()
}
}
}
Expand Down