Skip to content

Commit

Permalink
Merge pull request NVIDIA#1301 from NVIDIA/branch-0.3
Browse files Browse the repository at this point in the history
[auto-merge] branch-0.3 to branch-0.4 [skip ci] [bot]
  • Loading branch information
nvauto authored Dec 7, 2020
2 parents 97ab6ed + 07aa2cd commit aa5dbd0
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 52 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
<spark301.version>3.0.1</spark301.version>
<spark302.version>3.0.2-SNAPSHOT</spark302.version>
<spark310.version>3.1.0-SNAPSHOT</spark310.version>
<mockito.version>3.6.0</mockito.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -319,7 +320,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.28.2</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
Expand Down Expand Up @@ -190,8 +190,8 @@ private class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer

private def writeBuffers(): Unit = {
val toProcess = offHeapBuffers.dequeueAll(_ => true)
// this could be problematic if the buffers are big as their cumulative length could be more
// than Int.MAX_SIZE. We could just have a list of buffers in that case and iterate over them
// We are making sure the input is smaller than 2gb so the parquet written should never be more
// than Int.MAX_SIZE.
val bytes = toProcess.map(_._2).sum

// for now assert bytes are less than Int.MaxValue
Expand Down Expand Up @@ -253,6 +253,16 @@ private case class CloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]) e
* This class assumes, the data is Columnar and the plugin is on
*/
class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

private[rapids] def getProducer(
cbIter: Iterator[ColumnarBatch],
schema: Seq[Attribute],
conf: Configuration,
sqlConf: SQLConf) : CachedBatchIteratorProducer[ColumnarBatch] = {
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, schema, conf, sqlConf)

}

override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = true

override def supportsColumnarOutput(schema: StructType): Boolean = schema.fields.forall { f =>
Expand Down Expand Up @@ -313,9 +323,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}

