From e06c22664f7300cbee8894407590fc22f0250cae Mon Sep 17 00:00:00 2001 From: Gera Shegalov Date: Tue, 2 Mar 2021 11:02:03 -0800 Subject: [PATCH] Fix NullPointerException on null partition insert (#1744) Port https://github.com/apache/spark/pull/31320 to close #1735 Signed-off-by: Gera Shegalov --- ...GpuInsertIntoHadoopFsRelationCommand.scala | 11 ++---- .../rapids/InsertPartition311Suite.scala | 39 +++++++++++++++++++ 2 files changed, 42 insertions(+), 8 deletions(-) create mode 100644 tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala index 7cd815179e9..39d6cfb6442 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuInsertIntoHadoopFsRelationCommand.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,7 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession} import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName +import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionPathString import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap @@ -204,12 +204,7 @@ case class GpuInsertIntoHadoopFsRelationCommand( committer: FileCommitProtocol): Unit = { val staticPartitionPrefix = if (staticPartitions.nonEmpty) { "/" + partitionColumns.flatMap { p => - staticPartitions.get(p.name) match { - case Some(value) => - Some(escapePathName(p.name) + "=" + escapePathName(value)) - case None => - None - } + staticPartitions.get(p.name).map(getPartitionPathString(p.name, _)) }.mkString("/") } else { "" diff --git a/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala new file mode 100644 index 00000000000..4865c9aa910 --- /dev/null +++ b/tests-spark310+/src/test/scala/com/nvidia/spark/rapids/InsertPartition311Suite.scala @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import org.scalatest.BeforeAndAfterEach + +class InsertPartition311Suite extends SparkQueryCompareTestSuite with BeforeAndAfterEach { + var tableNr = 0 + + override def afterEach(): Unit = { + List(1, 2).foreach { tnr => + SparkSessionHolder.sparkSession.sql(s"DROP TABLE IF EXISTS t$tnr") + } + } + + testSparkResultsAreEqual( + testName ="Insert null-value partition ", + spark => { + tableNr += 1 + spark.sql(s"""CREATE TABLE t${tableNr}(i STRING, c STRING) + |USING PARQUET PARTITIONED BY (c)""".stripMargin) + spark.sql(s"""INSERT OVERWRITE t${tableNr} PARTITION (c=null) + |VALUES ('1')""".stripMargin)})( + _.sparkSession.sql(s"SELECT * FROM t$tableNr")) +}