From bc0c7d4882edc4800fd4bb6ff30b45e97b66def5 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Fri, 18 Mar 2022 17:57:02 -0700 Subject: [PATCH 1/4] honor the conf to use compression Signed-off-by: Raza Jafri --- .../shims/ParquetCachedBatchSerializer.scala | 28 ++++++++++++------- .../shims/Spark310ParquetWriterSuite.scala | 3 +- 2 files changed, 20 insertions(+), 11 deletions(-) diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala index db8a28ef3e8..ddd9a316154 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala @@ -327,6 +327,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi conf: SQLConf): RDD[CachedBatch] = { val rapidsConf = new RapidsConf(conf) + val useCompression = conf.useCompression val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf) val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema) val structSchema = schemaWithUnambiguousNames.toStructType @@ -349,7 +350,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi } else { withResource(putOnGpuIfNeeded(batch)) { gpuCB => compressColumnarBatchWithParquet(gpuCB, structSchema, schema.toStructType, - bytesAllowedPerBatch) + bytesAllowedPerBatch, useCompression) } } }) @@ -367,7 +368,8 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi oldGpuCB: ColumnarBatch, schema: StructType, origSchema: StructType, - bytesAllowedPerBatch: Long): List[ParquetCachedBatch] = { + bytesAllowedPerBatch: Long, + useCompression: Boolean): List[ParquetCachedBatch] = { val estimatedRowSize = scala.Range(0, oldGpuCB.numCols()).map { idx => oldGpuCB.column(idx).asInstanceOf[GpuColumnVector] .getBase.getDeviceMemorySize / oldGpuCB.numRows() @@ -418,7 +420,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi for (i <- splitVectors.head.indices) { withResource(makeTableForIndex(i)) { table => - val buffer = writeTableToCachedBatch(table, schema) + val buffer = writeTableToCachedBatch(table, schema, useCompression) buffers += ParquetCachedBatch(buffer) } } @@ -427,7 +429,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi } } else { withResource(GpuColumnVector.from(gpuCB)) { table => - val buffer = writeTableToCachedBatch(table, schema) + val buffer = writeTableToCachedBatch(table, schema, useCompression) buffers += ParquetCachedBatch(buffer) } } @@ -437,12 +439,17 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi private def writeTableToCachedBatch( table: Table, - schema: StructType): ParquetBufferConsumer = { + schema: StructType, + useCompression: Boolean): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - val opts = SchemaUtils - .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) - .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() - withResource(Table.writeParquetChunked(opts, buffer)) { writer => + val optsBuilder = SchemaUtils + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) + .withCompressionType(CompressionType.NONE) + .withStatisticsFrequency(StatisticsFrequency.ROWGROUP) + if (useCompression) { + optsBuilder.withCompressionType(CompressionType.SNAPPY) + } + withResource(Table.writeParquetChunked(optsBuilder.build(), buffer)) { writer => writer.write(table) } buffer @@ -1454,6 +1461,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi conf: SQLConf): RDD[CachedBatch] = { val rapidsConf = new RapidsConf(conf) + val useCompression = conf.useCompression val bytesAllowedPerBatch = getBytesAllowedPerBatch(conf) val (schemaWithUnambiguousNames, _) = getSupportedSchemaFromUnsupported(schema) if (rapidsConf.isSqlEnabled && rapidsConf.isSqlExecuteOnGPU && @@ -1465,7 +1473,7 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi }) columnarBatchRdd.flatMap(cb => { withResource(cb)(cb => compressColumnarBatchWithParquet(cb, structSchema, - schema.toStructType, bytesAllowedPerBatch)) + schema.toStructType, bytesAllowedPerBatch, useCompression)) }) } else { val broadcastedConf = SparkSession.active.sparkContext.broadcast(conf.getAllConfs) diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala index 636d4e0086c..dbd9d7d96f7 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala @@ -149,7 +149,8 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { Array(StructField("empty", ByteType, false), StructField("empty", ByteType, false), StructField("empty", ByteType, false))) - ser.compressColumnarBatchWithParquet(cb, dummySchema, dummySchema, BYTES_ALLOWED_PER_BATCH) + ser.compressColumnarBatchWithParquet(cb, dummySchema, dummySchema, + BYTES_ALLOWED_PER_BATCH, false) theTableMock.close() } } From 04f1c6e25090d00a73caadaedf09704b13d95ce9 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 22 Mar 2022 08:13:42 -0700 Subject: [PATCH 2/4] added test Signed-off-by: Raza Jafri --- .../shims/ParquetCachedBatchSerializer.scala | 16 +++++++----- .../shims/Spark310ParquetWriterSuite.scala | 26 +++++++++++++++++++ 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala index ddd9a316154..e9b7b2b5788 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala @@ -442,16 +442,18 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi schema: StructType, useCompression: Boolean): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - val optsBuilder = SchemaUtils + val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE + val opts = SchemaUtils .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) - .withCompressionType(CompressionType.NONE) - .withStatisticsFrequency(StatisticsFrequency.ROWGROUP) - if (useCompression) { - optsBuilder.withCompressionType(CompressionType.SNAPPY) - } - withResource(Table.writeParquetChunked(optsBuilder.build(), buffer)) { writer => + .withCompressionType(compressionType) + .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() + withResource(Table.writeParquetChunked(opts, buffer)) { writer => writer.write(table) } + withResource(Table.writeParquetChunked(opts, + new java.io.File(s"/tmp/cache${System.currentTimeMillis()}-${compressionType.name()}"))) { + writer => writer.write(table) + } buffer } diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala index dbd9d7d96f7..b7fc3c5b30f 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala @@ -20,7 +20,11 @@ import scala.collection.mutable import ai.rapids.cudf.{ColumnVector, DType, Table, TableWriter} import com.nvidia.spark.rapids._ +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} +import org.apache.parquet.HadoopReadOptions +import org.apache.parquet.format.CompressionCodec +import org.apache.parquet.hadoop.ParquetFileReader import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -82,6 +86,28 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { testColumnarBatchToCachedBatchIterator(cb, schema) } + test("test useCompression conf is honored") { + val ser = new ParquetCachedBatchSerializer() + val readOptions = HadoopReadOptions.builder(new Configuration).build() + val schema = new StructType().add("value", "int") + withResource(ColumnVector.fromInts(1, 2, 3, 4)) { cudfcv => + val vec = GpuColumnVector.from(cudfcv, IntegerType) + val batch = new ColumnarBatch(Array[org.apache.spark.sql.vectorized.ColumnVector](vec), 4) + List(false, true).foreach { comp => + val cbList = ser.compressColumnarBatchWithParquet(batch, schema, schema, 100, comp) + cbList.foreach { cb => + val inputFile = new ByteArrayInputFile(cb.buffer) + withResource(ParquetFileReader.open(inputFile, readOptions)) { parquetFileReader => + assert( + (if (comp) CompressionCodec.SNAPPY.name() + else CompressionCodec.UNCOMPRESSED.name()) == + parquetFileReader.getFooter.getBlocks.get(0).getColumns.get(0).getCodec.name()) + } + } + } + } + } + val ROWS = 3 * 1024 * 1024 private def getCudfAndGpuVectors(onHost: Boolean = false): (ColumnVector, GpuColumnVector)= { From e3b1bf2b2ea1ac6093798128d5276ee1dd3cc6fe Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 22 Mar 2022 08:29:31 -0700 Subject: [PATCH 3/4] removed the file being written to disk for debugging Signed-off-by: Raza Jafri --- .../spark/rapids/shims/ParquetCachedBatchSerializer.scala | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala index e9b7b2b5788..82c8dd9efa4 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala @@ -450,10 +450,6 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi withResource(Table.writeParquetChunked(opts, buffer)) { writer => writer.write(table) } - withResource(Table.writeParquetChunked(opts, - new java.io.File(s"/tmp/cache${System.currentTimeMillis()}-${compressionType.name()}"))) { - writer => writer.write(table) - } buffer } From 9d275fef2c07e62791ee0f6271e1811abd97eb80 Mon Sep 17 00:00:00 2001 From: Raza Jafri Date: Tue, 22 Mar 2022 14:42:45 -0700 Subject: [PATCH 4/4] refactored PCBS to make it easier to test Signed-off-by: Raza Jafri --- .../shims/ParquetCachedBatchSerializer.scala | 16 +++++++--- .../shims/Spark310ParquetWriterSuite.scala | 32 ++++++------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala index 82c8dd9efa4..1f3d51e030a 100644 --- a/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala +++ b/sql-plugin/src/main/311+-all/scala/com/nvidia/spark/rapids/shims/ParquetCachedBatchSerializer.scala @@ -437,16 +437,22 @@ protected class ParquetCachedBatchSerializer extends GpuCachedBatchSerializer wi } } + def getParquetWriterOptions( + useCompression: Boolean, + schema: StructType): ParquetWriterOptions = { + val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE + SchemaUtils + .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) + .withCompressionType(compressionType) + .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() + } + private def writeTableToCachedBatch( table: Table, schema: StructType, useCompression: Boolean): ParquetBufferConsumer = { val buffer = new ParquetBufferConsumer(table.getRowCount.toInt) - val compressionType = if (useCompression) CompressionType.SNAPPY else CompressionType.NONE - val opts = SchemaUtils - .writerOptionsFromSchema(ParquetWriterOptions.builder(), schema, writeInt96 = false) - .withCompressionType(compressionType) - .withStatisticsFrequency(StatisticsFrequency.ROWGROUP).build() + val opts = getParquetWriterOptions(useCompression, schema) withResource(Table.writeParquetChunked(opts, buffer)) { writer => writer.write(table) } diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala index b7fc3c5b30f..a875263c482 100644 --- a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/shims/Spark310ParquetWriterSuite.scala @@ -18,13 +18,9 @@ package com.nvidia.spark.rapids.shims import scala.collection.mutable -import ai.rapids.cudf.{ColumnVector, DType, Table, TableWriter} +import ai.rapids.cudf.{ColumnVector, CompressionType, DType, Table, TableWriter} import com.nvidia.spark.rapids._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{RecordWriter, TaskAttemptContext} -import org.apache.parquet.HadoopReadOptions -import org.apache.parquet.format.CompressionCodec -import org.apache.parquet.hadoop.ParquetFileReader import org.mockito.ArgumentMatchers._ import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock @@ -88,23 +84,15 @@ class Spark310ParquetWriterSuite extends SparkQueryCompareTestSuite { test("test useCompression conf is honored") { val ser = new ParquetCachedBatchSerializer() - val readOptions = HadoopReadOptions.builder(new Configuration).build() - val schema = new StructType().add("value", "int") - withResource(ColumnVector.fromInts(1, 2, 3, 4)) { cudfcv => - val vec = GpuColumnVector.from(cudfcv, IntegerType) - val batch = new ColumnarBatch(Array[org.apache.spark.sql.vectorized.ColumnVector](vec), 4) - List(false, true).foreach { comp => - val cbList = ser.compressColumnarBatchWithParquet(batch, schema, schema, 100, comp) - cbList.foreach { cb => - val inputFile = new ByteArrayInputFile(cb.buffer) - withResource(ParquetFileReader.open(inputFile, readOptions)) { parquetFileReader => - assert( - (if (comp) CompressionCodec.SNAPPY.name() - else CompressionCodec.UNCOMPRESSED.name()) == - parquetFileReader.getFooter.getBlocks.get(0).getColumns.get(0).getCodec.name()) - } - } - } + val schema = new StructType().add("value", "string") + List(false, true).foreach { comp => + val opts = ser.getParquetWriterOptions(comp, schema) + assert( + (if (comp) { + CompressionType.SNAPPY + } else { + CompressionType.NONE + }) == opts.getCompressionType) } }