Skip to content

Commit

Permalink
Fix remaining broken v1 tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Apr 1, 2020
1 parent c0229a3 commit f3db7fc
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1613,7 +1613,7 @@ class DataSourceV2SQLSuite
"""
|CREATE TABLE testcat.t (id int, `a.b` string) USING foo
|CLUSTERED BY (`a.b`) INTO 4 BUCKETS
|OPTIONS ('allow-unsupported-transforms'=true)
|TBLPROPERTIES ('allow-unsupported-transforms'=true)
""".stripMargin)

val testCatalog = catalog("testcat").asTableCatalog.asInstanceOf[InMemoryTableCatalog]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,8 @@ class SparkSqlParserSuite extends AnalysisTest {
val newConf = new SQLConf
private lazy val parser = new SparkSqlParser(newConf)

/**
* Normalizes plans:
* - CreateTable the createTime in tableDesc will replaced by -1L.
*/
override def normalizePlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case CreateTable(tableDesc, mode, query) =>
val newTableDesc = tableDesc.copy(createTime = -1L)
CreateTable(newTableDesc, mode, query)
case _ => plan // Don't transform
}
}

private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = {
val normalized1 = normalizePlan(parser.parsePlan(sqlCommand))
val normalized2 = normalizePlan(plan)
comparePlans(normalized1, normalized2)
comparePlans(parser.parsePlan(sqlCommand), plan)
}

private def intercept(sqlCommand: String, messages: String*): Unit =
Expand All @@ -80,110 +65,6 @@ class SparkSqlParserSuite extends AnalysisTest {
intercept("REFRESH", "Resource paths cannot be empty in REFRESH statements")
}

private def createTableUsing(
table: String,
database: Option[String] = None,
tableType: CatalogTableType = CatalogTableType.MANAGED,
storage: CatalogStorageFormat = CatalogStorageFormat.empty,
schema: StructType = new StructType,
provider: Option[String] = Some("parquet"),
partitionColumnNames: Seq[String] = Seq.empty,
bucketSpec: Option[BucketSpec] = None,
mode: SaveMode = SaveMode.ErrorIfExists,
query: Option[LogicalPlan] = None): CreateTable = {
CreateTable(
CatalogTable(
identifier = TableIdentifier(table, database),
tableType = tableType,
storage = storage,
schema = schema,
provider = provider,
partitionColumnNames = partitionColumnNames,
bucketSpec = bucketSpec
), mode, query
)
}

private def createTable(
table: String,
database: Option[String] = None,
tableType: CatalogTableType = CatalogTableType.MANAGED,
storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
schema: StructType = new StructType,
provider: Option[String] = Some("hive"),
partitionColumnNames: Seq[String] = Seq.empty,
comment: Option[String] = None,
mode: SaveMode = SaveMode.ErrorIfExists,
query: Option[LogicalPlan] = None): CreateTable = {
CreateTable(
CatalogTable(
identifier = TableIdentifier(table, database),
tableType = tableType,
storage = storage,
schema = schema,
provider = provider,
partitionColumnNames = partitionColumnNames,
comment = comment
), mode, query
)
}

test("create table - schema") {
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile",
createTable(
table = "my_tab",
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
)
)
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
"PARTITIONED BY (c INT, d STRING COMMENT 'test2')",
createTable(
table = "my_tab",
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
.add("c", IntegerType)
.add("d", StringType, nullable = true, "test2"),
partitionColumnNames = Seq("c", "d")
)
)
assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>) " +
"STORED AS textfile",
createTable(
table = "my_tab",
schema = (new StructType)
.add("id", LongType)
.add("nested", (new StructType)
.add("col1", StringType)
.add("col2", IntegerType)
)
)
)
// Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze
// rule in `AnalyzeCreateTable`.
assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
"PARTITIONED BY (nested STRUCT<col1: STRING,col2: INT>)",
createTable(
table = "my_tab",
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
.add("nested", (new StructType)
.add("col1", StringType)
.add("col2", IntegerType)
),
partitionColumnNames = Seq("nested")
)
)
intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING)",
"no viable alternative at input")
}

