Skip to content

Commit

Permalink
[SPARK-46949][SQL] Support CHAR/VARCHAR through ResolveDefaultColumns
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

We have issues resolving column definitions with default values, i.e., `c CHAR(5) DEFAULT 'foo'`, `v  VARCHAR(10) DEFAULT 'bar'`. The reason is that CAHR/VARCHAR types in schemas are not supplanted with STRING type to align with the value expression.

This PR fixes these issues in `ResolveDefaultColumns`, which seems to only cover the v1 tables. When I applied some related tests to v2 tables, they had the same issues. But beyond that, there are some other front-loading needs to be addressed. In this case, I'd like to separate v2 from the v1 PR.

### Why are the changes needed?

bugfix

```
spark-sql (default)> CREATE TABLE t( c CHAR(5) DEFAULT 'spark') USING parquet;
[INVALID_DEFAULT_VALUE.DATA_TYPE] Failed to execute CREATE TABLE command because the destination table column `c` has a DEFAULT value 'spark', which requires "CHAR(5)" type, but the statement provided a value of incompatible "STRING" type.
```
### Does this PR introduce _any_ user-facing change?

no, bugfix
### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?
no

Closes #44991 from yaooqinn/SPARK-46949.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
  • Loading branch information
yaooqinn committed Feb 2, 2024
1 parent a343242 commit 362a4d4
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer

import org.apache.spark.{SparkThrowable, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -42,7 +42,9 @@ import org.apache.spark.util.ArrayImplicits._
/**
* This object contains fields to help process DEFAULT columns.
*/
object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsUtils {
object ResolveDefaultColumns extends QueryErrorsBase
with ResolveDefaultColumnsUtils
with SQLConfHelper {
// Name of attributes representing explicit references to the value stored in the above
// CURRENT_DEFAULT_COLUMN_METADATA.
val CURRENT_DEFAULT_COLUMN_NAME = "DEFAULT"
Expand Down Expand Up @@ -307,25 +309,26 @@ object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsU
statementType: String,
colName: String,
defaultSQL: String): Expression = {
val supplanted = CharVarcharUtils.replaceCharVarcharWithString(dataType)
// Perform implicit coercion from the provided expression type to the required column type.
if (dataType == analyzed.dataType) {
val ret = if (supplanted == analyzed.dataType) {
analyzed
} else if (Cast.canUpCast(analyzed.dataType, dataType)) {
Cast(analyzed, dataType)
} else if (Cast.canUpCast(analyzed.dataType, supplanted)) {
Cast(analyzed, supplanted)
} else {
// If the provided default value is a literal of a wider type than the target column, but the
// literal value fits within the narrower type, just coerce it for convenience. Exclude
// boolean/array/struct/map types from consideration for this type coercion to avoid
// surprising behavior like interpreting "false" as integer zero.
val result = if (analyzed.isInstanceOf[Literal] &&
!Seq(dataType, analyzed.dataType).exists(_ match {
!Seq(supplanted, analyzed.dataType).exists(_ match {
case _: BooleanType | _: ArrayType | _: StructType | _: MapType => true
case _ => false
})) {
try {
val casted = Cast(analyzed, dataType, evalMode = EvalMode.TRY).eval()
val casted = Cast(analyzed, supplanted, evalMode = EvalMode.TRY).eval()
if (casted != null) {
Some(Literal(casted, dataType))
Some(Literal(casted, supplanted))
} else {
None
}
Expand All @@ -339,6 +342,10 @@ object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsU
statementType, colName, defaultSQL, dataType, analyzed.dataType)
}
}
if (!conf.charVarcharAsString && CharVarcharUtils.hasCharVarchar(dataType)) {
CharVarcharUtils.stringLengthCheck(ret, dataType).eval(EmptyRow)
}
ret
}

/**
Expand Down Expand Up @@ -454,7 +461,8 @@ object ResolveDefaultColumns extends QueryErrorsBase with ResolveDefaultColumnsU
throw QueryCompilationErrors.defaultValuesMayNotContainSubQueryExpressions(
statement, colName, default.originalSQL)
} else if (default.resolved) {
if (!Cast.canUpCast(default.child.dataType, targetType)) {
val dataType = CharVarcharUtils.replaceCharVarcharWithString(targetType)
if (!Cast.canUpCast(default.child.dataType, dataType)) {
throw QueryCompilationErrors.defaultValuesDataTypeError(
statement, colName, default.originalSQL, targetType, default.child.dataType)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.test.SharedSparkSession

class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
Expand Down Expand Up @@ -215,4 +216,44 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
}
}
}

test("SPARK-46949: DDL with valid default char/varchar values") {
withTable("t") {
val ddl =
s"""
|CREATE TABLE t(
| key int,
| v VARCHAR(6) DEFAULT 'apache',
| c CHAR(5) DEFAULT 'spark')
|USING parquet""".stripMargin
sql(ddl)
sql("INSERT INTO t (key) VALUES(1)")
checkAnswer(sql("select * from t"), Row(1, "apache", "spark"))
}
}

test("SPARK-46949: DDL with invalid default char/varchar values") {
Seq("CHAR", "VARCHAR").foreach { typeName =>
checkError(
exception = intercept[SparkRuntimeException](
sql(s"CREATE TABLE t(c $typeName(3) DEFAULT 'spark') USING parquet")),
errorClass = "EXCEED_LIMIT_LENGTH",
parameters = Map("limit" -> "3"))
}
}

test("SPARK-46949: DDL with default char/varchar values need padding") {
withTable("t") {
val ddl =
s"""
|CREATE TABLE t(
| key int,
| v VARCHAR(6) DEFAULT 'apache',
| c CHAR(6) DEFAULT 'spark')
|USING parquet""".stripMargin
sql(ddl)
sql("INSERT INTO t (key) VALUES(1)")
checkAnswer(sql("select * from t"), Row(1, "apache", "spark "))
}
}
}

0 comments on commit 362a4d4

Please sign in to comment.