diff --git a/integration_tests/src/main/python/parquet_test.py b/integration_tests/src/main/python/parquet_test.py index 72a02ea21fe..6db33d7e677 100644 --- a/integration_tests/src/main/python/parquet_test.py +++ b/integration_tests/src/main/python/parquet_test.py @@ -127,7 +127,6 @@ def test_simple_partitioned_read(spark_tmp_path): assert_gpu_and_cpu_are_equal_collect( lambda spark : spark.read.parquet(data_path)) -@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/60') def test_read_merge_schema(spark_tmp_path): # Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed # we should go with a more standard set of generators diff --git a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala index 75d19922dd1..aca935e3b6c 100644 --- a/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala @@ -19,16 +19,17 @@ package ai.rapids.spark import java.io.OutputStream import java.net.URI import java.nio.charset.StandardCharsets -import java.util.Collections +import java.util.{Collections, Locale} import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.math.max -import ai.rapids.cudf.{DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table} +import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table} import ai.rapids.spark.GpuMetricNames._ import ai.rapids.spark.ParquetPartitionReader.CopyRange +import ai.rapids.spark.RapidsPluginImplicits._ import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration @@ -39,7 +40,7 @@ import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.format.converter.ParquetMetadataConverter import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat} import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath, FileMetaData, ParquetMetadata} -import org.apache.parquet.schema.MessageType +import org.apache.parquet.schema.{GroupType, MessageType, Types} import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -166,6 +167,30 @@ case class GpuParquetPartitionReaderFactory( ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema) } + private def filterClippedSchema(clippedSchema: MessageType, + fileSchema: MessageType, isCaseSensitive: Boolean): MessageType = { + val fs = fileSchema.asGroupType() + val types = if (isCaseSensitive) { + val inFile = fs.getFields.asScala.map(_.getName).toSet + clippedSchema.asGroupType() + .getFields.asScala.filter(f => inFile.contains(f.getName)) + } else { + val inFile = fs.getFields.asScala + .map(_.getName.toLowerCase(Locale.ROOT)).toSet + clippedSchema.asGroupType() + .getFields.asScala + .filter(f => inFile.contains(f.getName.toLowerCase(Locale.ROOT))) + } + if (types.isEmpty) { + Types.buildMessage().named("spark_schema") + } else { + Types + .buildMessage() + .addFields(types: _*) + .named("spark_schema") + } + } + private def buildBaseColumnarParquetReader( file: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value @@ -197,12 +222,17 @@ case class GpuParquetPartitionReaderFactory( footer.getBlocks } - val clippedSchema = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema, + val clippedSchemaTmp = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema, isCaseSensitive) + // ParquetReadSupport.clipParquetSchema does most of what we want, but it includes + // everything in readDataSchema, even if it is not in fileSchema we want to remove those + // for our own purposes + val clippedSchema = filterClippedSchema(clippedSchemaTmp, fileSchema, isCaseSensitive) val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x:_*)) val clippedBlocks = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala) new ParquetPartitionReader(conf, file, filePath, clippedBlocks, clippedSchema, - readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics) + isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, + maxReadBatchSizeBytes, metrics) } } @@ -229,12 +259,13 @@ class ParquetPartitionReader( filePath: Path, clippedBlocks: Seq[BlockMetaData], clippedParquetSchema: MessageType, + isSchemaCaseSensitive: Boolean, readDataSchema: StructType, debugDumpPrefix: String, maxReadBatchSizeRows: Integer, maxReadBatchSizeBytes: Long, execMetrics: Map[String, SQLMetric]) extends PartitionReader[ColumnarBatch] with Logging - with ScanWithMetrics { + with ScanWithMetrics with Arm { private var isExhausted: Boolean = false private var maxDeviceMemory: Long = 0 private var batch: Option[ColumnarBatch] = None @@ -272,7 +303,7 @@ class ParquetPartitionReader( isExhausted = true } - private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long, Long) = { + private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long) = { val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW, metrics("bufferTime")) try { @@ -289,7 +320,7 @@ class ParquetPartitionReader( BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt) out.write(ParquetPartitionReader.PARQUET_MAGIC) succeeded = true - (hmb, out.getPos, estimateRowCount(blocks)) + (hmb, out.getPos) } finally { if (!succeeded) { hmb.close() @@ -303,9 +334,6 @@ class ParquetPartitionReader( } } - private def estimateRowCount(currentChunkedBlocks: Seq[BlockMetaData]): Long = - currentChunkedBlocks.map(_.getRowCount).sum - private def calculateParquetOutputSize(currentChunkedBlocks: Seq[BlockMetaData]): Long = { // start with the size of Parquet magic (at start+end) and footer length values var size: Long = 4 + 4 + 4 @@ -448,12 +476,59 @@ class ParquetPartitionReader( } } + private def areNamesEquiv(groups: GroupType, index: Int, otherName: String, + isCaseSensitive: Boolean): Boolean = { + if (groups.getFieldCount > index) { + if (isCaseSensitive) { + groups.getFieldName(index) == otherName + } else { + groups.getFieldName(index).toLowerCase(Locale.ROOT) == otherName.toLowerCase(Locale.ROOT) + } + } else { + false + } + } + + private def evolveSchemaIfNeededAndClose(inputTable: Table): Table = { + if (readDataSchema.length > inputTable.getNumberOfColumns) { + // Spark+Parquet schema evolution is relatively simple with only adding/removing columns + // To type casting or anyting like that + val clippedGroups = clippedParquetSchema.asGroupType() + val newColumns = new Array[ColumnVector](readDataSchema.length) + try { + withResource(inputTable) { table => + var readAt = 0 + (0 until readDataSchema.length).foreach(writeAt => { + val readField = readDataSchema(writeAt) + if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) { + newColumns(writeAt) = table.getColumn(readAt).incRefCount() + readAt += 1 + } else { + withResource(GpuScalar.from(null, readField.dataType)) { n => + newColumns(writeAt) = ColumnVector.fromScalar(n, table.getRowCount.toInt) + } + } + }) + if (readAt != table.getNumberOfColumns) { + throw new QueryExecutionException(s"Could not find the expected columns " + + s"$readAt out of ${table.getNumberOfColumns} from $filePath") + } + } + new Table(newColumns: _*) + } finally { + newColumns.safeClose() + } + } else { + inputTable + } + } + private def readToTable(currentChunkedBlocks: Seq[BlockMetaData]): Option[Table] = { if (currentChunkedBlocks.isEmpty) { return None } - val (dataBuffer, dataSize, rowCount) = readPartFile(currentChunkedBlocks) + val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks) try { if (dataSize == 0) { None @@ -461,7 +536,6 @@ class ParquetPartitionReader( if (debugDumpPrefix != null) { dumpParquetData(dataBuffer, dataSize) } - val cudfSchema = GpuColumnVector.from(readDataSchema) val parseOpts = ParquetOptions.builder() .withTimeUnit(DType.TIMESTAMP_MICROSECONDS) .includeColumn(readDataSchema.fieldNames:_*).build() @@ -471,14 +545,13 @@ class ParquetPartitionReader( val table = Table.readParquet(parseOpts, dataBuffer, 0, dataSize) maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory) - val numColumns = table.getNumberOfColumns - if (readDataSchema.length != numColumns) { + if (readDataSchema.length < table.getNumberOfColumns) { table.close() throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " + - s"but read $numColumns from $filePath") + s"but read ${table.getNumberOfColumns} from $filePath") } metrics(NUM_OUTPUT_BATCHES) += 1 - Some(table) + Some(evolveSchemaIfNeededAndClose(table)) } } finally { dataBuffer.close()