Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunking input before writing a ParquetCachedBatch #1265

Merged
merged 13 commits into from
Dec 7, 2020
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@
<spark301.version>3.0.1</spark301.version>
<spark302.version>3.0.2-SNAPSHOT</spark302.version>
<spark310.version>3.1.0-SNAPSHOT</spark310.version>
<mockito.version>3.6.0</mockito.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -319,7 +320,7 @@
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.28.2</version>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
Expand Down Expand Up @@ -190,8 +190,8 @@ private class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer

private def writeBuffers(): Unit = {
val toProcess = offHeapBuffers.dequeueAll(_ => true)
// this could be problematic if the buffers are big as their cumulative length could be more
// than Int.MAX_SIZE. We could just have a list of buffers in that case and iterate over them
// We are making sure the input is smaller than 2gb so the parquet written should never be more
// than Int.MAX_SIZE.
val bytes = toProcess.map(_._2).sum

// for now assert bytes are less than Int.MaxValue
Expand Down Expand Up @@ -253,6 +253,16 @@ private case class CloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]) e
* This class assumes, the data is Columnar and the plugin is on
*/
class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was made to gain access to the Inner class, open to suggestions to make this better

private[rapids] def getProducer(
jlowe marked this conversation as resolved.
Show resolved Hide resolved
cbIter: Iterator[ColumnarBatch],
schema: Seq[Attribute],
conf: Configuration,
sqlConf: SQLConf) : CachedBatchIteratorProducer[ColumnarBatch] = {
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, schema, conf, sqlConf)

}

override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = true

override def supportsColumnarOutput(schema: StructType): Boolean = schema.fields.forall { f =>
Expand Down Expand Up @@ -313,9 +323,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}

