Skip to content

Commit

Permalink
[SPARK-33558][SQL][TESTS] Unify v1 and v2 ALTER TABLE .. ADD PARTITIO…
Browse files Browse the repository at this point in the history
…N tests

### What changes were proposed in this pull request?
1. Move the `ALTER TABLE .. ADD PARTITION` parsing tests to `AlterTableAddPartitionParserSuite`
2. Place v1 tests for `ALTER TABLE .. ADD PARTITION` from `DDLSuite` and v2 tests from `AlterTablePartitionV2SQLSuite` to the common trait `AlterTableAddPartitionSuiteBase`, so, the tests will run for V1, Hive V1 and V2 DS.

### Why are the changes needed?
- The unification will allow to run common `ALTER TABLE .. ADD PARTITION` tests for both DSv1 and Hive DSv1, DSv2
- We can detect missing features and differences between DSv1 and DSv2 implementations.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
By running new test suites:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *AlterTableAddPartitionSuite"
```

Closes #30685 from MaxGekk/unify-alter-table-add-partition-tests.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
MaxGekk authored and cloud-fan committed Dec 10, 2020
1 parent 1c7f5f1 commit af37c7f
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 233 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] {
resolvedPartitionSpec
}

private def convertToPartIdent(
private[sql] def convertToPartIdent(
partitionSpec: TablePartitionSpec,
schema: Seq[StructField]): InternalRow = {
val partValues = schema.map { part =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2042,33 +2042,6 @@ class DDLParserSuite extends AnalysisTest {
AlterTableRecoverPartitionsStatement(Seq("a", "b", "c")))
}

test("alter table: add partition") {
val sql1 =
"""
|ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
|(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
|(dt='2009-09-09', country='uk')
""".stripMargin
val sql2 = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"

val parsed1 = parsePlan(sql1)
val parsed2 = parsePlan(sql2)

val expected1 = AlterTableAddPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
Seq(
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
ifNotExists = true)
val expected2 = AlterTableAddPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
ifNotExists = false)

comparePlans(parsed1, expected1)
comparePlans(parsed2, expected2)
}

test("alter view: add partition (not supported)") {
assertUnsupported(
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,12 @@

package org.apache.spark.sql.connector

import java.time.{LocalDate, LocalDateTime}

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException}
import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException
import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.unsafe.types.UTF8String

class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {

Expand All @@ -45,66 +41,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}

test("ALTER TABLE ADD PARTITION") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'")

val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))

val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
assert(partMetadata.containsKey("location"))
assert(partMetadata.get("location") == "loc")
}
}

test("ALTER TABLE ADD PARTITIONS") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(
s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'")

val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))

val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1)))
assert(partMetadata.containsKey("location"))
assert(partMetadata.get("location") == "loc")

val partMetadata1 = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(2)))
assert(partMetadata1.containsKey("location"))
assert(partMetadata1.get("location") == "loc1")
}
}

test("ALTER TABLE ADD PARTITIONS: partition already exists") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
spark.sql(
s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'")

assertThrows[PartitionsAlreadyExistException](
spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'"))

val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable]
assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1))))

spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" +
" PARTITION (id=2) LOCATION 'loc1'")
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1))))
assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2))))
}
}

test("ALTER TABLE RENAME PARTITION") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
Expand Down Expand Up @@ -173,7 +109,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)")
withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
val errMsg = intercept[AnalysisException] {
spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'")
spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)")
}.getMessage
assert(errMsg.contains(s"ID is not a valid partition column in table $t"))
}
Expand All @@ -192,73 +128,14 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
}
}

test("SPARK-33521: universal type conversions of partition values") {
val t = "testpart.ns1.ns2.tbl"
withTable(t) {
sql(s"""
|CREATE TABLE $t (
| part0 tinyint,
| part1 smallint,
| part2 int,
| part3 bigint,
| part4 float,
| part5 double,
| part6 string,
| part7 boolean,
| part8 date,
| part9 timestamp
|) USING foo
|PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9)
|""".stripMargin)
val partTable = catalog("testpart").asTableCatalog
.loadTable(Identifier.of(Array("ns1", "ns2"), "tbl"))
.asPartitionable
val expectedPartition = InternalRow.fromSeq(Seq[Any](
-1, // tinyint
0, // smallint
1, // int
2, // bigint
3.14F, // float
3.14D, // double
UTF8String.fromString("abc"), // string
true, // boolean
LocalDate.parse("2020-11-23").toEpochDay,
DateTimeUtils.instantToMicros(
LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant)
))
assert(!partTable.partitionExists(expectedPartition))
val partSpec = """
| part0 = -1,
| part1 = 0,
| part2 = 1,
| part3 = 2,
| part4 = 3.14,
| part5 = 3.14,
| part6 = 'abc',
| part7 = true,
| part8 = '2020-11-23',
| part9 = '2020-11-23T22:13:10.123456'
|""".stripMargin
sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'")
assert(partTable.partitionExists(expectedPartition))
sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)")
assert(!partTable.partitionExists(expectedPartition))
}
}

test("SPARK-33650: add/drop partition into a table which doesn't support partition management") {
test("SPARK-33650: drop partition into a table which doesn't support partition management") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _")
Seq(
s"ALTER TABLE $t ADD PARTITION (id=1)",
s"ALTER TABLE $t DROP PARTITION (id=1)"
).foreach { alterTable =>
val errMsg = intercept[AnalysisException] {
spark.sql(alterTable)
}.getMessage
assert(errMsg.contains(s"Table $t can not alter partitions"))
}
val errMsg = intercept[AnalysisException] {
spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
}.getMessage
assert(errMsg.contains(s"Table $t can not alter partitions"))
}
}

Expand All @@ -269,16 +146,11 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase {
|CREATE TABLE $t (id bigint, part0 int, part1 string)
|USING foo
|PARTITIONED BY (part0, part1)""".stripMargin)
Seq(
s"ALTER TABLE $t ADD PARTITION (part0 = 1)",
s"ALTER TABLE $t DROP PARTITION (part0 = 1)"
).foreach { alterTable =>
val errMsg = intercept[AnalysisException] {
sql(alterTable)
}.getMessage
assert(errMsg.contains("Partition spec is invalid. " +
"The spec (part0) must match the partition spec (part0, part1)"))
}
val errMsg = intercept[AnalysisException] {
sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)")
}.getMessage
assert(errMsg.contains("Partition spec is invalid. " +
"The spec (part0) must match the partition spec (part0, part1)"))
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition
import org.apache.spark.sql.test.SharedSparkSession

class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession {
test("add partition if not exists") {
val sql = """
|ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION
|(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION
|(dt='2009-09-09', country='uk')""".stripMargin
val parsed = parsePlan(sql)
val expected = AlterTableAddPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
Seq(
UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")),
UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)),
ifNotExists = true)
comparePlans(parsed, expected)
}

test("add partition") {
val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'"
val parsed = parsePlan(sql)
val expected = AlterTableAddPartition(
UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."),
Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))),
ifNotExists = false)

comparePlans(parsed, expected)
}
}
Loading

0 comments on commit af37c7f

Please sign in to comment.