Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43123][SQL] Internal field metadata should not be leaked to catalogs #40776

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2
import org.apache.spark.sql.catalyst.trees.AlwaysProcess
import org.apache.spark.sql.catalyst.trees.CurrentOrigin.withOrigin
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.util.{toPrettySQL, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.{toPrettySQL, AUTO_GENERATED_ALIAS, CharVarcharUtils, StringUtils}
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.{View => _, _}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
Expand Down Expand Up @@ -492,7 +492,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
case l: Literal => Alias(l, toPrettySQL(l))()
case e =>
val metaForAutoGeneratedAlias = new MetadataBuilder()
.putString("__autoGeneratedAlias", "true")
.putString(AUTO_GENERATED_ALIAS, "true")
.build()
Alias(e, toPrettySQL(e))(explicitMetadata = Some(metaForAutoGeneratedAlias))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import com.google.common.io.ByteStreams
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType}
import org.apache.spark.sql.types.{MetadataBuilder, NumericType, StringType, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -191,12 +191,13 @@ package object util extends Logging {

val METADATA_COL_ATTR_KEY = "__metadata_col"

/**
* If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or
* `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
*/
val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"

implicit class MetadataColumnHelper(attr: Attribute) {
/**
* If set, this metadata column can only be accessed with qualifiers, e.g. `qualifiers.col` or
* `qualifiers.*`. If not set, metadata columns cannot be accessed via star.
*/
val QUALIFIED_ACCESS_ONLY = "__qualified_access_only"

def isMetadataCol: Boolean = MetadataAttribute.isValid(attr.metadata)

Expand Down Expand Up @@ -225,4 +226,25 @@ package object util extends Logging {
}
}
}

val AUTO_GENERATED_ALIAS = "__autoGeneratedAlias"

val INTERNAL_METADATA_KEYS = Seq(
AUTO_GENERATED_ALIAS,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these metadata keys only in the top level columns?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

METADATA_COL_ATTR_KEY,
QUALIFIED_ACCESS_ONLY,
FileSourceMetadataAttribute.FILE_SOURCE_METADATA_COL_ATTR_KEY,
FileSourceConstantMetadataStructField.FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY,
FileSourceGeneratedMetadataStructField.FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY
)

def removeInternalMetadata(schema: StructType): StructType = {
StructType(schema.map { field =>
var builder = new MetadataBuilder().withMetadata(field.metadata)
INTERNAL_METADATA_KEYS.foreach { key =>
builder = builder.remove(key)
}
field.copy(metadata = builder.build())
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.URI
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.execution.datasources._
Expand Down Expand Up @@ -181,7 +181,8 @@ case class CreateDataSourceTableAsSelectCommand(
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, SaveMode.Overwrite, tableExists = false)
val tableSchema = CharVarcharUtils.getRawSchema(result.schema, sessionState.conf)
val tableSchema = CharVarcharUtils.getRawSchema(
removeInternalMetadata(result.schema), sessionState.conf)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,16 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, LogicalPlan, TableSpec, UnaryNode}
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, WriteDeltaProjections}
import org.apache.spark.sql.catalyst.util.{removeInternalMetadata, CharVarcharUtils, WriteDeltaProjections}
import org.apache.spark.sql.catalyst.util.RowDeltaUtils.{DELETE_OPERATION, INSERT_OPERATION, UPDATE_OPERATION}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, Identifier, StagedTable, StagingTableCatalog, Table, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.metric.CustomMetric
import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, PhysicalWriteInfoImpl, Write, WriterCommitMessage}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{LongAccumulator, Utils}

/**
Expand Down Expand Up @@ -69,7 +70,7 @@ case class CreateTableAsSelectExec(
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
ifNotExists: Boolean) extends TableWriteExecHelper {
ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -78,14 +79,10 @@ case class CreateTableAsSelectExec(
if (ifNotExists) {
return Nil
}

throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}

val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val table = catalog.createTable(ident, columns,
partitioning.toArray, properties.asJava)
val table = catalog.createTable(
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident, query)
}
}
Expand All @@ -106,7 +103,7 @@ case class AtomicCreateTableAsSelectExec(
query: LogicalPlan,
tableSpec: TableSpec,
writeOptions: Map[String, String],
ifNotExists: Boolean) extends TableWriteExecHelper {
ifNotExists: Boolean) extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -115,13 +112,10 @@ case class AtomicCreateTableAsSelectExec(
if (ifNotExists) {
return Nil
}

throw QueryCompilationErrors.tableAlreadyExistsError(ident)
}
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val stagedTable = catalog.stageCreate(
ident, columns, partitioning.toArray, properties.asJava)
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, stagedTable, writeOptions, ident, query)
}
}
Expand All @@ -144,7 +138,8 @@ case class ReplaceTableAsSelectExec(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
invalidateCache: (TableCatalog, Table, Identifier) => Unit)
extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

Expand All @@ -164,10 +159,8 @@ case class ReplaceTableAsSelectExec(
} else if (!orCreate) {
throw QueryCompilationErrors.cannotReplaceMissingTableError(ident)
}
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val table = catalog.createTable(
ident, columns, partitioning.toArray, properties.asJava)
ident, getV2Columns(query.schema), partitioning.toArray, properties.asJava)
writeToTable(catalog, table, writeOptions, ident, query)
}
}
Expand All @@ -192,13 +185,13 @@ case class AtomicReplaceTableAsSelectExec(
tableSpec: TableSpec,
writeOptions: Map[String, String],
orCreate: Boolean,
invalidateCache: (TableCatalog, Table, Identifier) => Unit) extends TableWriteExecHelper {
invalidateCache: (TableCatalog, Table, Identifier) => Unit)
extends V2CreateTableAsSelectBaseExec {

val properties = CatalogV2Util.convertTableProperties(tableSpec)

override protected def run(): Seq[InternalRow] = {
val columns = CatalogV2Util.structTypeToV2Columns(
CharVarcharUtils.getRawSchema(query.schema, conf).asNullable)
val columns = getV2Columns(query.schema)
if (catalog.tableExists(ident)) {
val table = catalog.loadTable(ident)
invalidateCache(catalog, table, ident)
Expand Down Expand Up @@ -559,9 +552,14 @@ case class DeltaWithMetadataWritingSparkTask(
}
}

private[v2] trait TableWriteExecHelper extends LeafV2CommandExec {
private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec {
override def output: Seq[Attribute] = Nil

protected def getV2Columns(schema: StructType): Array[Column] = {
CatalogV2Util.structTypeToV2Columns(CharVarcharUtils.getRawSchema(
removeInternalMetadata(schema), conf).asNullable)
}

protected def writeToTable(
catalog: TableCatalog,
table: Table,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.spark.sql.connector

import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.types.IntegerType

class MetadataColumnSuite extends DatasourceV2SQLBase {
import testImplicits._
Expand Down Expand Up @@ -340,4 +343,17 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
assert(relations(0).output != relations(1).output)
}
}

test("SPARK-43123: Metadata column related field metadata should not be leaked to catalogs") {
withTable(tbl, "testcat.target") {
prepareTable()
sql(s"CREATE TABLE testcat.target AS SELECT index FROM $tbl")
val cols = catalog("testcat").asTableCatalog.loadTable(
Identifier.of(Array.empty, "target")).columns()
assert(cols.length == 1)
assert(cols.head.name() == "index")
assert(cols.head.dataType() == IntegerType)
assert(cols.head.metadataInJSON() == null)
}
}
}