Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix GpuFileFormatDataWriter failing to stat file after commit #5107

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
* must provide a zero-argument constructor. This is the columnar version of
* `org.apache.spark.sql.execution.datasources.OutputWriter`.
*/
abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType, rangeName: String) extends HostBufferConsumer with Arm {

val tableWriter: TableWriter
Expand Down Expand Up @@ -165,6 +165,11 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
writeBufferedData()
outputStream.close()
}

/**
* The file path to write. Invoked on the executor side.
*/
def path(): String
}

object ColumnarOutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
}

class GpuParquetWriter(
path: String,
override val path: String,
dataSchema: StructType,
compressionType: CompressionType,
dateRebaseException: Boolean,
timestampRebaseException: Boolean,
context: TaskAttemptContext)
extends ColumnarOutputWriter(path, context, dataSchema, "Parquet") {
extends ColumnarOutputWriter(context, dataSchema, "Parquet") {

val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,12 @@
package org.apache.spark.sql.rapids

import java.io.FileNotFoundException
import java.nio.charset.StandardCharsets

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -53,11 +56,11 @@ class BasicColumnarWriteTaskStatsTracker(
extends ColumnarWriteTaskStatsTracker with Logging {
private[this] var numPartitions: Int = 0
private[this] var numFiles: Int = 0
private[this] var submittedFiles: Int = 0
private[this] var numSubmittedFiles: Int = 0
private[this] var numBytes: Long = 0L
private[this] var numRows: Long = 0L

private[this] var curFile: Option[String] = None
private[this] val submittedFiles = mutable.HashSet[String]()
jlowe marked this conversation as resolved.
Show resolved Hide resolved

/**
* Get the size of the file expected to have been written by a worker.
Expand All @@ -67,37 +70,86 @@ class BasicColumnarWriteTaskStatsTracker(
private def getFileSize(filePath: String): Option[Long] = {
val path = new Path(filePath)
val fs = path.getFileSystem(hadoopConf)
getFileSize(fs, path)
}

/**
* Get the size of the file expected to have been written by a worker.
* This supports the XAttr in HADOOP-17414 when the "magic committer" adds
* a custom HTTP header to the a zero byte marker.
* If the output file as returned by getFileStatus > 0 then the length if
jlowe marked this conversation as resolved.
Show resolved Hide resolved
* returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is
* invoked. If a parseable, non-negative length can be retrieved, this
* is returned instead of the length.
* @return the file size or None if the file was not found.
*/
private def getFileSize(fs: FileSystem, path: Path): Option[Long] = {
// the normal file status probe.
try {
Some(fs.getFileStatus(path).getLen())
val len = fs.getFileStatus(path).getLen
if (len > 0) {
return Some(len)
}
} catch {
case e: FileNotFoundException =>
// may arise against eventually consistent object stores
// may arise against eventually consistent object stores.
logDebug(s"File $path is not yet visible", e)
None
return None
jlowe marked this conversation as resolved.
Show resolved Hide resolved
}

// Output File Size is 0. Look to see if it has an attribute
// declaring a future-file-length.
// Failure of API call, parsing, invalid value all return the
// 0 byte length.

var len = 0L
try {
val attr = fs.getXAttr(path, BasicColumnarWriteJobStatsTracker.FILE_LENGTH_XATTR)
if (attr != null && attr.nonEmpty) {
val str = new String(attr, StandardCharsets.UTF_8)
logDebug(s"File Length statistics for $path retrieved from XAttr: $str")
// a non-empty header was found. parse to a long via the java class
val l = java.lang.Long.parseLong(str)
if (l > 0) {
len = l
} else {
logDebug("Ignoring negative value in XAttr file length")
}
}
} catch {
case e: NumberFormatException =>
// warn but don't dump the whole stack
logInfo(s"Failed to parse" +
s" ${BasicColumnarWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" +
s" bytes written may be under-reported");
case e: UnsupportedOperationException =>
// this is not unusual; ignore
logDebug(s"XAttr not supported on path $path", e);
case e: Exception =>
// Something else. Log at debug and continue.
logDebug(s"XAttr processing failure on $path", e);
}
Some(len)
}

override def newPartition(/*partitionValues: InternalRow*/): Unit = {
numPartitions += 1
}

override def newBucket(bucketId: Int): Unit = {
// currently unhandled
override def newFile(filePath: String): Unit = {
submittedFiles += filePath
numSubmittedFiles += 1
}

override def newFile(filePath: String): Unit = {
statCurrentFile()
curFile = Some(filePath)
submittedFiles += 1
override def closeFile(filePath: String): Unit = {
updateFileStats(filePath)
submittedFiles.remove(filePath)
}

private def statCurrentFile(): Unit = {
curFile.foreach { path =>
getFileSize(path).foreach { len =>
numBytes += len
numFiles += 1
}
curFile = None
private def updateFileStats(filePath: String): Unit = {
getFileSize(filePath).foreach { len =>
numBytes += len
numFiles += 1
}
}

Expand All @@ -106,16 +158,17 @@ class BasicColumnarWriteTaskStatsTracker(
}

override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
statCurrentFile()
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numRows)
}

if (submittedFiles != numFiles) {
logWarning(s"Expected $submittedFiles files, but only saw $numFiles. " +
if (numSubmittedFiles != numFiles) {
logWarning(s"Expected $numSubmittedFiles files, but only saw $numFiles. " +
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
Expand Down Expand Up @@ -181,6 +234,8 @@ object BasicColumnarWriteJobStatsTracker {
private val NUM_PARTS_KEY = "numParts"
val TASK_COMMIT_TIME = "taskCommitTime"
val JOB_COMMIT_TIME = "jobCommitTime"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

def metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,20 +24,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* processed by a single write task in [[GpuFileFormatDataWriter]] - i.e. there should be one
* instance per executor.
*
* This trait is coupled with the way [[GpuFileFormatWriter]] works, in the sense that its methods
* will be called according to how column batches are being written out to disk, namely in
* sorted order according to partitionValue(s), then bucketId.
*
* As such, a typical call scenario is:
*
* newPartition -> newBucket -> newFile -> newRow -.
* ^ |______^___________^ ^ ^____|
* | | |______________|
* | |____________________________|
* |____________________________________________|
*
* newPartition and newBucket events are only triggered if the relation to be written out is
* partitioned and/or bucketed, respectively.
* newPartition event is only triggered if the relation to be written out is partitioned.
*/
trait ColumnarWriteTaskStatsTracker {

Expand All @@ -50,19 +37,18 @@ trait ColumnarWriteTaskStatsTracker {
*/
def newPartition(/*partitionValues: InternalRow*/): Unit

/**
* Process the fact that a new bucket is about to written.
* Only triggered when the relation is bucketed by a (non-empty) sequence of columns.
* @param bucketId The bucket number.
*/
def newBucket(bucketId: Int): Unit

/**
* Process the fact that a new file is about to be written.
* @param filePath Path of the file into which future rows will be written.
*/
def newFile(filePath: String): Unit

/**
* Process the fact that a file is finished to be written and closed.
* @param filePath Path of the file.
*/
def closeFile(filePath: String): Unit

/**
* Process a new column batch to update the tracked statistics accordingly.
* The batch will be written to the most recently witnessed file (via `newFile`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,24 @@ abstract class GpuFileFormatDataWriter(
protected val statsTrackers: Seq[ColumnarWriteTaskStatsTracker] =
description.statsTrackers.map(_.newTaskInstance())

protected def releaseResources(): Unit = {
/** Release resources of `currentWriter`. */
protected def releaseCurrentWriter(): Unit = {
if (currentWriter != null) {
try {
currentWriter.close()
statsTrackers.foreach(_.closeFile(currentWriter.path()))
} finally {
currentWriter = null
}
}
}

/** Release all resources. */
protected def releaseResources(): Unit = {
// Call `releaseCurrentWriter()` by default, as this is the only resource to be released.
releaseCurrentWriter()
}

/** Writes a columnar batch of records */
def write(batch: ColumnarBatch): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
}
}

class GpuOrcWriter(path: String,
class GpuOrcWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {
extends ColumnarOutputWriter(context, dataSchema, "ORC") {

override val tableWriter: TableWriter = {
val builder = SchemaUtils
Expand Down