Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ignoreCorruptFiles feature for Parquet reader [databricks] #4742

Merged
merged 6 commits into from
Feb 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
from pyspark.sql.types import *
from pyspark.sql.functions import *
from spark_session import with_cpu_session, with_gpu_session, is_before_spark_330
from conftest import is_databricks_runtime


def read_parquet_df(data_path):
return lambda spark : spark.read.parquet(data_path)
Expand Down Expand Up @@ -749,4 +751,25 @@ def do_parquet_scan(spark):
assert_cpu_and_gpu_are_equal_collect_with_capture(
do_parquet_scan,
exist_classes= "FileSourceScanExec",
non_exist_classes= "GpuBatchScanExec")
non_exist_classes= "GpuBatchScanExec")


@ignore_order
@pytest.mark.parametrize('v1_enabled_list', ["", "parquet"])
@pytest.mark.parametrize('reader_confs', reader_opt_confs, ids=idfn)
@pytest.mark.skipif(is_databricks_runtime(), reason="Databricks does not support ignoreCorruptFiles")
def test_parquet_read_with_corrupt_files(spark_tmp_path, reader_confs, v1_enabled_list):
first_data_path = spark_tmp_path + '/PARQUET_DATA/first'
with_cpu_session(lambda spark : spark.range(1).toDF("a").write.parquet(first_data_path))
second_data_path = spark_tmp_path + '/PARQUET_DATA/second'
with_cpu_session(lambda spark : spark.range(1, 2).toDF("a").write.parquet(second_data_path))
third_data_path = spark_tmp_path + '/PARQUET_DATA/third'
with_cpu_session(lambda spark : spark.range(2, 3).toDF("a").write.json(third_data_path))

all_confs = copy_and_update(reader_confs,
{'spark.sql.files.ignoreCorruptFiles': "true",
'spark.sql.sources.useV1SourceList': v1_enabled_list})

