From 00c0a6c9e8a2314cfd956280cef132fa295f85e7 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 11 May 2022 12:31:59 -0500 Subject: [PATCH] Use C++ to parse and filter parquet footers. (#5310) Signed-off-by: Robert (Bobby) Evans --- build/buildall | 10 +- docs/configs.md | 1 + .../src/main/python/parquet_test.py | 15 +- .../spark/rapids/AvroDataFileReader.scala | 4 +- .../nvidia/spark/rapids/GpuParquetScan.scala | 305 +++++++++++++++++- .../spark/rapids/HostMemoryStreams.scala | 37 ++- .../com/nvidia/spark/rapids/RapidsConf.scala | 31 ++ 7 files changed, 371 insertions(+), 32 deletions(-) diff --git a/build/buildall b/build/buildall index b152c06c3e6..7ae2296a900 100755 --- a/build/buildall +++ b/build/buildall @@ -45,6 +45,8 @@ function print_usage() { echo " E.g., --module=integration_tests" echo " -P=N, --parallel=N" echo " Build in parallel, N (4 by default) is passed via -P to xargs" + echo " --install" + echo " Intall the resulting jar instead of just building it" } function bloopInstall() { @@ -85,6 +87,8 @@ function bloopInstall() { ) } +FINAL_OP="package" + while [[ "$1" != "" ]] ; do case "$1" in @@ -118,6 +122,10 @@ case "$1" in SKIP_CLEAN="0" ;; +--install) + FINAL_OP="install" + ;; + *) echo >&2 "Unknown arg: $1" print_usage @@ -262,7 +270,7 @@ time ( # a negligible increase of the build time by ~2 seconds. joinShimBuildFrom="aggregator" echo "Resuming from $joinShimBuildFrom build only using $BASE_VER" - mvn package -rf $joinShimBuildFrom $MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \ + mvn $FINAL_OP -rf $joinShimBuildFrom $MODULE_OPT $MVN_PROFILE_OPT $INCLUDED_BUILDVERS_OPT \ -Dbuildver="$BASE_VER" \ -DskipTests -Dskip -Dmaven.javadoc.skip ) diff --git a/docs/configs.md b/docs/configs.md index a69b47d8411..8d49ed4d31c 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -92,6 +92,7 @@ Name | Description | Default Value spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel|A limit on the maximum number of files per task processed in parallel on the CPU side before the file is sent to the GPU. This affects the amount of host memory used when reading the files in parallel. Used with MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type|2147483647 spark.rapids.sql.format.parquet.multiThreadedRead.numThreads|The maximum number of threads, on the executor, to use for reading small parquet files in parallel. This can not be changed at runtime after the executor has started. Used with COALESCING and MULTITHREADED reader, see spark.rapids.sql.format.parquet.reader.type.|20 spark.rapids.sql.format.parquet.read.enabled|When set to false disables parquet input acceleration|true +spark.rapids.sql.format.parquet.reader.footer.type|In some cases reading the footer of the file is very expensive. Typically this happens when there are a large number of columns and relatively few of them are being read on a large number of files. This provides the ability to use a different path to parse and filter the footer. JAVA is the default and should match closely with Apache Spark. NATIVE will parse and filter the footer using C++. In the worst case this can be slower than JAVA, but not by much if anything. This is still a very experimental feature and there are known bugs and limitations. It should work for most cases when reading data that complies with the latest Parquet standard, but can run into issues for older data that does not fully comply with it.|JAVA spark.rapids.sql.format.parquet.reader.type|Sets the parquet reader type. We support different types that are optimized for different environments. The original Spark style reader can be selected by setting this to PERFILE which individually reads and copies files to the GPU. Loading many small files individually has high overhead, and using either COALESCING or MULTITHREADED is recommended instead. The COALESCING reader is good when using a local file system where the executors are on the same nodes or close to the nodes the data is being read on. This reader coalesces all the files assigned to a task into a single host buffer before sending it down to the GPU. It copies blocks from a single file into a host buffer in separate threads in parallel, see spark.rapids.sql.format.parquet.multiThreadedRead.numThreads. MULTITHREADED is good for cloud environments where you are reading from a blobstore that is totally separate and likely has a higher I/O read cost. Many times the cloud environments also get better throughput when you have multiple readers in parallel. This reader uses multiple threads to read each file in parallel and each file is sent to the GPU separately. This allows the CPU to keep reading while GPU is also doing work. See spark.rapids.sql.format.parquet.multiThreadedRead.numThreads and spark.rapids.sql.format.parquet.multiThreadedRead.maxNumFilesParallel to control the number of threads and amount of memory used. By default this is set to AUTO so we select the reader we think is best. This will either be the COALESCING or the MULTITHREADED based on whether we think the file is in the cloud. See spark.rapids.cloudSchemes.|AUTO spark.rapids.sql.format.parquet.write.enabled|When set to false disables parquet output acceleration|true spark.rapids.sql.format.parquet.writer.int96.enabled|When set to false, disables accelerated parquet write if the spark.sql.parquet.outputTimestampType is set to INT96|true diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index e92eacb2f07..0b77941cedc 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -66,7 +66,20 @@ def read_parquet_sql(data_path): original_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE'} multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED'} coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING'} +native_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'PERFILE', + 'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'} +native_multithreaded_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'MULTITHREADED', + 'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'} +native_coalesce_parquet_file_reader_conf = {'spark.rapids.sql.format.parquet.reader.type': 'COALESCING', + 'spark.rapids.sql.format.parquet.reader.footer.type': 'NATIVE'} + +# For now the native configs are not compatible with spark.sql.parquet.writeLegacyFormat written files +# for nested types reader_opt_confs = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, + coalesce_parquet_file_reader_conf, native_parquet_file_reader_conf, + native_multithreaded_parquet_file_reader_conf, native_coalesce_parquet_file_reader_conf] + +reader_opt_confs_no_native = [original_parquet_file_reader_conf, multithreaded_parquet_file_reader_conf, coalesce_parquet_file_reader_conf] @pytest.mark.parametrize('parquet_gens', parquet_gens_list, ids=idfn) @@ -222,7 +235,7 @@ def test_ts_read_fails_datetime_legacy(gen, spark_tmp_path, ts_write, ts_rebase, [ArrayGen(decimal_gen_32bit, max_length=10)], [StructGen([['child0', decimal_gen_32bit]])]], ids=idfn) @pytest.mark.parametrize('read_func', [read_parquet_df, read_parquet_sql]) -@pytest.mark.parametrize('reader_confs', reader_opt_confs) +@pytest.mark.parametrize('reader_confs', reader_opt_confs_no_native) @pytest.mark.parametrize('v1_enabled_list', ["", "parquet"]) def test_parquet_decimal_read_legacy(spark_tmp_path, parquet_gens, read_func, reader_confs, v1_enabled_list): gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)] diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala index 88c12744529..f4039dcc09c 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/AvroDataFileReader.scala @@ -27,7 +27,7 @@ import org.apache.avro.file.SeekableInput import org.apache.avro.io.{BinaryData, BinaryDecoder, DecoderFactory} import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} -private class SeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput { +private class AvroSeekableInputStream(in: SeekableInput) extends InputStream with SeekableInput { var oneByte = new Array[Byte](1) override def read(): Int = { @@ -132,7 +132,7 @@ case class BlockInfo(blockStart: Long, blockLength: Long, blockDataSize: Long, c * AvroDataFileReader parses the Avro file to get the header and all block information */ class AvroDataFileReader(si: SeekableInput) extends AutoCloseable { - private val sin = new SeekableInputStream(si) + private val sin = new AvroSeekableInputStream(si) sin.seek(0) // seek to the start of file and get some meta info. private var vin: BinaryDecoder = DecoderFactory.get.binaryDecoder(sin, vin); diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index d5b7a9610c5..07e88792678 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -16,8 +16,9 @@ package com.nvidia.spark.rapids -import java.io.{FileNotFoundException, IOException, OutputStream} +import java.io.{EOFException, FileNotFoundException, IOException, OutputStream} import java.net.URI +import java.nio.ByteBuffer import java.nio.charset.StandardCharsets import java.util.{Collections, Locale} import java.util.concurrent._ @@ -32,17 +33,23 @@ import ai.rapids.cudf._ import com.nvidia.spark.RebaseHelper import com.nvidia.spark.rapids.GpuMetric._ import com.nvidia.spark.rapids.ParquetPartitionReader.CopyRange +import com.nvidia.spark.rapids.RapidsConf.ParquetFooterReaderType import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.jni.ParquetFooter import com.nvidia.spark.rapids.shims.{ParquetFieldIdShims, SparkShimImpl} +import java.util import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, Path} import org.apache.parquet.bytes.BytesUtils +import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} +import org.apache.parquet.hadoop.ParquetFileWriter.MAGIC import org.apache.parquet.hadoop.metadata._ +import org.apache.parquet.io.{InputFile, SeekableInputStream} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, PrimitiveType, Type, Types} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName @@ -67,7 +74,6 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.SerializableConfiguration - /** * Base GpuParquetScan used for common code across Spark versions. Gpu version of * Spark's 'ParquetScan'. @@ -334,6 +340,123 @@ private case class ParquetFileInfoWithBlockMeta(filePath: Path, blocks: Seq[Bloc partValues: InternalRow, schema: MessageType, isCorrectedInt96RebaseMode: Boolean, isCorrectedRebaseMode: Boolean, hasInt96Timestamps: Boolean) +/** + * A parquet compatible stream that allows reading from a HostMemoryBuffer to Parquet. + * The majority of the code here was copied from Parquet's DelegatingSeekableInputStream with + * minor modifications to have it be make it Scala and call into the + * HostMemoryInputStreamMixIn's state. + */ +class HMBSeekableInputStream( + val hmb: HostMemoryBuffer, + val hmbLength: Long) extends SeekableInputStream + with HostMemoryInputStreamMixIn { + private val temp = new Array[Byte](8192) + + override def seek(offset: Long): Unit = { + pos = offset + } + + @throws[IOException] + override def readFully(buffer: Array[Byte]): Unit = { + val amountRead = read(buffer) + val remaining = buffer.length - amountRead + if (remaining > 0) { + throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read") + } + } + + @throws[IOException] + override def readFully(buffer: Array[Byte], offset: Int, length: Int): Unit = { + val amountRead = read(buffer, offset, length) + val remaining = length - amountRead + if (remaining > 0) { + throw new EOFException("Reached the end of stream with " + remaining + " bytes left to read") + } + } + + @throws[IOException] + override def read(buf: ByteBuffer): Int = + if (buf.hasArray) { + readHeapBuffer(buf) + } else { + readDirectBuffer(buf) + } + + @throws[IOException] + override def readFully(buf: ByteBuffer): Unit = { + if (buf.hasArray) { + readFullyHeapBuffer(buf) + } else { + readFullyDirectBuffer(buf) + } + } + + private def readHeapBuffer(buf: ByteBuffer) = { + val bytesRead = read(buf.array, buf.arrayOffset + buf.position, buf.remaining) + if (bytesRead < 0) { + bytesRead + } else { + buf.position(buf.position + bytesRead) + bytesRead + } + } + + private def readFullyHeapBuffer(buf: ByteBuffer): Unit = { + readFully(buf.array, buf.arrayOffset + buf.position, buf.remaining) + buf.position(buf.limit) + } + + private def readDirectBuffer(buf: ByteBuffer): Int = { + var nextReadLength = Math.min(buf.remaining, temp.length) + var totalBytesRead = 0 + var bytesRead = 0 + totalBytesRead = 0 + bytesRead = read(temp, 0, nextReadLength) + while (bytesRead == temp.length) { + buf.put(temp) + totalBytesRead += bytesRead + + nextReadLength = Math.min(buf.remaining, temp.length) + bytesRead = read(temp, 0, nextReadLength) + } + if (bytesRead < 0) { + if (totalBytesRead == 0) { + -1 + } else { + totalBytesRead + } + } else { + buf.put(temp, 0, bytesRead) + totalBytesRead += bytesRead + totalBytesRead + } + } + + private def readFullyDirectBuffer(buf: ByteBuffer): Unit = { + var nextReadLength = Math.min(buf.remaining, temp.length) + var bytesRead = 0 + bytesRead = 0 + bytesRead = read(temp, 0, nextReadLength) + while (nextReadLength > 0 && bytesRead >= 0) { + buf.put(temp, 0, bytesRead) + + nextReadLength = Math.min(buf.remaining, temp.length) + bytesRead = read(temp, 0, nextReadLength) + } + if (bytesRead < 0 && buf.remaining > 0) { + throw new EOFException("Reached the end of stream with " + + buf.remaining + " bytes left to read") + } + } +} + +class HMBInputFile(buffer: HostMemoryBuffer) extends InputFile { + + override def getLength: Long = buffer.getLength + + override def newStream(): SeekableInputStream = new HMBSeekableInputStream(buffer, getLength) +} + private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) extends Arm { private val isCaseSensitive = sqlConf.caseSensitiveAnalysis private val enableParquetFilterPushDown: Boolean = sqlConf.parquetFilterPushDown @@ -358,21 +481,169 @@ private case class GpuParquetFileFilterHandler(@transient sqlConf: SQLConf) exte } } - @scala.annotation.nowarn( - "msg=constructor ParquetFileReader in class ParquetFileReader is deprecated" - ) + private def addNamesAndCount(names: ArrayBuffer[String], children: ArrayBuffer[Int], + name: String, numChildren: Int): Unit = { + names += name + children += numChildren + } + + /** + * Flatten a Spark schema according to the parquet standard. This does not work for older + * parquet files that did not fully follow the standard, or were before some of these + * things were standardized. This will be fixed as a part of + * https://github.com/NVIDIA/spark-rapids-jni/issues/210 + */ + private def depthFirstNamesHelper(schema: DataType, elementName: String, makeLowerCase: Boolean, + names: ArrayBuffer[String], children: ArrayBuffer[Int]): Unit = { + val name = if (makeLowerCase) { + elementName.toLowerCase(Locale.ROOT) + } else { + elementName + } + schema match { + case cst: StructType => + addNamesAndCount(names, children, name, cst.length) + cst.fields.foreach { field => + depthFirstNamesHelper(field.dataType, field.name, makeLowerCase, names, children) + } + case _: NumericType | BinaryType | BooleanType | DateType | TimestampType | StringType => + addNamesAndCount(names, children, name, 0) + case at: ArrayType => + addNamesAndCount(names, children, name, 1) + addNamesAndCount(names, children, "list", 1) + depthFirstNamesHelper(at.elementType, "element", makeLowerCase, names, children) + case mt: MapType => + addNamesAndCount(names, children, name, 1) + addNamesAndCount(names, children, "key_value", 2) + depthFirstNamesHelper(mt.keyType, "key", makeLowerCase, names, children) + depthFirstNamesHelper(mt.valueType, "value", makeLowerCase, names, children) + case other => + throw new UnsupportedOperationException(s"Need some help here $other...") + } + } + + def depthFirstNames(schema: StructType, makeLowerCase: Boolean): (Array[String], Array[Int]) = { + withResource(new NvtxRange("prepare schema", NvtxColor.WHITE)) { _ => + // Initialize them with a quick length for non-nested values + val names = new ArrayBuffer[String](schema.length) + val children = new ArrayBuffer[Int](schema.length) + schema.fields.foreach { field => + depthFirstNamesHelper(field.dataType, field.name, makeLowerCase, names, children) + } + (names.toArray, children.toArray) + } + } + + def readAndFilterFooter( + file: PartitionedFile, + conf : Configuration, + readDataSchema: StructType, + filePath: Path): ParquetFooter = { + val (names, children) = depthFirstNames(readDataSchema, !isCaseSensitive) + val fs = filePath.getFileSystem(conf) + val stat = fs.getFileStatus(filePath) + // Much of this code came from the parquet_mr projects ParquetFileReader, and was modified + // to match our needs + val fileLen = stat.getLen + val FOOTER_LENGTH_SIZE = 4 + // MAGIC + data + footer + footerIndex + MAGIC + if (fileLen < MAGIC.length + FOOTER_LENGTH_SIZE + MAGIC.length) { + throw new RuntimeException(s"$filePath is not a Parquet file (too small length: $fileLen )") + } + val footerLengthIndex = fileLen - FOOTER_LENGTH_SIZE - MAGIC.length + val footerBuffer = withResource(fs.open(filePath)) { inputStream => + withResource(new NvtxRange("ReadFooterBytes", NvtxColor.YELLOW)) { _ => + inputStream.seek(footerLengthIndex) + val footerLength = readIntLittleEndian(inputStream) + val magic = new Array[Byte](MAGIC.length) + inputStream.readFully(magic) + if (!util.Arrays.equals(MAGIC, magic)) { + throw new RuntimeException(s"$filePath is not a Parquet file. " + + s"Expected magic number at tail ${util.Arrays.toString(MAGIC)} " + + s"but found ${util.Arrays.toString(magic)}") + } + val footerIndex = footerLengthIndex - footerLength + if (footerIndex < MAGIC.length || footerIndex >= footerLengthIndex) { + throw new RuntimeException(s"corrupted file: the footer index is not within " + + s"the file: $footerIndex") + } + inputStream.seek(footerIndex) + closeOnExcept(HostMemoryBuffer.allocate(footerLength, false)) { outBuffer => + val out = new HostMemoryOutputStream(outBuffer) + val tmpBuffer = new Array[Byte](4096) + var bytesLeft = footerLength + while (bytesLeft > 0) { + val readLength = Math.min(bytesLeft, tmpBuffer.length) + inputStream.readFully(tmpBuffer, 0, readLength) + out.write(tmpBuffer, 0, readLength) + bytesLeft -= readLength + } + outBuffer + } + } + } + withResource(footerBuffer) { footerBuffer => + withResource(new NvtxRange("Parse and filter footer by range", NvtxColor.RED)) { _ => + val len = if (fileLen <= file.length) { + // secret signal to skip filtering + -1 + } else { + file.length + } + ParquetFooter.readAndFilter(footerBuffer, file.start, len, + names, children, readDataSchema.length, !isCaseSensitive) + } + } + } + + def readAndSimpleFilterFooter( + file: PartitionedFile, + conf : Configuration, + filePath: Path): ParquetMetadata = { + //noinspection ScalaDeprecation + withResource(new NvtxRange("readFooter", NvtxColor.YELLOW)) { _ => + ParquetFileReader.readFooter(conf, filePath, + ParquetMetadataConverter.range(file.start, file.start + file.length)) + } + } + + @scala.annotation.nowarn def filterBlocks( + footerReader: ParquetFooterReaderType.Value, file: PartitionedFile, conf : Configuration, filters: Array[Filter], readDataSchema: StructType): ParquetFileInfoWithBlockMeta = { withResource(new NvtxRange("filterBlocks", NvtxColor.PURPLE)) { _ => val filePath = new Path(new URI(file.filePath)) - //noinspection ScalaDeprecation - val footer = withResource(new NvtxRange("readFooter", NvtxColor.YELLOW)) { _ => - ParquetFileReader.readFooter(conf, filePath, - ParquetMetadataConverter.range(file.start, file.start + file.length)) + val footer = footerReader match { + case ParquetFooterReaderType.NATIVE => + val serialized = withResource(readAndFilterFooter(file, conf, readDataSchema, filePath)) { + tableFooter => + if (tableFooter.getNumColumns <= 0) { + // Special case because java parquet reader does not like having 0 columns. + val numRows = tableFooter.getNumRows + val block = new BlockMetaData() + block.setRowCount(numRows) + val schema = new MessageType("root") + return ParquetFileInfoWithBlockMeta(filePath, Seq(block), file.partitionValues, + schema, false, false, false) + } + + tableFooter.serializeThriftFile() + } + withResource(serialized) { serialized => + withResource(new NvtxRange("readFilteredFooter", NvtxColor.YELLOW)) { _ => + val inputFile = new HMBInputFile(serialized) + + // We already filtered the ranges so no need to do more here... + ParquetFileReader.readFooter(inputFile, ParquetMetadataConverter.NO_FILTER) + } + } + case _ => + readAndSimpleFilterFooter(file, conf, filePath) } + val fileSchema = footer.getFileMetaData.getSchema val pushedFilters = if (enableParquetFilterPushDown) { val parquetFilters = SparkShimImpl.getParquetFilters(fileSchema, pushDownDate, @@ -451,6 +722,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val numThreads = rapidsConf.parquetMultiThreadReadNumThreads private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel + private val footerReadType = rapidsConf.parquetReaderFooterType private val ignoreMissingFiles = sqlConf.ignoreMissingFiles private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles private val filterHandler = GpuParquetFileFilterHandler(sqlConf) @@ -473,7 +745,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( override def buildBaseColumnarReaderForCloud( files: Array[PartitionedFile], conf: Configuration): PartitionReader[ColumnarBatch] = { - new MultiFileCloudParquetPartitionReader(conf, files, + new MultiFileCloudParquetPartitionReader(conf, footerReadType, files, isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema, numThreads, maxNumFileProcessed, filterHandler, filters, @@ -493,7 +765,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( val clippedBlocks = ArrayBuffer[ParquetSingleDataBlockMeta]() files.map { file => val singleFileInfo = try { - filterHandler.filterBlocks(file, conf, filters, readDataSchema) + filterHandler.filterBlocks(footerReadType, file, conf, filters, readDataSchema) } catch { case e: FileNotFoundException if ignoreMissingFiles => logWarning(s"Skipped missing file: ${file.filePath}", e) @@ -547,6 +819,7 @@ case class GpuParquetPartitionReaderFactory( private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix private val maxReadBatchSizeRows = rapidsConf.maxReadBatchSizeRows private val maxReadBatchSizeBytes = rapidsConf.maxReadBatchSizeBytes + private val footerReadType = rapidsConf.parquetReaderFooterType private val filterHandler = GpuParquetFileFilterHandler(sqlConf) @@ -565,7 +838,8 @@ case class GpuParquetPartitionReaderFactory( private def buildBaseColumnarParquetReader( file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + val singleFileInfo = filterHandler.filterBlocks(footerReadType, file, conf, filters, + readDataSchema) new ParquetPartitionReader(conf, file, singleFileInfo.filePath, singleFileInfo.blocks, singleFileInfo.schema, isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, @@ -1256,6 +1530,7 @@ class MultiFileParquetPartitionReader( */ class MultiFileCloudParquetPartitionReader( override val conf: Configuration, + footerReadType: ParquetFooterReaderType.Value, files: Array[PartitionedFile], override val isSchemaCaseSensitive: Boolean, override val readDataSchema: StructType, @@ -1283,6 +1558,7 @@ class MultiFileCloudParquetPartitionReader( clippedSchema: MessageType) extends HostMemoryBuffersWithMetaDataBase private class ReadBatchRunner( + footerReadType: ParquetFooterReaderType.Value, taskContext: TaskContext, filterHandler: GpuParquetFileFilterHandler, file: PartitionedFile, @@ -1322,7 +1598,8 @@ class MultiFileCloudParquetPartitionReader( val startingBytesRead = fileSystemBytesRead() val hostBuffers = new ArrayBuffer[(HostMemoryBuffer, Long)] try { - val fileBlockMeta = filterHandler.filterBlocks(file, conf, filters, readDataSchema) + val fileBlockMeta = filterHandler.filterBlocks(footerReadType, file, conf, filters, + readDataSchema) if (fileBlockMeta.blocks.isEmpty) { val bytesRead = fileSystemBytesRead() - startingBytesRead // no blocks so return null buffer and size 0 @@ -1388,7 +1665,7 @@ class MultiFileCloudParquetPartitionReader( file: PartitionedFile, conf: Configuration, filters: Array[Filter]): Callable[HostMemoryBuffersWithMetaDataBase] = { - new ReadBatchRunner(tc, filterHandler, file, conf, filters) + new ReadBatchRunner(footerReadType, tc, filterHandler, file, conf, filters) } /** diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala index 02a7f8991ed..d071aea0553 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/HostMemoryStreams.scala @@ -48,19 +48,12 @@ class HostMemoryOutputStream(buffer: HostMemoryBuffer) extends OutputStream { def getPos: Long = pos } -/** - * An implementation of InputStream that reads from a HostMemoryBuffer. - * - * NOTE: Closing this input stream does NOT close the buffer! - * - * @param hmb the buffer from which to read data - * @param hmbLength the amount of data available in the buffer - */ -class HostMemoryInputStream( - hmb: HostMemoryBuffer, - hmbLength: Long) extends InputStream { - private var pos = 0L - private var mark = -1L +trait HostMemoryInputStreamMixIn extends InputStream { + protected val hmb: HostMemoryBuffer + protected val hmbLength: Long + + protected var pos = 0L + protected var mark = -1L override def read(): Int = { if (pos >= hmbLength) { @@ -68,7 +61,10 @@ class HostMemoryInputStream( } else { val result = hmb.getByte(pos) pos += 1 - result + // because byte in java is signed, result will be sign extended when it is auto cast to an + // int for the return value. We need to mask off the upper bits to avoid returning a + // negative value which indicated EOF. + result & 0xFF } } @@ -106,3 +102,16 @@ class HostMemoryInputStream( def getPos: Long = pos } + +/** + * An implementation of InputStream that reads from a HostMemoryBuffer. + * + * NOTE: Closing this input stream does NOT close the buffer! + * + * @param hmb the buffer from which to read data + * @param hmbLength the amount of data available in the buffer + */ +class HostMemoryInputStream( + val hmb: HostMemoryBuffer, + val hmbLength: Long) extends HostMemoryInputStreamMixIn { +} \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala index 05afab07bfe..2d86f4ed315 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/RapidsConf.scala @@ -732,6 +732,27 @@ object RapidsConf { .booleanConf .createWithDefault(true) + object ParquetFooterReaderType extends Enumeration { + val JAVA, NATIVE = Value + } + + val PARQUET_READER_FOOTER_TYPE = + conf("spark.rapids.sql.format.parquet.reader.footer.type") + .doc("In some cases reading the footer of the file is very expensive. Typically this " + + "happens when there are a large number of columns and relatively few " + + "of them are being read on a large number of files. " + + "This provides the ability to use a different path to parse and filter the footer. " + + "JAVA is the default and should match closely with Apache Spark. NATIVE will parse and " + + "filter the footer using C++. In the worst case this can be slower than JAVA, but " + + "not by much if anything. This is still a very experimental feature and there are " + + "known bugs and limitations. It should work for most cases when reading data that " + + "complies with the latest Parquet standard, but can run into issues for older data " + + "that does not fully comply with it.") + .stringConf + .transform(_.toUpperCase(java.util.Locale.ROOT)) + .checkValues(ParquetFooterReaderType.values.map(_.toString)) + .createWithDefault(ParquetFooterReaderType.JAVA.toString) + // This is an experimental feature now. And eventually, should be enabled or disabled depending // on something that we don't know yet but would try to figure out. val ENABLE_CPU_BASED_UDF = conf("spark.rapids.sql.rowBasedUDF.enabled") @@ -1670,6 +1691,16 @@ class RapidsConf(conf: Map[String, String]) extends Logging { lazy val isParquetInt96WriteEnabled: Boolean = get(ENABLE_PARQUET_INT96_WRITE) + lazy val parquetReaderFooterType: ParquetFooterReaderType.Value = { + get(PARQUET_READER_FOOTER_TYPE) match { + case "NATIVE" => ParquetFooterReaderType.NATIVE + case "JAVA" => ParquetFooterReaderType.JAVA + case other => + throw new IllegalArgumentException(s"Internal Error $other is not supported for " + + s"${PARQUET_READER_FOOTER_TYPE.key}") + } + } + lazy val isParquetPerFileReadEnabled: Boolean = RapidsReaderType.withName(get(PARQUET_READER_TYPE)) == RapidsReaderType.PERFILE