Skip to content

Commit

Permalink
Fix missing input bytes read metric for Parquet (NVIDIA#1205)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored and kuhushukla committed Nov 30, 2020
1 parent 4ea7922 commit 028078a
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ case class GpuBatchScanExec(
override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory()

override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
new GpuDataSourceRDD(sparkContext, partitions, readerFactory)
}

override def doCanonicalize(): GpuBatchScanExec = {
Expand Down Expand Up @@ -309,8 +309,8 @@ case class GpuCSVPartitionReaderFactory(

override def buildColumnarReader(partFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value
val reader = new CSVPartitionReader(conf, partFile, dataSchema, readDataSchema, parsedOptions,
maxReaderBatchSizeRows, maxReaderBatchSizeBytes, metrics)
val reader = new PartitionReaderWithBytesRead(new CSVPartitionReader(conf, partFile, dataSchema,
readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes, metrics))
ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids

import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition}
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* A replacement for DataSourceRDD that does NOT compute the bytes read input metric.
* DataSourceRDD assumes all reads occur on the task thread, and some GPU input sources
* use multithreaded readers that cannot generate proper metrics with DataSourceRDD.
* @note It is the responsibility of users of this RDD to generate the bytes read input
* metric explicitly!
*/
class GpuDataSourceRDD(
sc: SparkContext,
@transient private val inputPartitions: Seq[InputPartition],
partitionReaderFactory: PartitionReaderFactory)
extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true) {

private def castPartition(split: Partition): DataSourceRDDPartition = split match {
case p: DataSourceRDDPartition => p
case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split")
}

override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val inputPartition = castPartition(split).inputPartition
val batchReader = partitionReaderFactory.createColumnarReader(inputPartition)
val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader))
context.addTaskCompletionListener[Unit](_ => batchReader.close())
// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
}
}

private class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] {
private[this] var valuePrepared = false

override def hasNext: Boolean = {
if (!valuePrepared) {
valuePrepared = reader.next()
}
valuePrepared
}

override def next(): T = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
valuePrepared = false
reader.get()
}
}

private class MetricsBatchIterator(iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] {
private[this] val inputMetrics = TaskContext.get().taskMetrics().inputMetrics

override def hasNext: Boolean = iter.hasNext

override def next(): ColumnarBatch = {
val batch = iter.next()
TrampolineUtil.incInputRecordsRows(inputMetrics, batch.numRows())
batch
}
}

/** Wraps a columnar PartitionReader to update bytes read metric based on filesystem statistics. */
class PartitionReaderWithBytesRead(reader: PartitionReader[ColumnarBatch])
extends PartitionReader[ColumnarBatch] {
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics
private[this] val getBytesRead = TrampolineUtil.getFSBytesReadOnThreadCallback()

override def next(): Boolean = {
val result = reader.next()
TrampolineUtil.incBytesRead(inputMetrics, getBytesRead())
result
}

override def get(): ColumnarBatch = reader.get()

override def close(): Unit = reader.close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ case class GpuOrcPartitionReaderFactory(
OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive)

val fullSchema = StructType(dataSchema ++ partitionSchema)
val reader = new GpuOrcPartitionReader(conf, partFile, dataSchema, readDataSchema,
fullSchema, pushedFilters, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes,
metrics)
val reader = new PartitionReaderWithBytesRead(new GpuOrcPartitionReader(conf, partFile,
dataSchema, readDataSchema, fullSchema, pushedFilters, debugDumpPrefix, maxReadBatchSizeRows,
maxReadBatchSizeBytes, metrics))
ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._
import org.apache.commons.io.IOUtils
import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path}
import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.filter2.predicate.FilterApi
Expand Down Expand Up @@ -401,7 +401,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
} else {
logInfo("Using the coalesce multi-file parquet reader, files: " +
s"${filePaths.mkString(",")} task attemptid: ${TaskContext.get.taskAttemptId()}")
buildBaseColumnarParquetReader(files, conf)
buildBaseColumnarParquetReader(files)
}
}

Expand All @@ -415,8 +415,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
}

