Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dump Parquet Meta as SparkMetrics #29

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,19 @@ trait GpuBatchScanExecMetrics extends GpuExec {
lazy val fileCacheMetrics: Map[String, GpuMetric] = {
// File cache only supported on Parquet files for now.
scan match {
case _: GpuParquetScan | _: GpuOrcScan => createFileCacheMetrics()
case _: GpuParquetScan | _: GpuOrcScan =>
createFileCacheMetrics() ++
// For debugging ByteDance workloads
Map(
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ => Map.empty
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ object MetricsLevel {
object GpuMetric extends Logging {
// Metric names.
val BUFFER_TIME = "bufferTime"
val BUFFER_DATA_TIME = "bufferDataTime"
val BUFFER_META_TIME = "bufferMetaTime"
val BUFFER_RESIZE_TIME = "bufferResizeTime"
val COPY_BUFFER_TIME = "copyBufferTime"
val GPU_DECODE_TIME = "gpuDecodeTime"
val NUM_INPUT_ROWS = "numInputRows"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,10 @@ abstract class MultiFileCloudPartitionReaderBase(
_ += (blockedTime * fileBufsAndMeta.getBufferTimePct).toLong
}

// Update Parquet Meta as Metrics
fileBufsAndMeta.memBuffersAndSizes.foreach { hmbMeta =>
hmbMeta.blockMeta.foreach(_.updateMetrics(metrics))
}
TrampolineUtil.incBytesRead(inputMetrics, fileBufsAndMeta.bytesRead)
// if we replaced the path with Alluxio, set it to the original filesystem file
// since Alluxio replacement is supposed to be transparent to the user
Expand Down Expand Up @@ -756,6 +760,41 @@ trait DataBlockBase {
def getReadDataSize: Long
// the block size to be used to slice the whole HostMemoryBuffer
def getBlockSize: Long

case class ColumnStats(compressedSize: Long,
uncompressedSize: Long,
nullCount: Long,
minFieldSize: Int,
maxFieldSize: Int)

protected def getColumnStatistics: Seq[ColumnStats] = Seq()

def updateMetrics(metrics: Map[String, GpuMetric]): Unit = {
getColumnStatistics match {
case stats if stats.isEmpty =>
case stats =>
var minR: Double = 1
var maxR: Double = 0
var maxFieldSize: Int = 0
stats.foreach { s =>
metrics("compPageSize") += s.compressedSize
if (s.uncompressedSize > 0) {
metrics("unCompPageSize") += s.uncompressedSize
val r: Double = s.compressedSize.toDouble / s.uncompressedSize.toDouble
maxR = maxR max r
minR = minR min r
}
maxFieldSize = maxFieldSize max s.maxFieldSize
}
metrics("nullCount") += stats.head.nullCount
metrics("maxCPR").set(metrics("maxCPR").value max ((1.0 - minR) * 100000L).toLong)
if (metrics("minCPR").value == 0L) {
metrics("minCPR").set(100000L)
}
metrics("minCPR").set(metrics("minCPR").value min ((1.0 - maxR) * 100000L).toLong)
metrics("maxFieldSize").set(metrics("maxFieldSize").value max (maxFieldSize * 10L))
}
}
}

/**
Expand Down Expand Up @@ -1058,12 +1097,16 @@ abstract class MultiFileCoalescingPartitionReaderBase(
if (currentChunkMeta.currentChunk.isEmpty) {
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {
currentChunkMeta.currentChunk.foreach(_._2.updateMetrics(metrics))

val (dataBuffer, dataSize) = readPartFiles(currentChunkMeta.currentChunk,
currentChunkMeta.clippedSchema)

if (dataSize == 0) {
dataBuffer.close()
CachedGpuBatchIterator(EmptyTableReader, colTypes)
} else {

startNewBufferRetry
RmmRapidsRetryIterator.withRetry(dataBuffer, chunkedSplit(_)) { _ =>
// We don't want to actually close the host buffer until we know that we don't
Expand Down Expand Up @@ -1110,7 +1153,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
val batchContext = createBatchContext(filesAndBlocks, clippedSchema)
// First, estimate the output file size for the initial allocating.
// the estimated size should be >= size of HEAD + Blocks + FOOTER
val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext)
val initTotalSize =
withResource(new NvtxWithMetrics("Buffer size eval", NvtxColor.ORANGE,
metrics("bufferMetaTime"))) { _ =>
calculateEstimatedBlocksOutputSize(batchContext)
}
val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
// Second, write header
Expand All @@ -1129,15 +1176,21 @@ abstract class MultiFileCoalescingPartitionReaderBase(
offset += fileBlockSize
}

for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
withResource(new NvtxWithMetrics("Buffer read data", NvtxColor.PURPLE,
metrics("bufferDataTime"))) { _ =>
for (future <- tasks.asScala) {
val (blocks, bytesRead) = future.get()
allOutputBlocks ++= blocks
TrampolineUtil.incBytesRead(inputMetrics, bytesRead)
}
}

// Fourth, calculate the final buffer size
val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
val finalBufferSize = withResource(new NvtxWithMetrics("Buffer size eval",
NvtxColor.RED, metrics("bufferMetaTime"))) { _ =>
calculateFinalBlocksOutputSize(offset, allOutputBlocks.toSeq,
batchContext)
}

(hmb, finalBufferSize, offset, allOutputBlocks.toSeq)
}
Expand All @@ -1154,14 +1207,17 @@ abstract class MultiFileCoalescingPartitionReaderBase(
s"reallocating and copying data to bigger buffer size: $bufferSize")
}
// Copy the old buffer to a new allocated bigger buffer and close the old buffer
buf = withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
buf = withResource(new NvtxWithMetrics("Buffer resize time",
NvtxColor.RED, metrics("bufferResizeTime"))) { _ =>
withResource(buffer) { _ =>
withResource(new HostMemoryInputStream(buffer, footerOffset)) { in =>
// realloc memory and copy
closeOnExcept(HostMemoryBuffer.allocate(bufferSize)) { newhmb =>
withResource(new HostMemoryOutputStream(newhmb)) { out =>
IOUtils.copy(in, out)
}
newhmb
}
newhmb
}
}
}
Expand All @@ -1175,9 +1231,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// Closing the original buf and returning a new allocated buffer is allowed, but there is no
// reason to do that.
// If you have to do this, please think about to add other abstract methods first.
val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)

val (finalBuffer, finalBufferSize) = withResource(new NvtxWithMetrics("Buffer write footer",
NvtxColor.WHITE, metrics("bufferMetaTime"))) { _ =>
writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, batchContext)
}
closeOnExcept(finalBuffer) { _ =>
// triple check we didn't go over memory
if (finalBufferSize > totalBufferSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.hadoop.fs.{FSDataInputStream, Path}
import org.apache.parquet.bytes.BytesUtils
import org.apache.parquet.bytes.BytesUtils.readIntLittleEndian
import org.apache.parquet.column.ColumnDescriptor
import org.apache.parquet.column.statistics.{BinaryStatistics, BooleanStatistics, DoubleStatistics, FloatStatistics, IntStatistics, LongStatistics}
import org.apache.parquet.filter2.predicate.FilterApi
import org.apache.parquet.format.converter.ParquetMetadataConverter
import org.apache.parquet.hadoop.{ParquetFileReader, ParquetInputFormat}
Expand Down Expand Up @@ -1598,6 +1599,7 @@ trait ParquetPartitionReaderBase extends Logging with ScanWithMetrics
val copyBuffer: Array[Byte] = new Array[Byte](copyBufferSize)
withResource(filePath.getFileSystem(fileHadoopConf).open(filePath)) { in =>
coalescedRanges.foreach { blockCopy =>

totalBytesCopied += copyDataRange(blockCopy, in, out, copyBuffer)
}
}
Expand Down Expand Up @@ -1821,6 +1823,29 @@ private case class ParquetDataBlock(dataBlock: BlockMetaData) extends DataBlockB
override def getRowCount: Long = dataBlock.getRowCount
override def getReadDataSize: Long = dataBlock.getTotalByteSize
override def getBlockSize: Long = dataBlock.getColumns.asScala.map(_.getTotalSize).sum

override protected def getColumnStatistics: Seq[ColumnStats] = {
dataBlock.getColumns.asScala.map { c =>
val (nullCnt, minField, maxField) = c.getStatistics match {
case s: BinaryStatistics =>
(s.getNumNulls, s.genericGetMin.length, s.genericGetMax.length)
case s: BooleanStatistics =>
(s.getNumNulls, 1, 1)
case s: IntStatistics =>
(s.getNumNulls, 4, 4)
case s: LongStatistics =>
(s.getNumNulls, 8, 8)
case s: FloatStatistics =>
(s.getNumNulls, 4, 4)
case s: DoubleStatistics =>
(s.getNumNulls, 8, 8)
case s =>
throw new Exception(s"Invalid value $s")
}
ColumnStats(c.getTotalSize, c.getTotalUncompressedSize,
nullCnt, minField, maxField)
}.toSeq
}
}

/** Parquet extra information containing rebase modes and whether there is int96 timestamp */
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,17 @@ case class GpuFileSourceScanExec(
relation.fileFormat match {
case _: GpuReadParquetFileFormat | _: GpuOrcFileFormat =>
Map(READ_FS_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_READ_FS_TIME),
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME))
WRITE_BUFFER_TIME -> createNanoTimingMetric(DEBUG_LEVEL, DESCRIPTION_WRITE_BUFFER_TIME),
// For debugging ByteDance workloads
"compPageSize" -> createSizeMetric(DEBUG_LEVEL, "compressed page size"),
"unCompPageSize" -> createSizeMetric(DEBUG_LEVEL, "uncompressed page size"),
"nullCount" -> createSizeMetric(DEBUG_LEVEL, "null record count"),
"maxCPR" -> createAverageMetric(DEBUG_LEVEL, "max compression ratio"),
"minCPR" -> createAverageMetric(DEBUG_LEVEL, "min compression ratio"),
"maxFieldSize" -> createAverageMetric(DEBUG_LEVEL, "max size of single field"),
BUFFER_DATA_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_DATA_TIME),
BUFFER_META_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_META_TIME),
BUFFER_RESIZE_TIME -> createNanoTimingMetric(DEBUG_LEVEL, BUFFER_RESIZE_TIME))
case _ =>
Map.empty[String, GpuMetric]
}
Expand Down