Skip to content

Commit

Permalink
Fix resource leaks in unit tests (NVIDIA#1181)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Nov 23, 2020
1 parent 22f87dd commit ecc1017
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,9 @@ class GpuCoalesceBatchesSuite extends SparkQueryCompareTestSuite {
val codec = TableCompressionCodec.getCodec(CodecType.NVCOMP_LZ4)
withResource(codec.createBatchCompressor(0, Cuda.DEFAULT_STREAM)) { compressor =>
compressor.addTableToCompress(buildContiguousTable(start, numRows))
GpuCompressedColumnVector.from(compressor.finish().head, Array[DataType](LongType))
withResource(compressor.finish()) { compressedResults =>
GpuCompressedColumnVector.from(compressedResults.head, Array[DataType](LongType))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,11 +143,14 @@ class GpuPartitioningSuite extends FunSuite with Arm {
val gccv = columns.head.asInstanceOf[GpuCompressedColumnVector]
val bufferId = MockRapidsBufferId(partIndex)
val devBuffer = gccv.getBuffer
// device store takes ownership of the buffer
devBuffer.incRefCount()
deviceStore.addBuffer(bufferId, devBuffer, gccv.getTableMeta, spillPriority)
withResource(buildSubBatch(batch, startRow, endRow)) { expectedBatch =>
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
compareBatches(expectedBatch, buffer.getColumnarBatch(sparkTypes))
withResource(buffer.getColumnarBatch(sparkTypes)) { batch =>
compareBatches(expectedBatch, batch)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,10 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
withResource(new RapidsDeviceMemoryStore(catalog)) { store =>
val spillPriority = 3
val bufferId = MockRapidsBufferId(7)
val meta = closeOnExcept(buildContiguousTable()) { ct =>
val meta = withResource(buildContiguousTable()) { ct =>
val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer)
// store takes ownership of the buffer
ct.getBuffer.incRefCount()
store.addBuffer(bufferId, ct.getBuffer, meta, spillPriority)
meta
}
Expand All @@ -83,11 +84,12 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
val catalog = new RapidsBufferCatalog
withResource(new RapidsDeviceMemoryStore(catalog)) { store =>
val bufferId = MockRapidsBufferId(7)
closeOnExcept(buildContiguousTable()) { ct =>
withResource(buildContiguousTable()) { ct =>
withResource(HostMemoryBuffer.allocate(ct.getBuffer.getLength)) { expectedHostBuffer =>
expectedHostBuffer.copyFromDeviceBuffer(ct.getBuffer)
val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer)
// store takes ownership of the buffer
ct.getBuffer.incRefCount()
store.addBuffer(bufferId, ct.getBuffer, meta, initialSpillPriority = 3)
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
withResource(buffer.getMemoryBuffer.asInstanceOf[DeviceMemoryBuffer]) { devbuf =>
Expand All @@ -107,11 +109,12 @@ class RapidsDeviceMemoryStoreSuite extends FunSuite with Arm with MockitoSugar {
val sparkTypes = Array[DataType](IntegerType, StringType, DoubleType)
withResource(new RapidsDeviceMemoryStore(catalog)) { store =>
val bufferId = MockRapidsBufferId(7)
closeOnExcept(buildContiguousTable()) { ct =>
withResource(buildContiguousTable()) { ct =>
withResource(GpuColumnVector.from(ct.getTable, sparkTypes)) {
expectedBatch =>
val meta = MetaUtils.buildTableMeta(bufferId.tableId, ct.getTable, ct.getBuffer)
// store takes ownership of the buffer
ct.getBuffer.incRefCount()
store.addBuffer(bufferId, ct.getBuffer, meta, initialSpillPriority = 3)
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,9 @@ class RapidsDiskStoreSuite extends FunSuite with BeforeAndAfterEach with Arm wit
hostStore.synchronousSpill(0)
withResource(catalog.acquireBuffer(bufferId)) { buffer =>
assertResult(StorageTier.DISK)(buffer.storageTier)
TestUtils.compareBatches(expectedBatch, buffer.getColumnarBatch(sparkTypes))
withResource(buffer.getColumnarBatch(sparkTypes)) { actualBatch =>
TestUtils.compareBatches(expectedBatch, actualBatch)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.math.RoundingMode
import scala.util.Random

import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector}
import com.nvidia.spark.rapids.{GpuAlias, GpuColumnVector, GpuIsNotNull, GpuIsNull, GpuLiteral, GpuOverrides, GpuScalar, GpuUnitTests, HostColumnarToGpu, RapidsConf, RapidsHostColumnVector}
import com.nvidia.spark.rapids.{GpuAlias, GpuColumnVector, GpuIsNotNull, GpuIsNull, GpuLiteral, GpuOverrides, GpuScalar, GpuUnitTests, HostColumnarToGpu, RapidsConf}

import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Literal}
import org.apache.spark.sql.types.{Decimal, DecimalType}
Expand Down Expand Up @@ -78,7 +78,7 @@ class DecimalUnitTest extends GpuUnitTests {
val (precision, scale) = cv.dataType() match {
case dt: DecimalType => (dt.precision, dt.scale)
}
withResource(cv.copyToHost()) { hostCv: RapidsHostColumnVector =>
withResource(cv.copyToHost()) { hostCv =>
dec32Data.zipWithIndex.foreach { case (dec, i) =>
val rescaled = dec.toJavaBigDecimal.setScale(scale)
assertResult(rescaled.unscaledValue().longValueExact())(hostCv.getLong(i))
Expand All @@ -94,7 +94,7 @@ class DecimalUnitTest extends GpuUnitTests {
val (precision, scale) = cv.dataType() match {
case dt: DecimalType => (dt.precision, dt.scale)
}
withResource(cv.copyToHost()) { hostCv: RapidsHostColumnVector =>
withResource(cv.copyToHost()) { hostCv =>
dec64WithNull.zipWithIndex.foreach {
case (dec, i) if dec == null =>
assertResult(true)(hostCv.getBase.isNull(i))
Expand All @@ -107,13 +107,15 @@ class DecimalUnitTest extends GpuUnitTests {
}
// assertion error throws because of precision overflow
assertThrows[AssertionError] {
withResource(GpuColumnVector.from(ColumnVector.decimalFromLongs(0, 1L),
DecimalType(DType.DECIMAL64_MAX_PRECISION + 1, 0))) { _ => }
withResource(ColumnVector.decimalFromLongs(0, 1L)) { dcv =>
GpuColumnVector.from(dcv, DecimalType(DType.DECIMAL64_MAX_PRECISION + 1, 0))
}
}
// assertion error throws because of unsupported DType DECIMAL32
assertThrows[AssertionError] {
withResource(GpuColumnVector.from(ColumnVector.decimalFromInts(0, 1),
DecimalType(1, 0))) { _ => }
withResource(ColumnVector.decimalFromInts(0, 1)) { dcv =>
GpuColumnVector.from(dcv, DecimalType(1, 0))
}
}
withResource(GpuScalar.from(dec64Data(0), dt64)) { scalar =>
withResource(GpuColumnVector.from(scalar, 10, dt64)) { cv =>
Expand Down Expand Up @@ -172,19 +174,23 @@ class DecimalUnitTest extends GpuUnitTests {
BigDecimal(-123L).bigDecimal, null, null)
withResource(GpuColumnVector.from(ColumnVector.fromDecimals(decArray: _*), DecimalType(10, 0))
) { cv =>
withResource(GpuIsNull(null).doColumnar(cv).copyToHost()) { ret =>
assertResult(false)(ret.getBoolean(0))
assertResult(true)(ret.getBoolean(1))
assertResult(false)(ret.getBoolean(2))
assertResult(true)(ret.getBoolean(3))
assertResult(true)(ret.getBoolean(4))
withResource(GpuIsNull(null).doColumnar(cv)) { isNullResult =>
withResource(isNullResult.copyToHost()) { ret =>
assertResult(false)(ret.getBoolean(0))
assertResult(true)(ret.getBoolean(1))
assertResult(false)(ret.getBoolean(2))
assertResult(true)(ret.getBoolean(3))
assertResult(true)(ret.getBoolean(4))
}
}
withResource(GpuIsNotNull(null).doColumnar(cv).copyToHost()) { ret =>
assertResult(true)(ret.getBoolean(0))
assertResult(false)(ret.getBoolean(1))
assertResult(true)(ret.getBoolean(2))
assertResult(false)(ret.getBoolean(3))
assertResult(false)(ret.getBoolean(4))
withResource(GpuIsNotNull(null).doColumnar(cv)) { isNotNullResult =>
withResource(isNotNullResult.copyToHost()) { ret =>
assertResult(true)(ret.getBoolean(0))
assertResult(false)(ret.getBoolean(1))
assertResult(true)(ret.getBoolean(2))
assertResult(false)(ret.getBoolean(3))
assertResult(false)(ret.getBoolean(4))
}
}
}
}
Expand All @@ -195,15 +201,17 @@ class DecimalUnitTest extends GpuUnitTests {
DecimalType(DType.DECIMAL64_MAX_PRECISION, 9))) { cv =>
val dt = new HostColumnVector.BasicType(false,
GpuColumnVector.getNonNestedRapidsType(cv.dataType()))
val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount)
withResource(cv.copyToHost()) { hostCV =>
HostColumnarToGpu.columnarCopy(hostCV, builder, false, cv.getRowCount.toInt)
val actual = builder.build()
val expected = hostCV.getBase
assertResult(expected.getType)(actual.getType)
assertResult(expected.getRowCount)(actual.getRowCount)
(0 until actual.getRowCount.toInt).foreach { i =>
assertResult(expected.getBigDecimal(i))(actual.getBigDecimal(i))
withResource(new HostColumnVector.ColumnBuilder(dt, cv.getRowCount)) { builder =>
withResource(cv.copyToHost()) { hostCV =>
HostColumnarToGpu.columnarCopy(hostCV, builder, false, cv.getRowCount.toInt)
withResource(builder.build()) { actual =>
val expected = hostCV.getBase
assertResult(expected.getType)(actual.getType)
assertResult(expected.getRowCount)(actual.getRowCount)
(0 until actual.getRowCount.toInt).foreach { i =>
assertResult(expected.getBigDecimal(i))(actual.getBigDecimal(i))
}
}
}
}
}
Expand All @@ -217,18 +225,20 @@ class DecimalUnitTest extends GpuUnitTests {
DecimalType(DType.DECIMAL64_MAX_PRECISION, 9))) { cv =>
val dt = new HostColumnVector.BasicType(true,
GpuColumnVector.getNonNestedRapidsType(cv.dataType()))
val builder = new HostColumnVector.ColumnBuilder(dt, cv.getRowCount)
withResource(cv.copyToHost()) { hostCV =>
HostColumnarToGpu.columnarCopy(hostCV, builder, true, cv.getRowCount.toInt)
val actual = builder.build()
val expected = hostCV.getBase
assertResult(DType.create(DType.DTypeEnum.DECIMAL64, expected.getType.getScale)
)(actual.getType)
assertResult(expected.getRowCount)(actual.getRowCount)
(0 until actual.getRowCount.toInt).foreach { i =>
assertResult(expected.isNull(i))(actual.isNull(i))
if (!actual.isNull(i)) {
assertResult(expected.getBigDecimal(i))(actual.getBigDecimal(i))
withResource(new HostColumnVector.ColumnBuilder(dt, cv.getRowCount)) { builder =>
withResource(cv.copyToHost()) { hostCV =>
HostColumnarToGpu.columnarCopy(hostCV, builder, true, cv.getRowCount.toInt)
withResource(builder.build()) { actual =>
val expected = hostCV.getBase
assertResult(DType.create(DType.DTypeEnum.DECIMAL64, expected.getType.getScale)
)(actual.getType)
assertResult(expected.getRowCount)(actual.getRowCount)
(0 until actual.getRowCount.toInt).foreach { i =>
assertResult(expected.isNull(i))(actual.isNull(i))
if (!actual.isNull(i)) {
assertResult(expected.getBigDecimal(i))(actual.getBigDecimal(i))
}
}
}
}
}
Expand Down

0 comments on commit ecc1017

Please sign in to comment.