private def buildBaseColumnarParquetReader(
files: Array[PartitionedFile],
conf: Configuration): PartitionReader[ColumnarBatch] = {
files: Array[PartitionedFile]): PartitionReader[ColumnarBatch] = {
val conf = broadcastedConf.value.value
val clippedBlocks = ArrayBuffer[ParquetFileInfoWithSingleBlockMeta]()
files.map { file =>
Expand Down Expand Up @@ -457,7 +456,7 @@ case class GpuParquetPartitionReaderFactory(

override def buildColumnarReader(
partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = {
val reader = buildBaseColumnarParquetReader(partitionedFile)
val reader = new PartitionReaderWithBytesRead(buildBaseColumnarParquetReader(partitionedFile))
ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema)
}

Expand Down Expand Up @@ -786,6 +785,10 @@ abstract class FileParquetPartitionReaderBase(
batch
}
}

protected def fileSystemBytesRead(): Long = {
FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum
}
}

// Singleton threadpool that is used across all the tasks.
Expand Down Expand Up @@ -861,21 +864,24 @@ class MultiFileParquetPartitionReader(
private val blockIterator: BufferedIterator[ParquetFileInfoWithSingleBlockMeta] =
clippedBlocks.iterator.buffered

private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics

class ParquetCopyBlocksRunner(
file: Path,
outhmb: HostMemoryBuffer,
blocks: ArrayBuffer[BlockMetaData],
offset: Long,
startTs: Long)
extends Callable[Seq[BlockMetaData]] {
offset: Long)
extends Callable[(Seq[BlockMetaData], Long)] {

override def call(): Seq[BlockMetaData] = {
var out = new HostMemoryOutputStream(outhmb)
override def call(): (Seq[BlockMetaData], Long) = {
val startBytesRead = fileSystemBytesRead()
val out = new HostMemoryOutputStream(outhmb)
val res = withResource(file.getFileSystem(conf).open(file)) { in =>
copyBlocksData(in, out, blocks, offset)
}
outhmb.close()
res
val bytesRead = fileSystemBytesRead() - startBytesRead
(res, bytesRead)
}
}

Expand Down Expand Up @@ -918,7 +924,7 @@ class MultiFileParquetPartitionReader(
blocks.foreach { case (path, block) =>
filesAndBlocks.getOrElseUpdate(path, new ArrayBuffer[BlockMetaData]) += block
}
val tasks = new java.util.ArrayList[Future[Seq[BlockMetaData]]]()
val tasks = new java.util.ArrayList[Future[(Seq[BlockMetaData], Long)]]()

val allBlocks = blocks.map(_._2)
val initTotalSize = calculateParquetOutputSize(allBlocks, clippedSchema, true)
Expand All @@ -934,14 +940,15 @@ class MultiFileParquetPartitionReader(
val outLocal = hmb.slice(offset, fileBlockSize)
// copy the blocks for each file in parallel using background threads
tasks.add(MultiFileThreadPoolFactory.submitToThreadPool(
new ParquetCopyBlocksRunner(file, outLocal, blocks, offset, System.nanoTime()),
new ParquetCopyBlocksRunner(file, outLocal, blocks, offset),
numThreads))
offset += fileBlockSize
}

for (future <- tasks.asScala) {
val result = future.get()
allOutputBlocks ++= result
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
}

// The footer size can change vs the initial estimated because we are combining more blocks
Expand Down Expand Up @@ -1181,15 +1188,22 @@ class MultiFileCloudParquetPartitionReader(
extends FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema,
debugDumpPrefix, execMetrics) {

case class HostMemoryBuffersWithMetaData(isCorrectRebaseMode: Boolean, clippedSchema: MessageType,
partValues: InternalRow, memBuffersAndSizes: Array[(HostMemoryBuffer, Long)],
fileName: String, fileStart: Long, fileLength: Long)
case class HostMemoryBuffersWithMetaData(
isCorrectRebaseMode: Boolean,
clippedSchema: MessageType,
partValues: InternalRow,
memBuffersAndSizes: Array[(HostMemoryBuffer, Long)],
fileName: String,
fileStart: Long,
fileLength: Long,
bytesRead: Long)

private var filesToRead = 0
private var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaData] = None
private var isInitted = false
private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaData]]()
private val tasksToRun = new Queue[ReadBatchRunner]()
private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics

private class ReadBatchRunner(filterHandler: GpuParquetFileFilterHandler,
file: PartitionedFile,
Expand All @@ -1207,48 +1221,52 @@ class MultiFileCloudParquetPartitionReader(
* Note that the TaskContext is not set in these threads and should not be used.
*/
override def call(): HostMemoryBuffersWithMetaData = {
val startingBytesRead = fileSystemBytesRead()
val hostBuffers = new ArrayBuffer[(HostMemoryBuffer, Long)]
try {
val fileBlockMeta = filterHandler.filterBlocks(file, conf, filters, readDataSchema)
if (fileBlockMeta.blocks.length == 0) {
val bytesRead = fileSystemBytesRead() - startingBytesRead
// no blocks so return null buffer and size 0
return HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode,
fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)),
file.filePath, file.start, file.length)
file.filePath, file.start, file.length, bytesRead)
}
blockChunkIter = fileBlockMeta.blocks.iterator.buffered
if (isDone) {
val bytesRead = fileSystemBytesRead() - startingBytesRead
// got close before finishing
HostMemoryBuffersWithMetaData(
fileBlockMeta.isCorrectedRebaseMode,
fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)),
file.filePath, file.start, file.length)
file.filePath, file.start, file.length, bytesRead)
} else {
if (readDataSchema.isEmpty) {
val bytesRead = fileSystemBytesRead() - startingBytesRead
val numRows = fileBlockMeta.blocks.map(_.getRowCount).sum.toInt
// overload size to be number of rows with null buffer
HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode,
fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, numRows)),
file.filePath, file.start, file.length)
file.filePath, file.start, file.length, bytesRead)

} else {
val filePath = new Path(new URI(file.filePath))
while (blockChunkIter.hasNext) {
val blocksToRead = populateCurrentBlockChunk(blockChunkIter,
maxReadBatchSizeRows, maxReadBatchSizeBytes)
val blockTotalSize = blocksToRead.map(_.getTotalByteSize).sum
hostBuffers += readPartFile(blocksToRead, fileBlockMeta.schema, filePath)
}
val bytesRead = fileSystemBytesRead() - startingBytesRead
if (isDone) {
// got close before finishing
hostBuffers.foreach(_._1.safeClose())
HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode,
fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)),
file.filePath, file.start, file.length)
file.filePath, file.start, file.length, bytesRead)
} else {
HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode,
fileBlockMeta.schema, fileBlockMeta.partValues, hostBuffers.toArray,
file.filePath, file.start, file.length)
file.filePath, file.start, file.length, bytesRead)
}
}
}
Expand Down Expand Up @@ -1326,6 +1344,7 @@ class MultiFileCloudParquetPartitionReader(
if (filesToRead > 0 && !isDone) {
val fileBufsAndMeta = tasks.poll.get()
filesToRead -= 1
TrampolineUtil.incBytesRead(inputMetrics, fileBufsAndMeta.bytesRead)
InputFileUtils.setInputFileBlock(fileBufsAndMeta.fileName, fileBufsAndMeta.fileStart,
fileBufsAndMeta.fileLength)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS

import scala.collection.mutable.HashMap

import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, ShimLoader, SparkPlanMeta}
import com.nvidia.spark.rapids.{GpuDataSourceRDD, GpuExec, GpuMetricNames, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, ShimLoader, SparkPlanMeta}
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
Expand All @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStr
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -469,8 +468,7 @@ case class GpuFileSourceScanExec(
queryUsesInputFile)

// note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code
new DataSourceRDD(relation.sparkSession.sparkContext, filePartitions,
factory, supportsColumnar)
new GpuDataSourceRDD(relation.sparkSession.sparkContext, filePartitions, factory)
}
}

Expand Down Expand Up @@ -522,7 +520,7 @@ case class GpuFileSourceScanExec(
queryUsesInputFile)

// note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code
new DataSourceRDD(relation.sparkSession.sparkContext, partitions, factory, supportsColumnar)
new GpuDataSourceRDD(relation.sparkSession.sparkContext, partitions, factory)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.json4s.JsonAST

import org.apache.spark.{SparkContext, SparkEnv, SparkUpgradeException, TaskContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.InputMetrics
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, IdentityBroadcastMode}
Expand Down Expand Up @@ -93,4 +94,18 @@ object TrampolineUtil {
def incTaskMetricsDiskBytesSpilled(amountSpilled: Long): Unit = {
Option(TaskContext.get).foreach(_.taskMetrics().incDiskBytesSpilled(amountSpilled))
}

/**
* Returns a function that can be called to find Hadoop FileSystem bytes read. If
* getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will
* return the bytes read on r since t.
*/
def getFSBytesReadOnThreadCallback(): () => Long = {
SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
}

/** Set the bytes read task input metric */
def incBytesRead(inputMetrics: InputMetrics, bytesRead: Long): Unit = {
inputMetrics.incBytesRead(bytesRead)
}
}

0 comments on commit 028078a

Please sign in to comment.