Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ErrorClass to Throw AnalysisException [databricks] #10830

Merged
merged 19 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.DataWritingCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -84,10 +84,9 @@ object GpuDataWritingCommand {
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
TrampolineUtil.throwAnalysisException(
s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
s"${tablePath} . To allow overwriting the existing non-empty directory, " +
s"set '$allowNonEmptyLocationInCTASKey' to true.")
throw RapidsErrorUtils.
createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString,
allowNonEmptyLocationInCTASKey)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.GpuWriteJobStatsTracker
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -82,10 +82,9 @@ object GpuRunnableCommand {
if (fs.exists(filePath) &&
fs.getFileStatus(filePath).isDirectory &&
fs.listStatus(filePath).length != 0) {
TrampolineUtil.throwAnalysisException(
s"CREATE-TABLE-AS-SELECT cannot create table with location to a non-empty directory " +
s"${tablePath} . To allow overwriting the existing non-empty directory, " +
s"set '$allowNonEmptyLocationInCTASKey' to true.")
throw RapidsErrorUtils.
createTableAsSelectWithNonEmptyDirectoryError(tablePath.toString,
allowNonEmptyLocationInCTASKey)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,9 @@ package org.apache.spark.sql.hive.rapids
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.types.{DataType, DoubleType, FloatType, StringType}

object RapidsHiveErrors {
Expand Down Expand Up @@ -53,8 +53,7 @@ object RapidsHiveErrors {
}

def cannotResolveAttributeError(name: String, outputStr: String): Throwable = {
new AnalysisException(
s"Unable to resolve $name given [$outputStr]")
throw RapidsErrorUtils.cannotResolveAttributeError(name, outputStr)
}

def writePartitionExceedConfigSizeWhenDynamicPartitionError(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.datasources.v2.orc.OrcDataSourceV2
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.{RateStreamProvider, TextSocketSourceProvider}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.shims.SchemaUtilsShims
import org.apache.spark.sql.rapids.shims.{RapidsErrorUtils, SchemaUtilsShims}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.util.{HadoopFSUtils, ThreadUtils, Utils}
Expand Down Expand Up @@ -144,8 +144,8 @@ abstract class GpuDataSourceBase(
}
inferredOpt
}.getOrElse {
throw new AnalysisException(s"Failed to resolve the schema for $format for " +
s"the partition column: $partitionColumn. It must be specified manually.")
throw RapidsErrorUtils.
partitionColumnNotSpecifiedError(format.toString, partitionColumn)
}
}
StructType(partitionFields)
Expand All @@ -162,8 +162,7 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions - "path",
SparkShimImpl.filesFromFileIndex(tempFileIndex))
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format. It must be specified manually.")
throw RapidsErrorUtils.dataSchemaNotSpecifiedError(format.toString)
}

// We just print a waring message if the data schema and partition schema have the duplicate
Expand Down Expand Up @@ -201,17 +200,13 @@ abstract class GpuDataSourceBase(
case (dataSource: RelationProvider, None) =>
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
case (_: SchemaRelationProvider, None) =>
throw new AnalysisException(s"A schema needs to be specified when using $className.")
throw RapidsErrorUtils.schemaNotSpecifiedForSchemaRelationProviderError(className)
case (dataSource: RelationProvider, Some(schema)) =>
val baseRelation =
dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions)
if (!DataType.equalsIgnoreCompatibleNullability(baseRelation.schema, schema)) {
throw new AnalysisException(
"The user-specified schema doesn't match the actual schema: " +
s"user-specified: ${schema.toDDL}, actual: ${baseRelation.schema.toDDL}. If " +
"you're using DataFrameReader.schema API or creating a table, please do not " +
"specify the schema. Or if you're scanning an existed table, please drop " +
"it and re-create it.")
throw RapidsErrorUtils.userSpecifiedSchemaMismatchActualSchemaError(schema,
baseRelation.schema)
}
baseRelation

Expand All @@ -233,9 +228,8 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions - "path",
SparkShimImpl.filesFromFileIndex(fileCatalog))
}.getOrElse {
throw new AnalysisException(
s"Unable to infer schema for $format at ${fileCatalog.allFiles().mkString(",")}. " +
"It must be specified manually")
throw RapidsErrorUtils.
dataSchemaNotSpecifiedError(format.toString, fileCatalog.allFiles().mkString(","))
}

