Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow restricting table features to auto-update protocol versions #1660

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 13 additions & 15 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,19 @@
],
"sqlState" : "0AKDE"
},
"DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT" : {
"message" : [
"Your table schema requires the following table feature(s) that require manual enablement: <features>.",
xupefei marked this conversation as resolved.
Show resolved Hide resolved
"",
"To do this, run the following command for each of features listed above:",
" ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.feature_name' = 'supported')",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Table name can be passed in to this error message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems not trivial because we have only the Delta Log instance which is based on a path. The name is already lost at this stage.

xupefei marked this conversation as resolved.
Show resolved Hide resolved
"Replace \"table_name\" and \"feature_name\" with real values.",
"",
"Required Delta protocol: <requiredVersion>",
"Current Delta protocol: <currentVersion>"
],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Q: Does that mean we now allow users to specify support for all table features? I thought they were not meant to be user-facing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nevermind, they are allowed to be user-facing. So a user can specify support for DVs like this or add CDC support and change the metadata at the same time, if they wish 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A feature can also be controlled exclusively by table features, such as TimestampNtz: #1626. In this case the config must be user-facing.

"sqlState" : "0AKDE"
},
"DELTA_FEATURE_REQUIRES_HIGHER_READER_VERSION" : {
"message" : [
"Unable to enable table feature <feature> because it requires a higher reader protocol version (current <current>). Consider upgrading the table's reader protocol version to <required>, or to a version which supports reader table features. Refer to <docLink> for more information on table protocol versions."
Expand Down Expand Up @@ -2077,21 +2090,6 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : {
"message" : [
"Your table schema <schema> contains a column of type TimestampNTZ.",
"TimestampNTZ type is not supported by your table's protocol.",
"",
"Required Delta protocol version and features for TimestampNTZ:",
"<requiredVersion>",
"Your table's current Delta protocol version and enabled features:",
"<currentVersion>",
"",
"Run the following command to add TimestampNTZ support to your table.",
"ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')"
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : {
"message" : [
"Please provide the base path (<baseDeltaPath>) when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported."
Expand Down
26 changes: 12 additions & 14 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1720,20 +1720,6 @@ trait DeltaErrorsBase
)
}

def schemaContainsTimestampNTZType(
schema: StructType,
requiredProtocol: Protocol,
currentProtocol: Protocol): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ",
messageParameters = Array(
s"${formatSchema(schema)}",
s"$requiredProtocol",
s"$currentProtocol"
)
)
}

def tableAlreadyExists(table: CatalogTable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_TABLE_ALREADY_EXISTS",
Expand Down Expand Up @@ -2175,6 +2161,18 @@ trait DeltaErrorsBase
messageParameters = Array(features.mkString(", ")))
}

def tableFeaturesRequireManualEnablementException(
features: Iterable[String],
requiredProtocol: Protocol,
currentProtocol: Protocol): Throwable = {
new DeltaTableFeatureException(
errorClass = "DELTA_FEATURES_REQUIRE_MANUAL_ENABLEMENT",
messageParameters = Array(
features.mkString(", "),
requiredProtocol.toString,
currentProtocol.toString))
}

def concurrentAppendException(
conflictingCommit: Option[CommitInfo],
partition: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,26 +403,6 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp,
isCreatingNewTable)

// Check for existence of TimestampNTZ in the schema and throw an error if the feature
// is not enabled.
if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) &&
SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) {
// The timestampNTZ feature is enabled if there is a table prop in this transaction,
// or if this is a new table
val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils
.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
.contains(TimestampNTZTableFeature)

if (!isEnabled) {
throw DeltaErrors.schemaContainsTimestampNTZType(
newMetadataTmp.schema,
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature),
snapshot.protocol
)
}
}

