Skip to content

Commit

Permalink
Merge branch 'read_schema_fix_parquet' into 'branch-0.1'
Browse files Browse the repository at this point in the history
Allow for simple schema evolution in parquet

See merge request nvspark/rapids-plugin-4-spark!649
  • Loading branch information
revans2 committed Jun 11, 2020
2 parents af636fa + 957825e commit 7294b1a
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 18 deletions.
1 change: 0 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ def test_simple_partitioned_read(spark_tmp_path):
assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(data_path))

@pytest.mark.xfail(reason='https://github.com/NVIDIA/spark-rapids/issues/60')
def test_read_merge_schema(spark_tmp_path):
# Once https://github.com/NVIDIA/spark-rapids/issues/133 and https://github.com/NVIDIA/spark-rapids/issues/132 are fixed
# we should go with a more standard set of generators
Expand Down
107 changes: 90 additions & 17 deletions sql-plugin/src/main/scala/ai/rapids/spark/GpuParquetScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,17 @@ package ai.rapids.spark
import java.io.OutputStream
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.Collections
import java.util.{Collections, Locale}

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.math.max

import ai.rapids.cudf.{DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table}
import ai.rapids.cudf.{ColumnVector, DType, HostMemoryBuffer, NvtxColor, ParquetOptions, Table}
import ai.rapids.spark.GpuMetricNames._
import ai.rapids.spark.ParquetPartitionReader.CopyRange
import ai.rapids.spark.RapidsPluginImplicits._
import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
Expand All @@ -39,7 +40,7 @@ import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
import org.apache.parquet.hadoop.metadata.{BlockMetaData, ColumnChunkMetaData, ColumnPath, FileMetaData, ParquetMetadata}
import org.apache.parquet.schema.MessageType
import org.apache.parquet.schema.{GroupType, MessageType, Types}

import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -166,6 +167,30 @@ case class GpuParquetPartitionReaderFactory(
ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema)
}

private def filterClippedSchema(clippedSchema: MessageType,
fileSchema: MessageType, isCaseSensitive: Boolean): MessageType = {
val fs = fileSchema.asGroupType()
val types = if (isCaseSensitive) {
val inFile = fs.getFields.asScala.map(_.getName).toSet
clippedSchema.asGroupType()
.getFields.asScala.filter(f => inFile.contains(f.getName))
} else {
val inFile = fs.getFields.asScala
.map(_.getName.toLowerCase(Locale.ROOT)).toSet
clippedSchema.asGroupType()
.getFields.asScala
.filter(f => inFile.contains(f.getName.toLowerCase(Locale.ROOT)))
}
if (types.isEmpty) {
Types.buildMessage().named("spark_schema")
} else {
Types
.buildMessage()
.addFields(types: _*)
.named("spark_schema")
}
}

private def buildBaseColumnarParquetReader(
file: PartitionedFile): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value
Expand Down Expand Up @@ -197,12 +222,17 @@ case class GpuParquetPartitionReaderFactory(
footer.getBlocks
}

val clippedSchema = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema,
val clippedSchemaTmp = ParquetReadSupport.clipParquetSchema(fileSchema, readDataSchema,
isCaseSensitive)
// ParquetReadSupport.clipParquetSchema does most of what we want, but it includes
// everything in readDataSchema, even if it is not in fileSchema we want to remove those
// for our own purposes
val clippedSchema = filterClippedSchema(clippedSchemaTmp, fileSchema, isCaseSensitive)
val columnPaths = clippedSchema.getPaths.asScala.map(x => ColumnPath.get(x:_*))
val clippedBlocks = ParquetPartitionReader.clipBlocks(columnPaths, blocks.asScala)
new ParquetPartitionReader(conf, file, filePath, clippedBlocks, clippedSchema,
readDataSchema, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics)
isCaseSensitive, readDataSchema, debugDumpPrefix, maxReadBatchSizeRows,
maxReadBatchSizeBytes, metrics)
}
}

