Skip to content

Commit

Permalink
Add shims for AnsiCast [databricks] (#5749)
Browse files Browse the repository at this point in the history
* Remove AnsiCast for 340

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Revert "Disable 340 for premerge and nightly (#5689)"

This reverts commit 4769926.

* Disable the failing tests

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Fix a new build error about parquet column

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* Add comment for the disabled tests

Signed-off-by: Firestarman <firestarmanllc@gmail.com>

* address comments

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored Jun 13, 2022
1 parent 07e7e15 commit 808b819
Show file tree
Hide file tree
Showing 24 changed files with 270 additions and 120 deletions.
8 changes: 2 additions & 6 deletions dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,8 @@
<snapshot.buildvers>
314,
322,
330
<!--
Remove 340 now due to https://github.com/NVIDIA/spark-rapids/issues/5688.
Need to add it back when #5688 is fixed
-->
<!-- 340 -->
330,
340
</snapshot.buildvers>
<databricks.buildvers>
312db,
Expand Down
3 changes: 1 addition & 2 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ mvn_verify() {
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=321 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
[[ $BUILD_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=322 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=330 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
# Skip 340 now due to https://github.com/NVIDIA/spark-rapids/issues/5688. Need to enable it when #5688 is fixed
# [[ $BUILD_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=340 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
[[ $BUILD_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=340 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am

# Here run Python integration tests tagged with 'premerge_ci_1' only, that would help balance test duration and memory
# consumption from two k8s pods running in parallel, which executes 'mvn_verify()' and 'ci_2()' respectively.
Expand Down
5 changes: 2 additions & 3 deletions jenkins/version-def.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ SPARK_REPO=${SPARK_REPO:-"$URM_URL"}
echo "CUDF_VER: $CUDF_VER, CUDA_CLASSIFIER: $CUDA_CLASSIFIER, PROJECT_VER: $PROJECT_VER \
SPARK_VER: $SPARK_VER, SCALA_BINARY_VER: $SCALA_BINARY_VER"

# Remove 340 now due to https://github.com/NVIDIA/spark-rapids/issues/5688.
# Need to add it back when #5688 is fixed
SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"311 321cdh 312 313 314 320 321 322 330"}

SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"311 321cdh 312 313 314 320 321 322 330 340"}

IFS=" " <<< $SPARK_SHIM_VERSIONS_STR read -r -a SPARK_SHIM_VERSIONS

Expand Down
18 changes: 12 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -409,14 +409,15 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/311until330-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/320until330-nondb/scala</source>
<source>${project.basedir}/src/main/320until330-noncdh/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -476,15 +477,16 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/311until330-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/320until330-nondb/scala</source>
<source>${project.basedir}/src/main/320until330-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/321until330-all/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -544,13 +546,14 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/311until330-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/320until330-nondb/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/321until330-all/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -616,15 +619,16 @@
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/311until330-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/320until330-nondb/scala</source>
<source>${project.basedir}/src/main/320until330-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/321until330-all/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -695,14 +699,15 @@
<sources>
<source>${project.basedir}/src/main/321db/scala</source>
<source>${project.basedir}/src/main/311until330-all/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/311+-db/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until330-all/scala</source>
<source>${project.basedir}/src/main/320until330-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/321until330-all/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down Expand Up @@ -749,12 +754,13 @@
<sources>
<source>${project.basedir}/src/main/330/scala</source>
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/330+/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,7 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with
case _ => false
}

override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
GpuOverrides.expr[Cast](
"Convert a column of one type of data into another type",
new CastChecks(),
(cast, conf, p, r) => new CastExprMeta[Cast](cast,
SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r,
doFloatToIntCheck = true, stringToAnsiDate = false)),
override def ansiCastRule: ExprRule[_ <: Expression] = {
GpuOverrides.expr[AnsiCast](
"Convert a column of one type of data into another type",
new CastChecks {
Expand Down Expand Up @@ -248,7 +242,16 @@ abstract class Spark31XShims extends SparkShims with Spark31Xuntil33XShims with
override val sparkUdtSig: TypeSig = UDT
},
(cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, ansiEnabled = true, conf = conf,
parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = false)),
parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = false))
}

override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
GpuOverrides.expr[Cast](
"Convert a column of one type of data into another type",
new CastChecks(),
(cast, conf, p, r) => new CastExprMeta[Cast](cast,
SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r,
doFloatToIntCheck = true, stringToAnsiDate = false)),
GpuOverrides.expr[Average](
"Average aggregate operator",
ExprChecks.fullAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,7 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
override def isWindowFunctionExec(plan: SparkPlan): Boolean =
plan.isInstanceOf[WindowExecBase] || plan.isInstanceOf[RunningWindowFunctionExec]

override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
GpuOverrides.expr[Cast](
"Convert a column of one type of data into another type",
new CastChecks(),
// 312db supports Ansi mode when casting string to date, this means that an exception
// will be thrown when casting an invalid value to date in Ansi mode.
// Set `stringToAnsiDate` = true
(cast, conf, p, r) => new CastExprMeta[Cast](cast,
SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r,
doFloatToIntCheck = true, stringToAnsiDate = true)),
override def ansiCastRule: ExprRule[_ <: Expression] = {
GpuOverrides.expr[AnsiCast](
"Convert a column of one type of data into another type",
new CastChecks {
Expand Down Expand Up @@ -119,7 +110,19 @@ abstract class Spark31XdbShims extends Spark31XdbShimsBase with Logging {
// will be thrown when casting an invalid value to date in Ansi mode.
// Set `stringToAnsiDate` = true
(cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, ansiEnabled = true, conf = conf,
parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = true)),
parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = true))
}

override def getExprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Seq(
GpuOverrides.expr[Cast](
"Convert a column of one type of data into another type",
new CastChecks(),
// 312db supports Ansi mode when casting string to date, this means that an exception
// will be thrown when casting an invalid value to date in Ansi mode.
// Set `stringToAnsiDate` = true
(cast, conf, p, r) => new CastExprMeta[Cast](cast,
SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r,
doFloatToIntCheck = true, stringToAnsiDate = true)),
GpuOverrides.expr[Average](
"Average aggregate operator",
ExprChecks.fullAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,65 +129,6 @@ trait Spark320PlusShims extends SparkShims with RebaseShims with Logging {
(cast, conf, p, r) => new CastExprMeta[Cast](cast,
SparkSession.active.sessionState.conf.ansiEnabled, conf, p, r,
doFloatToIntCheck = true, stringToAnsiDate = true)),
GpuOverrides.expr[AnsiCast](
"Convert a column of one type of data into another type",
new CastChecks {

import TypeSig._
// nullChecks are the same

override val booleanChecks: TypeSig = integral + fp + BOOLEAN + STRING
override val sparkBooleanSig: TypeSig = cpuNumeric + BOOLEAN + STRING

override val integralChecks: TypeSig = gpuNumeric + BOOLEAN + STRING
override val sparkIntegralSig: TypeSig = cpuNumeric + BOOLEAN + STRING

override val fpChecks: TypeSig = (gpuNumeric + BOOLEAN + STRING)
.withPsNote(TypeEnum.STRING, fpToStringPsNote)
override val sparkFpSig: TypeSig = cpuNumeric + BOOLEAN + STRING

override val dateChecks: TypeSig = TIMESTAMP + DATE + STRING
override val sparkDateSig: TypeSig = TIMESTAMP + DATE + STRING

override val timestampChecks: TypeSig = TIMESTAMP + DATE + STRING
override val sparkTimestampSig: TypeSig = TIMESTAMP + DATE + STRING

// stringChecks are the same, but adding in PS note
private val fourDigitYearMsg: String = "Only 4 digit year parsing is available. To " +
s"enable parsing anyways set ${RapidsConf.HAS_EXTENDED_YEAR_VALUES} to false."
override val stringChecks: TypeSig = gpuNumeric + BOOLEAN + STRING + BINARY +
TypeSig.psNote(TypeEnum.DATE, fourDigitYearMsg) +
TypeSig.psNote(TypeEnum.TIMESTAMP, fourDigitYearMsg)

// binaryChecks are the same
override val decimalChecks: TypeSig = gpuNumeric + STRING
override val sparkDecimalSig: TypeSig = cpuNumeric + BOOLEAN + STRING

// calendarChecks are the same

override val arrayChecks: TypeSig =
ARRAY.nested(commonCudfTypes + DECIMAL_128 + NULL + ARRAY + BINARY + STRUCT) +
psNote(TypeEnum.ARRAY, "The array's child type must also support being cast to " +
"the desired child type")
override val sparkArraySig: TypeSig = ARRAY.nested(all)

override val mapChecks: TypeSig =
MAP.nested(commonCudfTypes + DECIMAL_128 + NULL + ARRAY + BINARY + STRUCT + MAP) +
psNote(TypeEnum.MAP, "the map's key and value must also support being cast to the " +
"desired child types")
override val sparkMapSig: TypeSig = MAP.nested(all)

override val structChecks: TypeSig =
STRUCT.nested(commonCudfTypes + DECIMAL_128 + NULL + ARRAY + BINARY + STRUCT) +
psNote(TypeEnum.STRUCT, "the struct's children must also support being cast to the " +
"desired child type(s)")
override val sparkStructSig: TypeSig = STRUCT.nested(all)

override val udtChecks: TypeSig = none
override val sparkUdtSig: TypeSig = UDT
},
(cast, conf, p, r) => new CastExprMeta[AnsiCast](cast, ansiEnabled = true, conf = conf,
parent = p, rule = r, doFloatToIntCheck = true, stringToAnsiDate = true)),
GpuOverrides.expr[Average](
"Average aggregate operator",
ExprChecks.fullAgg(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.spark.sql.types.StructType

object SparkShimImpl extends Spark320PlusShims
with Spark320PlusNonDBShims
with Spark31Xuntil33XShims {
with Spark31Xuntil33XShims
with Spark320until340Shims {
override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion

override def getFileScanRDD(
Expand Down
Loading

0 comments on commit 808b819

Please sign in to comment.