input.map(batch => {
input.flatMap(batch => {
if (batch.numCols() == 0) {
ParquetCachedBatch(batch.numRows(), new Array[Byte](0))
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB)
Expand All @@ -330,19 +340,58 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, cachedSchema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
.getColumnarBatchToCachedBatchIterator
.getColumnarBatchToCachedBatchIterator()
}
}
}

private def compressColumnarBatchWithParquet(gpuCB: ColumnarBatch): ParquetCachedBatch = {
val buffer = new ParquetBufferConsumer(gpuCB.numRows())
withResource(GpuColumnVector.from(gpuCB)) { table =>
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
val _2GB: Long = 2L * 1024 * 1024 * 1024
val APPROX_PAR_META_DATA: Int = 10 * 1024 * 1024 // we are estimating 10MB
val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA

private[rapids] def compressColumnarBatchWithParquet(
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
// NOTE: this doesn't take nulls into account so we could be over-estimating the actual size
// but that's still better than under-estimating
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { index =>
gpuCB.column(index).dataType().defaultSize
}.sum
val rowsInBatch = (BYTES_ALLOWED_PER_BATCH / estimatedRowSize).toInt
val splitIndices = scala.Range(rowsInBatch, gpuCB.numRows(), rowsInBatch)
val cudfTables = if (splitIndices.nonEmpty) {
val splutVectors = scala.Range(0, gpuCB.numCols()).map { index =>
gpuCB.column(index).asInstanceOf[GpuColumnVector].getBase.split(splitIndices: _*)
}
try {
// Splitting the table
// e.g. T0 = {col1, col2,...,coln} => split columns into 'm' cols =>
// T00= {splitCol1(0), splitCol2(0),...,splitColn(0)}
// T01= {splitCol1(1), splitCol2(1),...,splitColn(1)}
// ...
// T0m= {splitCol1(m), splitCol2(m),...,splitColn(m)}
for (i <- splutVectors(0).indices) yield
new Table({
for (j <- splutVectors.indices) yield splutVectors(j)(i)
}: _*)
} finally {
for (i <- splutVectors(0).indices)
for (j <- splutVectors.indices) splutVectors(j)(i).close()
}
} else {
Seq(GpuColumnVector.from(gpuCB))
}
ParquetCachedBatch(buffer)

val buffers = new ListBuffer[ParquetCachedBatch]
cudfTables.foreach { table =>
withResource(table) { table =>
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
}
buffers += ParquetCachedBatch(buffer)
}
}
buffers.toList
}

/**
Expand Down Expand Up @@ -870,26 +919,29 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* @param sharedConf - SQL conf
* @tparam T - Strictly either InternalRow or ColumnarBatch
*/
private class CachedBatchIteratorProducer[T](
private[rapids] class CachedBatchIteratorProducer[T](
iter: Iterator[T],
cachedAttributes: Seq[Attribute],
sharedHadoopConf: Configuration,
sharedConf: SQLConf) {

def getInternalRowToCachedBatchIterator: Iterator[CachedBatch] = {
new InternalRowToCachedBatchIterator
def getInternalRowToCachedBatchIterator(
p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
new InternalRowToCachedBatchIterator(p)
}

def getColumnarBatchToCachedBatchIterator: Iterator[CachedBatch] = {
new ColumnarBatchToCachedBatchIterator
def getColumnarBatchToCachedBatchIterator(
p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
new ColumnarBatchToCachedBatchIterator(p)
}

/**
* This class produces an Iterator[CachedBatch] from Iterator[InternalRow]. This is a n-1
* relationship. Each partition represents a single parquet file, so we encode it
* and return the CachedBatch when next is called.
*/
private class InternalRowToCachedBatchIterator extends Iterator[CachedBatch]() {
private class InternalRowToCachedBatchIterator(
parquetOutputFileFormat: ParquetOutputFileFormat) extends Iterator[CachedBatch]() {
// is there a type that spark doesn't support by default in the schema?
val hasUnsupportedType: Boolean = cachedAttributes.exists { attribute =>
!isTypeSupportedByParquet(attribute.dataType)
Expand Down Expand Up @@ -992,34 +1044,67 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}

private val parquetOutputFormat = new ParquetOutputFileFormat()
override def hasNext: Boolean = queue.nonEmpty || iter.hasNext

private val queue = new mutable.Queue[CachedBatch]()

override def hasNext: Boolean = iter.hasNext
//estimate the size of a row
val estimatedSize: Int = newCachedAttributes.map { attr =>
attr.dataType.defaultSize
}.sum

override def next(): CachedBatch = {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)
sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)
val recordWriter = SQLConf.withExistingConf(sharedConf) {
parquetOutputFormat.getRecordWriter(outputFile, sharedHadoopConf)
}
val rowIterator = getIterator
while (rowIterator.hasNext) {
rows += 1
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
if (queue.isEmpty) {
// to store a row if we have read it but there is no room in the parquet file to put it
// we will put it in the next CachedBatch
var leftOverRow: Option[InternalRow] = None
val rowIterator = getIterator
while (rowIterator.hasNext) {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)
sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)
val recordWriter = SQLConf.withExistingConf(sharedConf) {
parquetOutputFileFormat.getRecordWriter(outputFile, sharedHadoopConf)
}
var totalSize = 0
while (rowIterator.hasNext && totalSize < BYTES_ALLOWED_PER_BATCH) {

val row = if (leftOverRow.nonEmpty) {
val a = leftOverRow.get
leftOverRow = None // reset value
a
} else {
rowIterator.next()
}
totalSize += {
row match {
case r: UnsafeRow =>
r.getSizeInBytes
case _ =>
estimatedSize
}
}
if (totalSize <= BYTES_ALLOWED_PER_BATCH) {
rows += 1
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
}
recordWriter.write(null, row)
} else {
leftOverRow = Some(row)
}
}
// passing null as context isn't used in this method
recordWriter.close(null)
queue += ParquetCachedBatch(rows, stream.toByteArray)
}
val row = rowIterator.next()
recordWriter.write(null, row)
}
// passing null as context isn't used in this method
recordWriter.close(null)
ParquetCachedBatch(rows, stream.toByteArray)
queue.dequeue()
}
}

Expand All @@ -1028,7 +1113,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* relationship. Each ColumnarBatch is converted to a single ParquetCachedBatch when next()
* is called on this iterator
*/
private class ColumnarBatchToCachedBatchIterator extends InternalRowToCachedBatchIterator {
private class ColumnarBatchToCachedBatchIterator(
p: ParquetOutputFileFormat) extends InternalRowToCachedBatchIterator(p) {
override def getIterator: Iterator[InternalRow] = {
iter.asInstanceOf[Iterator[ColumnarBatch]].next.rowIterator().asScala
}
Expand Down Expand Up @@ -1162,11 +1248,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch
}
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
Expand All @@ -1176,7 +1259,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[InternalRow](cbIter, schema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
.getInternalRowToCachedBatchIterator
.getInternalRowToCachedBatchIterator()
}
}
}
Expand All @@ -1199,7 +1282,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
/**
* Similar to ParquetFileFormat
*/
private class ParquetOutputFileFormat {
private[rapids] class ParquetOutputFileFormat {

def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = {
import ParquetOutputFormat._
Expand Down Expand Up @@ -1247,4 +1330,3 @@ private object ParquetOutputFileFormat {
memoryManager
}
}

3 changes: 2 additions & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Loading

0 comments on commit aa5dbd0

Please sign in to comment.