Skip to content

Commit

Permalink
Code cleanup
Browse files Browse the repository at this point in the history
Addressed the following issues
NVIDIA#1285
NVIDIA#1286
NVIDIA#1287
NVIDIA#1288
NVIDIA#1289

Signed-off-by: Raza Jafri <rjafri@nvidia.com>
  • Loading branch information
razajafri committed Dec 8, 2020
1 parent 6a53452 commit bfa78af
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,6 @@ private case class CloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]) e
*/
class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

private[rapids] def getProducer(
cbIter: Iterator[ColumnarBatch],
schema: Seq[Attribute],
conf: Configuration,
sqlConf: SQLConf) : CachedBatchIteratorProducer[ColumnarBatch] = {
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, schema, conf, sqlConf)

}

override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = true

override def supportsColumnarOutput(schema: StructType): Boolean = schema.fields.forall { f =>
Expand Down Expand Up @@ -327,9 +318,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
if (batch.numCols() == 0) {
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB)
}
withResource(putOnGpuIfNeeded(batch))(gpuCB => compressColumnarBatchWithParquet(gpuCB))
}
})
} else {
Expand All @@ -353,13 +342,14 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
// NOTE: this doesn't take nulls into account so we could be over-estimating the actual size
// but that's still better than under-estimating
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { index =>
gpuCB.column(index).dataType().defaultSize
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { idx =>
gpuCB.column(idx).asInstanceOf[GpuColumnVector].getBase.getDeviceMemorySize / gpuCB.numRows()
}.sum
val rowsInBatch = (BYTES_ALLOWED_PER_BATCH / estimatedRowSize).toInt
val splitIndices = scala.Range(rowsInBatch, gpuCB.numRows(), rowsInBatch)
val cudfTables = if (splitIndices.nonEmpty) {
val splutVectors = scala.Range(0, gpuCB.numCols()).map { index =>
val buffers = new ListBuffer[ParquetCachedBatch]
if (splitIndices.nonEmpty) {
val splitVectors = scala.Range(0, gpuCB.numCols()).map { index =>
gpuCB.column(index).asInstanceOf[GpuColumnVector].getBase.split(splitIndices: _*)
}
try {
Expand All @@ -369,31 +359,33 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
// T01= {splitCol1(1), splitCol2(1),...,splitColn(1)}
// ...
// T0m= {splitCol1(m), splitCol2(m),...,splitColn(m)}
for (i <- splutVectors(0).indices) yield
new Table({
for (j <- splutVectors.indices) yield splutVectors(j)(i)
}: _*)
for (i <- splitVectors(0).indices)
withResource(new Table({
for (j <- splitVectors.indices) yield splitVectors(j)(i)
}: _*)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
}
} finally {
for (i <- splutVectors(0).indices)
for (j <- splutVectors.indices) splutVectors(j)(i).close()
for (i <- splitVectors(0).indices)
for (j <- splitVectors.indices) splitVectors(j)(i).close()
}
} else {
Seq(GpuColumnVector.from(gpuCB))
}

val buffers = new ListBuffer[ParquetCachedBatch]
cudfTables.foreach { table =>
withResource(table) { table =>
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
}
buffers += ParquetCachedBatch(buffer)
withResource(GpuColumnVector.from(gpuCB)) { table =>
buffers += ParquetCachedBatch(writeTableToCachedBatch(table))
}
}

buffers.toList
}

private def writeTableToCachedBatch(table: Table): ParquetBufferConsumer = {
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
}
buffer
}

/**
* This method decodes the CachedBatch leaving it on the GPU to avoid the extra copying back to
* the host
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,8 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite {
when(spyCol.getRowCount).thenReturn(ROWS)
val mockDtype = mock(classOf[DType])
when(mockDtype.getSizeInBytes).thenReturn(1024)
val mockDataType = mock(classOf[DataType])
val spyGpuCol = spy(GpuColumnVector.from(spyCol, ByteType))
when(spyCol.getType()).thenReturn(mockDtype)
when(spyGpuCol.dataType()).thenReturn(mockDataType)
when(mockDataType.defaultSize).thenReturn(1024)
when(spyCol.getDeviceMemorySize).thenReturn(1024L * ROWS)

(spyCol, spyGpuCol)
}
Expand Down

0 comments on commit bfa78af

Please sign in to comment.