input.map(batch => {
input.flatMap(batch => {
if (batch.numCols() == 0) {
ParquetCachedBatch(batch.numRows(), new Array[Byte](0))
List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB)
Expand All @@ -330,19 +340,58 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, cachedSchema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
.getColumnarBatchToCachedBatchIterator
.getColumnarBatchToCachedBatchIterator()
}
}
}

private def compressColumnarBatchWithParquet(gpuCB: ColumnarBatch): ParquetCachedBatch = {
val buffer = new ParquetBufferConsumer(gpuCB.numRows())
withResource(GpuColumnVector.from(gpuCB)) { table =>
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
val _2GB: Long = 2L * 1024 * 1024 * 1024
val APPROX_PAR_META_DATA: Int = 10 * 1024 * 1024 // we are estimating 10MB
val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA

private[rapids] def compressColumnarBatchWithParquet(
gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
// NOTE: this doesn't take nulls into account so we could be over-estimating the actual size
// but that's still better than under-estimating
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { index =>
gpuCB.column(index).dataType().defaultSize
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}.sum
val rowsInBatch = (BYTES_ALLOWED_PER_BATCH / estimatedRowSize).toInt
val splitIndices = scala.Range(rowsInBatch, gpuCB.numRows(), rowsInBatch)
val cudfTables = if (splitIndices.nonEmpty) {
jlowe marked this conversation as resolved.
Show resolved Hide resolved
val splutVectors = scala.Range(0, gpuCB.numCols()).map { index =>
jlowe marked this conversation as resolved.
Show resolved Hide resolved
gpuCB.column(index).asInstanceOf[GpuColumnVector].getBase.split(splitIndices: _*)
}
try {
// Splitting the table
// e.g. T0 = {col1, col2,...,coln} => split columns into 'm' cols =>
// T00= {splitCol1(0), splitCol2(0),...,splitColn(0)}
// T01= {splitCol1(1), splitCol2(1),...,splitColn(1)}
// ...
// T0m= {splitCol1(m), splitCol2(m),...,splitColn(m)}
for (i <- splutVectors(0).indices) yield
new Table({
for (j <- splutVectors.indices) yield splutVectors(j)(i)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}: _*)
} finally {
for (i <- splutVectors(0).indices)
for (j <- splutVectors.indices) splutVectors(j)(i).close()
}
} else {
Seq(GpuColumnVector.from(gpuCB))
}
ParquetCachedBatch(buffer)

val buffers = new ListBuffer[ParquetCachedBatch]
cudfTables.foreach { table =>
revans2 marked this conversation as resolved.
Show resolved Hide resolved
withResource(table) { table =>
val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
writer.write(table)
}
buffers += ParquetCachedBatch(buffer)
}
}
buffers.toList
}

/**
Expand Down Expand Up @@ -870,26 +919,29 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* @param sharedConf - SQL conf
* @tparam T - Strictly either InternalRow or ColumnarBatch
*/
private class CachedBatchIteratorProducer[T](
private[rapids] class CachedBatchIteratorProducer[T](
iter: Iterator[T],
cachedAttributes: Seq[Attribute],
sharedHadoopConf: Configuration,
sharedConf: SQLConf) {

def getInternalRowToCachedBatchIterator: Iterator[CachedBatch] = {
new InternalRowToCachedBatchIterator
def getInternalRowToCachedBatchIterator(
p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
new InternalRowToCachedBatchIterator(p)
}

def getColumnarBatchToCachedBatchIterator: Iterator[CachedBatch] = {
new ColumnarBatchToCachedBatchIterator
def getColumnarBatchToCachedBatchIterator(
p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
new ColumnarBatchToCachedBatchIterator(p)
}

/**
* This class produces an Iterator[CachedBatch] from Iterator[InternalRow]. This is a n-1
* relationship. Each partition represents a single parquet file, so we encode it
* and return the CachedBatch when next is called.
*/
private class InternalRowToCachedBatchIterator extends Iterator[CachedBatch]() {
private class InternalRowToCachedBatchIterator(
parquetOutputFileFormat: ParquetOutputFileFormat) extends Iterator[CachedBatch]() {
// is there a type that spark doesn't support by default in the schema?
val hasUnsupportedType: Boolean = cachedAttributes.exists { attribute =>
!isTypeSupportedByParquet(attribute.dataType)
Expand Down Expand Up @@ -992,34 +1044,67 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}

private val parquetOutputFormat = new ParquetOutputFileFormat()
override def hasNext: Boolean = queue.nonEmpty || iter.hasNext

private val queue = new mutable.Queue[CachedBatch]()

override def hasNext: Boolean = iter.hasNext
//estimate the size of a row
val estimatedSize: Int = newCachedAttributes.map { attr =>
attr.dataType.defaultSize
}.sum

override def next(): CachedBatch = {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)
sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)
val recordWriter = SQLConf.withExistingConf(sharedConf) {
parquetOutputFormat.getRecordWriter(outputFile, sharedHadoopConf)
}
val rowIterator = getIterator
while (rowIterator.hasNext) {
rows += 1
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
if (queue.isEmpty) {
// to store a row if we have read it but there is no room in the parquet file to put it
// we will put it in the next CachedBatch
var leftOverRow: Option[InternalRow] = None
val rowIterator = getIterator
while (rowIterator.hasNext) {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)
sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)
val recordWriter = SQLConf.withExistingConf(sharedConf) {
parquetOutputFileFormat.getRecordWriter(outputFile, sharedHadoopConf)
}
var totalSize = 0
while (rowIterator.hasNext && totalSize < BYTES_ALLOWED_PER_BATCH) {

val row = if (leftOverRow.nonEmpty) {
val a = leftOverRow.get
leftOverRow = None // reset value
a
} else {
rowIterator.next()
}
totalSize += {
row match {
case r: UnsafeRow =>
r.getSizeInBytes
case _ =>
estimatedSize
}
}
if (totalSize <= BYTES_ALLOWED_PER_BATCH) {
rows += 1
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
}
recordWriter.write(null, row)
} else {
leftOverRow = Some(row)
}
}
// passing null as context isn't used in this method
recordWriter.close(null)
queue += ParquetCachedBatch(rows, stream.toByteArray)
}
val row = rowIterator.next()
recordWriter.write(null, row)
}
// passing null as context isn't used in this method
recordWriter.close(null)
ParquetCachedBatch(rows, stream.toByteArray)
queue.dequeue()
}
}

Expand All @@ -1028,7 +1113,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* relationship. Each ColumnarBatch is converted to a single ParquetCachedBatch when next()
* is called on this iterator
*/
private class ColumnarBatchToCachedBatchIterator extends InternalRowToCachedBatchIterator {
private class ColumnarBatchToCachedBatchIterator(
p: ParquetOutputFileFormat) extends InternalRowToCachedBatchIterator(p) {
override def getIterator: Iterator[InternalRow] = {
iter.asInstanceOf[Iterator[ColumnarBatch]].next.rowIterator().asScala
}
Expand Down Expand Up @@ -1162,11 +1248,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch
}
columnarBatchRdd.flatMap(cb => {
withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
Expand All @@ -1176,7 +1259,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[InternalRow](cbIter, schema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
.getInternalRowToCachedBatchIterator
.getInternalRowToCachedBatchIterator()
}
}
}
Expand All @@ -1199,7 +1282,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
/**
* Similar to ParquetFileFormat
*/
private class ParquetOutputFileFormat {
private[rapids] class ParquetOutputFileFormat {

def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = {
import ParquetOutputFormat._
Expand Down Expand Up @@ -1247,4 +1330,3 @@ private object ParquetOutputFileFormat {
memoryManager
}
}

3 changes: 2 additions & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<artifactId>mockito-inline</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Loading