test("describe query") {
val query = "SELECT * FROM t"
assertEqual("DESCRIBE QUERY " + query, DescribeQueryCommand(query, parser.parsePlan(query)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1568,6 +1568,116 @@ class PlanResolutionSuite extends AnalysisTest {
checkFailure("testcat.tab", "foo")
}

private def compareNormalized(plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
/**
* Normalizes plans:
* - CreateTable the createTime in tableDesc will replaced by -1L.
*/
def normalizePlan(plan: LogicalPlan): LogicalPlan = {
plan match {
case CreateTable(tableDesc, mode, query) =>
val newTableDesc = tableDesc.copy(createTime = -1L)
CreateTable(newTableDesc, mode, query)
case _ => plan // Don't transform
}
}
comparePlans(normalizePlan(plan1), normalizePlan(plan2))
}

test("create table - schema") {
def createTable(
table: String,
database: Option[String] = None,
tableType: CatalogTableType = CatalogTableType.MANAGED,
storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat,
serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")),
schema: StructType = new StructType,
provider: Option[String] = Some("hive"),
partitionColumnNames: Seq[String] = Seq.empty,
comment: Option[String] = None,
mode: SaveMode = SaveMode.ErrorIfExists,
query: Option[LogicalPlan] = None): CreateTable = {
CreateTable(
CatalogTable(
identifier = TableIdentifier(table, database),
tableType = tableType,
storage = storage,
schema = schema,
provider = provider,
partitionColumnNames = partitionColumnNames,
comment = comment
), mode, query
)
}

def compare(sql: String, plan: LogicalPlan): Unit = {
compareNormalized(parseAndResolve(sql), plan)
}

compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) STORED AS textfile",
createTable(
table = "my_tab",
database = Some("default"),
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
)
)
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "true") {
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
"PARTITIONED BY (c INT, d STRING COMMENT 'test2')",
createTable(
table = "my_tab",
database = Some("default"),
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
.add("c", IntegerType)
.add("d", StringType, nullable = true, "test2"),
partitionColumnNames = Seq("c", "d")
)
)
}
compare("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>) " +
"STORED AS textfile",
createTable(
table = "my_tab",
database = Some("default"),
schema = (new StructType)
.add("id", LongType)
.add("nested", (new StructType)
.add("col1", StringType)
.add("col2", IntegerType)
)
)
)
// Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze
// rule in `AnalyzeCreateTable`.
withSQLConf(SQLConf.LEGACY_CREATE_HIVE_TABLE_BY_DEFAULT_ENABLED.key -> "true") {
compare("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
"PARTITIONED BY (nested STRUCT<col1: STRING,col2: INT>)",
createTable(
table = "my_tab",
database = Some("default"),
schema = (new StructType)
.add("a", IntegerType, nullable = true, "test")
.add("b", StringType)
.add("nested", (new StructType)
.add("col1", StringType)
.add("col2", IntegerType)
),
partitionColumnNames = Seq("nested")
)
)
}

interceptParseException(parsePlan)(
"CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING)",
"extraneous input ':'")
}

test("create hive table - table file format") {
val allSources = Seq("parquet", "parquetfile", "orc", "orcfile", "avro", "avrofile",
"sequencefile", "rcfile", "textfile")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,13 @@ class CreateTableAsSelectSuite extends DataSourceTest with SharedSparkSession {
)
}.getMessage
assert(error.contains("Operation not allowed") &&
error.contains("CREATE TEMPORARY TABLE ... USING ... AS query"))
error.contains("CREATE TEMPORARY TABLE"))
}
}

test("disallows CREATE EXTERNAL TABLE ... USING ... AS query") {
withTable("t") {
val error = intercept[ParseException] {
val error = intercept[AnalysisException] {
sql(
s"""
|CREATE EXTERNAL TABLE t USING PARQUET
Expand Down

0 comments on commit f3db7fc

Please sign in to comment.