Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write InternalRow to CachedBatch #914

Merged
merged 9 commits into from
Oct 21, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions integration_tests/src/main/python/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,12 @@ def op_df(spark, length=2048, seed=0):

assert_gpu_and_cpu_are_equal_collect(op_df, conf = enableVectorizedConf)

@pytest.mark.parametrize('data_gen', all_gen, ids=idfn)
@pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn)
@allow_non_gpu('CollectLimitExec')
def test_cache_partial_load(enableVectorizedConf):
def test_cache_partial_load(data_gen, enableVectorizedConf):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : two_col_df(spark, IntegerGen(), string_gen)
lambda spark : two_col_df(spark, data_gen, string_gen)
.select(f.col("a"), f.col("b"))
.cache()
.limit(50).select(f.col("b")), enableVectorizedConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

package com.nvidia.spark.rapids.shims.spark310

import java.io.{File, InputStream}
import java.io.InputStream
import java.nio.ByteBuffer
import java.nio.file.Files

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -28,10 +27,17 @@ import ai.rapids.cudf._
import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.RecordWriter
import org.apache.parquet.HadoopReadOptions
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.io.{DelegatingSeekableInputStream, InputFile, SeekableInputStream}
import org.apache.parquet.column.ParquetProperties
import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader, ParquetFileWriter, ParquetInputFormat, ParquetOutputFormat, ParquetRecordWriter, ParquetWriter}
import org.apache.parquet.hadoop.ParquetFileWriter.Mode
import org.apache.parquet.hadoop.ParquetOutputFormat.{MEMORY_POOL_RATIO, MIN_MEMORY_ALLOCATION}
import org.apache.parquet.hadoop.api.WriteSupport
import org.apache.parquet.hadoop.metadata.CompressionCodecName
import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream}

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -121,6 +127,41 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile {
}
}

object ByteArrayOutputFile {
val BLOCK_SIZE = 32 * 1024 * 1024 // 32M
}

class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile {
override def create(blockSizeHint: Long): PositionOutputStream = {
new DelegatingPositionOutputStream(stream) {
var pos = 0
override def getPos: Long = pos

override def write(b: Int): Unit = {
super.write(b)
pos += Integer.BYTES
}

override def write(b: Array[Byte]): Unit = {
super.write(b)
pos += b.length
}

override def write(b: Array[Byte], off: Int, len: Int): Unit = {
super.write(b, off, len)
pos += len
}
}
}

override def createOrOverwrite(blockSizeHint: Long): PositionOutputStream =
throw new UnsupportedOperationException("Don't need to overwrite")

override def supportsBlockSize(): Boolean = true

override def defaultBlockSize(): Long = ByteArrayOutputFile.BLOCK_SIZE
}

class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer with AutoCloseable {
@transient private[this] val offHeapBuffers = mutable.Queue[(HostMemoryBuffer, Long)]()
private var buffer: Array[Byte] = null
Expand Down Expand Up @@ -466,6 +507,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
hadoopConf.set(
SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString)

hadoopConf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)

hadoopConf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, false)

ParquetWriteSupport.setSchema(requestedSchema, hadoopConf)

hadoopConf
Expand All @@ -485,22 +531,126 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm {
schema: Seq[Attribute],
storageLevel: StorageLevel,
conf: SQLConf): RDD[CachedBatch] = {
val s = schema.toStructType
val converters = new GpuRowToColumnConverter(s)
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, s, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch

val parquetSchema = schema.zip(getColumnIndices(schema, schema)).map {
case (attr, index) => attr.withName("_col" + index)
}

val rapidsConf = new RapidsConf(conf)
val sparkSession = SparkSession.active

val hadoopConf = getHadoopConf(parquetSchema.toStructType, conf)
val broadcastedHadoopConf =
sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

SQLConf.get.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key,
LegacyBehaviorPolicy.CORRECTED.toString)

val broadcastedConf = sparkSession.sparkContext.broadcast(conf)

if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) {
val structSchema = schema.toStructType
val converters = new GpuRowToColumnConverter(structSchema)
val columnarBatchRdd = input.mapPartitions(iter => {
new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters)
})
columnarBatchRdd.map(cb => {
withResource(cb) { columnarBatch =>
val cachedBatch = compressColumnarBatchWithParquet(columnarBatch)
cachedBatch
}
})
} else {
// fallback to the CPU
input.mapPartitions {
internalRowIter =>
new Iterator[CachedBatch]() {
private val parquetOutputFormat = new ParquetOutputFileFormat()

override def hasNext: Boolean = internalRowIter.hasNext

override def next(): CachedBatch = {
// Each partition will be a single parquet file
var rows = 0
// at least a single block
val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE)
val outputFile: OutputFile = new ByteArrayOutputFile(stream)

val recordWriter = SQLConf.withExistingConf(broadcastedConf.value) {
parquetOutputFormat.getRecordWriter(outputFile, broadcastedHadoopConf.value.value)
}

while (internalRowIter.hasNext) {
rows += 1
jlowe marked this conversation as resolved.
Show resolved Hide resolved
if (rows < 0) {
throw new IllegalStateException("CachedBatch doesn't support rows larger " +
"than Int.MaxValue")
}
val row = internalRowIter.next()
recordWriter.write(null, row)
}
// passing null as context isn't used in this method
recordWriter.close(null)
ParquetCachedBatch(rows, stream.toByteArray)
}
}
}
})
}
}

override def buildFilter(predicates: Seq[Expression],
cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = {
//essentially a noop
(partId: Int, b: Iterator[CachedBatch]) => b
//essentially a noop
(partId: Int, b: Iterator[CachedBatch]) => b
}
}

/**
* Similar to ParquetFileFormat
*/
class ParquetOutputFileFormat() {

def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = {
import ParquetOutputFormat._

val blockSize = getLongBlockSize(conf)
val maxPaddingSize =
conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT)
val validating = getValidation(conf)

val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]]
val init = writeSupport.init(conf)
val writer = new ParquetFileWriter(output, init.getSchema,
Mode.CREATE, blockSize, maxPaddingSize)
writer.start()

val writerVersion =
ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION,
ParquetProperties.WriterVersion.PARQUET_1_0.toString))

val codecFactory = new CodecFactory(conf, getPageSize(conf))

new ParquetRecordWriter[InternalRow](writer, writeSupport, init.getSchema,
init.getExtraMetaData, blockSize, getPageSize(conf),
codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf),
getEnableDictionary(conf), validating, writerVersion,
ParquetOutputFileFormat.getMemoryManager(conf))
}
}

object ParquetOutputFileFormat {
var memoryManager: MemoryManager = null
val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f
val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB

def getMemoryManager(conf: Configuration): MemoryManager = {
synchronized {
if (memoryManager == null) {
val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO)
val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION)
memoryManager = new MemoryManager(maxLoad, minAllocation)
}
}
memoryManager
}
}