Skip to content

Commit

Permalink
[SPARK-33496][SQL] Improve error message of ANSI explicit cast
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

After apache/spark#30260, there are some type conversions disallowed under ANSI mode.
We should tell users what they can do if they have to use the disallowed casting.

### Why are the changes needed?

Make it more user-friendly.

### Does this PR introduce _any_ user-facing change?

Yes, the error message is improved on casting failure when ANSI mode is enabled
### How was this patch tested?

Unit tests.

Closes #30440 from gengliangwang/improveAnsiCastErrorMSG.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
  • Loading branch information
a0x8o committed Nov 25, 2020
1 parent d754440 commit 71b2def
Show file tree
Hide file tree
Showing 146 changed files with 5,296 additions and 2,977 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,8 @@ object FileCommitProtocol extends Logging {
ctor.newInstance(jobId, outputPath)
}
}

def getStagingDir(path: String, jobId: String): Path = {
new Path(path, ".spark-staging-" + jobId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,28 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
* dynamically. Suppose final path is /path/to/outputPath, output
* path of [[FileOutputCommitter]] is an intermediate path, e.g.
* /path/to/outputPath/.spark-staging-{jobId}, which is a staging
* directory. Task attempts firstly write files under the
* intermediate path, e.g.
* /path/to/outputPath/.spark-staging-{jobId}/_temporary/
* {appAttemptId}/_temporary/{taskAttemptId}/a=1/b=1/xxx.parquet.
*
* 1. When [[FileOutputCommitter]] algorithm version set to 1,
* we firstly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/_temporary/
* {appAttemptId}/{taskId}/a=1/b=1,
* then move them to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
* 2. When [[FileOutputCommitter]] algorithm version set to 2,
* committing tasks directly move task attempt output files to
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1.
*
* At the end of committing job, we move output files from
* intermediate path to final path, e.g., move files from
* /path/to/outputPath/.spark-staging-{jobId}/a=1/b=1
* to /path/to/outputPath/a=1/b=1
*/
class HadoopMapReduceCommitProtocol(
jobId: String,
Expand Down Expand Up @@ -89,7 +104,7 @@ class HadoopMapReduceCommitProtocol(
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)
protected def stagingDir = getStagingDir(path, jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.getConstructor().newInstance()
Expand All @@ -106,13 +121,13 @@ class HadoopMapReduceCommitProtocol(
val filename = getFilename(taskContext, ext)

val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
if (dynamicPartitionOverwrite) {
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
}
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[spark] class StorageStatus(
.getOrElse((0L, 0L))
case _ if !level.useOffHeap =>
(_nonRddStorageInfo.onHeapUsage, _nonRddStorageInfo.diskUsage)
case _ if level.useOffHeap =>
case _ =>
(_nonRddStorageInfo.offHeapUsage, _nonRddStorageInfo.diskUsage)
}
val newMem = math.max(oldMem + changeInMem, 0L)
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -757,15 +757,15 @@ private[spark] object JsonProtocol {

def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = taskResourceRequestFromJson(v)
(k, req)
}.toMap
}

def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val req = executorResourceRequestFromJson(v)
(k, req)
}.toMap
Expand Down Expand Up @@ -1229,7 +1229,7 @@ private[spark] object JsonProtocol {

def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, v) =>
jsonFields.collect { case JField(k, v) =>
val resourceInfo = ResourceInformation.parseJson(v)
(k, resourceInfo)
}.toMap
Expand All @@ -1241,7 +1241,7 @@ private[spark] object JsonProtocol {

def mapFromJson(json: JValue): Map[String, String] = {
val jsonFields = json.asInstanceOf[JObject].obj
jsonFields.map { case JField(k, JString(v)) => (k, v) }.toMap
jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap
}

def propertiesFromJson(json: JValue): Properties = {
Expand Down
1 change: 1 addition & 0 deletions dev/run-tests-jenkins
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ FWDIR="$( cd "$( dirname "$0" )/.." && pwd )"
cd "$FWDIR"

export PATH=/home/anaconda/envs/py36/bin:$PATH
export LANG="en_US.UTF-8"

PYTHON_VERSION_CHECK=$(python3 -c 'import sys; print(sys.version_info < (3, 6, 0))')
if [[ "$PYTHON_VERSION_CHECK" == "True" ]]; then
Expand Down
13 changes: 9 additions & 4 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,20 @@ SCALA_VERSION: "2.12.10"
MESOS_VERSION: 1.0.0
SPARK_ISSUE_TRACKER_URL: https://issues.apache.org/jira/browse/SPARK
SPARK_GITHUB_URL: https://github.com/apache/spark
# Before a new release, we should apply a new `apiKey` for the new Spark documentation
# on https://docsearch.algolia.com/. Otherwise, after release, the search results are always based
# on the latest documentation(https://spark.apache.org/docs/latest/) even when visiting the
# documentation of previous releases.
# Before a new release, we should:
# 1. update the `version` array for the new Spark documentation
# on https://github.com/algolia/docsearch-configs/blob/master/configs/apache_spark.json.
# 2. update the value of `facetFilters.version` in `algoliaOptions` on the new release branch.
# Otherwise, after release, the search results are always based on the latest documentation
# (https://spark.apache.org/docs/latest/) even when visiting the documentation of previous releases.
DOCSEARCH_SCRIPT: |
docsearch({
apiKey: 'b18ca3732c502995563043aa17bc6ecb',
indexName: 'apache_spark',
inputSelector: '#docsearch-input',
enhancedSearchInput: true,
algoliaOptions: {
'facetFilters': ["version:latest"]
},
debug: false // Set debug to true if you want to inspect the dropdown
});
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ private[spark] object BLAS extends Serializable {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"spr doesn't support vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ private[ml] object RFormulaParser extends RegexParsers {

private val pow: Parser[Term] = term ~ "^" ~ "^[1-9]\\d*".r ^^ {
case base ~ "^" ~ degree => power(base, degree.toInt)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
} | term

private val interaction: Parser[Term] = pow * (":" ^^^ { interact _ })
Expand All @@ -298,7 +299,10 @@ private[ml] object RFormulaParser extends RegexParsers {
private val expr = (sum | term)

private val formula: Parser[ParsedRFormula] =
(label ~ "~" ~ expr) ^^ { case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms) }
(label ~ "~" ~ expr) ^^ {
case r ~ "~" ~ t => ParsedRFormula(r, t.asTerms.terms)
case t => throw new IllegalArgumentException(s"Invalid term: $t")
}

def parse(value: String): ParsedRFormula = parseAll(formula, value) match {
case Success(result, _) => result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ object StandardScalerModel extends MLReadable[StandardScalerModel] {
case SparseVector(size, indices, values) =>
val newValues = transformSparseWithScale(scale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case (false, false) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ private[ml] object JsonMatrixConverter {
("values" -> values.toSeq) ~
("isTransposed" -> isTransposed)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ private[ml] object JsonVectorConverter {
case DenseVector(values) =>
val jValue = ("type" -> 1) ~ ("values" -> values.toSeq)
compact(render(jValue))
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ private[spark] class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,9 @@ private[ml] class BlockHingeAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,9 @@ private[ml] class BlockLogisticAggregator(
case sm: SparseMatrix if !fitIntercept =>
val gradSumVec = new DenseVector(gradientSumArray)
BLAS.gemv(1.0, sm.transpose, vec, 1.0, gradSumVec)

case m =>
throw new IllegalArgumentException(s"Unknown matrix type ${m.getClass}.")
}

if (fitIntercept) gradientSumArray(numFeatures) += vec.values.sum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@ private[spark] object Instrumentation {
case Failure(NonFatal(e)) =>
instr.logFailure(e)
throw e
case Failure(e) =>
throw e
case Success(result) =>
instr.logSuccess()
result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ class StandardScalerModel @Since("1.3.0") (
val newValues = NewStandardScalerModel
.transformSparseWithScale(localScale, indices, values.clone())
Vectors.sparse(size, indices, newValues)
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}

case _ => vector
Expand Down
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ private[spark] object BLAS extends Serializable with Logging {
j += 1
prevCol = col
}
case _ =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,8 @@ class VectorUDT extends UserDefinedType[Vector] {
row.setNullAt(2)
row.update(3, UnsafeArrayData.fromPrimitiveArray(values))
row
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ class IndexedRowMatrix @Since("1.0.0") (
.map { case (values, blockColumn) =>
((blockRow.toInt, blockColumn), (rowInBlock.toInt, values.zipWithIndex))
}
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}.groupByKey(GridPartitioner(numRowBlocks, numColBlocks, rows.getNumPartitions)).map {
case ((blockRow, blockColumn), itr) =>
Expand Down Expand Up @@ -187,6 +189,8 @@ class IndexedRowMatrix @Since("1.0.0") (
Iterator.tabulate(indices.length)(i => MatrixEntry(rowIndex, indices(i), values(i)))
case DenseVector(values) =>
Iterator.tabulate(values.length)(i => MatrixEntry(rowIndex, i, values(i)))
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
new CoordinateMatrix(entries, numRows(), numCols())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -748,6 +748,8 @@ class RowMatrix @Since("1.0.0") (
}
buf
}.flatten
case v =>
throw new IllegalArgumentException(s"Unknown vector type ${v.getClass}.")
}
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3264,7 +3264,7 @@
<profile>
<id>scala-2.13</id>
<properties>
<scala.version>2.13.3</scala.version>
<scala.version>2.13.4</scala.version>
<scala.binary.version>2.13</scala.binary.version>
</properties>
<dependencyManagement>
Expand Down
3 changes: 2 additions & 1 deletion python/docs/source/reference/pyspark.mllib.rst
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ Statistics
ChiSqTestResult
MultivariateGaussian
KernelDensity
ChiSqTestResult
KolmogorovSmirnovTestResult


Tree
Expand Down Expand Up @@ -250,4 +252,3 @@ Utilities
Loader
MLUtils
Saveable

Loading

0 comments on commit 71b2def

Please sign in to comment.