Skip to content

Commit

Permalink
[SPARK-38314][SQL] Fix of failing to read parquet files after writing…
Browse files Browse the repository at this point in the history
… the hidden file metadata in

### What changes were proposed in this pull request?
Selecting and then writing df containing hidden file metadata column `_metadata` into a file format like `parquet`, `delta` will still keep the internal `Attribute` metadata information. Then when reading those `parquet`, `delta` files again, it will actually break the code, because it wrongly thinks user data schema`_metadata` is a hidden file source metadata column.

```
// prepare a file source df
df.select("*", "_metadata").write.format("parquet").save(path)

spark.read.format("parquet").load(path).select("*").show()
```
This PR fixes this by cleaning up any remaining metadata information of output columns.

### Why are the changes needed?
Bugfix

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
A new UT

Closes #35650 from Yaohua628/spark-38314.

Authored-by: yaohua <yaohua.zhao@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Yaohua628 authored and cloud-fan committed Feb 28, 2022
1 parent 89799b8 commit 50520fe
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,4 +475,19 @@ object FileSourceMetadataAttribute {
&& attr.metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY) => Some(attr)
case _ => None
}

/**
* Cleanup the internal metadata information of an attribute if it is
* a [[FileSourceMetadataAttribute]], it will remove both [[METADATA_COL_ATTR_KEY]] and
* [[FILE_SOURCE_METADATA_COL_ATTR_KEY]] from the attribute [[Metadata]]
*/
def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute = attr match {
case FileSourceMetadataAttribute(attr) => attr.withMetadata(
new MetadataBuilder().withMetadata(attr.metadata)
.remove(METADATA_COL_ATTR_KEY)
.remove(FILE_SOURCE_METADATA_COL_ATTR_KEY)
.build()
)
case attr => attr
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,11 @@ object FileFormatWriter extends Logging {
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val partitionSet = AttributeSet(partitionColumns)
val dataColumns = outputSpec.outputColumns.filterNot(partitionSet.contains)
// cleanup the internal metadata information of
// the file source metadata attribute if any before write out
val finalOutputSpec = outputSpec.copy(outputColumns = outputSpec.outputColumns
.map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation))
val dataColumns = finalOutputSpec.outputColumns.filterNot(partitionSet.contains)

var needConvert = false
val projectList: Seq[NamedExpression] = plan.output.map {
Expand Down Expand Up @@ -167,12 +171,12 @@ object FileFormatWriter extends Logging {
uuid = UUID.randomUUID.toString,
serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
outputWriterFactory = outputWriterFactory,
allColumns = outputSpec.outputColumns,
allColumns = finalOutputSpec.outputColumns,
dataColumns = dataColumns,
partitionColumns = partitionColumns,
bucketSpec = writerBucketSpec,
path = outputSpec.outputPath,
customPartitionLocations = outputSpec.customPartitionLocations,
path = finalOutputSpec.outputPath,
customPartitionLocations = finalOutputSpec.customPartitionLocations,
maxRecordsPerFile = caseInsensitiveOptions.get("maxRecordsPerFile").map(_.toLong)
.getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile),
timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION)
Expand Down Expand Up @@ -212,7 +216,7 @@ object FileFormatWriter extends Logging {
// the physical plan may have different attribute ids due to optimizer removing some
// aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
val orderingExpr = bindReferences(
requiredOrdering.map(SortOrder(_, Ascending)), outputSpec.outputColumns)
requiredOrdering.map(SortOrder(_, Ascending)), finalOutputSpec.outputColumns)
val sortPlan = SortExec(
orderingExpr,
global = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,4 +474,40 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
Seq(Row("jack", 24, 12345L, f0(METADATA_FILE_SIZE)))
)
}

metadataColumnsTest("write _metadata in parquet and read back", schema) { (df, f0, f1) =>
// SPARK-38314: Selecting and then writing df containing hidden file
// metadata column `_metadata` into parquet files will still keep the internal `Attribute`
// metadata information of the column. It will then fail when read again.
withTempDir { dir =>
df.select("*", "_metadata")
.write.format("parquet").save(dir.getCanonicalPath + "/new-data")

val newDF = spark.read.format("parquet").load(dir.getCanonicalPath + "/new-data")

// SELECT * will have: name, age, info, _metadata of f0 and f1
checkAnswer(
newDF.select("*"),
Seq(
Row("jack", 24, Row(12345L, "uom"),
Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
Row("lily", 31, Row(54321L, "ucb"),
Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
)
)

// SELECT _metadata won't override the existing user data (_metadata of f0 and f1)
checkAnswer(
newDF.select("_metadata"),
Seq(
Row(Row(f0(METADATA_FILE_PATH), f0(METADATA_FILE_NAME),
f0(METADATA_FILE_SIZE), f0(METADATA_FILE_MODIFICATION_TIME))),
Row(Row(f1(METADATA_FILE_PATH), f1(METADATA_FILE_NAME),
f1(METADATA_FILE_SIZE), f1(METADATA_FILE_MODIFICATION_TIME)))
)
)
}
}
}

0 comments on commit 50520fe

Please sign in to comment.