Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DELTA] Support TimestampNTZ DataType #1626

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2021,6 +2021,21 @@
],
"sqlState" : "0AKDC"
},
"DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ" : {
"message" : [
"Your table schema <schema> contains a column of type TimestampNTZ.",
"TimestampNTZ type is not supported by your table's protocol.",
"",
"Required Delta protocol version and features for TimestampNTZ:",
"<requiredVersion>",
"Your table's current Delta protocol version and enabled features:",
"<currentVersion>",
"",
"Run the following command to add TimestampNTZ support to your table.",
"ALTER TABLE table_name SET TBLPROPERTIES ('delta.feature.timestampNtz' = 'supported')"
],
"sqlState" : "0A000"
},
"DELTA_UNSUPPORTED_VACUUM_SPECIFIC_PARTITION" : {
"message" : [
"Please provide the base path (<baseDeltaPath>) when Vacuuming Delta tables. Vacuuming specific partitions is currently not supported."
Expand Down
14 changes: 14 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1720,6 +1720,20 @@ trait DeltaErrorsBase
)
}

def schemaContainsTimestampNTZType(
schema: StructType,
requiredProtocol: Protocol,
currentProtocol: Protocol): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_UNSUPPORTED_TYPE_TIMESTAMP_NTZ",
messageParameters = Array(
s"${formatSchema(schema)}",
s"$requiredProtocol",
s"$currentProtocol"
)
)
}

def tableAlreadyExists(table: CatalogTable): Throwable = {
new DeltaAnalysisException(
errorClass = "DELTA_TABLE_ALREADY_EXISTS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,26 @@ trait OptimisticTransactionImpl extends TransactionalWrite
newMetadataTmp,
isCreatingNewTable)

// Check for existence of TimestampNTZ in the schema and throw an error if the feature
// is not enabled.
if (!protocolBeforeUpdate.isFeatureSupported(TimestampNTZTableFeature) &&
SchemaUtils.checkForTimestampNTZColumnsRecursively(newMetadataTmp.schema)) {
// The timestampNTZ feature is enabled if there is a table prop in this transaction,
// or if this is a new table
val isEnabled = isCreatingNewTable || TableFeatureProtocolUtils
.getSupportedFeaturesFromConfigs(
newMetadataTmp.configuration, TableFeatureProtocolUtils.FEATURE_PROP_PREFIX)
.contains(TimestampNTZTableFeature)

if (!isEnabled) {
throw DeltaErrors.schemaContainsTimestampNTZType(
newMetadataTmp.schema,
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature),
snapshot.protocol
)
}
}

