diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala index 0d64c91b7a6..12ec8fc3830 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuBatchScanExec.scala @@ -71,7 +71,7 @@ case class GpuBatchScanExec( override lazy val readerFactory: PartitionReaderFactory = batch.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { - new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) + new GpuDataSourceRDD(sparkContext, partitions, readerFactory) } override def doCanonicalize(): GpuBatchScanExec = { @@ -309,8 +309,8 @@ case class GpuCSVPartitionReaderFactory( override def buildColumnarReader(partFile: PartitionedFile): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value - val reader = new CSVPartitionReader(conf, partFile, dataSchema, readDataSchema, parsedOptions, - maxReaderBatchSizeRows, maxReaderBatchSizeBytes, metrics) + val reader = new PartitionReaderWithBytesRead(new CSVPartitionReader(conf, partFile, dataSchema, + readDataSchema, parsedOptions, maxReaderBatchSizeRows, maxReaderBatchSizeBytes, metrics)) ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala new file mode 100644 index 00000000000..34df89a0d2d --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDataSourceRDD.scala @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.apache.spark.{InterruptibleIterator, Partition, SparkContext, SparkException, TaskContext} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, PartitionReaderFactory} +import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDD, DataSourceRDDPartition} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * A replacement for DataSourceRDD that does NOT compute the bytes read input metric. + * DataSourceRDD assumes all reads occur on the task thread, and some GPU input sources + * use multithreaded readers that cannot generate proper metrics with DataSourceRDD. + * @note It is the responsibility of users of this RDD to generate the bytes read input + * metric explicitly! + */ +class GpuDataSourceRDD( + sc: SparkContext, + @transient private val inputPartitions: Seq[InputPartition], + partitionReaderFactory: PartitionReaderFactory) + extends DataSourceRDD(sc, inputPartitions, partitionReaderFactory, columnarReads = true) { + + private def castPartition(split: Partition): DataSourceRDDPartition = split match { + case p: DataSourceRDDPartition => p + case _ => throw new SparkException(s"[BUG] Not a DataSourceRDDPartition: $split") + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val inputPartition = castPartition(split).inputPartition + val batchReader = partitionReaderFactory.createColumnarReader(inputPartition) + val iter = new MetricsBatchIterator(new PartitionIterator[ColumnarBatch](batchReader)) + context.addTaskCompletionListener[Unit](_ => batchReader.close()) + // TODO: SPARK-25083 remove the type erasure hack in data source scan + new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) + } +} + +private class PartitionIterator[T](reader: PartitionReader[T]) extends Iterator[T] { + private[this] var valuePrepared = false + + override def hasNext: Boolean = { + if (!valuePrepared) { + valuePrepared = reader.next() + } + valuePrepared + } + + override def next(): T = { + if (!hasNext) { + throw new java.util.NoSuchElementException("End of stream") + } + valuePrepared = false + reader.get() + } +} + +private class MetricsBatchIterator(iter: Iterator[ColumnarBatch]) extends Iterator[ColumnarBatch] { + private[this] val inputMetrics = TaskContext.get().taskMetrics().inputMetrics + + override def hasNext: Boolean = iter.hasNext + + override def next(): ColumnarBatch = { + val batch = iter.next() + TrampolineUtil.incInputRecordsRows(inputMetrics, batch.numRows()) + batch + } +} + +/** Wraps a columnar PartitionReader to update bytes read metric based on filesystem statistics. */ +class PartitionReaderWithBytesRead(reader: PartitionReader[ColumnarBatch]) + extends PartitionReader[ColumnarBatch] { + private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics + private[this] val getBytesRead = TrampolineUtil.getFSBytesReadOnThreadCallback() + + override def next(): Boolean = { + val result = reader.next() + TrampolineUtil.incBytesRead(inputMetrics, getBytesRead()) + result + } + + override def get(): ColumnarBatch = reader.get() + + override def close(): Unit = reader.close() +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala index a8c1a10c02e..d9dc35fcc8b 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala @@ -147,9 +147,9 @@ case class GpuOrcPartitionReaderFactory( OrcConf.IS_SCHEMA_EVOLUTION_CASE_SENSITIVE.setBoolean(conf, isCaseSensitive) val fullSchema = StructType(dataSchema ++ partitionSchema) - val reader = new GpuOrcPartitionReader(conf, partFile, dataSchema, readDataSchema, - fullSchema, pushedFilters, debugDumpPrefix, maxReadBatchSizeRows, maxReadBatchSizeBytes, - metrics) + val reader = new PartitionReaderWithBytesRead(new GpuOrcPartitionReader(conf, partFile, + dataSchema, readDataSchema, fullSchema, pushedFilters, debugDumpPrefix, maxReadBatchSizeRows, + maxReadBatchSizeBytes, metrics)) ColumnarPartitionReaderWithPartitionValues.newReader(partFile, reader, partitionSchema) } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala index a8d282135dd..f4a077ae0e7 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala @@ -37,7 +37,7 @@ import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.commons.io.IOUtils import org.apache.commons.io.output.{CountingOutputStream, NullOutputStream} import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FSDataInputStream, Path} +import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path} import org.apache.parquet.bytes.BytesUtils import org.apache.parquet.column.ColumnDescriptor import org.apache.parquet.filter2.predicate.FilterApi @@ -401,7 +401,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( } else { logInfo("Using the coalesce multi-file parquet reader, files: " + s"${filePaths.mkString(",")} task attemptid: ${TaskContext.get.taskAttemptId()}") - buildBaseColumnarParquetReader(files, conf) + buildBaseColumnarParquetReader(files) } } @@ -415,8 +415,7 @@ case class GpuParquetMultiFilePartitionReaderFactory( } private def buildBaseColumnarParquetReader( - files: Array[PartitionedFile], - conf: Configuration): PartitionReader[ColumnarBatch] = { + files: Array[PartitionedFile]): PartitionReader[ColumnarBatch] = { val conf = broadcastedConf.value.value val clippedBlocks = ArrayBuffer[ParquetFileInfoWithSingleBlockMeta]() files.map { file => @@ -457,7 +456,7 @@ case class GpuParquetPartitionReaderFactory( override def buildColumnarReader( partitionedFile: PartitionedFile): PartitionReader[ColumnarBatch] = { - val reader = buildBaseColumnarParquetReader(partitionedFile) + val reader = new PartitionReaderWithBytesRead(buildBaseColumnarParquetReader(partitionedFile)) ColumnarPartitionReaderWithPartitionValues.newReader(partitionedFile, reader, partitionSchema) } @@ -786,6 +785,10 @@ abstract class FileParquetPartitionReaderBase( batch } } + + protected def fileSystemBytesRead(): Long = { + FileSystem.getAllStatistics.asScala.map(_.getThreadStatistics.getBytesRead).sum + } } // Singleton threadpool that is used across all the tasks. @@ -861,21 +864,24 @@ class MultiFileParquetPartitionReader( private val blockIterator: BufferedIterator[ParquetFileInfoWithSingleBlockMeta] = clippedBlocks.iterator.buffered + private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics + class ParquetCopyBlocksRunner( file: Path, outhmb: HostMemoryBuffer, blocks: ArrayBuffer[BlockMetaData], - offset: Long, - startTs: Long) - extends Callable[Seq[BlockMetaData]] { + offset: Long) + extends Callable[(Seq[BlockMetaData], Long)] { - override def call(): Seq[BlockMetaData] = { - var out = new HostMemoryOutputStream(outhmb) + override def call(): (Seq[BlockMetaData], Long) = { + val startBytesRead = fileSystemBytesRead() + val out = new HostMemoryOutputStream(outhmb) val res = withResource(file.getFileSystem(conf).open(file)) { in => copyBlocksData(in, out, blocks, offset) } outhmb.close() - res + val bytesRead = fileSystemBytesRead() - startBytesRead + (res, bytesRead) } } @@ -918,7 +924,7 @@ class MultiFileParquetPartitionReader( blocks.foreach { case (path, block) => filesAndBlocks.getOrElseUpdate(path, new ArrayBuffer[BlockMetaData]) += block } - val tasks = new java.util.ArrayList[Future[Seq[BlockMetaData]]]() + val tasks = new java.util.ArrayList[Future[(Seq[BlockMetaData], Long)]]() val allBlocks = blocks.map(_._2) val initTotalSize = calculateParquetOutputSize(allBlocks, clippedSchema, true) @@ -934,14 +940,15 @@ class MultiFileParquetPartitionReader( val outLocal = hmb.slice(offset, fileBlockSize) // copy the blocks for each file in parallel using background threads tasks.add(MultiFileThreadPoolFactory.submitToThreadPool( - new ParquetCopyBlocksRunner(file, outLocal, blocks, offset, System.nanoTime()), + new ParquetCopyBlocksRunner(file, outLocal, blocks, offset), numThreads)) offset += fileBlockSize } for (future <- tasks.asScala) { - val result = future.get() - allOutputBlocks ++= result + val (blocks, bytesRead) = future.get() + allOutputBlocks ++= blocks + TrampolineUtil.incBytesRead(inputMetrics, bytesRead) } // The footer size can change vs the initial estimated because we are combining more blocks @@ -1181,15 +1188,22 @@ class MultiFileCloudParquetPartitionReader( extends FileParquetPartitionReaderBase(conf, isSchemaCaseSensitive, readDataSchema, debugDumpPrefix, execMetrics) { - case class HostMemoryBuffersWithMetaData(isCorrectRebaseMode: Boolean, clippedSchema: MessageType, - partValues: InternalRow, memBuffersAndSizes: Array[(HostMemoryBuffer, Long)], - fileName: String, fileStart: Long, fileLength: Long) + case class HostMemoryBuffersWithMetaData( + isCorrectRebaseMode: Boolean, + clippedSchema: MessageType, + partValues: InternalRow, + memBuffersAndSizes: Array[(HostMemoryBuffer, Long)], + fileName: String, + fileStart: Long, + fileLength: Long, + bytesRead: Long) private var filesToRead = 0 private var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaData] = None private var isInitted = false private val tasks = new ConcurrentLinkedQueue[Future[HostMemoryBuffersWithMetaData]]() private val tasksToRun = new Queue[ReadBatchRunner]() + private[this] val inputMetrics = TaskContext.get.taskMetrics().inputMetrics private class ReadBatchRunner(filterHandler: GpuParquetFileFilterHandler, file: PartitionedFile, @@ -1207,48 +1221,52 @@ class MultiFileCloudParquetPartitionReader( * Note that the TaskContext is not set in these threads and should not be used. */ override def call(): HostMemoryBuffersWithMetaData = { + val startingBytesRead = fileSystemBytesRead() val hostBuffers = new ArrayBuffer[(HostMemoryBuffer, Long)] try { val fileBlockMeta = filterHandler.filterBlocks(file, conf, filters, readDataSchema) if (fileBlockMeta.blocks.length == 0) { + val bytesRead = fileSystemBytesRead() - startingBytesRead // no blocks so return null buffer and size 0 return HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), - file.filePath, file.start, file.length) + file.filePath, file.start, file.length, bytesRead) } blockChunkIter = fileBlockMeta.blocks.iterator.buffered if (isDone) { + val bytesRead = fileSystemBytesRead() - startingBytesRead // got close before finishing HostMemoryBuffersWithMetaData( fileBlockMeta.isCorrectedRebaseMode, fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), - file.filePath, file.start, file.length) + file.filePath, file.start, file.length, bytesRead) } else { if (readDataSchema.isEmpty) { + val bytesRead = fileSystemBytesRead() - startingBytesRead val numRows = fileBlockMeta.blocks.map(_.getRowCount).sum.toInt // overload size to be number of rows with null buffer HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, numRows)), - file.filePath, file.start, file.length) + file.filePath, file.start, file.length, bytesRead) } else { val filePath = new Path(new URI(file.filePath)) while (blockChunkIter.hasNext) { val blocksToRead = populateCurrentBlockChunk(blockChunkIter, maxReadBatchSizeRows, maxReadBatchSizeBytes) - val blockTotalSize = blocksToRead.map(_.getTotalByteSize).sum hostBuffers += readPartFile(blocksToRead, fileBlockMeta.schema, filePath) } + val bytesRead = fileSystemBytesRead() - startingBytesRead if (isDone) { // got close before finishing hostBuffers.foreach(_._1.safeClose()) HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, fileBlockMeta.schema, fileBlockMeta.partValues, Array((null, 0)), - file.filePath, file.start, file.length) + file.filePath, file.start, file.length, bytesRead) } else { HostMemoryBuffersWithMetaData(fileBlockMeta.isCorrectedRebaseMode, fileBlockMeta.schema, fileBlockMeta.partValues, hostBuffers.toArray, - file.filePath, file.start, file.length) + file.filePath, file.start, file.length, bytesRead) } } } @@ -1326,6 +1344,7 @@ class MultiFileCloudParquetPartitionReader( if (filesToRead > 0 && !isDone) { val fileBufsAndMeta = tasks.poll.get() filesToRead -= 1 + TrampolineUtil.incBytesRead(inputMetrics, fileBufsAndMeta.bytesRead) InputFileUtils.setInputFileBlock(fileBufsAndMeta.fileName, fileBufsAndMeta.fileStart, fileBufsAndMeta.fileLength) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala index 39bde29c3af..ae283186836 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuFileSourceScanExec.scala @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS import scala.collection.mutable.HashMap -import com.nvidia.spark.rapids.{GpuExec, GpuMetricNames, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, ShimLoader, SparkPlanMeta} +import com.nvidia.spark.rapids.{GpuDataSourceRDD, GpuExec, GpuMetricNames, GpuParquetMultiFilePartitionReaderFactory, GpuReadCSVFileFormat, GpuReadFileFormatWithMetrics, GpuReadOrcFileFormat, GpuReadParquetFileFormat, RapidsConf, ShimLoader, SparkPlanMeta} import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD @@ -34,7 +34,6 @@ import org.apache.spark.sql.execution.datasources.{BucketingUtils, DataSourceStr import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.datasources.v2.DataSourceRDD import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -469,8 +468,7 @@ case class GpuFileSourceScanExec( queryUsesInputFile) // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code - new DataSourceRDD(relation.sparkSession.sparkContext, filePartitions, - factory, supportsColumnar) + new GpuDataSourceRDD(relation.sparkSession.sparkContext, filePartitions, factory) } } @@ -522,7 +520,7 @@ case class GpuFileSourceScanExec( queryUsesInputFile) // note we use the v2 DataSourceRDD instead of FileScanRDD so we don't have to copy more code - new DataSourceRDD(relation.sparkSession.sparkContext, partitions, factory, supportsColumnar) + new GpuDataSourceRDD(relation.sparkSession.sparkContext, partitions, factory) } } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index d0c6bdd096a..abbcec5ec8b 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -20,6 +20,7 @@ import org.json4s.JsonAST import org.apache.spark.{SparkContext, SparkEnv, SparkUpgradeException, TaskContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.InputMetrics import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, IdentityBroadcastMode} @@ -93,4 +94,18 @@ object TrampolineUtil { def incTaskMetricsDiskBytesSpilled(amountSpilled: Long): Unit = { Option(TaskContext.get).foreach(_.taskMetrics().incDiskBytesSpilled(amountSpilled)) } + + /** + * Returns a function that can be called to find Hadoop FileSystem bytes read. If + * getFSBytesReadOnThreadCallback is called from thread r at time t, the returned callback will + * return the bytes read on r since t. + */ + def getFSBytesReadOnThreadCallback(): () => Long = { + SparkHadoopUtil.get.getFSBytesReadOnThreadCallback() + } + + /** Set the bytes read task input metric */ + def incBytesRead(inputMetrics: InputMetrics, bytesRead: Long): Unit = { + inputMetrics.incBytesRead(bytesRead) + } }