assert_gpu_and_cpu_are_equal_collect(
lambda spark : spark.read.parquet(first_data_path, second_data_path, third_data_path),
conf=all_confs)
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import java.io.File
import java.io.{File, IOException}
import java.net.{URI, URISyntaxException}
import java.util.concurrent.{Callable, ConcurrentLinkedQueue, Future, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

Expand Down Expand Up @@ -296,14 +296,16 @@ abstract class FilePartitionReaderBase(conf: Configuration, execMetrics: Map[Str
* submitted to threadpool
* @param filters push down filters
* @param execMetrics the metrics
* @param ignoreCorruptFiles Whether to ignore corrupt files when GPU failed to decode the files
*/
abstract class MultiFileCloudPartitionReaderBase(
conf: Configuration,
files: Array[PartitionedFile],
numThreads: Int,
maxNumFileProcessed: Int,
filters: Array[Filter],
execMetrics: Map[String, GpuMetric]) extends FilePartitionReaderBase(conf, execMetrics) {
execMetrics: Map[String, GpuMetric],
ignoreCorruptFiles: Boolean = false) extends FilePartitionReaderBase(conf, execMetrics) {

private var filesToRead = 0
protected var currentFileHostBuffers: Option[HostMemoryBuffersWithMetaDataBase] = None
Expand Down Expand Up @@ -389,7 +391,15 @@ abstract class MultiFileCloudPartitionReaderBase(
closeCurrentFileHostBuffers()
next()
} else {
batch = readBatch(currentFileHostBuffers.get)
val file = currentFileHostBuffers.get.partitionedFile.filePath
batch = try {
readBatch(currentFileHostBuffers.get)
} catch {
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the corrupted file: ${file}", e)
None
}
}
} else {
if (filesToRead > 0 && !isDone) {
Expand All @@ -408,7 +418,16 @@ abstract class MultiFileCloudPartitionReaderBase(
addNextTaskIfNeeded()
next()
} else {
batch = readBatch(fileBufsAndMeta)
val file = fileBufsAndMeta.partitionedFile.filePath
batch = try {
readBatch(fileBufsAndMeta)
} catch {
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the corrupted file: ${file}", e)
None
}

// the data is copied to GPU so submit another task if we were limited
addNextTaskIfNeeded()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.nvidia.spark.rapids

import java.io.OutputStream
import java.io.{FileNotFoundException, IOException, OutputStream}
import java.net.URI
import java.nio.charset.StandardCharsets
import java.util.{Collections, Locale}
Expand Down Expand Up @@ -408,14 +408,15 @@ case class GpuParquetMultiFilePartitionReaderFactory(
private val debugDumpPrefix = rapidsConf.parquetDebugDumpPrefix
private val numThreads = rapidsConf.parquetMultiThreadReadNumThreads
private val maxNumFileProcessed = rapidsConf.maxNumParquetFilesParallel

private val ignoreMissingFiles = sqlConf.ignoreMissingFiles
private val ignoreCorruptFiles = sqlConf.ignoreCorruptFiles
private val filterHandler = GpuParquetFileFilterHandler(sqlConf)

// we can't use the coalescing files reader when InputFileName, InputFileBlockStart,
// or InputFileBlockLength because we are combining all the files into a single buffer
// and we don't know which file is associated with each row.
override val canUseCoalesceFilesReader: Boolean =
rapidsConf.isParquetCoalesceFileReadEnabled && !queryUsesInputFile
rapidsConf.isParquetCoalesceFileReadEnabled && !(queryUsesInputFile || ignoreCorruptFiles)

override val canUseMultiThreadReader: Boolean = rapidsConf.isParquetMultiThreadReadEnabled

Expand All @@ -432,7 +433,8 @@ case class GpuParquetMultiFilePartitionReaderFactory(
new MultiFileCloudParquetPartitionReader(conf, files,
isCaseSensitive, readDataSchema, debugDumpPrefix,
maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics, partitionSchema,
numThreads, maxNumFileProcessed, filterHandler, filters)
numThreads, maxNumFileProcessed, filterHandler, filters,
ignoreMissingFiles, ignoreCorruptFiles)
}

/**
Expand All @@ -447,7 +449,23 @@ case class GpuParquetMultiFilePartitionReaderFactory(
conf: Configuration): PartitionReader[ColumnarBatch] = {
val clippedBlocks = ArrayBuffer[ParquetSingleDataBlockMeta]()
files.map { file =>
val singleFileInfo = filterHandler.filterBlocks(file, conf, filters, readDataSchema)
val singleFileInfo = try {
filterHandler.filterBlocks(file, conf, filters, readDataSchema)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${file.filePath}", e)
ParquetFileInfoWithBlockMeta(new Path(new URI(file.filePath)), Seq.empty,
file.partitionValues, null, false, false, false)
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
// If ignoreMissingFiles=true, this case will never be reached. But it's ok
// to leave this branch here.
case e@(_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e)
ParquetFileInfoWithBlockMeta(new Path(new URI(file.filePath)), Seq.empty,
file.partitionValues, null, false, false, false)
}
clippedBlocks ++= singleFileInfo.blocks.map(block =>
ParquetSingleDataBlockMeta(
singleFileInfo.filePath,
Expand All @@ -460,7 +478,7 @@ case class GpuParquetMultiFilePartitionReaderFactory(
new MultiFileParquetPartitionReader(conf, files, clippedBlocks,
isCaseSensitive, readDataSchema, debugDumpPrefix,
maxReadBatchSizeRows, maxReadBatchSizeBytes, metrics,
partitionSchema, numThreads)
partitionSchema, numThreads, ignoreMissingFiles, ignoreCorruptFiles)
}

/**
Expand Down Expand Up @@ -991,6 +1009,8 @@ private case class ParquetSingleDataBlockMeta(
* @param execMetrics metrics
* @param partitionSchema Schema of partitions.
* @param numThreads the size of the threadpool
* @param ignoreMissingFiles Whether to ignore missing files
* @param ignoreCorruptFiles Whether to ignore corrupt files
*/
class MultiFileParquetPartitionReader(
override val conf: Configuration,
Expand All @@ -1003,7 +1023,9 @@ class MultiFileParquetPartitionReader(
maxReadBatchSizeBytes: Long,
override val execMetrics: Map[String, GpuMetric],
partitionSchema: StructType,
numThreads: Int)
numThreads: Int,
ignoreMissingFiles: Boolean,
ignoreCorruptFiles: Boolean)
extends MultiFileCoalescingPartitionReaderBase(conf, clippedBlocks, readDataSchema,
partitionSchema, maxReadBatchSizeRows, maxReadBatchSizeBytes, numThreads, execMetrics)
with ParquetPartitionReaderBase {
Expand Down Expand Up @@ -1046,6 +1068,18 @@ class MultiFileParquetPartitionReader(
}
val bytesRead = fileSystemBytesRead() - startBytesRead
(res, bytesRead)
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${file.toString}", e)
(Seq.empty, 0)
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: ${file.toString}", e)
// It leave the empty hole for the re-composed parquet file if we skip
// the corrupted file. But it should be ok since there is no meta pointing to that "hole"
(Seq.empty, 0)
} finally {
TrampolineUtil.unsetTaskContext()
}
Expand Down Expand Up @@ -1190,6 +1224,8 @@ class MultiFileParquetPartitionReader(
* processed on the GPU. This affects the amount of host memory used.
* @param filterHandler GpuParquetFileFilterHandler used to filter the parquet blocks
* @param filters filters passed into the filterHandler
* @param ignoreMissingFiles Whether to ignore missing files
* @param ignoreCorruptFiles Whether to ignore corrupt files
*/
class MultiFileCloudParquetPartitionReader(
override val conf: Configuration,
Expand All @@ -1204,9 +1240,11 @@ class MultiFileCloudParquetPartitionReader(
numThreads: Int,
maxNumFileProcessed: Int,
filterHandler: GpuParquetFileFilterHandler,
filters: Array[Filter])
filters: Array[Filter],
ignoreMissingFiles: Boolean,
ignoreCorruptFiles: Boolean)
extends MultiFileCloudPartitionReaderBase(conf, files, numThreads, maxNumFileProcessed, filters,
execMetrics) with ParquetPartitionReaderBase {
execMetrics, ignoreCorruptFiles) with ParquetPartitionReaderBase {

case class HostMemoryBuffersWithMetaData(
override val partitionedFile: PartitionedFile,
Expand Down Expand Up @@ -1238,6 +1276,16 @@ class MultiFileCloudParquetPartitionReader(
TrampolineUtil.setTaskContext(taskContext)
try {
doRead()
} catch {
case e: FileNotFoundException if ignoreMissingFiles =>
logWarning(s"Skipped missing file: ${file.filePath}", e)
HostMemoryBuffersWithMetaData(file, Array((null, 0)), 0, false, false, false, null)
// Throw FileNotFoundException even if `ignoreCorruptFiles` is true
case e: FileNotFoundException if !ignoreMissingFiles => throw e
case e @ (_: RuntimeException | _: IOException) if ignoreCorruptFiles =>
logWarning(
s"Skipped the rest of the content in the corrupted file: ${file.filePath}", e)
HostMemoryBuffersWithMetaData(file, Array((null, 0)), 0, false, false, false, null)
} finally {
TrampolineUtil.unsetTaskContext()
}
Expand Down Expand Up @@ -1361,7 +1409,6 @@ class MultiFileCloudParquetPartitionReader(
}
}


private def readBufferToTable(
isCorrectRebaseMode: Boolean,
isCorrectInt96RebaseMode: Boolean,
Expand Down