From 2c648aa35cccb6c456f65f654a4573663de160a6 Mon Sep 17 00:00:00 2001 From: Rahul Mahadev Date: Wed, 1 Mar 2023 09:27:25 -0800 Subject: [PATCH] Support TimestampNTZ data type in Delta Lake --- .../resources/error/delta-error-classes.json | 15 ++ .../apache/spark/sql/delta/DeltaErrors.scala | 14 ++ .../sql/delta/OptimisticTransaction.scala | 20 +++ .../apache/spark/sql/delta/TableFeature.scala | 13 ++ .../sql/delta/DeltaTimestampNTZSuite.scala | 129 ++++++++++++++++++ .../sql/delta/schema/SchemaUtilsSuite.scala | 24 ---- 6 files changed, 191 insertions(+), 24 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 41b45ad7922..8ee3bf4d64b 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -2021,6 +2021,21 @@ ], "sqlState" : "0AKDC" }, + "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : { + "message" : [ + "Your table schema contains a column of type TimestampNTZ.", + "TimestampNTZ type is not supported by your table's protocol.", + "", + "Required Delta protocol version and features for TimestampNTZ:", + "", + "Your table's current Delta protocol version and enabled features:", + "", + "", + "Run the following command to add TimestampNTZ support to your table.", + "ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')" + ], + "sqlState" : "0A000" + }, "DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : { "message" : [ "Please provide the base path () when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported." diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index b064fef9086..3261f7f57f6 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -1720,6 +1720,20 @@ trait DeltaErrorsBase ) } + def schemaContainsTimestampNTZType( + schema: StructType, + requiredProtocol: Protocol, + currentProtocol: Protocol): Throwable = { + new DeltaAnalysisException( + errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ", + messageParameters = Array( + s"${formatSchema(schema)}", + s"$requiredProtocol", + s"$currentProtocol" + ) + ) + } + def tableAlreadyExists(table: CatalogTable): Throwable = { new DeltaAnalysisException( errorClass = "DELTA_TABLE_ALREADY_EXISTS", diff --git a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index 40e794ce087..19c94e8b244 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -413,6 +413,26 @@ trait OptimisticTransactionImpl extends TransactionalWrite newMetadataTmp, isCreatingNewTable) + // Check for existence of TimestampNTZ in the schema and throw an error if the feature + // is not enabled. + if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) && + SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) { + // The timestampNTZ feature is enabled if there is a table prop in this transaction, + // or if this is a new table + val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils + .getSupportedFeaturesFromConfigs( + newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX) + .contains(TimestampNTZTableFeature) + + if (!isEnabled) { + throw DeltaErrors.schemaContainsTimestampNTZType( + newMetadataTmp.schema, + TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature), + snapshot.protocol + ) + } + } + if (newMetadataTmp.schemaString != null) { // Replace CHAR and VARCHAR with StringType var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala index 4f0fa72c3e3..4705e24b485 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala @@ -20,9 +20,12 @@ import java.util.Locale import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.constraints.{Constraints, Invariants} +import org.apache.spark.sql.delta.schema.SchemaUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.util.{Utils => DeltaUtils} import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.TimestampNTZType /* --------------------------------------- * | Table features base class definitions | @@ -197,6 +200,7 @@ object TableFeature { GeneratedColumnsTableFeature, InvariantsTableFeature, ColumnMappingTableFeature, + TimestampNTZTableFeature, DeletionVectorsTableFeature) if (DeltaUtils.isTesting) { features ++= Set( @@ -296,6 +300,15 @@ object IdentityColumnsTableFeature } } +object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz") + with FeatureAutomaticallyEnabledByMetadata { + + override def metadataRequiresFeatureToBeEnabled( + metadata: Metadata, spark: SparkSession): Boolean = { + SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema) + } +} + object DeletionVectorsTableFeature extends ReaderWriterFeature(name = "deletionVectors") with FeatureAutomaticallyEnabledByMetadata { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala new file mode 100644 index 00000000000..029163f8a0a --- /dev/null +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala @@ -0,0 +1,129 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.sql.Timestamp +import java.time.LocalDateTime + +import org.apache.spark.sql.delta.actions.Protocol +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest + +import org.apache.spark.SparkThrowable +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.StructType + +class DeltaTimestampNTZSuite extends QueryTest + with SharedSparkSession with DeltaSQLCommandTest { + + private def getProtocolForTable(table: String): Protocol = { + val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) + deltaLog.unsafeVolatileSnapshot.protocol + } + + test("create a new table with TIMESTAMP_NTZ, higher protocol and feature should be picked.") { + withTable("tbl") { + sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA") + sql( + """INSERT INTO tbl VALUES + |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) + assert(spark.table("tbl").head == Row( + "foo", + new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), + LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) + assert(getProtocolForTable("tbl") == + TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) + ) + } + } + + test("creating a table without TIMESTAMP_NTZ should use the usual minimum protocol") { + withTable("tbl") { + sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP) USING DELTA") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) + assert( + !deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(TimestampNTZTableFeature), + s"Table tbl contains TimestampNTZFeature descriptor when its not supposed to" + ) + } + } + + test("add a new column using TIMESTAMP_NTZ should upgrade to the correct protocol versions") { + withTable("tbl") { + sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP) USING delta") + assert(getProtocolForTable("tbl") == Protocol(1, 2)) + + // Should throw error + val e = intercept[SparkThrowable] { + sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ") + } + + // add table feature + sql(s"ALTER TABLE tbl " + + s"SET TBLPROPERTIES('delta.feature.timestampNtz' = 'supported')") + + sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ") + + + sql( + """INSERT INTO tbl VALUES + |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) + assert(spark.table("tbl").head == Row( + "foo", + new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), + LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) + + assert(getProtocolForTable("tbl") == + TimestampNTZTableFeature.minProtocolVersion + .withFeature(TimestampNTZTableFeature) + .withFeature(InvariantsTableFeature) + .withFeature(AppendOnlyTableFeature) + ) + } + } + + test("use TIMESTAMP_NTZ in a partition column") { + withTable("delta_test") { + sql( + """CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) + |USING delta + |PARTITIONED BY (c3)""".stripMargin) + sql( + """INSERT INTO delta_test VALUES + |('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) + assert(spark.table("delta_test").head == Row( + "foo", + new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), + LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) + assert(getProtocolForTable("delta_test") == + TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) + ) + } + } + + test("min/max stats collection should not apply on TIMESTAMP_NTZ") { + withTable("delta_test") { + sql("CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING delta") + val statsSchema = DeltaLog.forTable(spark, TableIdentifier("delta_test")).snapshot.statsSchema + assert(statsSchema("minValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP")) + assert(statsSchema("maxValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP")) + } + } +} diff --git a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala index 1e3c205deaa..818a5a02ca4 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/schema/SchemaUtilsSuite.scala @@ -1576,9 +1576,6 @@ class SchemaUtilsSuite extends QueryTest assertUnsupportedDataType(StringType, Nil) assertUnsupportedDataType(DateType, Nil) assertUnsupportedDataType(TimestampType, Nil) - assertUnsupportedDataType( - TimestampNTZType, - Seq(UnsupportedDataTypeInfo("col", TimestampNTZType))) assertUnsupportedDataType( CalendarIntervalType, Seq(UnsupportedDataTypeInfo("col", CalendarIntervalType))) @@ -1637,27 +1634,6 @@ class SchemaUtilsSuite extends QueryTest Seq(UnsupportedDataTypeInfo("col", UnsupportedDataType))) } - test("unsupported data type check") { - withTempDir { tempDir => - val path = tempDir.getCanonicalPath - - def createTableUsingTimestampNTZType(): Unit = { - DeltaTable.create().addColumn("t", TimestampNTZType, true).location(path).execute() - } - - val e = intercept[AnalysisException] { - createTableUsingTimestampNTZType() - } - assert( - e.getMessage.contains("Found columns using unsupported data types: [t: TimestampNTZType]")) - assert(e.getMessage.contains(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key)) - - withSQLConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key -> "false") { - createTableUsingTimestampNTZType() - } - } - } - test("findUndefinedTypes: basic types") { val schema = StructType(Seq( StructField("c1", NullType),