diff --git a/pom.xml b/pom.xml
index 30516e6d078..5e18b8fa219 100644
--- a/pom.xml
+++ b/pom.xml
@@ -171,6 +171,7 @@
3.0.1
3.0.2-SNAPSHOT
3.1.0-SNAPSHOT
+ 3.6.0
@@ -319,7 +320,7 @@
org.mockito
mockito-core
- 2.28.2
+ ${mockito.version}
test
diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala
index 0dbfa8912e0..77b7d3951d6 100644
--- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala
+++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala
@@ -44,7 +44,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow, SpecializedGetters, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData, MapData}
import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer}
@@ -190,8 +190,8 @@ private class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer
private def writeBuffers(): Unit = {
val toProcess = offHeapBuffers.dequeueAll(_ => true)
- // this could be problematic if the buffers are big as their cumulative length could be more
- // than Int.MAX_SIZE. We could just have a list of buffers in that case and iterate over them
+ // We are making sure the input is smaller than 2gb so the parquet written should never be more
+ // than Int.MAX_SIZE.
val bytes = toProcess.map(_._2).sum
// for now assert bytes are less than Int.MaxValue
@@ -253,6 +253,16 @@ private case class CloseableColumnBatchIterator(iter: Iterator[ColumnarBatch]) e
* This class assumes, the data is Columnar and the plugin is on
*/
class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
+
+ private[rapids] def getProducer(
+ cbIter: Iterator[ColumnarBatch],
+ schema: Seq[Attribute],
+ conf: Configuration,
+ sqlConf: SQLConf) : CachedBatchIteratorProducer[ColumnarBatch] = {
+ new CachedBatchIteratorProducer[ColumnarBatch](cbIter, schema, conf, sqlConf)
+
+ }
+
override def supportsColumnarInput(schema: Seq[Attribute]): Boolean = true
override def supportsColumnarOutput(schema: StructType): Boolean = schema.fields.forall { f =>
@@ -313,9 +323,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}
- input.map(batch => {
+ input.flatMap(batch => {
if (batch.numCols() == 0) {
- ParquetCachedBatch(batch.numRows(), new Array[Byte](0))
+ List(ParquetCachedBatch(batch.numRows(), new Array[Byte](0)))
} else {
withResource(putOnGpuIfNeeded(batch)) { gpuCB =>
compressColumnarBatchWithParquet(gpuCB)
@@ -330,19 +340,58 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[ColumnarBatch](cbIter, cachedSchema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
- .getColumnarBatchToCachedBatchIterator
+ .getColumnarBatchToCachedBatchIterator()
}
}
}
- private def compressColumnarBatchWithParquet(gpuCB: ColumnarBatch): ParquetCachedBatch = {
- val buffer = new ParquetBufferConsumer(gpuCB.numRows())
- withResource(GpuColumnVector.from(gpuCB)) { table =>
- withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
- writer.write(table)
+ val _2GB: Long = 2L * 1024 * 1024 * 1024
+ val APPROX_PAR_META_DATA: Int = 10 * 1024 * 1024 // we are estimating 10MB
+ val BYTES_ALLOWED_PER_BATCH: Long = _2GB - APPROX_PAR_META_DATA
+
+ private[rapids] def compressColumnarBatchWithParquet(
+ gpuCB: ColumnarBatch): List[ParquetCachedBatch] = {
+ // NOTE: this doesn't take nulls into account so we could be over-estimating the actual size
+ // but that's still better than under-estimating
+ val estimatedRowSize = scala.Range(0, gpuCB.numCols()).map { index =>
+ gpuCB.column(index).dataType().defaultSize
+ }.sum
+ val rowsInBatch = (BYTES_ALLOWED_PER_BATCH / estimatedRowSize).toInt
+ val splitIndices = scala.Range(rowsInBatch, gpuCB.numRows(), rowsInBatch)
+ val cudfTables = if (splitIndices.nonEmpty) {
+ val splutVectors = scala.Range(0, gpuCB.numCols()).map { index =>
+ gpuCB.column(index).asInstanceOf[GpuColumnVector].getBase.split(splitIndices: _*)
+ }
+ try {
+ // Splitting the table
+ // e.g. T0 = {col1, col2,...,coln} => split columns into 'm' cols =>
+ // T00= {splitCol1(0), splitCol2(0),...,splitColn(0)}
+ // T01= {splitCol1(1), splitCol2(1),...,splitColn(1)}
+ // ...
+ // T0m= {splitCol1(m), splitCol2(m),...,splitColn(m)}
+ for (i <- splutVectors(0).indices) yield
+ new Table({
+ for (j <- splutVectors.indices) yield splutVectors(j)(i)
+ }: _*)
+ } finally {
+ for (i <- splutVectors(0).indices)
+ for (j <- splutVectors.indices) splutVectors(j)(i).close()
}
+ } else {
+ Seq(GpuColumnVector.from(gpuCB))
}
- ParquetCachedBatch(buffer)
+
+ val buffers = new ListBuffer[ParquetCachedBatch]
+ cudfTables.foreach { table =>
+ withResource(table) { table =>
+ val buffer = new ParquetBufferConsumer(table.getRowCount.toInt)
+ withResource(Table.writeParquetChunked(ParquetWriterOptions.DEFAULT, buffer)) { writer =>
+ writer.write(table)
+ }
+ buffers += ParquetCachedBatch(buffer)
+ }
+ }
+ buffers.toList
}
/**
@@ -870,18 +919,20 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* @param sharedConf - SQL conf
* @tparam T - Strictly either InternalRow or ColumnarBatch
*/
- private class CachedBatchIteratorProducer[T](
+ private[rapids] class CachedBatchIteratorProducer[T](
iter: Iterator[T],
cachedAttributes: Seq[Attribute],
sharedHadoopConf: Configuration,
sharedConf: SQLConf) {
- def getInternalRowToCachedBatchIterator: Iterator[CachedBatch] = {
- new InternalRowToCachedBatchIterator
+ def getInternalRowToCachedBatchIterator(
+ p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
+ new InternalRowToCachedBatchIterator(p)
}
- def getColumnarBatchToCachedBatchIterator: Iterator[CachedBatch] = {
- new ColumnarBatchToCachedBatchIterator
+ def getColumnarBatchToCachedBatchIterator(
+ p: ParquetOutputFileFormat = new ParquetOutputFileFormat()): Iterator[CachedBatch] = {
+ new ColumnarBatchToCachedBatchIterator(p)
}
/**
@@ -889,7 +940,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* relationship. Each partition represents a single parquet file, so we encode it
* and return the CachedBatch when next is called.
*/
- private class InternalRowToCachedBatchIterator extends Iterator[CachedBatch]() {
+ private class InternalRowToCachedBatchIterator(
+ parquetOutputFileFormat: ParquetOutputFileFormat) extends Iterator[CachedBatch]() {
// is there a type that spark doesn't support by default in the schema?
val hasUnsupportedType: Boolean = cachedAttributes.exists { attribute =>
!isTypeSupportedByParquet(attribute.dataType)
@@ -992,34 +1044,67 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
}
}
- private val parquetOutputFormat = new ParquetOutputFileFormat()
+ override def hasNext: Boolean = queue.nonEmpty || iter.hasNext
+
+ private val queue = new mutable.Queue[CachedBatch]()
- override def hasNext: Boolean = iter.hasNext
+ //estimate the size of a row
+ val estimatedSize: Int = newCachedAttributes.map { attr =>
+ attr.dataType.defaultSize
+ }.sum
override def next(): CachedBatch = {
- // Each partition will be a single parquet file
- var rows = 0
- // at least a single block
- val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
- val outputFile: OutputFile = new ByteArrayOutputFile(stream)
- sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
- LegacyBehaviorPolicy.CORRECTED.toString)
- val recordWriter = SQLConf.withExistingConf(sharedConf) {
- parquetOutputFormat.getRecordWriter(outputFile, sharedHadoopConf)
- }
- val rowIterator = getIterator
- while (rowIterator.hasNext) {
- rows += 1
- if (rows < 0) {
- throw new IllegalStateException("CachedBatch doesn't support rows larger " +
- "than Int.MaxValue")
+ if (queue.isEmpty) {
+ // to store a row if we have read it but there is no room in the parquet file to put it
+ // we will put it in the next CachedBatch
+ var leftOverRow: Option[InternalRow] = None
+ val rowIterator = getIterator
+ while (rowIterator.hasNext) {
+ // Each partition will be a single parquet file
+ var rows = 0
+ // at least a single block
+ val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
+ val outputFile: OutputFile = new ByteArrayOutputFile(stream)
+ sharedConf.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
+ LegacyBehaviorPolicy.CORRECTED.toString)
+ val recordWriter = SQLConf.withExistingConf(sharedConf) {
+ parquetOutputFileFormat.getRecordWriter(outputFile, sharedHadoopConf)
+ }
+ var totalSize = 0
+ while (rowIterator.hasNext && totalSize < BYTES_ALLOWED_PER_BATCH) {
+
+ val row = if (leftOverRow.nonEmpty) {
+ val a = leftOverRow.get
+ leftOverRow = None // reset value
+ a
+ } else {
+ rowIterator.next()
+ }
+ totalSize += {
+ row match {
+ case r: UnsafeRow =>
+ r.getSizeInBytes
+ case _ =>
+ estimatedSize
+ }
+ }
+ if (totalSize <= BYTES_ALLOWED_PER_BATCH) {
+ rows += 1
+ if (rows < 0) {
+ throw new IllegalStateException("CachedBatch doesn't support rows larger " +
+ "than Int.MaxValue")
+ }
+ recordWriter.write(null, row)
+ } else {
+ leftOverRow = Some(row)
+ }
+ }
+ // passing null as context isn't used in this method
+ recordWriter.close(null)
+ queue += ParquetCachedBatch(rows, stream.toByteArray)
}
- val row = rowIterator.next()
- recordWriter.write(null, row)
}
- // passing null as context isn't used in this method
- recordWriter.close(null)
- ParquetCachedBatch(rows, stream.toByteArray)
+ queue.dequeue()
}
}
@@ -1028,7 +1113,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
* relationship. Each ColumnarBatch is converted to a single ParquetCachedBatch when next()
* is called on this iterator
*/
- private class ColumnarBatchToCachedBatchIterator extends InternalRowToCachedBatchIterator {
+ private class ColumnarBatchToCachedBatchIterator(
+ p: ParquetOutputFileFormat) extends InternalRowToCachedBatchIterator(p) {
override def getIterator: Iterator[InternalRow] = {
iter.asInstanceOf[Iterator[ColumnarBatch]].next.rowIterator().asScala
}
@@ -1162,11 +1248,8 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
- columnarBatchRdd.map(cb => {
- withResource(cb) { columnarBatch =>
- val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
- cachedBatch
- }
+ columnarBatchRdd.flatMap(cb => {
+ withResource(cb)(cb => compressColumnarBatchWithParquet(cb))
})
} else {
val broadcastedHadoopConf = getBroadcastedHadoopConf(conf, parquetSchema)
@@ -1176,7 +1259,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
cbIter =>
new CachedBatchIteratorProducer[InternalRow](cbIter, schema,
broadcastedHadoopConf.value.value, broadcastedConf.value)
- .getInternalRowToCachedBatchIterator
+ .getInternalRowToCachedBatchIterator()
}
}
}
@@ -1199,7 +1282,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
/**
* Similar to ParquetFileFormat
*/
-private class ParquetOutputFileFormat {
+private[rapids] class ParquetOutputFileFormat {
def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = {
import ParquetOutputFormat._
@@ -1247,4 +1330,3 @@ private object ParquetOutputFileFormat {
memoryManager
}
}
-
diff --git a/tests/pom.xml b/tests/pom.xml
index bdcccc456b9..3e84266aaee 100644
--- a/tests/pom.xml
+++ b/tests/pom.xml
@@ -104,7 +104,8 @@
org.mockito
- mockito-core
+ mockito-inline
+ ${mockito.version}
test
diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala
index f7c7feb2d2d..4788574f989 100644
--- a/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala
+++ b/tests/src/test/scala/com/nvidia/spark/rapids/ParquetWriterSuite.scala
@@ -17,12 +17,19 @@
package com.nvidia.spark.rapids
import java.io.File
+import java.lang.reflect.Method
import java.nio.charset.StandardCharsets
+import ai.rapids.cudf.{ColumnVector, DType, Table, TableWriter}
import org.apache.hadoop.fs.Path
import org.apache.parquet.hadoop.ParquetFileReader
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mockito.invocation.InvocationOnMock
import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.sql.types.{ByteType, DataType}
+import org.apache.spark.sql.vectorized.ColumnarBatch
/**
* Tests for writing Parquet files with the GPU.
@@ -118,4 +125,112 @@ class ParquetWriterSuite extends SparkQueryCompareTestSuite {
frame
}
}
+
+ test("convert large columnar batch to cachedbatch on single col table") {
+ if (!withCpuSparkSession(s => s.version < "3.1.0")) {
+ val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors()
+ testCompressColBatch(Array(spyCol0), Array(spyGpuCol0))
+ verify(spyCol0).split(2086912)
+ }
+ }
+
+ test("convert large columnar batch to cachedbatch on multi-col table") {
+ if (!withCpuSparkSession(s => s.version < "3.1.0")) {
+ val (spyCol0, spyGpuCol0) = getCudfAndGpuVectors()
+ val (spyCol1, spyGpuCol1) = getCudfAndGpuVectors()
+ val (spyCol2, spyGpuCol2) = getCudfAndGpuVectors()
+ testCompressColBatch(Array(spyCol0, spyCol1, spyCol2),
+ Array(spyGpuCol0, spyGpuCol1, spyGpuCol2))
+ val splitAt = Seq(695637, 1391274, 2086911, 2782548)
+ verify(spyCol0).split(splitAt: _*)
+ verify(spyCol1).split(splitAt: _*)
+ verify(spyCol2).split(splitAt: _*)
+ }
+ }
+
+ val ROWS = 3 * 1024 * 1024
+
+ private def getCudfAndGpuVectors(onHost: Boolean = false): (ColumnVector, GpuColumnVector)= {
+ val spyCol = spy(ColumnVector.fromBytes(1))
+ when(spyCol.getRowCount).thenReturn(ROWS)
+ val mockDtype = mock(classOf[DType])
+ when(mockDtype.getSizeInBytes).thenReturn(1024)
+ val mockDataType = mock(classOf[DataType])
+ val spyGpuCol = spy(GpuColumnVector.from(spyCol, ByteType))
+ when(spyCol.getType()).thenReturn(mockDtype)
+ when(spyGpuCol.dataType()).thenReturn(mockDataType)
+ when(mockDataType.defaultSize).thenReturn(1024)
+
+ (spyCol, spyGpuCol)
+ }
+
+ val _2GB = 2L * 1024 * 1024 * 1024
+ val APPROX_PAR_META_DATA = 10 * 1024 * 1024 // we are estimating 10MB
+ val BYTES_ALLOWED_PER_BATCH = _2GB - APPROX_PAR_META_DATA
+
+ private def whenSplitCalled(cb: ColumnarBatch): Unit = {
+ val rows = cb.numRows()
+ val eachRowSize = cb.numCols() * 1024
+ val rowsAllowedInABatch = BYTES_ALLOWED_PER_BATCH / eachRowSize
+ val spillOver = cb.numRows() % rowsAllowedInABatch
+ val splitRange = scala.Range(rowsAllowedInABatch.toInt, rows, rowsAllowedInABatch.toInt)
+ scala.Range(0, cb.numCols()).indices.foreach { i =>
+ val spyCol = cb.column(i).asInstanceOf[GpuColumnVector].getBase
+ val splitCols0 = scala.Range(0, splitRange.length).map { _ =>
+ val spySplitCol = spy(ColumnVector.fromBytes(4, 5, 6))
+ when(spySplitCol.getRowCount()).thenReturn(rowsAllowedInABatch)
+ spySplitCol
+ }
+ val splitCols = if (spillOver > 0) {
+ val splitCol = spy(ColumnVector.fromBytes(3))
+ when(splitCol.getRowCount()).thenReturn(spillOver)
+ splitCols0 :+ splitCol
+ } else {
+ splitCols0
+ }
+ when(spyCol.split(any())).thenReturn(splitCols.toArray)
+ }
+ }
+
+ private var compressWithParquetMethod: Option[Method] = None
+ private var parquetSerializerInstance: Option[Any] = None
+
+ private def testCompressColBatch(
+ cudfCols: Array[ColumnVector],
+ gpuCols: Array[org.apache.spark.sql.vectorized.ColumnVector]): Unit = {
+ // mock static method for Table
+ val theTableMock = mockStatic(classOf[Table], (_: InvocationOnMock) =>
+ new TableWriter {
+ override def write(table: Table): Unit = {
+ val tableSize = table.getColumn(0).getType.getSizeInBytes * table.getRowCount
+ if (tableSize > Int.MaxValue) {
+ fail(s"Parquet file went over the allowed limit of $BYTES_ALLOWED_PER_BATCH")
+ }
+ }
+
+ override def close(): Unit = {
+ // noop
+ }
+ })
+
+ withResource(cudfCols) { _ =>
+ val cb = new ColumnarBatch(gpuCols, ROWS)
+ whenSplitCalled(cb)
+ try {
+ val method = compressWithParquetMethod.getOrElse {
+ val classOfSerializer = Class.forName(
+ "com.nvidia.spark.rapids.shims.spark310.ParquetCachedBatchSerializer")
+ parquetSerializerInstance = Some(classOfSerializer.newInstance())
+ val compressWithParquet =
+ classOfSerializer.getMethod("compressColumnarBatchWithParquet",
+ classOf[ColumnarBatch])
+ compressWithParquetMethod = Some(compressWithParquet)
+ compressWithParquet
+ }
+ method.invoke(parquetSerializerInstance.get, cb)
+ } finally {
+ theTableMock.close()
+ }
+ }
+ }
}