Skip to content

Commit

Permalink
Adds support to TimestampNTZ type in Delta
Browse files Browse the repository at this point in the history
Previously this type was not supported in Spark and Spark 3.3 added support for this

To prevent(gate) older writers/readers from reading to this column we need a protocol(feature) bump that does the gating

* This PR creates a new TableFeature TimestampNTZ feature that is a ReaderWriter feature
* This is how to feature is automatically enabled
<google-sheets-html-origin><style type="text/css"><!--td {border: 1px solid #cccccc;}br {mso-data-placement:same-cell;}--></style>

Scenario | Previously | With this change
-- | -- | --
User creates a new table with timestamp NTZ column | AnalysisException saying type not supported | Protocol upgraded to feature vector protocol and TimestampNTZ Feature automatically enabled and DDL successful
User adds a new column of type TimestampNTZ on legacy protocol version | AnalysisException saying type not supported | User DDL completes successful.(Protocol also upgraded automatically)
User adds a new column of type TimestampNTZ on table with TimestampNTZFeature enabled on the table | AnalysisException saying type not supported | User DDL completes successful.

Closes #1626

GitOrigin-RevId: d92b62895cf1cdc3dfaed9e97d2ef6e9378f98a3
  • Loading branch information
rahulsmahadev authored and scottsand-db committed Mar 15, 2023
1 parent 380425b commit 6532535
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 27 deletions.
15 changes: 15 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,21 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : {
"message" : [
"Your table schema <schema> contains a column of type TimestampNTZ.",
"TimestampNTZ type is not supported by your table's protocol.",
"",
"Required Delta protocol version and features for TimestampNTZ:",
"<requiredVersion>",
"Your table's current Delta protocol version and enabled features:",
"<currentVersion>",
"",
"Run the following command to add TimestampNTZ support to your table.",
"ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')"
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : {
"message" : [
"Please provide the base path (<baseDeltaPath>) when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported."
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,20 @@ trait DeltaErrorsBase
)
}

def schemaContainsTimestampNTZType(
schema: StructType,
requiredProtocol: Protocol,
currentProtocol: Protocol): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ",
messageParameters = Array(
s"${formatSchema(schema)}",
s"$requiredProtocol",
s"$currentProtocol"
)
)
}

def tableAlreadyExists(table: CatalogTable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_TABLE_ALREADY_EXISTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,26 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp,
isCreatingNewTable)

// Check for existence of TimestampNTZ in the schema and throw an error if the feature
// is not enabled.
if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) &&
SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) {
// The timestampNTZ feature is enabled if there is a table prop in this transaction,
// or if this is a new table
val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils
.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
.contains(TimestampNTZTableFeature)

if (!isEnabled) {
throw DeltaErrors.schemaContainsTimestampNTZType(
newMetadataTmp.schema,
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature),
snapshot.protocol
)
}
}

if (newMetadataTmp.schemaString != null) {
// Replace CHAR and VARCHAR with StringType
var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema)
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import java.util.Locale

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.TimestampNTZType

/* --------------------------------------- *
| Table features base class definitions |
Expand Down Expand Up @@ -197,6 +200,7 @@ object TableFeature {
GeneratedColumnsTableFeature,
InvariantsTableFeature,
ColumnMappingTableFeature,
TimestampNTZTableFeature,
DeletionVectorsTableFeature)
if (DeltaUtils.isTesting) {
features ++= Set(
Expand Down Expand Up @@ -296,6 +300,15 @@ object IdentityColumnsTableFeature
}
}

object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
with FeatureAutomaticallyEnabledByMetadata {

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema)
}
}

object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping}
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaColumnMappingMode, DeltaErrors, DeltaLog, GeneratedColumn, NoMapping, TimestampNTZTableFeature}
import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -1026,6 +1026,13 @@ object SchemaUtils extends DeltaLogging {
unsupportedDataTypes.toSeq
}

/**
* Find TimestampNTZ columns in the table schema.
*/
def checkForTimestampNTZColumnsRecursively(schema: StructType): Boolean = {
SchemaUtils.typeExistsRecursively(schema)(_.isInstanceOf[TimestampNTZType])
}

