Skip to content

Commit

Permalink
Write InternalRow to CachedBatch (NVIDIA#914)
Browse files Browse the repository at this point in the history
* This PR writes the InternalRow to a CachedBatch using the RecordWriter. This parquet file is an in-memory file and is saved as buffers inside a CachedBatch.
  • Loading branch information
razajafri authored Oct 21, 2020
1 parent d99086e commit c630ee6
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 18 deletions.
5 changes: 3 additions & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,12 @@ def op_df(spark, length=2048, seed=0):

assert_gpu_and_cpu_are_equal_collect(op_df, conf = enableVectorizedConf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
@allow_non_gpu('CollectLimitExec')
def test_cache_partial_load(enableVectorizedConf):
def test_cache_partial_load(data_gen, enableVectorizedConf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, IntegerGen(), string_gen)
lambda spark : two_col_df(spark, data_gen, string_gen)
.select(f.col("a"), f.col("b"))
.cache()
.limit(50).select(f.col("b")), enableVectorizedConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.nvidia.spark.rapids.shims.spark310

import java.io.{File, InputStream}
import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.file.Files

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -28,10 +27,17 @@ import ai.rapids.cudf._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.RecordWriter
import org.apache.parquet.HadoopReadOptions
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.io.{DelegatingSeekableInputStream, InputFile, SeekableInputStream}
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader, ParquetFileWriter, ParquetInputFormat, ParquetOutputFormat, ParquetRecordWriter, ParquetWriter}
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.ParquetOutputFormat.{MEMORY_POOL_RATIO, MIN_MEMORY_ALLOCATION}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream}

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -121,6 +127,41 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile {
}
}

object ByteArrayOutputFile {
val BLOCK_SIZE = 32 * 1024 * 1024 // 32M
}

class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile {
override def create(blockSizeHint: Long): PositionOutputStream = {
new DelegatingPositionOutputStream(stream) {
var pos = 0
override def getPos: Long = pos

override def write(b: Int): Unit = {
super.write(b)
pos += Integer.BYTES
}

override def write(b: Array[Byte]): Unit = {
super.write(b)
pos += b.length
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
super.write(b, off, len)
pos += len
}
}
}

override def createOrOverwrite(blockSizeHint: Long): PositionOutputStream =
throw new UnsupportedOperationException("Don't need to overwrite")

override def supportsBlockSize(): Boolean = true

override def defaultBlockSize(): Long = ByteArrayOutputFile.BLOCK_SIZE
}

class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer with AutoCloseable {
@transient private[this] val offHeapBuffers = mutable.Queue[(HostMemoryBuffer, Long)]()
private var buffer: Array[Byte] = null
Expand Down Expand Up @@ -466,6 +507,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
hadoopConf.set(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString)

hadoopConf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)

hadoopConf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, false)

ParquetWriteSupport.setSchema(requestedSchema, hadoopConf)

hadoopConf
Expand All @@ -485,22 +531,126 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] = {
val s = schema.toStructType
val converters = new GpuRowToColumnConverter(s)
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, s, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch

val parquetSchema = schema.zip(getColumnIndices(schema, schema)).map {
case (attr, index) => attr.withName("_col" + index)
}

val rapidsConf = new RapidsConf(conf)
val sparkSession = SparkSession.active

val hadoopConf = getHadoopConf(parquetSchema.toStructType, conf)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

SQLConf.get.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)

val broadcastedConf = sparkSession.sparkContext.broadcast(conf)

if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) {
val structSchema = schema.toStructType
val converters = new GpuRowToColumnConverter(structSchema)
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch
}
})
} else {
// fallback to the CPU
input.mapPartitions {
internalRowIter =>
new Iterator[CachedBatch]() {
private val parquetOutputFormat = new ParquetOutputFileFormat()

override def hasNext: Boolean = internalRowIter.hasNext

override def next(): CachedBatch = {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)

val recordWriter = SQLConf.withExistingConf(broadcastedConf.value) {
parquetOutputFormat.getRecordWriter(outputFile, broadcastedHadoopConf.value.value)
}

while (internalRowIter.hasNext) {
rows += 1
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
}
val row = internalRowIter.next()
recordWriter.write(null, row)
}
// passing null as context isn't used in this method
recordWriter.close(null)
ParquetCachedBatch(rows, stream.toByteArray)
}
}
}
})
}
}

override def buildFilter(predicates: Seq[Expression],
cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
//essentially a noop
(partId: Int, b: Iterator[CachedBatch]) => b
//essentially a noop
(partId: Int, b: Iterator[CachedBatch]) => b
}
}

/**
* Similar to ParquetFileFormat
*/
class ParquetOutputFileFormat() {

def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = {
import ParquetOutputFormat._

val blockSize = getLongBlockSize(conf)
val maxPaddingSize =
conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT)
val validating = getValidation(conf)

val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]]
val init = writeSupport.init(conf)
val writer = new ParquetFileWriter(output, init.getSchema,
Mode.CREATE, blockSize, maxPaddingSize)
writer.start()

val writerVersion =
ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString))

val codecFactory = new CodecFactory(conf, getPageSize(conf))

new ParquetRecordWriter[InternalRow](writer, writeSupport, init.getSchema,
init.getExtraMetaData, blockSize, getPageSize(conf),
codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf),
getEnableDictionary(conf), validating, writerVersion,
ParquetOutputFileFormat.getMemoryManager(conf))
}
}

object ParquetOutputFileFormat {
var memoryManager: MemoryManager = null
val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f
val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB

def getMemoryManager(conf: Configuration): MemoryManager = {
synchronized {
if (memoryManager == null) {
val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO)
val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION)
memoryManager = new MemoryManager(maxLoad, minAllocation)
}
}
memoryManager
}
}

0 comments on commit c630ee6

Please sign in to comment.