Skip to content

Commit

Permalink
Accelerate the coalescing parquet reader when reading files from mult…
Browse files Browse the repository at this point in the history
…iple partitioned folders (NVIDIA#1401)

*Accelerate the coalescing parquet reader when reading files from multiple partitioned folders
Signed-off-by: Thomas Graves <tgraves@nvidia.com>

* Properly close and change to use withResource and closeOnExcept

* remove null check
  • Loading branch information
tgravescs authored Dec 17, 2020
1 parent 5c5b0cc commit 4d22429
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 57 deletions.
8 changes: 6 additions & 2 deletions integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,18 @@ def test_simple_partitioned_read(spark_tmp_path, v1_enabled_list, reader_confs):
string_gen, boolean_gen, DateGen(start=date(1590, 1, 1)),
TimestampGen(start=datetime(1900, 1, 1, tzinfo=timezone.utc))]
gen_list = [('_c' + str(i), gen) for i, gen in enumerate(parquet_gens)]
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0'
first_data_path = spark_tmp_path + '/PARQUET_DATA/key=0/key2=20'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(first_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'LEGACY'})
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1'
second_data_path = spark_tmp_path + '/PARQUET_DATA/key=1/key2=21'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(second_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
third_data_path = spark_tmp_path + '/PARQUET_DATA/key=2/key2=22'
with_cpu_session(
lambda spark : gen_df(spark, gen_list).write.parquet(third_data_path),
conf={'spark.sql.legacy.parquet.datetimeRebaseModeInWrite': 'CORRECTED'})
data_path = spark_tmp_path + '/PARQUET_DATA'
all_confs = reader_confs.copy()
all_confs.update({'spark.sql.sources.useV1SourceList': v1_enabled_list})
Expand Down
11 changes: 11 additions & 0 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/Arm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ trait Arm {
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: Array[T])(block: Array[T] => V): V = {
try {
block(r)
} catch {
case t: Throwable =>
r.safeClose(t)
throw t
}
}

/** Executes the provided code block, closing the resources only if an exception occurs */
def closeOnExcept[T <: AutoCloseable, V](r: ArrayBuffer[T])(block: ArrayBuffer[T] => V): V = {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,25 +74,26 @@ object ColumnarPartitionReaderWithPartitionValues extends Arm {
fileBatch: ColumnarBatch,
partitionValues: Array[Scalar],
sparkTypes: Array[DataType]): ColumnarBatch = {
var partitionColumns: Array[GpuColumnVector] = null
try {
partitionColumns = buildPartitionColumns(fileBatch.numRows, partitionValues, sparkTypes)
val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column)
val resultCols = fileBatchCols ++ partitionColumns
val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows)
fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount())
partitionColumns = null
result
} finally {
if (fileBatch != null) {
fileBatch.close()
}
if (partitionColumns != null) {
partitionColumns.safeClose()
withResource(fileBatch) { _ =>
closeOnExcept(buildPartitionColumns(fileBatch.numRows, partitionValues, sparkTypes)) {
partitionColumns => addGpuColumVectorsToBatch(fileBatch, partitionColumns)
}
}
}

/**
* The caller is responsible for closing the fileBatch passed in.
*/
def addGpuColumVectorsToBatch(
fileBatch: ColumnarBatch,
partitionColumns: Array[GpuColumnVector]): ColumnarBatch = {
val fileBatchCols = (0 until fileBatch.numCols).map(fileBatch.column)
val resultCols = fileBatchCols ++ partitionColumns
val result = new ColumnarBatch(resultCols.toArray, fileBatch.numRows)
fileBatchCols.foreach(_.asInstanceOf[GpuColumnVector].incRefCount())
result
}

private def buildPartitionColumns(
numRows: Int,
partitionValues: Array[Scalar],
Expand Down
174 changes: 134 additions & 40 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -990,23 +990,97 @@ class MultiFileParquetPartitionReader(
}
}

private def buildAndConcatPartitionColumns(
rowsPerPartition: Array[Long],
inPartitionValues: Array[InternalRow]): Array[GpuColumnVector] = {
val numCols = partitionSchema.fields.size
val allPartCols = new Array[GpuColumnVector](numCols)
// build the partitions vectors for all partitions within each column
// and concatenate those together then go to the next column
for ((field, colIndex) <- partitionSchema.fields.zipWithIndex) {
val dataType = field.dataType
val partitionColumns = new Array[GpuColumnVector](inPartitionValues.size)
withResource(new Array[GpuColumnVector](inPartitionValues.size)) {
partitionColumns =>
for ((rowsInPart, partIndex) <- rowsPerPartition.zipWithIndex) {
val partInternalRow = inPartitionValues(partIndex)
val partValueForCol = partInternalRow.get(colIndex, dataType)
val partitionScalar = GpuScalar.from(partValueForCol, dataType)
withResource(partitionScalar) { scalar =>
partitionColumns(partIndex) = GpuColumnVector.from(
ai.rapids.cudf.ColumnVector.fromScalar(scalar, rowsInPart.toInt),
dataType)
}
}
val baseOfCols = partitionColumns.map(_.getBase)
allPartCols(colIndex) = GpuColumnVector.from(
ColumnVector.concatenate(baseOfCols: _*), field.dataType)
}
}
allPartCols
}

private def concatAndAddPartitionColsToBatch(
cb: ColumnarBatch,
rowsPerPartition: Array[Long],
inPartitionValues: Array[InternalRow]): ColumnarBatch = {
withResource(cb) { _ =>
closeOnExcept(buildAndConcatPartitionColumns(rowsPerPartition, inPartitionValues)) {
allPartCols =>
ColumnarPartitionReaderWithPartitionValues.addGpuColumVectorsToBatch(cb, allPartCols)
}
}
}

/**
* Add all partition values found to the batch. There could be more then one partition
* value in the batch so we have to build up columns with the correct number of rows
* for each partition value.
*
* @param batch - columnar batch to append partition values to
* @param inPartitionValues - array of partition values
* @param rowsPerPartition - the number of rows that require each partition value
* @param partitionSchema - schema of the partitions
* @return
*/
protected def addAllPartitionValues(
batch: Option[ColumnarBatch],
inPartitionValues: Array[InternalRow],
rowsPerPartition: Array[Long],
partitionSchema: StructType): Option[ColumnarBatch] = {
assert(rowsPerPartition.size == inPartitionValues.size)
if (partitionSchema.nonEmpty) {
batch.map { cb =>
val numPartitions = inPartitionValues.size
if (numPartitions > 1) {
concatAndAddPartitionColsToBatch(cb, rowsPerPartition, inPartitionValues)
} else {
// single partition, add like other readers
addPartitionValues(Some(cb), inPartitionValues.head, partitionSchema).get
}
}
} else {
batch
}
}

private def readBatch(): Option[ColumnarBatch] = {
withResource(new NvtxRange("Parquet readBatch", NvtxColor.GREEN)) { _ =>
val (isCorrectRebaseMode, clippedSchema, partValues, seqPathsAndBlocks) =
populateCurrentBlockChunk()
val currentChunkMeta = populateCurrentBlockChunk()
if (readDataSchema.isEmpty) {
// not reading any data, so return a degenerate ColumnarBatch with the row count
val numRows = seqPathsAndBlocks.map(_._2.getRowCount).sum.toInt
if (numRows == 0) {
if (currentChunkMeta.numTotalRows == 0) {
None
} else {
// Someone is going to process this data, even if it is just a row count
GpuSemaphore.acquireIfNecessary(TaskContext.get())
val emptyBatch = new ColumnarBatch(Array.empty, numRows.toInt)
addPartitionValues(Some(emptyBatch), partValues, partitionSchema)
val emptyBatch = new ColumnarBatch(Array.empty, currentChunkMeta.numTotalRows.toInt)
addAllPartitionValues(Some(emptyBatch), currentChunkMeta.allPartValues,
currentChunkMeta.rowsPerPartition, partitionSchema)
}
} else {
val table = readToTable(seqPathsAndBlocks, clippedSchema, isCorrectRebaseMode)
val table = readToTable(currentChunkMeta.currentChunk, currentChunkMeta.clippedSchema,
currentChunkMeta.isCorrectRebaseMode)
try {
val colTypes = readDataSchema.fields.map(f => f.dataType)
val maybeBatch = table.map(t => GpuColumnVector.from(t, colTypes))
Expand All @@ -1015,7 +1089,8 @@ class MultiFileParquetPartitionReader(
}
// we have to add partition values here for this batch, we already verified that
// its not different for all the blocks in this batch
addPartitionValues(maybeBatch, partValues, partitionSchema)
addAllPartitionValues(maybeBatch, currentChunkMeta.allPartValues,
currentChunkMeta.rowsPerPartition, partitionSchema)
} finally {
table.foreach(_.close())
}
Expand Down Expand Up @@ -1071,9 +1146,15 @@ class MultiFileParquetPartitionReader(
}
}

private def populateCurrentBlockChunk():
(Boolean, MessageType, InternalRow, Seq[(Path, BlockMetaData)]) = {
private case class CurrentChunkMeta(
isCorrectRebaseMode: Boolean,
clippedSchema: MessageType,
currentChunk: Seq[(Path, BlockMetaData)],
numTotalRows: Long,
rowsPerPartition: Array[Long],
allPartValues: Array[InternalRow])

private def populateCurrentBlockChunk(): CurrentChunkMeta = {
val currentChunk = new ArrayBuffer[(Path, BlockMetaData)]
var numRows: Long = 0
var numBytes: Long = 0
Expand All @@ -1082,44 +1163,20 @@ class MultiFileParquetPartitionReader(
var currentPartitionValues: InternalRow = null
var currentClippedSchema: MessageType = null
var currentIsCorrectRebaseMode: Boolean = false
val rowsPerPartition = new ArrayBuffer[Long]()
var lastPartRows: Long = 0
val allPartValues = new ArrayBuffer[InternalRow]()

@tailrec
def readNextBatch(): Unit = {
if (blockIterator.hasNext) {
if (currentFile == null) {
currentFile = blockIterator.head.filePath
currentPartitionValues = blockIterator.head.partValues
allPartValues += currentPartitionValues
currentClippedSchema = blockIterator.head.schema
currentIsCorrectRebaseMode = blockIterator.head.isCorrectedRebaseMode
}
if (currentFile != blockIterator.head.filePath) {
// We need to ensure all files we are going to combine have the same datetime rebase mode.
if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) {
logInfo("datetime rebase mode for the next file " +
s"${blockIterator.head.filePath} is different then current file $currentFile, " +
s"splitting into another batch.")
return
}

// check to see if partitionValues different, then have to split it
if (blockIterator.head.partValues != currentPartitionValues) {
logInfo(s"Partition values for the next file ${blockIterator.head.filePath}" +
s" doesn't match current $currentFile, splitting it into another batch!")
return
}
val schemaNextfile =
blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName)
val schemaCurrentfile =
currentClippedSchema.asGroupType().getFields.asScala.map(_.getName)
if (!schemaNextfile.sameElements(schemaCurrentfile)) {
logInfo(s"File schema for the next file ${blockIterator.head.filePath}" +
s" doesn't match current $currentFile, splitting it into another batch!")
return
}
currentFile = blockIterator.head.filePath
currentPartitionValues = blockIterator.head.partValues
currentClippedSchema = blockIterator.head.schema
}
val peekedRowGroup = blockIterator.head.blockMeta
if (peekedRowGroup.getRowCount > Integer.MAX_VALUE) {
throw new UnsupportedOperationException("Too many rows in split")
Expand All @@ -1129,6 +1186,41 @@ class MultiFileParquetPartitionReader(
val estimatedBytes = GpuBatchUtils.estimateGpuMemory(readDataSchema,
peekedRowGroup.getRowCount)
if (numBytes == 0 || numBytes + estimatedBytes <= maxReadBatchSizeBytes) {
// only care to check if we are actually adding in the next chunk
if (currentFile != blockIterator.head.filePath) {
// We need to ensure all files we are going to combine have the same datetime
// rebase mode.
if (blockIterator.head.isCorrectedRebaseMode != currentIsCorrectRebaseMode) {
logInfo("datetime rebase mode for the next file " +
s"${blockIterator.head.filePath} is different then current file $currentFile, " +
s"splitting into another batch.")
return
}
val schemaNextfile =
blockIterator.head.schema.asGroupType().getFields.asScala.map(_.getName)
val schemaCurrentfile =
currentClippedSchema.asGroupType().getFields.asScala.map(_.getName)
if (!schemaNextfile.sameElements(schemaCurrentfile)) {
logInfo(s"File schema for the next file ${blockIterator.head.filePath}" +
s" doesn't match current $currentFile, splitting it into another batch!")
return
}
// If the partition values are different we have to track the number of rows that get
// each partition value and the partition values themselves so that we can build
// the full columns with different partition values later.
if (blockIterator.head.partValues != currentPartitionValues) {
rowsPerPartition += (numRows - lastPartRows)
lastPartRows = numRows
// we add the actual partition values here but then
// the number of rows in that partition at the end or
// when the partition changes
allPartValues += blockIterator.head.partValues
}
currentFile = blockIterator.head.filePath
currentPartitionValues = blockIterator.head.partValues
currentClippedSchema = blockIterator.head.schema
}

val nextBlock = blockIterator.next()
val nextTuple = (nextBlock.filePath, nextBlock.blockMeta)
currentChunk += nextTuple
Expand All @@ -1141,9 +1233,11 @@ class MultiFileParquetPartitionReader(
}
}
readNextBatch()
rowsPerPartition += (numRows - lastPartRows)
logDebug(s"Loaded $numRows rows from Parquet. Parquet bytes read: $numParquetBytes. " +
s"Estimated GPU bytes: $numBytes")
(currentIsCorrectRebaseMode, currentClippedSchema, currentPartitionValues, currentChunk)
s"Estimated GPU bytes: $numBytes. Number of different partitions: ${allPartValues.size}")
CurrentChunkMeta(currentIsCorrectRebaseMode, currentClippedSchema, currentChunk,
numRows, rowsPerPartition.toArray, allPartValues.toArray)
}
}

Expand Down

0 comments on commit 4d22429

Please sign in to comment.