/**
* Find the unsupported data types in a `DataType` recursively. Add the unsupported data types to
* the provided `unsupportedDataTypes` buffer.
Expand Down Expand Up @@ -1058,8 +1065,6 @@ object SchemaUtils extends DeltaLogging {
case DateType =>
case TimestampType =>
case TimestampNTZType =>
assert(columnPath.nonEmpty, "'columnPath' must not be empty")
unsupportedDataTypes += UnsupportedDataTypeInfo(prettyFieldName(columnPath), TimestampNTZType)
case BinaryType =>
case _: DecimalType =>
case a: ArrayType =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.sql.Timestamp
import java.time.LocalDateTime

import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType

class DeltaTimestampNTZSuite extends QueryTest
with SharedSparkSession with DeltaSQLCommandTest {

private def getProtocolForTable(table: String): Protocol = {
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
deltaLog.unsafeVolatileSnapshot.protocol
}

test("create a new table with TIMESTAMP_NTZ, higher protocol and feature should be picked.") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA")
sql(
"""INSERT INTO tbl VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("tbl").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
assert(getProtocolForTable("tbl") ==
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature)
)
}
}

test("creating a table without TIMESTAMP_NTZ should use the usual minimum protocol") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP) USING DELTA")
assert(getProtocolForTable("tbl") == Protocol(1, 2))

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
assert(
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(TimestampNTZTableFeature),
s"Table tbl contains TimestampNTZFeature descriptor when its not supposed to"
)
}
}

test("add a new column using TIMESTAMP_NTZ should upgrade to the correct protocol versions") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP) USING delta")
assert(getProtocolForTable("tbl") == Protocol(1, 2))

// Should throw error
val e = intercept[SparkThrowable] {
sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ")
}

// add table feature
sql(s"ALTER TABLE tbl " +
s"SET TBLPROPERTIES('delta.feature.timestampNtz' = 'supported')")

sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ")


sql(
"""INSERT INTO tbl VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("tbl").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))

assert(getProtocolForTable("tbl") ==
TimestampNTZTableFeature.minProtocolVersion
.withFeature(TimestampNTZTableFeature)
.withFeature(InvariantsTableFeature)
.withFeature(AppendOnlyTableFeature)
)
}
}

test("use TIMESTAMP_NTZ in a partition column") {
withTable("delta_test") {
sql(
"""CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ)
|USING delta
|PARTITIONED BY (c3)""".stripMargin)
sql(
"""INSERT INTO delta_test VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("delta_test").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
assert(getProtocolForTable("delta_test") ==
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature)
)
}
}

test("min/max stats collection should not apply on TIMESTAMP_NTZ") {
withTable("delta_test") {
sql("CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING delta")
val statsSchema = DeltaLog.forTable(spark, TableIdentifier("delta_test")).snapshot.statsSchema
assert(statsSchema("minValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP"))
assert(statsSchema("maxValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1576,9 +1576,6 @@ class SchemaUtilsSuite extends QueryTest
assertUnsupportedDataType(StringType, Nil)
assertUnsupportedDataType(DateType, Nil)
assertUnsupportedDataType(TimestampType, Nil)
assertUnsupportedDataType(
TimestampNTZType,
Seq(UnsupportedDataTypeInfo("col", TimestampNTZType)))
assertUnsupportedDataType(
CalendarIntervalType,
Seq(UnsupportedDataTypeInfo("col", CalendarIntervalType)))
Expand Down Expand Up @@ -1637,27 +1634,6 @@ class SchemaUtilsSuite extends QueryTest
Seq(UnsupportedDataTypeInfo("col", UnsupportedDataType)))
}

test("unsupported data type check") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath

def createTableUsingTimestampNTZType(): Unit = {
DeltaTable.create().addColumn("t", TimestampNTZType, true).location(path).execute()
}

val e = intercept[AnalysisException] {
createTableUsingTimestampNTZType()
}
assert(
e.getMessage.contains("Found columns using unsupported data types: [t: TimestampNTZType]"))
assert(e.getMessage.contains(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key -> "false") {
createTableUsingTimestampNTZType()
}
}
}

test("findUndefinedTypes: basic types") {
val schema = StructType(Seq(
StructField("c1", NullType),
Expand Down

0 comments on commit 6532535

Please sign in to comment.