Skip to content

Commit

Permalink
Dump Parquet Meta as SparkMetrics (#29)
Browse files Browse the repository at this point in the history
Signed-off-by: sperlingxx <lovedreamf@gmail.com>
  • Loading branch information
sperlingxx authored Jan 19, 2024
1 parent 5fd19a6 commit 71515b2
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ trait GpuBatchScanExecMetrics extends GpuExec {
lazy val fileCacheMetrics: Map[String, GpuMetric] = {
// File cache only supported on Parquet files for now.
scan match {
case _: GpuParquetScan | _: GpuOrcScan => createFileCacheMetrics()
case _: GpuParquetScan | _: GpuOrcScan =>
createFileCacheMetrics() ++
// For debugging ByteDance workloads
Map(
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ => Map.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ object MetricsLevel {
object GpuMetric extends Logging {
// Metric names.
val BUFFER_TIME = "bufferTime"
val BUFFER_DATA_TIME = "bufferDataTime"
val BUFFER_META_TIME = "bufferMetaTime"
val BUFFER_RESIZE_TIME = "bufferResizeTime"
val COPY_BUFFER_TIME = "copyBufferTime"
val GPU_DECODE_TIME = "gpuDecodeTime"
val NUM_INPUT_ROWS = "numInputRows"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ abstract class MultiFileCloudPartitionReaderBase(
_ += (blockedTime * fileBufsAndMeta.getBufferTimePct).toLong
}

// Update Parquet Meta as Metrics
fileBufsAndMeta.memBuffersAndSizes.foreach { hmbMeta =>
hmbMeta.blockMeta.foreach(_.updateMetrics(metrics))
}
TrampolineUtil.incBytesRead(inputMetrics, fileBufsAndMeta.bytesRead)
// if we replaced the path with Alluxio, set it to the original filesystem file
// since Alluxio replacement is supposed to be transparent to the user
Expand Down Expand Up @@ -756,6 +760,41 @@ trait DataBlockBase {
def getReadDataSize: Long
// the block size to be used to slice the whole HostMemoryBuffer
def getBlockSize: Long

case class ColumnStats(compressedSize: Long,
uncompressedSize: Long,
nullCount: Long,
minFieldSize: Int,
maxFieldSize: Int)

protected def getColumnStatistics: Seq[ColumnStats] = Seq()

def updateMetrics(metrics: Map[String, GpuMetric]): Unit = {
getColumnStatistics match {
case stats if stats.isEmpty =>
case stats =>
var minR: Double = 1
var maxR: Double = 0
var maxFieldSize: Int = 0
stats.foreach { s =>
metrics("compPageSize") += s.compressedSize
if (s.uncompressedSize > 0) {
metrics("unCompPageSize") += s.uncompressedSize
val r: Double = s.compressedSize.toDouble / s.uncompressedSize.toDouble
maxR = maxR max r
minR = minR min r
}
maxFieldSize = maxFieldSize max s.maxFieldSize
}
metrics("nullCount") += stats.head.nullCount
metrics("maxCPR").set(metrics("maxCPR").value max ((1.0 - minR) * 100000L).toLong)
if (metrics("minCPR").value == 0L) {
metrics("minCPR").set(100000L)
}
metrics("minCPR").set(metrics("minCPR").value min ((1.0 - maxR) * 100000L).toLong)
metrics("maxFieldSize").set(metrics("maxFieldSize").value max (maxFieldSize * 10L))
}
}
}

/**
Expand Down Expand Up @@ -1058,12 +1097,16 @@ abstract class MultiFileCoalescingPartitionReaderBase(
if (currentChunkMeta.currentChunk.isEmpty) {
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
currentChunkMeta.currentChunk.foreach(_._2.updateMetrics(metrics))

val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)

if (dataSize == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {

startNewBufferRetry
RmmRapidsRetryIterator.withRetry(dataBuffer, chunkedSplit(_)) { _ =>
// We don't want to actually close the host buffer until we know that we don't
Expand Down Expand Up @@ -1110,7 +1153,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
val batchContext = createBatchContext(filesAndBlocks, clippedSchema)
// First, estimate the output file size for the initial allocating.
// the estimated size should be >= size of HEAD + Blocks + FOOTER
val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext)
val initTotalSize =
withResource(new NvtxWithMetrics("Buffer size eval", NvtxColor.ORANGE,
metrics("bufferMetaTime"))) { _ =>
calculateEstimatedBlocksOutputSize(batchContext)
}
val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
// Second, write header
Expand All @@ -1129,15 +1176,21 @@ abstract class MultiFileCoalescingPartitionReaderBase(
offset += fileBlockSize
}

for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
withResource(new NvtxWithMetrics("Buffer read data", NvtxColor.PURPLE,
metrics("bufferDataTime"))) { _ =>
for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
}
}

// Fourth, calculate the final buffer size
val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
val finalBufferSize = withResource(new NvtxWithMetrics("Buffer size eval",
NvtxColor.RED, metrics("bufferMetaTime"))) { _ =>
calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
}

(hmb, finalBufferSize, offset, allOutputBlocks.toSeq)
}
Expand All @@ -1154,14 +1207,17 @@ abstract class MultiFileCoalescingPartitionReaderBase(
s"reallocating and copying data to bigger buffer size: $bufferSize")
}
// Copy the old buffer to a new allocated bigger buffer and close the old buffer
buf = withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
buf = withResource(new NvtxWithMetrics("Buffer resize time",
NvtxColor.RED, metrics("bufferResizeTime"))) { _ =>
withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
}
newhmb
}
newhmb
}
}
}
Expand All @@ -1175,9 +1231,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// Closing the original buf and returning a new allocated buffer is allowed, but there is no
// reason to do that.
// If you have to do this, please think about to add other abstract methods first.
val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)

val (finalBuffer, finalBufferSize) = withResource(new NvtxWithMetrics("Buffer write footer",
NvtxColor.WHITE, metrics("bufferMetaTime"))) { _ =>
writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)
}
closeOnExcept(finalBuffer) { _ =>
// triple check we didn't go over memory
if (finalBufferSize > totalBufferSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian
import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.column.statistics.{BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntStatistics, LongStatistics}
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
Expand Down Expand Up @@ -1598,6 +1599,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in =>
coalescedRanges.foreach { blockCopy =>

totalBytesCopied += copyDataRange(blockCopy, in, out, copyBuffer)
}
}
Expand Down Expand Up @@ -1821,6 +1823,29 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB
override def getRowCount: Long = dataBlock.getRowCount
override def getReadDataSize: Long = dataBlock.getTotalByteSize
override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum

override protected def getColumnStatistics: Seq[ColumnStats] = {
dataBlock.getColumns.asScala.map { c =>
val (nullCnt, minField, maxField) = c.getStatistics match {
case s: BinaryStatistics =>
(s.getNumNulls, s.genericGetMin.length, s.genericGetMax.length)
case s: BooleanStatistics =>
(s.getNumNulls, 1, 1)
case s: IntStatistics =>
(s.getNumNulls, 4, 4)
case s: LongStatistics =>
(s.getNumNulls, 8, 8)
case s: FloatStatistics =>
(s.getNumNulls, 4, 4)
case s: DoubleStatistics =>
(s.getNumNulls, 8, 8)
case s =>
throw new Exception(s"Invalid value $s")
}
ColumnStats(c.getTotalSize, c.getTotalUncompressedSize,
nullCnt, minField, maxField)
}.toSeq
}
}

/** Parquet extra information containing rebase modes and whether there is int96 timestamp */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,17 @@ case class GpuFileSourceScanExec(
relation.fileFormat match {
case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat =>
Map(READ_FS_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_READ_FS_TIME),
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME))
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME),
// For debugging ByteDance workloads
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ =>
Map.empty[String, GpuMetric]
}
Expand Down

0 comments on commit 71515b2

Please sign in to comment.