if (newMetadataTmp.schemaString != null) {
// Replace CHAR and VARCHAR with StringType
var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema)
Expand Down Expand Up @@ -450,14 +430,12 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp.configuration.contains(Protocol.MIN_WRITER_VERSION_PROP)) {
// Collect new reader and writer versions from table properties, which could be provided by
// the user in `ALTER TABLE TBLPROPERTIES` or copied over from session defaults.
val readerVersionInNewMetadataTmp = newMetadataTmp.configuration
.get(Protocol.MIN_READER_VERSION_PROP)
.map(Protocol.getVersion(Protocol.MIN_READER_VERSION_PROP, _))
.getOrElse(protocolBeforeUpdate.minReaderVersion)
val writerVersionInNewMetadataTmp = newMetadataTmp.configuration
.get(Protocol.MIN_WRITER_VERSION_PROP)
.map(Protocol.getVersion(Protocol.MIN_WRITER_VERSION_PROP, _))
.getOrElse(protocolBeforeUpdate.minWriterVersion)
val (readerVersionInNewMetadataTmp, writerVersionInNewMetadataTmp) = {
val (readerVersionInNewMetadataTmpOpt, writerVersionInNewMetadataTmpOpt) =
Protocol.getProtocolVersionsFromTableConf(newMetadataTmp.configuration)
(readerVersionInNewMetadataTmpOpt.getOrElse(protocolBeforeUpdate.minReaderVersion),
writerVersionInNewMetadataTmpOpt.getOrElse(protocolBeforeUpdate.minWriterVersion))
}
xupefei marked this conversation as resolved.
Show resolved Hide resolved

// If the collected reader and writer versions are provided by the user, we must use them,
// and throw ProtocolDowngradeException when they are lower than what the table have before
Expand Down Expand Up @@ -506,7 +484,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
GeneratedColumn.satisfyGeneratedColumnProtocol(protocolBeforeUpdate)
val keepIdentityColumns =
ColumnWithDefaultExprUtils.satisfiesIdentityColumnProtocol(protocolBeforeUpdate)
if (keepGeneratedColumns && keepIdentityColumns) {
val fixedNewMetadataTmp = if (keepGeneratedColumns && keepIdentityColumns) {
xupefei marked this conversation as resolved.
Show resolved Hide resolved
// If a protocol satisfies both requirements, we do nothing here.
newMetadataTmp
} else {
Expand All @@ -523,6 +501,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp
}
}

// After having the metadata fixed, assert all table features already supported by this
xupefei marked this conversation as resolved.
Show resolved Hide resolved
// table, or not supported but required by the fixed metadata, can be automatically
// supported by the protocol.
// See the doc of [[assertTableFeaturesAutomaticallySupported]] for more info.
assertTableFeaturesAutomaticallySupported(protocolBeforeUpdate, fixedNewMetadataTmp)
fixedNewMetadataTmp
}

// Enabling table features Part 1: add manually-supported features in table properties start
Expand All @@ -534,9 +519,7 @@ trait OptimisticTransactionImpl extends TransactionalWrite
// any reader-writer feature.
val newProtocolBeforeAddingFeatures = newProtocol.getOrElse(protocolBeforeUpdate)
val newFeaturesFromTableConf =
TableFeatureProtocolUtils.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration,
TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
TableFeatureProtocolUtils.getSupportedFeaturesFromConfigs(newMetadataTmp.configuration)
val readerVersionForNewProtocol =
if (newFeaturesFromTableConf.exists(_.isReaderWriterFeature)) {
TableFeatureProtocolUtils.TABLE_FEATURES_MIN_READER_VERSION
Expand Down Expand Up @@ -638,6 +621,60 @@ trait OptimisticTransactionImpl extends TransactionalWrite
}
}