HadoopFsRelation(
Expand Down Expand Up @@ -276,8 +270,7 @@ abstract class GpuDataSourceBase(
caseInsensitiveOptions)(sparkSession)

case _ =>
throw new AnalysisException(
s"$className is not a valid Spark SQL Data Source.")
throw RapidsErrorUtils.invalidDataSourceError(className)
}

relation match {
Expand Down Expand Up @@ -411,22 +404,13 @@ object GpuDataSourceBase extends Logging {
dataSource
case Failure(error) =>
if (provider1.startsWith("org.apache.spark.sql.hive.orc")) {
throw new AnalysisException(
"Hive built-in ORC data source must be used with Hive support enabled. " +
"Please use the native ORC data source by setting 'spark.sql.orc.impl' to " +
"'native'")
throw RapidsErrorUtils.orcNotUsedWithHiveEnabledError()
} else if (provider1.toLowerCase(Locale.ROOT) == "avro" ||
provider1 == "com.databricks.spark.avro" ||
provider1 == "org.apache.spark.sql.avro") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Avro is built-in but external data " +
"source module since Spark 2.4. Please deploy the application as per " +
"the deployment section of \"Apache Avro Data Source Guide\".")
throw RapidsErrorUtils.failedToFindAvroDataSourceError(provider1)
} else if (provider1.toLowerCase(Locale.ROOT) == "kafka") {
throw new AnalysisException(
s"Failed to find data source: $provider1. Please deploy the application as " +
"per the deployment section of " +
"\"Structured Streaming + Kafka Integration Guide\".")
throw RapidsErrorUtils.failedToFindKafkaDataSourceError(provider1)
} else {
throw new ClassNotFoundException(
s"Failed to find data source: $provider1. Please find packages at " +
Expand Down Expand Up @@ -459,8 +443,7 @@ object GpuDataSourceBase extends Logging {
s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).")
internalSources.head.getClass
} else {
throw new AnalysisException(s"Multiple sources found for $provider1 " +
s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.")
throw RapidsErrorUtils.findMultipleDataSourceError(provider1, sourceNames)
}
}
} catch {
Expand Down Expand Up @@ -513,7 +496,7 @@ object GpuDataSourceBase extends Logging {
}

if (checkEmptyGlobPath && globResult.isEmpty) {
throw new AnalysisException(s"Path does not exist: $globPath")
throw RapidsErrorUtils.dataPathNotExistError(globPath.toString)
}

globResult
Expand All @@ -527,7 +510,7 @@ object GpuDataSourceBase extends Logging {
ThreadUtils.parmap(nonGlobPaths, "checkPathsExist", numThreads) { path =>
val fs = path.getFileSystem(hadoopConf)
if (!fs.exists(path)) {
throw new AnalysisException(s"Path does not exist: $path")
throw RapidsErrorUtils.dataPathNotExistError(path.toString)
}
}
} catch {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,7 @@ import com.nvidia.spark.rapids.{ColumnarFileFormat, GpuDataWritingCommand}
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.getPartitionPathString
Expand All @@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, CommandUtils}
import org.apache.spark.sql.execution.datasources.{FileFormatWriter, FileIndex, PartitioningUtils}
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.rapids.shims.SchemaUtilsShims
import org.apache.spark.sql.rapids.shims.{RapidsErrorUtils, SchemaUtilsShims}
import org.apache.spark.sql.vectorized.ColumnarBatch

case class GpuInsertIntoHadoopFsRelationCommand(
Expand Down Expand Up @@ -121,7 +121,7 @@ case class GpuInsertIntoHadoopFsRelationCommand(
val pathExists = fs.exists(qualifiedOutputPath)
(mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
throw RapidsErrorUtils.outputPathAlreadyExistsError(qualifiedOutputPath)
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020-2023, NVIDIA CORPORATION.
* Copyright (c) 2020-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,8 +23,8 @@ import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.shims.ShimUnaryExpression

import org.apache.spark.TaskContext
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, ExpressionWithRandomSeed}
import org.apache.spark.sql.rapids.execution.RapidsAnalysisException
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -52,7 +52,7 @@ case class GpuRand(child: Expression) extends ShimUnaryExpression with GpuExpres
@transient protected lazy val seed: Long = child match {
case GpuLiteral(s, IntegerType) => s.asInstanceOf[Int]
case GpuLiteral(s, LongType) => s.asInstanceOf[Long]
case _ => throw new AnalysisException(
case _ => throw new RapidsAnalysisException(
s"Input argument to $prettyName must be an integer, long or null literal.")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,6 @@ object TrampolineUtil {
TaskContext.get.taskMemoryManager()
}

/** Throw a Spark analysis exception */
def throwAnalysisException(msg: String) = throw new AnalysisException(msg)

/** Set the task context for the current thread */
def setTaskContext(tc: TaskContext): Unit = TaskContext.setTaskContext(tc)

Expand Down Expand Up @@ -241,4 +238,13 @@ object TrampolineUtil {
}

def getSparkHadoopUtilConf: Configuration = SparkHadoopUtil.get.conf

}

/**
* This class is to only be used to throw errors specific to the
* RAPIDS Accelerator or errors mirroring Spark where a raw
* AnalysisException is thrown directly rather than via an error
* utility class (this should be rare).
*/
class RapidsAnalysisException(msg: String) extends AnalysisException(msg)
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022-2023, NVIDIA CORPORATION.
* Copyright (c) 2022-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,8 @@ import org.apache.parquet.schema.OriginalType._
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.rapids.execution.RapidsAnalysisException
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.types._

object ParquetSchemaClipShims {
Expand Down Expand Up @@ -64,13 +65,13 @@ object ParquetSchemaClipShims {
if (originalType == null) s"$typeName" else s"$typeName ($originalType)"

def typeNotSupported() =
TrampolineUtil.throwAnalysisException(s"Parquet type not supported: $typeString")
throw new RapidsAnalysisException(s"Parquet type not supported: $typeString")

def typeNotImplemented() =
TrampolineUtil.throwAnalysisException(s"Parquet type not yet supported: $typeString")
throw RapidsErrorUtils.parquetTypeUnsupportedYetError(typeString)

def illegalType() =
TrampolineUtil.throwAnalysisException(s"Illegal Parquet type: $typeString")
throw RapidsErrorUtils.illegalParquetTypeError(typeString)

// When maxPrecision = -1, we skip precision range check, and always respect the precision
// specified in field.getDecimalMetadata. This is useful when interpreting decimal types stored
Expand All @@ -80,8 +81,7 @@ object ParquetSchemaClipShims {
val scale = field.getDecimalMetadata.getScale

if (!(maxPrecision == -1 || 1 <= precision && precision <= maxPrecision)) {
TrampolineUtil.throwAnalysisException(
s"Invalid decimal precision: $typeName " +
throw new RapidsAnalysisException(s"Invalid decimal precision: $typeName " +
s"cannot store $precision digits (max $maxPrecision)")
}

Expand Down Expand Up @@ -121,7 +121,7 @@ object ParquetSchemaClipShims {

case INT96 =>
if (!SQLConf.get.isParquetINT96AsTimestamp) {
TrampolineUtil.throwAnalysisException(
throw new RapidsAnalysisException(
"INT96 is not supported unless it's interpreted as timestamp. " +
s"Please try to set ${SQLConf.PARQUET_INT96_AS_TIMESTAMP.key} to true.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg
import org.apache.hadoop.hive.ql.plan.TableDesc

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalog, ExternalCatalogUtils, ExternalCatalogWithListener}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand All @@ -58,6 +58,7 @@ import org.apache.spark.sql.hive.client.HiveClientImpl
import org.apache.spark.sql.hive.client.hive._
import org.apache.spark.sql.hive.execution.InsertIntoHiveTable
import org.apache.spark.sql.hive.rapids.{GpuHiveFileFormat, GpuSaveAsHiveFile, RapidsHiveErrors}
import org.apache.spark.sql.rapids.shims.RapidsErrorUtils
import org.apache.spark.sql.vectorized.ColumnarBatch

final class GpuInsertIntoHiveTableMeta(cmd: InsertIntoHiveTable,
Expand Down Expand Up @@ -193,7 +194,7 @@ case class GpuInsertIntoHiveTable(
// Report error if any static partition appears after a dynamic partition
val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
throw new AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
throw RapidsErrorUtils.dynamicPartitionParentError
}
}

Expand Down
Loading