diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala index feb05d3b6926b..099ac6172c9e6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala @@ -81,7 +81,7 @@ object ResolvePartitionSpec extends Rule[LogicalPlan] { resolvedPartitionSpec } - private def convertToPartIdent( + private[sql] def convertToPartIdent( partitionSpec: TablePartitionSpec, schema: Seq[StructField]): InternalRow = { val partValues = schema.map { part => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 947154eae12c8..e194e7112b1d4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -2042,33 +2042,6 @@ class DDLParserSuite extends AnalysisTest { AlterTableRecoverPartitionsStatement(Seq("a", "b", "c"))) } - test("alter table: add partition") { - val sql1 = - """ - |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION - |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION - |(dt='2009-09-09', country='uk') - """.stripMargin - val sql2 = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'" - - val parsed1 = parsePlan(sql1) - val parsed2 = parsePlan(sql2) - - val expected1 = AlterTableAddPartition( - UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), - Seq( - UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), - UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)), - ifNotExists = true) - val expected2 = AlterTableAddPartition( - UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), - Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))), - ifNotExists = false) - - comparePlans(parsed1, expected1) - comparePlans(parsed2, expected2) - } - test("alter view: add partition (not supported)") { assertUnsupported( """ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala index 45d47c6d8681c..570976965ec7c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTablePartitionV2SQLSuite.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.connector -import java.time.{LocalDate, LocalDateTime} - import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, PartitionsAlreadyExistException} -import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} +import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionsException import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.unsafe.types.UTF8String class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { @@ -45,66 +41,6 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("ALTER TABLE ADD PARTITION") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'") - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - - val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1))) - assert(partMetadata.containsKey("location")) - assert(partMetadata.get("location") == "loc") - } - } - - test("ALTER TABLE ADD PARTITIONS") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql( - s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc' PARTITION (id=2) LOCATION 'loc1'") - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2)))) - - val partMetadata = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(1))) - assert(partMetadata.containsKey("location")) - assert(partMetadata.get("location") == "loc") - - val partMetadata1 = partTable.loadPartitionMetadata(InternalRow.fromSeq(Seq(2))) - assert(partMetadata1.containsKey("location")) - assert(partMetadata1.get("location") == "loc1") - } - } - - test("ALTER TABLE ADD PARTITIONS: partition already exists") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") - spark.sql( - s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") - - assertThrows[PartitionsAlreadyExistException]( - spark.sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'")) - - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")).asInstanceOf[InMemoryPartitionTable] - assert(!partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - - spark.sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + - " PARTITION (id=2) LOCATION 'loc1'") - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(1)))) - assert(partTable.partitionExists(InternalRow.fromSeq(Seq(2)))) - } - } - test("ALTER TABLE RENAME PARTITION") { val t = "testcat.ns1.ns2.tbl" withTable(t) { @@ -173,7 +109,7 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING foo PARTITIONED BY (id)") withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { val errMsg = intercept[AnalysisException] { - spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + spark.sql(s"ALTER TABLE $t DROP PARTITION (ID=1)") }.getMessage assert(errMsg.contains(s"ID is not a valid partition column in table $t")) } @@ -192,73 +128,14 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { } } - test("SPARK-33521: universal type conversions of partition values") { - val t = "testpart.ns1.ns2.tbl" - withTable(t) { - sql(s""" - |CREATE TABLE $t ( - | part0 tinyint, - | part1 smallint, - | part2 int, - | part3 bigint, - | part4 float, - | part5 double, - | part6 string, - | part7 boolean, - | part8 date, - | part9 timestamp - |) USING foo - |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9) - |""".stripMargin) - val partTable = catalog("testpart").asTableCatalog - .loadTable(Identifier.of(Array("ns1", "ns2"), "tbl")) - .asPartitionable - val expectedPartition = InternalRow.fromSeq(Seq[Any]( - -1, // tinyint - 0, // smallint - 1, // int - 2, // bigint - 3.14F, // float - 3.14D, // double - UTF8String.fromString("abc"), // string - true, // boolean - LocalDate.parse("2020-11-23").toEpochDay, - DateTimeUtils.instantToMicros( - LocalDateTime.parse("2020-11-23T22:13:10.123456").atZone(DateTimeTestUtils.LA).toInstant) - )) - assert(!partTable.partitionExists(expectedPartition)) - val partSpec = """ - | part0 = -1, - | part1 = 0, - | part2 = 1, - | part3 = 2, - | part4 = 3.14, - | part5 = 3.14, - | part6 = 'abc', - | part7 = true, - | part8 = '2020-11-23', - | part9 = '2020-11-23T22:13:10.123456' - |""".stripMargin - sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") - assert(partTable.partitionExists(expectedPartition)) - sql(s" ALTER TABLE $t DROP PARTITION ($partSpec)") - assert(!partTable.partitionExists(expectedPartition)) - } - } - - test("SPARK-33650: add/drop partition into a table which doesn't support partition management") { + test("SPARK-33650: drop partition into a table which doesn't support partition management") { val t = "testcat.ns1.ns2.tbl" withTable(t) { spark.sql(s"CREATE TABLE $t (id bigint, data string) USING _") - Seq( - s"ALTER TABLE $t ADD PARTITION (id=1)", - s"ALTER TABLE $t DROP PARTITION (id=1)" - ).foreach { alterTable => - val errMsg = intercept[AnalysisException] { - spark.sql(alterTable) - }.getMessage - assert(errMsg.contains(s"Table $t can not alter partitions")) - } + val errMsg = intercept[AnalysisException] { + spark.sql(s"ALTER TABLE $t DROP PARTITION (id=1)") + }.getMessage + assert(errMsg.contains(s"Table $t can not alter partitions")) } } @@ -269,16 +146,11 @@ class AlterTablePartitionV2SQLSuite extends DatasourceV2SQLBase { |CREATE TABLE $t (id bigint, part0 int, part1 string) |USING foo |PARTITIONED BY (part0, part1)""".stripMargin) - Seq( - s"ALTER TABLE $t ADD PARTITION (part0 = 1)", - s"ALTER TABLE $t DROP PARTITION (part0 = 1)" - ).foreach { alterTable => - val errMsg = intercept[AnalysisException] { - sql(alterTable) - }.getMessage - assert(errMsg.contains("Partition spec is invalid. " + - "The spec (part0) must match the partition spec (part0, part1)")) - } + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t DROP PARTITION (part0 = 1)") + }.getMessage + assert(errMsg.contains("Partition spec is invalid. " + + "The spec (part0) must match the partition spec (part0, part1)")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala new file mode 100644 index 0000000000000..5ebca8f651604 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionParserSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedPartitionSpec, UnresolvedTable} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan +import org.apache.spark.sql.catalyst.plans.logical.AlterTableAddPartition +import org.apache.spark.sql.test.SharedSparkSession + +class AlterTableAddPartitionParserSuite extends AnalysisTest with SharedSparkSession { + test("add partition if not exists") { + val sql = """ + |ALTER TABLE a.b.c ADD IF NOT EXISTS PARTITION + |(dt='2008-08-08', country='us') LOCATION 'location1' PARTITION + |(dt='2009-09-09', country='uk')""".stripMargin + val parsed = parsePlan(sql) + val expected = AlterTableAddPartition( + UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), + Seq( + UnresolvedPartitionSpec(Map("dt" -> "2008-08-08", "country" -> "us"), Some("location1")), + UnresolvedPartitionSpec(Map("dt" -> "2009-09-09", "country" -> "uk"), None)), + ifNotExists = true) + comparePlans(parsed, expected) + } + + test("add partition") { + val sql = "ALTER TABLE a.b.c ADD PARTITION (dt='2008-08-08') LOCATION 'loc'" + val parsed = parsePlan(sql) + val expected = AlterTableAddPartition( + UnresolvedTable(Seq("a", "b", "c"), "ALTER TABLE ... ADD PARTITION ..."), + Seq(UnresolvedPartitionSpec(Map("dt" -> "2008-08-08"), Some("loc"))), + ifNotExists = false) + + comparePlans(parsed, expected) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala new file mode 100644 index 0000000000000..0cf0b395f139b --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableAddPartitionSuiteBase.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command + +import org.scalactic.source.Position +import org.scalatest.Tag + +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils + +trait AlterTableAddPartitionSuiteBase extends QueryTest with SQLTestUtils { + protected def version: String + protected def catalog: String + protected def defaultUsing: String + + override def test(testName: String, testTags: Tag*)(testFun: => Any) + (implicit pos: Position): Unit = { + super.test(s"ALTER TABLE .. ADD PARTITION $version: " + testName, testTags: _*)(testFun) + } + + protected def checkPartitions(t: String, expected: Map[String, String]*): Unit = { + val partitions = sql(s"SHOW PARTITIONS $t") + .collect() + .toSet + .map((row: Row) => row.getString(0)) + .map(PartitioningUtils.parsePathFragment) + assert(partitions === expected.toSet) + } + protected def checkLocation(t: String, spec: TablePartitionSpec, expected: String): Unit + + protected def withNsTable(ns: String, tableName: String, cat: String = catalog) + (f: String => Unit): Unit = { + val nsCat = s"$cat.$ns" + withNamespace(nsCat) { + sql(s"CREATE NAMESPACE $nsCat") + val t = s"$nsCat.$tableName" + withTable(t) { + f(t) + } + } + } + + test("one partition") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s"ALTER TABLE $t ADD $exists PARTITION (id=1) LOCATION 'loc'") + + checkPartitions(t, Map("id" -> "1")) + checkLocation(t, Map("id" -> "1"), "loc") + } + } + } + + test("multiple partitions") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s""" + |ALTER TABLE $t ADD $exists + |PARTITION (id=1) LOCATION 'loc' + |PARTITION (id=2) LOCATION 'loc1'""".stripMargin) + + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + checkLocation(t, Map("id" -> "1"), "loc") + checkLocation(t, Map("id" -> "2"), "loc1") + } + } + } + + test("multi-part partition") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, a int, b string) $defaultUsing PARTITIONED BY (a, b)") + Seq("", "IF NOT EXISTS").foreach { exists => + sql(s"ALTER TABLE $t ADD $exists PARTITION (a=2, b='abc')") + + checkPartitions(t, Map("a" -> "2", "b" -> "abc")) + } + } + } + + test("table to alter does not exist") { + withNsTable("ns", "does_not_exist") { t => + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (a='4', b='9')") + }.getMessage + assert(errMsg.contains("Table not found")) + } + } + + test("case sensitivity in resolving partition specs") { + withNsTable("ns", "tbl") { t => + spark.sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + val errMsg = intercept[AnalysisException] { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("ID is not a valid partition column")) + } + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + spark.sql(s"ALTER TABLE $t ADD PARTITION (ID=1) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1")) + checkLocation(t, Map("id" -> "1"), "loc1") + } + } + } + + test("SPARK-33521: universal type conversions of partition values") { + withNsTable("ns", "tbl") { t => + sql(s""" + |CREATE TABLE $t ( + | id int, + | part0 tinyint, + | part1 smallint, + | part2 int, + | part3 bigint, + | part4 float, + | part5 double, + | part6 string, + | part7 boolean, + | part8 date, + | part9 timestamp + |) $defaultUsing + |PARTITIONED BY (part0, part1, part2, part3, part4, part5, part6, part7, part8, part9) + |""".stripMargin) + val partSpec = """ + | part0 = -1, + | part1 = 0, + | part2 = 1, + | part3 = 2, + | part4 = 3.14, + | part5 = 3.14, + | part6 = 'abc', + | part7 = true, + | part8 = '2020-11-23', + | part9 = '2020-11-23 22:13:10.123456' + |""".stripMargin + sql(s"ALTER TABLE $t ADD PARTITION ($partSpec) LOCATION 'loc1'") + val expected = Map( + "part0" -> "-1", + "part1" -> "0", + "part2" -> "1", + "part3" -> "2", + "part4" -> "3.14", + "part5" -> "3.14", + "part6" -> "abc", + "part7" -> "true", + "part8" -> "2020-11-23", + "part9" -> "2020-11-23 22:13:10.123456") + checkPartitions(t, expected) + sql(s"ALTER TABLE $t DROP PARTITION ($partSpec)") + checkPartitions(t) // no partitions + } + } + + test("SPARK-33676: not fully specified partition spec") { + withNsTable("ns", "tbl") { t => + sql(s""" + |CREATE TABLE $t (id bigint, part0 int, part1 string) + |$defaultUsing + |PARTITIONED BY (part0, part1)""".stripMargin) + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (part0 = 1)") + }.getMessage + assert(errMsg.contains("Partition spec is invalid. " + + "The spec (part0) must match the partition spec (part0, part1)")) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 82d3e2dfe2212..05e0f4f4a538c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -334,10 +334,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { testChangeColumn(isDatasourceTable = true) } - test("alter table: add partition (datasource table)") { - testAddPartitions(isDatasourceTable = true) - } - test("alter table: drop partition (datasource table)") { testDropPartitions(isDatasourceTable = true) } @@ -1621,63 +1617,6 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } - protected def testAddPartitions(isDatasourceTable: Boolean): Unit = { - if (!isUsingHiveMetastore) { - assert(isDatasourceTable, "InMemoryCatalog only supports data source tables") - } - val catalog = spark.sessionState.catalog - val tableIdent = TableIdentifier("tab1", Some("dbx")) - val part1 = Map("a" -> "1", "b" -> "5") - val part2 = Map("a" -> "2", "b" -> "6") - val part3 = Map("a" -> "3", "b" -> "7") - val part4 = Map("a" -> "4", "b" -> "8") - val part5 = Map("a" -> "9", "b" -> "9") - createDatabase(catalog, "dbx") - createTable(catalog, tableIdent, isDatasourceTable) - createTablePartition(catalog, part1, tableIdent) - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) - - // basic add partition - sql("ALTER TABLE dbx.tab1 ADD IF NOT EXISTS " + - "PARTITION (a='2', b='6') LOCATION 'paris' PARTITION (a='3', b='7')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part3)) - assert(catalog.getPartition(tableIdent, part1).storage.locationUri.isDefined) - - val tableLocation = catalog.getTableMetadata(tableIdent).storage.locationUri - assert(tableLocation.isDefined) - val partitionLocation = makeQualifiedPath( - new Path(tableLocation.get.toString, "paris").toString) - - assert(catalog.getPartition(tableIdent, part2).storage.locationUri == Option(partitionLocation)) - assert(catalog.getPartition(tableIdent, part3).storage.locationUri.isDefined) - - // add partitions without explicitly specifying database - catalog.setCurrentDatabase("dbx") - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4)) - - // table to alter does not exist - intercept[AnalysisException] { - sql("ALTER TABLE does_not_exist ADD IF NOT EXISTS PARTITION (a='4', b='9')") - } - - // partition to add already exists - intercept[AnalysisException] { - sql("ALTER TABLE tab1 ADD PARTITION (a='4', b='8')") - } - - // partition to add already exists when using IF NOT EXISTS - sql("ALTER TABLE tab1 ADD IF NOT EXISTS PARTITION (a='4', b='8')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4)) - - // partition spec in ADD PARTITION should be case insensitive by default - sql("ALTER TABLE tab1 ADD PARTITION (A='9', B='9')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4, part5)) - } - protected def testDropPartitions(isDatasourceTable: Boolean): Unit = { if (!isUsingHiveMetastore) { assert(isDatasourceTable, "InMemoryCatalog only supports data source tables") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..295ce1d3da13f --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/AlterTableAddPartitionSuite.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v1 + +import org.apache.spark.sql.catalyst.analysis.PartitionsAlreadyExistException +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.connector.catalog.CatalogManager +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.test.SharedSparkSession + +trait AlterTableAddPartitionSuiteBase extends command.AlterTableAddPartitionSuiteBase { + override def version: String = "V1" + override def catalog: String = CatalogManager.SESSION_CATALOG_NAME + override def defaultUsing: String = "USING parquet" + + override protected def checkLocation( + t: String, + spec: TablePartitionSpec, + expected: String): Unit = { + val tablePath = t.split('.') + val tableName = tablePath.last + val ns = tablePath.init.mkString(".") + val partSpec = spec.map { case (key, value) => s"$key = $value"}.mkString(", ") + val information = sql(s"SHOW TABLE EXTENDED IN $ns LIKE '$tableName' PARTITION($partSpec)") + .select("information") + .first().getString(0) + val location = information.split("\\r?\\n").filter(_.startsWith("Location:")).head + assert(location.endsWith(expected)) + } +} + +class AlterTableAddPartitionSuite extends AlterTableAddPartitionSuiteBase with SharedSparkSession { + test("partition already exists") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..b15235d17671a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.command.v2 + +import org.apache.spark.SparkConf +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.analysis.{PartitionsAlreadyExistException, ResolvePartitionSpec} +import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec +import org.apache.spark.sql.connector.{InMemoryPartitionTable, InMemoryPartitionTableCatalog, InMemoryTableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogV2Implicits, Identifier} +import org.apache.spark.sql.execution.command +import org.apache.spark.sql.test.SharedSparkSession + +class AlterTableAddPartitionSuite + extends command.AlterTableAddPartitionSuiteBase + with SharedSparkSession { + + import CatalogV2Implicits._ + + override def version: String = "V2" + override def catalog: String = "test_catalog" + override def defaultUsing: String = "USING _" + + override def sparkConf: SparkConf = super.sparkConf + .set(s"spark.sql.catalog.$catalog", classOf[InMemoryPartitionTableCatalog].getName) + .set(s"spark.sql.catalog.non_part_$catalog", classOf[InMemoryTableCatalog].getName) + + override protected def checkLocation( + t: String, + spec: TablePartitionSpec, + expected: String): Unit = { + val tablePath = t.split('.') + val catalogName = tablePath.head + val namespaceWithTable = tablePath.tail + val namespaces = namespaceWithTable.init + val tableName = namespaceWithTable.last + val catalogPlugin = spark.sessionState.catalogManager.catalog(catalogName) + val partTable = catalogPlugin.asTableCatalog + .loadTable(Identifier.of(namespaces, tableName)) + .asInstanceOf[InMemoryPartitionTable] + val ident = ResolvePartitionSpec.convertToPartIdent(spec, partTable.partitionSchema.fields) + val partMetadata = partTable.loadPartitionMetadata(ident) + + assert(partMetadata.containsKey("location")) + assert(partMetadata.get("location") === expected) + } + + test("partition already exists") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[PartitionsAlreadyExistException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("The following partitions already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } + + test("SPARK-33650: add partition into a table which doesn't support partition management") { + withNsTable("ns", "tbl", s"non_part_$catalog") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing") + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1)") + }.getMessage + assert(errMsg.contains(s"Table $t can not alter partitions")) + } + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index d6a4d76386889..070fdf55deb38 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -166,10 +166,6 @@ class HiveCatalogedDDLSuite extends DDLSuite with TestHiveSingleton with BeforeA testDropPartitions(isDatasourceTable = false) } - test("alter table: add partition") { - testAddPartitions(isDatasourceTable = false) - } - test("drop table") { testDropTable(isDatasourceTable = false) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala new file mode 100644 index 0000000000000..ef0ec8d9bd69f --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/AlterTableAddPartitionSuite.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution.command + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.execution.command.v1 +import org.apache.spark.sql.hive.test.TestHiveSingleton + +class AlterTableAddPartitionSuite + extends v1.AlterTableAddPartitionSuiteBase + with TestHiveSingleton { + override def version: String = "Hive V1" + override def defaultUsing: String = "USING HIVE" + + test("partition already exists") { + withNsTable("ns", "tbl") { t => + sql(s"CREATE TABLE $t (id bigint, data string) $defaultUsing PARTITIONED BY (id)") + sql(s"ALTER TABLE $t ADD PARTITION (id=2) LOCATION 'loc1'") + + val errMsg = intercept[AnalysisException] { + sql(s"ALTER TABLE $t ADD PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + }.getMessage + assert(errMsg.contains("already exists")) + + sql(s"ALTER TABLE $t ADD IF NOT EXISTS PARTITION (id=1) LOCATION 'loc'" + + " PARTITION (id=2) LOCATION 'loc1'") + checkPartitions(t, Map("id" -> "1"), Map("id" -> "2")) + } + } +}