From 81c7a58e5bfa3aff191cd60f0459bb894822713d Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Wed, 19 Apr 2023 01:20:36 -0700 Subject: [PATCH] Remove spark's internal metadata stored intentionally in Delta [SPARK-43123](https://github.com/apache/spark/pull/40776) fixed an issue that Spark might leak internal metadata, which caused Delta to store Spark's internal metadata in its table schema. Spark's internal metadata may trigger special behaviors. For example, if a column metadata has `__metadata_col`, it cannot be selected by star. If we leak `__metadata_col` to any column in a Delta table, we won't be able to query this column when using `SELECT *`. Although [SPARK-43123](https://github.com/apache/spark/pull/40776) fixes the issue in new Spark versions, we might have already persisted internal metadata in some Delta tables. To make these Delta tables readable again, this PR adds an extra step to clean up internal metadata before returning the table schema to Spark. GitOrigin-RevId: 60eb4046d55e955379c98e409993b33e753c5256 --- .../org/apache/spark/sql/delta/DeltaLog.scala | 7 +-- .../apache/spark/sql/delta/DeltaTable.scala | 51 ++++++++++++++++++- .../sql/delta/catalog/DeltaTableV2.scala | 2 +- .../sql/delta/sources/DeltaDataSource.scala | 2 +- .../sql/delta/sources/DeltaSQLConf.scala | 10 ++++ .../spark/sql/delta/sources/DeltaSource.scala | 3 +- .../sql/delta/DeltaTableUtilsSuite.scala | 29 +++++++++++ 7 files changed, 96 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index fe04c046216..cc3908e80cd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -489,12 +489,13 @@ class DeltaLog private( val relation = HadoopFsRelation( fileIndex, - partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(partitionSchema), + partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata( + DeltaTableUtils.removeInternalMetadata(spark, partitionSchema)), // We pass all table columns as `dataSchema` so that Spark will preserve the partition column // locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just // append them to the end of `dataSchema`. dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions(metadata.schema)), + DeltaTableUtils.removeInternalMetadata(spark, metadata.schema)), bucketSpec = None, fileFormat(snapshot.protocol, metadata), hadoopOptions)(spark) @@ -542,7 +543,7 @@ class DeltaLog private( // locations. Otherwise, for any partition columns not in `dataSchema`, Spark would just // append them to the end of `dataSchema` dataSchema = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions( + DeltaTableUtils.removeInternalMetadata(spark, SchemaUtils.dropNullTypeColumns(snapshotToUse.metadata.schema))), bucketSpec = bucketSpec, fileFormat(snapshotToUse.protocol, snapshotToUse.metadata), diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index fcf6937b682..5eb19562908 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -21,7 +21,7 @@ import scala.util.{Failure, Success, Try} import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.metering.DeltaLogging -import org.apache.spark.sql.delta.sources.DeltaSourceUtils +import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.SparkSession @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ /** * Extractor Object for pulling out the table scan of a Delta table. It could be a full scan @@ -418,4 +419,52 @@ object DeltaTableUtils extends PredicateHelper new Path(basePath, relativeChildPath) } } + + /** + * A list of Spark internal metadata keys that we may save in a Delta table schema + * unintentionally due to SPARK-43123. We need to remove them before handing over the schema to + * Spark to avoid Spark interpreting table columns incorrectly. + * + * Hard-coded strings are used intentionally as we want to capture possible keys used before + * SPARK-43123 regardless Spark versions. For example, if Spark changes any key string in future + * after SPARK-43123, the new string won't be leaked, but we still want to clean up the old key. + */ + val SPARK_INTERNAL_METADATA_KEYS = Seq( + "__autoGeneratedAlias", + "__metadata_col", + "__supports_qualified_star", // A key used by an old version. Doesn't exist in latest code + "__qualified_access_only", + "__file_source_metadata_col", + "__file_source_constant_metadata_col", + "__file_source_generated_metadata_col" + ) + + /** + * Remove leaked metadata keys from the persisted table schema. Old versions might leak metadata + * intentionally. This method removes all possible metadata keys to avoid Spark interpreting + * table columns incorrectly. + */ + def removeInternalMetadata(spark: SparkSession, persistedSchema: StructType): StructType = { + val schema = ColumnWithDefaultExprUtils.removeDefaultExpressions(persistedSchema) + if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA)) { + var updated = false + val updatedSchema = schema.map { field => + if (SPARK_INTERNAL_METADATA_KEYS.exists(field.metadata.contains)) { + updated = true + val newMetadata = new MetadataBuilder().withMetadata(field.metadata) + SPARK_INTERNAL_METADATA_KEYS.foreach(newMetadata.remove) + field.copy(metadata = newMetadata.build()) + } else { + field + } + } + if (updated) { + StructType(updatedSchema) + } else { + schema + } + } else { + schema + } + } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala index a780259a9ef..8b187150d94 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaTableV2.scala @@ -128,7 +128,7 @@ case class DeltaTableV2( private lazy val tableSchema: StructType = DeltaColumnMapping.dropColumnMappingMetadata( - ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshot.schema)) + DeltaTableUtils.removeInternalMetadata(spark, snapshot.schema)) override def schema(): StructType = tableSchema diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala index 1a1d687c013..425cec3573e 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala @@ -95,7 +95,7 @@ class DeltaDataSource .getOrElse(snapshot.schema) } - val schemaToUse = ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchema) + val schemaToUse = DeltaTableUtils.removeInternalMetadata(sqlContext.sparkSession, readSchema) if (schemaToUse.isEmpty) { throw DeltaErrors.schemaNotSetException } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 9e704fd54d1..6e6c8b3125c 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -336,6 +336,16 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA = + buildConf("schema.removeSparkInternalMetadata") + .doc( + """Whether to remove leaked Spark's internal metadata from the table schema before returning + |to Spark. These internal metadata might be stored unintentionally in tables created by + |old Spark versions""".stripMargin) + .internal() + .booleanConf + .createWithDefault(true) + val DELTA_ASSUMES_DROP_CONSTRAINT_IF_EXISTS = buildConf("constraints.assumesDropIfExists.enabled") .doc("""If true, DROP CONSTRAINT quietly drops nonexistent constraints even without diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 5587d8f6f52..d673e10aee5 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -166,8 +166,7 @@ trait DeltaSourceBase extends Source protected var hasCheckedReadIncompatibleSchemaChangesOnStreamStart: Boolean = false override val schema: StructType = { - val schemaWithoutCDC = - ColumnWithDefaultExprUtils.removeDefaultExpressions(readSchemaAtSourceInit) + val schemaWithoutCDC = DeltaTableUtils.removeInternalMetadata(spark, readSchemaAtSourceInit) if (options.readChangeFeed) { CDCReader.cdcReadSchema(schemaWithoutCDC) } else { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala index 3d8de5a3517..3532f9208ad 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTableUtilsSuite.scala @@ -18,11 +18,14 @@ package org.apache.spark.sql.delta import java.net.URI +import org.apache.spark.sql.delta.DeltaTestUtils.BOOLEAN_DOMAIN +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest import org.apache.hadoop.fs.{Path, RawLocalFileSystem} import org.apache.spark.SparkConf import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest { @@ -49,6 +52,32 @@ class DeltaTableUtilsSuite extends SharedSparkSession with DeltaSQLCommandTest { assert(DeltaTableUtils.safeConcatPaths(basePathEmpty, "_delta_log") == new Path("s3://my-bucket/_delta_log")) } + + test("removeInternalMetadata") { + for (flag <- BOOLEAN_DOMAIN) { + withSQLConf(DeltaSQLConf.DELTA_SCHEMA_REMOVE_SPARK_INTERNAL_METADATA.key -> flag.toString) { + for (internalMetadataKey <- DeltaTableUtils.SPARK_INTERNAL_METADATA_KEYS) { + val metadata = new MetadataBuilder() + .putString(internalMetadataKey, "foo") + .putString("other", "bar") + .build() + val schema = StructType(Seq(StructField("foo", StringType, metadata = metadata))) + val newSchema = DeltaTableUtils.removeInternalMetadata(spark, schema) + newSchema.foreach { f => + if (flag) { + // Flag on: should remove internal metadata + assert(!f.metadata.contains(internalMetadataKey)) + // Should reserve non internal metadata + assert(f.metadata.contains("other")) + } else { + // Flag off: no-op + assert(f.metadata == metadata) + } + } + } + } + } + } } private class MockS3FileSystem extends RawLocalFileSystem {