From e05ca8370cd7c7139becd702d3bccf8cd26e4803 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 29 Sep 2020 23:05:18 -0700 Subject: [PATCH 1/6] Write InternalRow to CachedBatch --- .../src/main/python/cache_test.py | 5 +- .../ParquetCachedBatchSerializer.scala | 121 +++++++++++++++--- .../hadoop/ParquetOutputFileFormat.scala | 74 +++++++++++ 3 files changed, 183 insertions(+), 17 deletions(-) create mode 100644 shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala diff --git a/integration_tests/src/main/python/cache_test.py b/integration_tests/src/main/python/cache_test.py index 9f339d699b9..14070615235 100644 --- a/integration_tests/src/main/python/cache_test.py +++ b/integration_tests/src/main/python/cache_test.py @@ -201,11 +201,12 @@ def op_df(spark, length=2048, seed=0): assert_gpu_and_cpu_are_equal_collect(op_df, conf = enableVectorizedConf) +@pytest.mark.parametrize('data_gen', all_gen, ids=idfn) @pytest.mark.parametrize('enableVectorizedConf', enableVectorizedConf, ids=idfn) @allow_non_gpu('CollectLimitExec') -def test_cache_partial_load(enableVectorizedConf): +def test_cache_partial_load(data_gen, enableVectorizedConf): assert_gpu_and_cpu_are_equal_collect( - lambda spark : two_col_df(spark, IntegerGen(), string_gen) + lambda spark : two_col_df(spark, data_gen, string_gen) .select(f.col("a"), f.col("b")) .cache() .limit(50).select(f.col("b")), enableVectorizedConf diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 103bdf291ca..367bb95fba4 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -16,7 +16,7 @@ package com.nvidia.spark.rapids.shims.spark310 -import java.io.{File, InputStream} +import java.io.{ByteArrayOutputStream, File, InputStream} import java.nio.ByteBuffer import java.nio.file.Files @@ -30,8 +30,8 @@ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.HadoopReadOptions -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} -import org.apache.parquet.io.{DelegatingSeekableInputStream, InputFile, SeekableInputStream} +import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetOutputFileFormat} +import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream} import org.apache.spark.TaskContext import org.apache.spark.rdd.RDD @@ -121,6 +121,41 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile { } } +class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile { + // default to 32 MB: large enough to minimize the impact of seeks + var blockSize = 32L * 1024 * 1024 + + override def create(blockSizeHint: Long): PositionOutputStream = { + blockSize = blockSizeHint + new DelegatingPositionOutputStream(stream) { + var pos = 0 + override def getPos: Long = pos + + override def write(b: Int): Unit = { + super.write(b) + pos += Integer.BYTES + } + + override def write(b: Array[Byte]): Unit = { + super.write(b) + pos += b.length + } + + override def write(b: Array[Byte], off: Int, len: Int): Unit = { + super.write(b, off, len) + pos += len + } + } + } + + override def createOrOverwrite(blockSizeHint: Long): PositionOutputStream = + throw new UnsupportedOperationException("Don't need to overwrite") + + override def supportsBlockSize(): Boolean = true + + override def defaultBlockSize(): Long = blockSize +} + class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer with AutoCloseable { @transient private[this] val offHeapBuffers = mutable.Queue[(HostMemoryBuffer, Long)]() private var buffer: Array[Byte] = null @@ -466,6 +501,11 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { hadoopConf.set( SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, LegacyBehaviorPolicy.CORRECTED.toString) + hadoopConf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) + + hadoopConf.setBoolean(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, false) + ParquetWriteSupport.setSchema(requestedSchema, hadoopConf) hadoopConf @@ -485,22 +525,73 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { schema: Seq[Attribute], storageLevel: StorageLevel, conf: SQLConf): RDD[CachedBatch] = { - val s = schema.toStructType - val converters = new GpuRowToColumnConverter(s) - val columnarBatchRdd = input.mapPartitions(iter => { - new RowToColumnarIterator(iter, s, RequireSingleBatch, converters) - }) - columnarBatchRdd.map(cb => { - withResource(cb) { columnarBatch => - val cachedBatch = compressColumnarBatchWithParquet(columnarBatch) - cachedBatch + + val parquetSchema = schema.zip(getColumnIndices(schema, schema)).map { + case (attr, index) => attr.withName("_col" + index) + } + + val rapidsConf = new RapidsConf(conf) + val sparkSession = SparkSession.active + + val hadoopConf = getHadoopConf(parquetSchema.toStructType, conf) + val broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) + + SQLConf.get.setConfString(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key, + LegacyBehaviorPolicy.CORRECTED.toString) + + val broadcastedConf = sparkSession.sparkContext.broadcast(conf) + + if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) { + val s = schema.toStructType + val converters = new GpuRowToColumnConverter(s) + val columnarBatchRdd = input.mapPartitions(iter => { + new RowToColumnarIterator(iter, s, RequireSingleBatch, converters) + }) + columnarBatchRdd.map(cb => { + withResource(cb) { columnarBatch => + val cachedBatch = compressColumnarBatchWithParquet(columnarBatch) + cachedBatch + } + }) + } else { + // fallback to the CPU + input.mapPartitions { + cbIter => + new Iterator[CachedBatch]() { + private val parquetOutputFormat = + SQLConf.withExistingConf(broadcastedConf.value) { + new ParquetOutputFileFormat() + } + + override def hasNext: Boolean = cbIter.hasNext + + override def next(): CachedBatch = { + // Each partition will be a single parquet file + var rows = 0 + val stream = new ByteArrayOutputStream() + val outputFile: OutputFile = new ByteArrayOutputFile(stream) + + val recordWriter = parquetOutputFormat.getRecordWriter(outputFile, + broadcastedHadoopConf.value.value) + + while (cbIter.hasNext) { + rows += 1 + val row = cbIter.next() + recordWriter.write(null, row) + } + // passing null as context isn't used in this method + recordWriter.close(null) + ParquetCachedBatch(rows, stream.toByteArray) + } + } } - }) + } } override def buildFilter(predicates: Seq[Expression], cachedAttributes: Seq[Attribute]): (Int, Iterator[CachedBatch]) => Iterator[CachedBatch] = { - //essentially a noop - (partId: Int, b: Iterator[CachedBatch]) => b + //essentially a noop + (partId: Int, b: Iterator[CachedBatch]) => b } } diff --git a/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala b/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala new file mode 100644 index 00000000000..2540e0c6f26 --- /dev/null +++ b/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.parquet.hadoop + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.RecordWriter +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.ParquetFileWriter.Mode +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.io.OutputFile + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport + +class ParquetOutputFileFormat { + + private var memoryManager: MemoryManager = null + val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] + val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f + val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB + + def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = { + import ParquetOutputFormat._ + + val blockSize = getLongBlockSize(conf) + val maxPaddingSize = + conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT) + val validating = getValidation(conf) + + val init = writeSupport.init(conf) + val w = new ParquetFileWriter(output, init.getSchema, + Mode.CREATE, blockSize, maxPaddingSize) + w.start() + + val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) + val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) + + classOf[ParquetOutputFormat[InternalRow]].synchronized { + memoryManager = new MemoryManager(maxLoad, minAllocation) + } + + val writerVersion = + ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString)) + + val codecFactory = new CodecFactory(conf, getPageSize(conf)) + + // we should be using this constructor but its package-private. I am getting a RTE thrown even + // though this class is in the same package as ParquetRecordWriter.scala + // new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, + // init.getExtraMetaData, blockSize, CompressionCodecName.SNAPPY, validating, + // props, memoryManager, conf) + + new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, init.getExtraMetaData, + blockSize, getPageSize(conf), codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), + getDictionaryPageSize(conf), getEnableDictionary(conf), validating, writerVersion, + memoryManager) + } +} From ef7e02f6a53c5c0e46493feee368ba6d9dbcb1a6 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Wed, 7 Oct 2020 13:56:19 -0700 Subject: [PATCH 2/6] Sign off empty-commit Signed-off-by: Raza Jafri From 054bdfa861877a9c9d37a3b2a96b39bdf5976e99 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 16 Oct 2020 17:52:26 -0700 Subject: [PATCH 3/6] addressed review comments --- .../ParquetCachedBatchSerializer.scala | 76 +++++++++++++++---- .../hadoop/ParquetOutputFileFormat.scala | 74 ------------------ 2 files changed, 63 insertions(+), 87 deletions(-) delete mode 100644 shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 367bb95fba4..215f329a713 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -16,9 +16,8 @@ package com.nvidia.spark.rapids.shims.spark310 -import java.io.{ByteArrayOutputStream, File, InputStream} +import java.io.InputStream import java.nio.ByteBuffer -import java.nio.file.Files import scala.collection.JavaConverters._ import scala.collection.mutable @@ -28,9 +27,15 @@ import ai.rapids.cudf._ import com.nvidia.spark.rapids._ import com.nvidia.spark.rapids.GpuColumnVector.GpuColumnarBatchBuilder import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import org.apache.commons.io.output.ByteArrayOutputStream import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.mapreduce.RecordWriter import org.apache.parquet.HadoopReadOptions -import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat, ParquetOutputFileFormat} +import org.apache.parquet.column.ParquetProperties +import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader, ParquetFileWriter, ParquetInputFormat, ParquetOutputFormat, ParquetRecordWriter, ParquetWriter} +import org.apache.parquet.hadoop.ParquetFileWriter.Mode +import org.apache.parquet.hadoop.api.WriteSupport +import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream} import org.apache.spark.TaskContext @@ -121,12 +126,12 @@ class ByteArrayInputFile(buff: Array[Byte]) extends InputFile { } } -class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile { - // default to 32 MB: large enough to minimize the impact of seeks - var blockSize = 32L * 1024 * 1024 +object ByteArrayOutputFile { + val BLOCK_SIZE = 32 * 1024 * 1024 // 32M +} +class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile { override def create(blockSizeHint: Long): PositionOutputStream = { - blockSize = blockSizeHint new DelegatingPositionOutputStream(stream) { var pos = 0 override def getPos: Long = pos @@ -153,7 +158,7 @@ class ByteArrayOutputFile(stream: ByteArrayOutputStream) extends OutputFile { override def supportsBlockSize(): Boolean = true - override def defaultBlockSize(): Long = blockSize + override def defaultBlockSize(): Long = ByteArrayOutputFile.BLOCK_SIZE } class ParquetBufferConsumer(val numRows: Int) extends HostBufferConsumer with AutoCloseable { @@ -557,27 +562,32 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { } else { // fallback to the CPU input.mapPartitions { - cbIter => + internalRowIter => new Iterator[CachedBatch]() { private val parquetOutputFormat = SQLConf.withExistingConf(broadcastedConf.value) { new ParquetOutputFileFormat() } - override def hasNext: Boolean = cbIter.hasNext + override def hasNext: Boolean = internalRowIter.hasNext override def next(): CachedBatch = { // Each partition will be a single parquet file var rows = 0 - val stream = new ByteArrayOutputStream() + // at least a single block + val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE) val outputFile: OutputFile = new ByteArrayOutputFile(stream) val recordWriter = parquetOutputFormat.getRecordWriter(outputFile, broadcastedHadoopConf.value.value) - while (cbIter.hasNext) { + while (internalRowIter.hasNext) { rows += 1 - val row = cbIter.next() + if (rows < 0) { + throw new IllegalStateException("CachedBatch doesn't support rows larger " + + "than Int.MaxValue") + } + val row = internalRowIter.next() recordWriter.write(null, row) } // passing null as context isn't used in this method @@ -595,3 +605,43 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { (partId: Int, b: Iterator[CachedBatch]) => b } } + +/** + * Similar to ParquetFileFormat + */ +class ParquetOutputFileFormat { + + val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] + val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f + val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB + + def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = { + import ParquetOutputFormat._ + + val blockSize = getLongBlockSize(conf) + val maxPaddingSize = + conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT) + val validating = getValidation(conf) + + val init = writeSupport.init(conf) + val w = new ParquetFileWriter(output, init.getSchema, + Mode.CREATE, blockSize, maxPaddingSize) + w.start() + + val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) + val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) + + lazy val memoryManager = new MemoryManager(maxLoad, minAllocation) + + val writerVersion = + ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION, + ParquetProperties.WriterVersion.PARQUET_1_0.toString)) + + val codecFactory = new CodecFactory(conf, getPageSize(conf)) + + new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, init.getExtraMetaData, + blockSize, getPageSize(conf), codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), + getDictionaryPageSize(conf), getEnableDictionary(conf), validating, writerVersion, + memoryManager) + } +} \ No newline at end of file diff --git a/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala b/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala deleted file mode 100644 index 2540e0c6f26..00000000000 --- a/shims/spark310/src/main/scala/org/apache/parquet/hadoop/ParquetOutputFileFormat.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright (c) 2020, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.parquet.hadoop - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.mapreduce.RecordWriter -import org.apache.parquet.column.ParquetProperties -import org.apache.parquet.hadoop.ParquetFileWriter.Mode -import org.apache.parquet.hadoop.api.WriteSupport -import org.apache.parquet.hadoop.metadata.CompressionCodecName -import org.apache.parquet.io.OutputFile - -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport - -class ParquetOutputFileFormat { - - private var memoryManager: MemoryManager = null - val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] - val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f - val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB - - def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = { - import ParquetOutputFormat._ - - val blockSize = getLongBlockSize(conf) - val maxPaddingSize = - conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT) - val validating = getValidation(conf) - - val init = writeSupport.init(conf) - val w = new ParquetFileWriter(output, init.getSchema, - Mode.CREATE, blockSize, maxPaddingSize) - w.start() - - val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) - val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) - - classOf[ParquetOutputFormat[InternalRow]].synchronized { - memoryManager = new MemoryManager(maxLoad, minAllocation) - } - - val writerVersion = - ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION, - ParquetProperties.WriterVersion.PARQUET_1_0.toString)) - - val codecFactory = new CodecFactory(conf, getPageSize(conf)) - - // we should be using this constructor but its package-private. I am getting a RTE thrown even - // though this class is in the same package as ParquetRecordWriter.scala - // new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, - // init.getExtraMetaData, blockSize, CompressionCodecName.SNAPPY, validating, - // props, memoryManager, conf) - - new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, init.getExtraMetaData, - blockSize, getPageSize(conf), codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), - getDictionaryPageSize(conf), getEnableDictionary(conf), validating, writerVersion, - memoryManager) - } -} From affacc24e3d54987c9684fda3d7270e8f1738ee9 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 16 Oct 2020 18:15:54 -0700 Subject: [PATCH 4/6] cleanup --- .../ParquetCachedBatchSerializer.scala | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 215f329a713..dbc6332d472 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -548,10 +548,10 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { val broadcastedConf = sparkSession.sparkContext.broadcast(conf) if (rapidsConf.isSqlEnabled && isSupportedByCudf(schema)) { - val s = schema.toStructType - val converters = new GpuRowToColumnConverter(s) + val structSchema = schema.toStructType + val converters = new GpuRowToColumnConverter(structSchema) val columnarBatchRdd = input.mapPartitions(iter => { - new RowToColumnarIterator(iter, s, RequireSingleBatch, converters) + new RowToColumnarIterator(iter, structSchema, RequireSingleBatch, converters) }) columnarBatchRdd.map(cb => { withResource(cb) { columnarBatch => @@ -624,9 +624,9 @@ class ParquetOutputFileFormat { val validating = getValidation(conf) val init = writeSupport.init(conf) - val w = new ParquetFileWriter(output, init.getSchema, + val writer = new ParquetFileWriter(output, init.getSchema, Mode.CREATE, blockSize, maxPaddingSize) - w.start() + writer.start() val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) @@ -639,9 +639,9 @@ class ParquetOutputFileFormat { val codecFactory = new CodecFactory(conf, getPageSize(conf)) - new ParquetRecordWriter[InternalRow](w, writeSupport, init.getSchema, init.getExtraMetaData, - blockSize, getPageSize(conf), codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), - getDictionaryPageSize(conf), getEnableDictionary(conf), validating, writerVersion, - memoryManager) + new ParquetRecordWriter[InternalRow](writer, writeSupport, init.getSchema, + init.getExtraMetaData, blockSize, getPageSize(conf), + codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf), + getEnableDictionary(conf), validating, writerVersion, memoryManager) } } \ No newline at end of file From 2ef80805c41c573be9ba5a268e298bca1fc2a8fd Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 16 Oct 2020 18:18:31 -0700 Subject: [PATCH 5/6] add a new line --- .../rapids/shims/spark310/ParquetCachedBatchSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index dbc6332d472..610fe0bf2e5 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -644,4 +644,4 @@ class ParquetOutputFileFormat { codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf), getEnableDictionary(conf), validating, writerVersion, memoryManager) } -} \ No newline at end of file +} From 65251d909f055cef6391a37d00fd67248ef8e1c8 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Mon, 19 Oct 2020 21:52:00 -0700 Subject: [PATCH 6/6] more changes --- .../ParquetCachedBatchSerializer.scala | 43 +++++++++++-------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala index 610fe0bf2e5..84d15534eb6 100644 --- a/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala +++ b/shims/spark310/src/main/scala/com/nvidia/spark/rapids/shims/spark310/ParquetCachedBatchSerializer.scala @@ -34,6 +34,7 @@ import org.apache.parquet.HadoopReadOptions import org.apache.parquet.column.ParquetProperties import org.apache.parquet.hadoop.{CodecFactory, MemoryManager, ParquetFileReader, ParquetFileWriter, ParquetInputFormat, ParquetOutputFormat, ParquetRecordWriter, ParquetWriter} import org.apache.parquet.hadoop.ParquetFileWriter.Mode +import org.apache.parquet.hadoop.ParquetOutputFormat.{MEMORY_POOL_RATIO, MIN_MEMORY_ALLOCATION} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.parquet.io.{DelegatingPositionOutputStream, DelegatingSeekableInputStream, InputFile, OutputFile, PositionOutputStream, SeekableInputStream} @@ -564,10 +565,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { input.mapPartitions { internalRowIter => new Iterator[CachedBatch]() { - private val parquetOutputFormat = - SQLConf.withExistingConf(broadcastedConf.value) { - new ParquetOutputFileFormat() - } + private val parquetOutputFormat = new ParquetOutputFileFormat() override def hasNext: Boolean = internalRowIter.hasNext @@ -578,8 +576,9 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { val stream = new ByteArrayOutputStream(ByteArrayOutputFile.BLOCK_SIZE) val outputFile: OutputFile = new ByteArrayOutputFile(stream) - val recordWriter = parquetOutputFormat.getRecordWriter(outputFile, - broadcastedHadoopConf.value.value) + val recordWriter = SQLConf.withExistingConf(broadcastedConf.value) { + parquetOutputFormat.getRecordWriter(outputFile, broadcastedHadoopConf.value.value) + } while (internalRowIter.hasNext) { rows += 1 @@ -609,11 +608,7 @@ class ParquetCachedBatchSerializer extends CachedBatchSerializer with Arm { /** * Similar to ParquetFileFormat */ -class ParquetOutputFileFormat { - - val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] - val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f - val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB +class ParquetOutputFileFormat() { def getRecordWriter(output: OutputFile, conf: Configuration): RecordWriter[Void, InternalRow] = { import ParquetOutputFormat._ @@ -623,16 +618,12 @@ class ParquetOutputFileFormat { conf.getInt(MAX_PADDING_BYTES, ParquetWriter.MAX_PADDING_SIZE_DEFAULT) val validating = getValidation(conf) + val writeSupport = new ParquetWriteSupport().asInstanceOf[WriteSupport[InternalRow]] val init = writeSupport.init(conf) val writer = new ParquetFileWriter(output, init.getSchema, Mode.CREATE, blockSize, maxPaddingSize) writer.start() - val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) - val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) - - lazy val memoryManager = new MemoryManager(maxLoad, minAllocation) - val writerVersion = ParquetProperties.WriterVersion.fromString(conf.get(ParquetOutputFormat.WRITER_VERSION, ParquetProperties.WriterVersion.PARQUET_1_0.toString)) @@ -642,6 +633,24 @@ class ParquetOutputFileFormat { new ParquetRecordWriter[InternalRow](writer, writeSupport, init.getSchema, init.getExtraMetaData, blockSize, getPageSize(conf), codecFactory.getCompressor(CompressionCodecName.UNCOMPRESSED), getDictionaryPageSize(conf), - getEnableDictionary(conf), validating, writerVersion, memoryManager) + getEnableDictionary(conf), validating, writerVersion, + ParquetOutputFileFormat.getMemoryManager(conf)) + } +} + +object ParquetOutputFileFormat { + var memoryManager: MemoryManager = null + val DEFAULT_MEMORY_POOL_RATIO: Float = 0.95f + val DEFAULT_MIN_MEMORY_ALLOCATION: Long = 1 * 1024 * 1024 // 1MB + + def getMemoryManager(conf: Configuration): MemoryManager = { + synchronized { + if (memoryManager == null) { + val maxLoad = conf.getFloat(MEMORY_POOL_RATIO, DEFAULT_MEMORY_POOL_RATIO) + val minAllocation = conf.getLong(MIN_MEMORY_ALLOCATION, DEFAULT_MIN_MEMORY_ALLOCATION) + memoryManager = new MemoryManager(maxLoad, minAllocation) + } + } + memoryManager } }