/**
* Ensure all native table features required by `currentMetadata` is supported by
xupefei marked this conversation as resolved.
Show resolved Hide resolved
* `currentProtocol`. This means these features must be either auto-update capable
* ([[FeatureAutomaticallyEnabledByMetadata.automaticallyUpdateProtocolOfExistingTables]] is set
* to `true`), or being supported in the same transaction via a table property.
xupefei marked this conversation as resolved.
Show resolved Hide resolved
*/
private def assertTableFeaturesAutomaticallySupported(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the description, rename to assertTableFeaturesSupportedByProtocol? It doesn't necessarily only check automatic supports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about assertRequiredFeaturesCanBeSupportedByProtocol 😄?

currentProtocol: Protocol,
currentMetadata: Metadata): Unit = {
val (readerVersionFromTableConfOpt, writerVersionFromTableConfOpt) =
Protocol.getProtocolVersionsFromTableConf(currentMetadata.configuration)
val txnUpdatedProtocol = currentProtocol.merge(
Protocol(
readerVersionFromTableConfOpt.getOrElse(currentProtocol.minReaderVersion),
writerVersionFromTableConfOpt.getOrElse(currentProtocol.minWriterVersion)))
xupefei marked this conversation as resolved.
Show resolved Hide resolved
// Collect features that (1) are already supported by the current protocol or by a new
// protocol that the table will be updated to; (2) will be added to the protocol by this txn
// via `delta.feature.xxx` configs.
val supportedFeaturesInTxn =
TableFeatureProtocolUtils.getSupportedFeaturesFromConfigs(currentMetadata.configuration) ++
txnUpdatedProtocol.implicitlySupportedFeatures ++
txnUpdatedProtocol.readerAndWriterFeatureNames.flatMap(TableFeature.featureNameToFeature)
// Get the min set of features required by the metadata.
val (_, _, metadataEnabledFeaturesInTxn) =
Protocol.minProtocolComponentsFromAutomaticallyEnabledFeatures(spark, currentMetadata)
// Collect native features that are required by the metadata but can't be automatically
// activated.
val unsupportedNativeFeatures =
metadataEnabledFeaturesInTxn.diff(supportedFeaturesInTxn).filter {
case f: FeatureAutomaticallyEnabledByMetadata =>
!f.automaticallyUpdateProtocolOfExistingTables
case f =>
throw new RuntimeException(
f"Feature ${f.name} should not be here because " +
f"it is not activatable via metadata.")
xupefei marked this conversation as resolved.
Show resolved Hide resolved
}
if (unsupportedNativeFeatures.nonEmpty) {
// The required protocol we give the user is the current protocol with all features that are
// already supported or will be supported, plus which required by the metadata.
// `TableFeature.minProtocolVersion` does not contain feature names, so we must add them
// using `withFeatures`.
val requiredProtocol = currentProtocol
.merge(supportedFeaturesInTxn.map(_.minProtocolVersion).toSeq: _*)
.merge(metadataEnabledFeaturesInTxn.map(_.minProtocolVersion).toSeq: _*)
// `TableFeature.minProtocolVersion` does not contain feature names, we must add them.
.withFeatures(supportedFeaturesInTxn)
.withFeatures(metadataEnabledFeaturesInTxn)
throw DeltaErrors.tableFeaturesRequireManualEnablementException(
unsupportedNativeFeatures.map(_.name),
requiredProtocol,
currentProtocol)
}
}