Expand All @@ -229,12 +259,13 @@ class ParquetPartitionReader(
filePath: Path,
clippedBlocks: Seq[BlockMetaData],
clippedParquetSchema: MessageType,
isSchemaCaseSensitive: Boolean,
readDataSchema: StructType,
debugDumpPrefix: String,
maxReadBatchSizeRows: Integer,
maxReadBatchSizeBytes: Long,
execMetrics: Map[String, SQLMetric]) extends PartitionReader[ColumnarBatch] with Logging
with ScanWithMetrics {
with ScanWithMetrics with Arm {
private var isExhausted: Boolean = false
private var maxDeviceMemory: Long = 0
private var batch: Option[ColumnarBatch] = None
Expand Down Expand Up @@ -272,7 +303,7 @@ class ParquetPartitionReader(
isExhausted = true
}

private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long, Long) = {
private def readPartFile(blocks: Seq[BlockMetaData]): (HostMemoryBuffer, Long) = {
val nvtxRange = new NvtxWithMetrics("Buffer file split", NvtxColor.YELLOW,
metrics("bufferTime"))
try {
Expand All @@ -289,7 +320,7 @@ class ParquetPartitionReader(
BytesUtils.writeIntLittleEndian(out, (out.getPos - footerPos).toInt)
out.write(ParquetPartitionReader.PARQUET_MAGIC)
succeeded = true
(hmb, out.getPos, estimateRowCount(blocks))
(hmb, out.getPos)
} finally {
if (!succeeded) {
hmb.close()
Expand All @@ -303,9 +334,6 @@ class ParquetPartitionReader(
}
}

private def estimateRowCount(currentChunkedBlocks: Seq[BlockMetaData]): Long =
currentChunkedBlocks.map(_.getRowCount).sum

private def calculateParquetOutputSize(currentChunkedBlocks: Seq[BlockMetaData]): Long = {
// start with the size of Parquet magic (at start+end) and footer length values
var size: Long = 4 + 4 + 4
Expand Down Expand Up @@ -448,20 +476,66 @@ class ParquetPartitionReader(
}
}

private def areNamesEquiv(groups: GroupType, index: Int, otherName: String,
isCaseSensitive: Boolean): Boolean = {
if (groups.getFieldCount > index) {
if (isCaseSensitive) {
groups.getFieldName(index) == otherName
} else {
groups.getFieldName(index).toLowerCase(Locale.ROOT) == otherName.toLowerCase(Locale.ROOT)
}
} else {
false
}
}

private def evolveSchemaIfNeededAndClose(inputTable: Table): Table = {
if (readDataSchema.length > inputTable.getNumberOfColumns) {
// Spark+Parquet schema evolution is relatively simple with only adding/removing columns
// To type casting or anyting like that
val clippedGroups = clippedParquetSchema.asGroupType()
val newColumns = new Array[ColumnVector](readDataSchema.length)
try {
withResource(inputTable) { table =>
var readAt = 0
(0 until readDataSchema.length).foreach(writeAt => {
val readField = readDataSchema(writeAt)
if (areNamesEquiv(clippedGroups, readAt, readField.name, isSchemaCaseSensitive)) {
newColumns(writeAt) = table.getColumn(readAt).incRefCount()
readAt += 1
} else {
withResource(GpuScalar.from(null, readField.dataType)) { n =>
newColumns(writeAt) = ColumnVector.fromScalar(n, table.getRowCount.toInt)
}
}
})
if (readAt != table.getNumberOfColumns) {
throw new QueryExecutionException(s"Could not find the expected columns " +
s"$readAt out of ${table.getNumberOfColumns} from $filePath")
}
}
new Table(newColumns: _*)
} finally {
newColumns.safeClose()
}
} else {
inputTable
}
}

private def readToTable(currentChunkedBlocks: Seq[BlockMetaData]): Option[Table] = {
if (currentChunkedBlocks.isEmpty) {
return None
}

val (dataBuffer, dataSize, rowCount) = readPartFile(currentChunkedBlocks)
val (dataBuffer, dataSize) = readPartFile(currentChunkedBlocks)
try {
if (dataSize == 0) {
None
} else {
if (debugDumpPrefix != null) {
dumpParquetData(dataBuffer, dataSize)
}
val cudfSchema = GpuColumnVector.from(readDataSchema)
val parseOpts = ParquetOptions.builder()
.withTimeUnit(DType.TIMESTAMP_MICROSECONDS)
.includeColumn(readDataSchema.fieldNames:_*).build()
Expand All @@ -471,14 +545,13 @@ class ParquetPartitionReader(

val table = Table.readParquet(parseOpts, dataBuffer, 0, dataSize)
maxDeviceMemory = max(GpuColumnVector.getTotalDeviceMemoryUsed(table), maxDeviceMemory)
val numColumns = table.getNumberOfColumns
if (readDataSchema.length != numColumns) {
if (readDataSchema.length < table.getNumberOfColumns) {
table.close()
throw new QueryExecutionException(s"Expected ${readDataSchema.length} columns " +
s"but read $numColumns from $filePath")
s"but read ${table.getNumberOfColumns} from $filePath")
}
metrics(NUM_OUTPUT_BATCHES) += 1
Some(table)
Some(evolveSchemaIfNeededAndClose(table))
}
} finally {
dataBuffer.close()
Expand Down

0 comments on commit 7294b1a

Please sign in to comment.