Skip to content

Commit

Permalink
Add Apache Spark 3.3.1-SNAPSHOT Shims (#5877)
Browse files Browse the repository at this point in the history
* Add shims for spark 3.3.1-SNAPSHOT

Signed-off-by: Anthony Chang <antchang@nvidia.com>

* Add spark 3.3.1 shim test suite

Signed-off-by: Anthony Chang <antchang@nvidia.com>

* Fix some version definitions

Signed-off-by: Anthony Chang <antchang@nvidia.com>

* Factor out common 330 and 331 shims to 330until340
  • Loading branch information
anthony-chang authored Jun 22, 2022
1 parent 84bf4ba commit 9802304
Show file tree
Hide file tree
Showing 12 changed files with 255 additions and 2 deletions.
1 change: 1 addition & 0 deletions build/buildall
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ case $DIST_PROFILE in
320
321
322
331
)
;;

Expand Down
3 changes: 2 additions & 1 deletion dist/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
</noSnapshot.buildvers>
<snapshot.buildvers>
314,
322
322,
331
</snapshot.buildvers>
<databricks.buildvers>
312db,
Expand Down
1 change: 1 addition & 0 deletions jenkins/spark-premerge-build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ mvn_verify() {
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=321 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
[[ $BUILD_MAINTENANCE_VERSION_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=322 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=330 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
[[ $BUILD_MAINTENANCE_VERSION_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=331 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am
[[ $BUILD_FEATURE_VERSION_SNAPSHOTS == "true" ]] && env -u SPARK_HOME mvn -U -B $MVN_URM_MIRROR -Dbuildver=340 clean install -Drat.skip=true -DskipTests -Dmaven.javadoc.skip=true -Dskip -Dmaven.scalastyle.skip=true -Dcuda.version=$CUDA_CLASSIFIER -pl aggregator -am

# Here run Python integration tests tagged with 'premerge_ci_1' only, that would help balance test duration and memory
Expand Down
2 changes: 1 addition & 1 deletion jenkins/version-def.sh
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ echo "CUDF_VER: $CUDF_VER, CUDA_CLASSIFIER: $CUDA_CLASSIFIER, PROJECT_VER: $PROJ
SPARK_VER: $SPARK_VER, SCALA_BINARY_VER: $SCALA_BINARY_VER"


SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"311 321cdh 312 313 314 320 321 322 330"}
SPARK_SHIM_VERSIONS_STR=${SPARK_SHIM_VERSIONS_STR:-"311 321cdh 312 313 314 320 321 322 330 331"}

IFS=" " <<< $SPARK_SHIM_VERSIONS_STR read -r -a SPARK_SHIM_VERSIONS

Expand Down
67 changes: 67 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,7 @@
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/330until340/scala</source>
<source>${project.basedir}/src/main/330+/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
Expand Down Expand Up @@ -791,6 +792,71 @@
<module>aggregator</module>
</modules>
</profile>
<profile>
<id>release331</id>
<activation>
<property>
<name>buildver</name>
<value>331</value>
</property>
</activation>
<properties>
<spark.version>${spark331.version}</spark.version>
<spark.test.version>${spark331.version}</spark.test.version>
<parquet.hadoop.version>1.12.2</parquet.hadoop.version>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<executions>
<execution>
<id>add-profile-src-31+</id>
<goals><goal>add-source</goal></goals>
<phase>none</phase> <!--Disable it for all submodules, only sql-plugin needs to enable it -->
<configuration>
<sources>
<source>${project.basedir}/src/main/331/scala</source>
<source>${project.basedir}/src/main/311+-nondb/scala</source>
<source>${project.basedir}/src/main/311until340-all/scala</source>
<source>${project.basedir}/src/main/320+/scala</source>
<source>${project.basedir}/src/main/320+-nondb/scala</source>
<source>${project.basedir}/src/main/320+-noncdh/scala</source>
<source>${project.basedir}/src/main/320until340-all/scala</source>
<source>${project.basedir}/src/main/321+/scala</source>
<source>${project.basedir}/src/main/330until340/scala</source>
<source>${project.basedir}/src/main/330+/scala</source>
<source>${project.basedir}/src/main/post320-treenode/scala</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-profile-src-331</id>
<goals><goal>add-test-source</goal></goals>
<phase>generate-test-sources</phase>
<configuration>
<sources>
<source>${project.basedir}/src/test/331/scala</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<modules>
<module>common</module>
<module>dist</module>
<module>integration_tests</module>
<module>shuffle-plugin</module>
<module>sql-plugin</module>
<module>tests</module>
<module>udf-compiler</module>
<module>tools</module>
<module>aggregator</module>
</modules>
</profile>
<profile>
<id>release340</id>
<activation>
Expand Down Expand Up @@ -944,6 +1010,7 @@
<spark321db.version>3.2.1-databricks</spark321db.version>
<spark322.version>3.2.2-SNAPSHOT</spark322.version>
<spark330.version>3.3.0</spark330.version>
<spark331.version>3.3.1-SNAPSHOT</spark331.version>
<spark340.version>3.4.0-SNAPSHOT</spark340.version>
<mockito.version>3.6.0</mockito.version>
<scala.plugin.version>4.3.0</scala.plugin.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims

import com.nvidia.spark.rapids._

object SparkShimImpl extends Spark330PlusShims with Spark320until340Shims {
override def getSparkShimVersion: ShimVersion = ShimLoader.getShimVersion
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark331

import com.nvidia.spark.rapids.SparkShimVersion

object SparkShimServiceProvider {
val VERSION = SparkShimVersion(3, 3, 1)
val VERSIONNAMES = Seq(s"$VERSION", s"$VERSION-SNAPSHOT")
}

class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider {

override def getShimVersion: SparkShimVersion = SparkShimServiceProvider.VERSION

def matchesVersion(version: String): Boolean = {
SparkShimServiceProvider.VERSIONNAMES.contains(version)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.spark331

import org.apache.spark.SparkConf
import org.apache.spark.sql.rapids.shims.spark331.ProxyRapidsShuffleInternalManager

/** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */
sealed class RapidsShuffleManager(
conf: SparkConf,
isDriver: Boolean) extends ProxyRapidsShuffleInternalManager(conf, isDriver) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.rapids.shims.spark331

import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.shuffle._
import org.apache.spark.sql.rapids.{ProxyRapidsShuffleInternalManagerBase, RapidsShuffleInternalManagerBase}

/**
* A shuffle manager optimized for the RAPIDS Plugin For Apache Spark.
* @note This is an internal class to obtain access to the private
* `ShuffleManager` and `SortShuffleManager` classes.
*/
class RapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends RapidsShuffleInternalManagerBase(conf, isDriver) {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = {
getReaderInternal(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}


class ProxyRapidsShuffleInternalManager(conf: SparkConf, isDriver: Boolean)
extends ProxyRapidsShuffleInternalManagerBase(conf, isDriver) with ShuffleManager {

def getReader[K, C](
handle: ShuffleHandle,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int,
context: TaskContext,
metrics: ShuffleReadMetricsReporter
): ShuffleReader[K, C] = {
self.getReader(handle, startMapIndex, endMapIndex, startPartition, endPartition, context,
metrics)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright (c) 2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark331;

import com.nvidia.spark.rapids.{ShimLoader, SparkShimVersion, TypeSig}
import com.nvidia.spark.rapids.shims.SparkShimImpl
import org.scalatest.FunSuite

import org.apache.spark.sql.types.{DayTimeIntervalType, YearMonthIntervalType}

class Spark331ShimsSuite extends FunSuite {
test("spark shims version") {
assert(SparkShimImpl.getSparkShimVersion === SparkShimVersion(3, 3, 1))
}

test("shuffle manager class") {
assert(ShimLoader.getRapidsShuffleManagerClass ===
classOf[com.nvidia.spark.rapids.spark331.RapidsShuffleManager].getCanonicalName)
}

test("TypeSig331") {
val check = TypeSig.DAYTIME + TypeSig.YEARMONTH
assert(check.isSupportedByPlugin(DayTimeIntervalType()) == true)
assert(check.isSupportedByPlugin(YearMonthIntervalType()) == true)
}

}

0 comments on commit 9802304

Please sign in to comment.