From 8fe81e787d1549f08c581c861620b448b8c37c22 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 20 Oct 2020 19:55:58 -0700 Subject: [PATCH] Write InternalRow to CachedBatch (#914) * This PR writes the InternalRow to a CachedBatch using the RecordWriter. This parquet file is an in-memory file and is saved as buffers inside a CachedBatch. --- .../src/main/python/cache_test.py | 5 +- .../ParquetCachedBatchSerializer.scala | 182 ++++++++++++++++-- 2 files changed, 169 insertions(+), 18 deletions(-) diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index af10cbe4b3a..4ba1741e1c1 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -202,11 +202,12 @@ def op_df(spark, length=2048, seed=0): assert_gpu_and_cpu_are_equal_collect(op_df, conf = enableVectorizedConf) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn) @allow_non_gpu('CollectLimitExec') -def test_cache_partial_load(enableVectorizedConf): +def test_cache_partial_load(data_gen, enableVectorizedConf): assert_gpu_and_cpu_are_equal_collect( - lambda spark : two_col_df(spark, IntegerGen(), string_gen) + lambda spark : two_col_df(spark, data_gen, string_gen) .select(f.col("a"), f.col("b")) .cache() .limit(50).select(f.col("b")), enableVectorizedConf diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 103bdf291ca..84d15534eb6 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -16,9 +16,8 @@ package com.nvidia.spark.rapids.shims.spark310 -import java.io.{File, InputStream} +import java.io.InputStream import java.nio.ByteBuffer -import java.nio.file.Files import scala.collection.JavaConverters._ import scala.collection.mutable @@ -28,10 +27,17 @@ import ai.rapids.cudf._ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.RecordWriter import org.apache.parquet.HadoopReadOptions -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} -import org.apache.parquet.io.{DelegatingSeekableInputStream, InputFile, SeekableInputStream} +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader, ParquetFileWriter, ParquetInputFormat, ParquetOutputFormat, ParquetRecordWriter, ParquetWriter} +import org.apache.parquet.hadoop.ParquetFileWriter.Mode +import org.apache.parquet.hadoop.ParquetOutputFormat.{MEMORY_POOL_RATIO, MIN_MEMORY_ALLOCATION} +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream} import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD @@ -121,6 +127,41 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile { } } +object ByteArrayOutputFile { + val BLOCK_SIZE = 32 * 1024 * 1024 // 32M +} + +class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile { + override def create(blockSizeHint: Long): PositionOutputStream = { + new DelegatingPositionOutputStream(stream) { + var pos = 0 + override def getPos: Long = pos + + override def write(b: Int): Unit = { + super.write(b) + pos += Integer.BYTES + } + + override def write(b: Array[Byte]): Unit = { + super.write(b) + pos += b.length + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + super.write(b, off, len) + pos += len + } + } + } + + override def createOrOverwrite(blockSizeHint: Long): PositionOutputStream = + throw new UnsupportedOperationException("Don't need to overwrite") + + override def supportsBlockSize(): Boolean = true + + override def defaultBlockSize(): Long = ByteArrayOutputFile.BLOCK_SIZE +} + class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer with AutoCloseable { @transient private[this] val offHeapBuffers = mutable.Queue[(HostMemoryBuffer, Long)]() private var buffer: Array[Byte] = null @@ -466,6 +507,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { hadoopConf.set( SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString) + hadoopConf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) + + hadoopConf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, false) + ParquetWriteSupport.setSchema(requestedSchema, hadoopConf) hadoopConf @@ -485,22 +531,126 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { schema: Seq[Attribute], storageLevel: StorageLevel, conf: SQLConf): RDD[CachedBatch] = { - val s = schema.toStructType - val converters = new GpuRowToColumnConverter(s) - val columnarBatchRdd = input.mapPartitions(iter => { - new RowToColumnarIterator(iter, s, RequireSingleBatch, converters) - }) - columnarBatchRdd.map(cb => { - withResource(cb) { columnarBatch => - val cachedBatch = compressColumnarBatchWithParquet(columnarBatch) - cachedBatch + + val parquetSchema = schema.zip(getColumnIndices(schema, schema)).map { + case (attr, index) => attr.withName("_col" + index) + } + + val rapidsConf = new RapidsConf(conf) + val sparkSession = SparkSession.active + + val hadoopConf = getHadoopConf(parquetSchema.toStructType, conf) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + SQLConf.get.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, + LegacyBehaviorPolicy.CORRECTED.toString) + + val broadcastedConf = sparkSession.sparkContext.broadcast(conf) + + if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) { + val structSchema = schema.toStructType + val converters = new GpuRowToColumnConverter(structSchema) + val columnarBatchRdd = input.mapPartitions(iter => { + new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters) + }) + columnarBatchRdd.map(cb => { + withResource(cb) { columnarBatch => + val cachedBatch = compressColumnarBatchWithParquet(columnarBatch) + cachedBatch + } + }) + } else { + // fallback to the CPU + input.mapPartitions { + internalRowIter => + new Iterator[CachedBatch]() { + private val parquetOutputFormat = new ParquetOutputFileFormat() + + override def hasNext: Boolean = internalRowIter.hasNext + + override def next(): CachedBatch = { + // Each partition will be a single parquet file + var rows = 0 + // at least a single block + val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE) + val outputFile: OutputFile = new ByteArrayOutputFile(stream) + + val recordWriter = SQLConf.withExistingConf(broadcastedConf.value) { + parquetOutputFormat.getRecordWriter(outputFile, broadcastedHadoopConf.value.value) + } + + while (internalRowIter.hasNext) { + rows += 1 + if (rows < 0) { + throw new IllegalStateException("CachedBatch doesn't support rows larger " + + "than Int.MaxValue") + } + val row = internalRowIter.next() + recordWriter.write(null, row) + } + // passing null as context isn't used in this method + recordWriter.close(null) + ParquetCachedBatch(rows, stream.toByteArray) + } + } } - }) + } } override def buildFilter(predicates: Seq[Expression], cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = { - //essentially a noop - (partId: Int, b: Iterator[CachedBatch]) => b + //essentially a noop + (partId: Int, b: Iterator[CachedBatch]) => b + } +} + +/** + * Similar to ParquetFileFormat + */ +class ParquetOutputFileFormat() { + + def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = { + import ParquetOutputFormat._ + + val blockSize = getLongBlockSize(conf) + val maxPaddingSize = + conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT) + val validating = getValidation(conf) + + val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] + val init = writeSupport.init(conf) + val writer = new ParquetFileWriter(output, init.getSchema, + Mode.CREATE, blockSize, maxPaddingSize) + writer.start() + + val writerVersion = + ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString)) + + val codecFactory = new CodecFactory(conf, getPageSize(conf)) + + new ParquetRecordWriter[InternalRow](writer, writeSupport, init.getSchema, + init.getExtraMetaData, blockSize, getPageSize(conf), + codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf), + getEnableDictionary(conf), validating, writerVersion, + ParquetOutputFileFormat.getMemoryManager(conf)) + } +} + +object ParquetOutputFileFormat { + var memoryManager: MemoryManager = null + val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f + val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB + + def getMemoryManager(conf: Configuration): MemoryManager = { + synchronized { + if (memoryManager == null) { + val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) + val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) + memoryManager = new MemoryManager(maxLoad, minAllocation) + } + } + memoryManager } }