if (newMetadataTmp.schemaString != null) {
// Replace CHAR and VARCHAR with StringType
var schema = CharVarcharUtils.replaceCharVarcharWithStringInSchema(newMetadataTmp.schema)
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/sql/delta/TableFeature.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import java.util.Locale

import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.constraints.{Constraints, Invariants}
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.{Utils => DeltaUtils}

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.TimestampNTZType

/* --------------------------------------- *
| Table features base class definitions |
Expand Down Expand Up @@ -197,6 +200,7 @@ object TableFeature {
GeneratedColumnsTableFeature,
InvariantsTableFeature,
ColumnMappingTableFeature,
TimestampNTZTableFeature,
DeletionVectorsTableFeature)
if (DeltaUtils.isTesting) {
features ++= Set(
Expand Down Expand Up @@ -296,6 +300,15 @@ object IdentityColumnsTableFeature
}
}

object TimestampNTZTableFeature extends ReaderWriterFeature(name = "timestampNtz")
with FeatureAutomaticallyEnabledByMetadata {

override def metadataRequiresFeatureToBeEnabled(
metadata: Metadata, spark: SparkSession): Boolean = {
SchemaUtils.checkForTimestampNTZColumnsRecursively(metadata.schema)
}
}

object DeletionVectorsTableFeature
extends ReaderWriterFeature(name = "deletionVectors")
with FeatureAutomaticallyEnabledByMetadata {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import java.sql.Timestamp
import java.time.LocalDateTime

import org.apache.spark.sql.delta.actions.Protocol
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

import org.apache.spark.SparkThrowable
import org.apache.spark.sql.{QueryTest, Row}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType

class DeltaTimestampNTZSuite extends QueryTest
with SharedSparkSession with DeltaSQLCommandTest {

private def getProtocolForTable(table: String): Protocol = {
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
deltaLog.unsafeVolatileSnapshot.protocol
}

test("create a new table with TIMESTAMP_NTZ, higher protocol and feature should be picked.") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA")
sql(
"""INSERT INTO tbl VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("tbl").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
assert(getProtocolForTable("tbl") ==
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature)
)
}
}

test("creating a table without TIMESTAMP_NTZ should use the usual minimum protocol") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP) USING DELTA")
assert(getProtocolForTable("tbl") == Protocol(1, 2))

val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl"))
assert(
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(TimestampNTZTableFeature),
s"Table tbl contains TimestampNTZFeature descriptor when its not supposed to"
)
}
}

test("add a new column using TIMESTAMP_NTZ should upgrade to the correct protocol versions") {
withTable("tbl") {
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP) USING delta")
assert(getProtocolForTable("tbl") == Protocol(1, 2))

// Should throw error
val e = intercept[SparkThrowable] {
sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ")
}

// add table feature
sql(s"ALTER TABLE tbl " +
s"SET TBLPROPERTIES('delta.feature.timestampNtz' = 'supported')")

sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ")


sql(
"""INSERT INTO tbl VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("tbl").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))

assert(getProtocolForTable("tbl") ==
TimestampNTZTableFeature.minProtocolVersion
.withFeature(TimestampNTZTableFeature)
.withFeature(InvariantsTableFeature)
.withFeature(AppendOnlyTableFeature)
)
}
}

test("use TIMESTAMP_NTZ in a partition column") {
withTable("delta_test") {
sql(
"""CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ)
|USING delta
|PARTITIONED BY (c3)""".stripMargin)
sql(
"""INSERT INTO delta_test VALUES
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin)
assert(spark.table("delta_test").head == Row(
"foo",
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000),
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000)))
assert(getProtocolForTable("delta_test") ==
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature)
)
}
}

test("min/max stats collection should not apply on TIMESTAMP_NTZ") {
withTable("delta_test") {
sql("CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING delta")
val statsSchema = DeltaLog.forTable(spark, TableIdentifier("delta_test")).snapshot.statsSchema
assert(statsSchema("minValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP"))
assert(statsSchema("maxValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1576,9 +1576,6 @@ class SchemaUtilsSuite extends QueryTest
assertUnsupportedDataType(StringType, Nil)
assertUnsupportedDataType(DateType, Nil)
assertUnsupportedDataType(TimestampType, Nil)
assertUnsupportedDataType(
TimestampNTZType,
Seq(UnsupportedDataTypeInfo("col", TimestampNTZType)))
assertUnsupportedDataType(
CalendarIntervalType,
Seq(UnsupportedDataTypeInfo("col", CalendarIntervalType)))
Expand Down Expand Up @@ -1637,27 +1634,6 @@ class SchemaUtilsSuite extends QueryTest
Seq(UnsupportedDataTypeInfo("col", UnsupportedDataType)))
}

test("unsupported data type check") {
withTempDir { tempDir =>
val path = tempDir.getCanonicalPath

def createTableUsingTimestampNTZType(): Unit = {
DeltaTable.create().addColumn("t", TimestampNTZType, true).location(path).execute()
}

val e = intercept[AnalysisException] {
createTableUsingTimestampNTZType()
}
assert(
e.getMessage.contains("Found columns using unsupported data types: [t: TimestampNTZType]"))
assert(e.getMessage.contains(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key))

withSQLConf(DeltaSQLConf.DELTA_SCHEMA_TYPE_CHECK.key -> "false") {
createTableUsingTimestampNTZType()
}
}
}

test("findUndefinedTypes: basic types") {
val schema = StructType(Seq(
StructField("c1", NullType),
Expand Down