/**
* Must make sure that deletion vectors are never added to a table where that isn't allowed.
* Note, statistics recomputation is still allowed even though DVs might be currently disabled.
Expand Down
62 changes: 52 additions & 10 deletions core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,32 @@ sealed trait LegacyFeatureType
* A trait indicating this feature can be automatically enabled via a change in a table's
* metadata, e.g., through setting particular values of certain feature-specific table properties.
*
* When the feature's metadata requirements are satisfied during table creation
* ([[actions.Protocol.forNewTable]]) or commit ([[OptimisticTransaction.updateMetadata]]), the
* When the feature's metadata requirements are satisfied for <b>new tables</b>, or for
* <b>existing tables when [[automaticallyUpdateProtocolOfExistingTables]] set to `true`</b>, the
Comment on lines +114 to +115
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we allow html annotations in our doc comments? I thought it was just markdown?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so... Someone could correct me if I'm wrong

* client will silently add the feature to the protocol's `readerFeatures` and/or
* `writerFeatures`.
* `writerFeatures`. Otherwise, a proper protocol version bump must be present in the same
* transaction.
*/
sealed trait FeatureAutomaticallyEnabledByMetadata {
sealed trait FeatureAutomaticallyEnabledByMetadata { this: TableFeature =>

/**
* Whether the feature can automatically update the protocol of an existing table when the
* metadata requirements are satisfied. As a rule of thumb, a table feature that requires
* explicit operations (e.g., turning on a table property) should set this flag to `true`, while
* features that are used implicitly (e.g., when using a new data type) should set this flag to
* `false`.
*/
def automaticallyUpdateProtocolOfExistingTables: Boolean = this.isLegacyFeature

/**
* Determine whether the feature must be supported and enabled because its metadata requirements
* are satisfied.
*/
def metadataRequiresFeatureToBeEnabled(metadata: Metadata, spark: SparkSession): Boolean

require(
!this.isLegacyFeature || automaticallyUpdateProtocolOfExistingTables,
"Legacy feature must be auto-update capable.")
}

/**
Expand Down Expand Up @@ -205,10 +219,12 @@ object TableFeature {
if (DeltaUtils.isTesting) {
features ++= Set(
TestLegacyWriterFeature,
TestWriterFeature,
TestLegacyReaderWriterFeature,
TestWriterFeature,
TestWriterMetadataNoAutoUpdateFeature,
TestReaderWriterFeature,
TestReaderWriterMetadataFeature)
TestReaderWriterMetadataAutoUpdateFeature,
TestReaderWriterMetadataNoAutoUpdateFeature)
}
val featureMap = features.map(f => f.name.toLowerCase(Locale.ROOT) -> f).toMap
require(features.size == featureMap.size, "Lowercase feature names must not duplicate.")
Expand Down Expand Up @@ -302,7 +318,6 @@ object IdentityColumnsTableFeature

object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
with FeatureAutomaticallyEnabledByMetadata {

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema)
Expand All @@ -312,6 +327,8 @@ object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz
object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Expand All @@ -330,6 +347,17 @@ object TestLegacyWriterFeature

object TestWriterFeature extends WriterFeature(name = "testWriter")

object TestWriterMetadataNoAutoUpdateFeature
extends WriterFeature(name = "testWriterMetadataNoAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testWriterMetadataNoAutoUpdate321_"
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
}
}

object TestLegacyReaderWriterFeature
extends LegacyReaderWriterFeature(
name = "testLegacyReaderWriter",
Expand All @@ -338,10 +366,24 @@ object TestLegacyReaderWriterFeature

object TestReaderWriterFeature extends ReaderWriterFeature(name = "testReaderWriter")

object TestReaderWriterMetadataFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadata")
object TestReaderWriterMetadataNoAutoUpdateFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadataNoAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testReaderWriterMetadata321_"
val TABLE_PROP_KEY = "_123testReaderWriterMetadataNoAutoUpdate321_"
override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
metadata.configuration.get(TABLE_PROP_KEY).exists(_.toBoolean)
}
}

object TestReaderWriterMetadataAutoUpdateFeature
extends ReaderWriterFeature(name = "testReaderWriterMetadataAutoUpdate")
with FeatureAutomaticallyEnabledByMetadata {
val TABLE_PROP_KEY = "_123testReaderWriterMetadataAutoUpdate321_"

override def automaticallyUpdateProtocolOfExistingTables: Boolean = true

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata,
spark: SparkSession): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,18 +309,15 @@ object TableFeatureProtocolUtils {
}

/**
* Get a set of [[TableFeature]]s representing supported features set in a `config` map (table
* properties or Spark session configs).
* Get a set of [[TableFeature]]s representing supported features set in a table properties map.
*/
def getSupportedFeaturesFromConfigs(
configs: Map[String, String],
propertyPrefix: String): Set[TableFeature] = {
val featureConfigs = configs.filterKeys(_.startsWith(propertyPrefix))
def getSupportedFeaturesFromConfigs(configs: Map[String, String]): Set[TableFeature] = {
xupefei marked this conversation as resolved.
Show resolved Hide resolved
val featureConfigs = configs.filterKeys(_.startsWith(FEATURE_PROP_PREFIX))
val unsupportedFeatureConfigs = mutable.Set.empty[String]
val collectedFeatures = featureConfigs.flatMap { case (key, value) =>
// Feature name is lower cased in table properties but not in Spark session configs.
// Feature status is not lower cased in any case.
val name = key.stripPrefix(propertyPrefix).toLowerCase(Locale.ROOT)
val name = key.stripPrefix(FEATURE_PROP_PREFIX).toLowerCase(Locale.ROOT)
val status = value.toLowerCase(Locale.ROOT)
if (status != FEATURE_PROP_SUPPORTED && status != FEATURE_PROP_ENABLED) {
throw DeltaErrors.unsupportedTableFeatureStatusException(name, status)
Expand Down
Loading