Skip to content

Commit

Permalink
Add a shim provider for Spark 3.2.0 development branch (NVIDIA#1704)
Browse files Browse the repository at this point in the history
Signed-off-by: Gera Shegalov <gera@apache.org>

Add a shim provider for Spark 3.2.0 development branch. Closes NVIDIA#1490
- fix overflows in aggregate buffer for GpuSum by wiring the explicit output column type
- unit tests for the new shim
- consolidate version profiles in the parent pom
  • Loading branch information
gerashegalov authored Mar 2, 2021
1 parent db09eb4 commit 6614a88
Show file tree
Hide file tree
Showing 22 changed files with 396 additions and 116 deletions.
6 changes: 6 additions & 0 deletions api_validation/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@
<spark.version>${spark311.version}</spark.version>
</properties>
</profile>
<profile>
<id>spark320</id>
<properties>
<spark.version>${spark320.version}</spark.version>
</properties>
</profile>
</profiles>

<dependencies>
Expand Down
1 change: 1 addition & 0 deletions docs/additional-functionality/rapids-shuffle.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ In this section, we are using a docker container built using the sample dockerfi
| 3.0.1 EMR | com.nvidia.spark.rapids.spark301emr.RapidsShuffleManager |
| 3.0.2 | com.nvidia.spark.rapids.spark302.RapidsShuffleManager |
| 3.1.1 | com.nvidia.spark.rapids.spark311.RapidsShuffleManager |
| 3.2.0 | com.nvidia.spark.rapids.spark320.RapidsShuffleManager |

2. Recommended settings for UCX 1.9.0+
```shell
Expand Down
30 changes: 0 additions & 30 deletions integration_tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,36 +28,6 @@
<artifactId>rapids-4-spark-integration-tests_2.12</artifactId>
<version>0.5.0-SNAPSHOT</version>

<properties>
<spark.test.version>${spark300.version}</spark.test.version>
</properties>
<profiles>
<profile>
<id>spark301dbtests</id>
<properties>
<spark.test.version>${spark301db.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark301tests</id>
<properties>
<spark.test.version>${spark301.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark302tests</id>
<properties>
<spark.test.version>${spark302.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark311tests</id>
<properties>
<spark.test.version>${spark311.version}</spark.test.version>
</properties>
</profile>
</profiles>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
30 changes: 30 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,39 @@
<rat.consoleOutput>true</rat.consoleOutput>
</properties>
</profile>

<profile>
<id>spark301dbtests</id>
<properties>
<spark.test.version>${spark301db.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark301tests</id>
<properties>
<spark.test.version>${spark301.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark302tests</id>
<properties>
<spark.test.version>${spark302.version}</spark.test.version>
</properties>
</profile>
<profile>
<id>spark311tests</id>
<properties>
<spark.test.version>${spark311.version}</spark.test.version>
</properties>
<modules>
<module>tests-spark310+</module>
</modules>
</profile>
<profile>
<id>spark320tests</id>
<properties>
<spark.test.version>${spark320.version}</spark.test.version>
</properties>
<modules>
<module>tests-spark310+</module>
</modules>
Expand All @@ -146,6 +174,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<spark.version>${spark300.version}</spark.version>
<spark.test.version>${spark300.version}</spark.test.version>
<cuda.version>cuda10-1</cuda.version>
<cudf.version>0.19-SNAPSHOT</cudf.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down Expand Up @@ -177,6 +206,7 @@
<spark301db.version>3.0.1-databricks</spark301db.version>
<spark302.version>3.0.2</spark302.version>
<spark311.version>3.1.1-SNAPSHOT</spark311.version>
<spark320.version>3.2.0-SNAPSHOT</spark320.version>
<mockito.version>3.6.0</mockito.version>
<scala.plugin.version>4.3.0</scala.plugin.version>
<maven.jar.plugin.version>3.2.0</maven.jar.plugin.version>
Expand Down
6 changes: 6 additions & 0 deletions shims/aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@
<activeByDefault>true</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark320_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark311_${scala.binary.version}</artifactId>
Expand Down
10 changes: 10 additions & 0 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
</activation>
<modules>
<module>spark311</module>
<module>spark320</module>
</modules>
</profile>
</profiles>
Expand All @@ -71,13 +72,22 @@
<classifier>${cuda.version}</classifier>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,21 @@ import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.connector.read.Scan
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.datasources.{FileIndex, FilePartition, FileScanRDD, HadoopFsRelation, InMemoryFileIndex, PartitionDirectory, PartitionedFile}
import org.apache.spark.sql.execution.datasources.rapids.GpuPartitioningUtils
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScan
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ReusedExchangeExec, ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, HashJoin, SortMergeJoinExec}
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.execution.python.WindowInPandasExec
Expand Down Expand Up @@ -584,4 +586,18 @@ class Spark300Shims extends SparkShims {
}
recurse(plan, predicate, new ListBuffer[SparkPlan]())
}

override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec) => e
case BroadcastQueryStageExec(_, e: ReusedExchangeExec) => e
}

/** dropped by SPARK-34234 */
override def attachTreeIfSupported[TreeType <: TreeNode[_], A](
tree: TreeType,
msg: String)(
f: => A
): A = {
attachTree(tree, msg)(f)
}
}
1 change: 0 additions & 1 deletion shims/spark301db/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@

<properties>
<parquet.version>1.10.1</parquet.version>
<spark301db.version>3.0.1-databricks</spark301db.version>
<arrow.version>0.15.1</arrow.version>
</properties>

Expand Down
92 changes: 92 additions & 0 deletions shims/spark320/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2021, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims_2.12</artifactId>
<version>0.5.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark320_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.2.0 Shim</name>
<description>The RAPIDS SQL plugin for Apache Spark 3.2.0 Shim</description>
<version>0.5.0-SNAPSHOT</version>

<!-- Set 'spark.version' for the shims layer -->
<!-- Create a separate file 'SPARK_VER.properties' in the jar to save cudf & spark version info -->
<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>dependency</id>
<phase>generate-resources</phase>
<configuration>
<target>
<mkdir dir="${project.build.directory}/extra-resources"/>
<exec executable="bash" failonerror="true"
output="${project.build.directory}/extra-resources/spark-${spark320.version}-info.properties">
<arg value="${user.dir}/build/dependency-info.sh"/>
<arg value="${cudf.version}"/>
<arg value="${cuda.version}"/>
<arg value="${spark320.version}"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
</plugin>
</plugins>

<resources>
<resource>
<!-- Include the properties file to provide the build information. -->
<directory>${project.build.directory}/extra-resources</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>

<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark311_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark320.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.nvidia.spark.rapids.shims.spark320.SparkShimServiceProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark320

import com.nvidia.spark.rapids.ShimVersion
import com.nvidia.spark.rapids.shims.spark311.Spark311Shims
import com.nvidia.spark.rapids.spark320.RapidsShuffleManager

import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.adaptive.{BroadcastQueryStageExec, ShuffleQueryStageExec}
import org.apache.spark.sql.execution.exchange.ReusedExchangeExec

class Spark320Shims extends Spark311Shims {

override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION320

override def getRapidsShuffleManagerClass: String = {
classOf[RapidsShuffleManager].getCanonicalName
}

/**
* Case class ShuffleQueryStageExec holds an additional field shuffleOrigin
* affecting the unapply method signature
*/
override def reusedExchangeExecPfn: PartialFunction[SparkPlan, ReusedExchangeExec] = {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e
case BroadcastQueryStageExec(_, e: ReusedExchangeExec, _) => e
}

/** dropped by SPARK-34234 */
override def attachTreeIfSupported[TreeType <: TreeNode[_], A](
tree: TreeType,
msg: String)(
f: => A
): A = {
identity(f)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark320

import com.nvidia.spark.rapids.{SparkShims, SparkShimVersion}

object SparkShimServiceProvider {
val VERSION320 = SparkShimVersion(3, 2, 0)
val VERSIONNAMES: Seq[String] = Seq(VERSION320)
.flatMap(v => Seq(s"$v", s"$v-SNAPSHOT"))
}

class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider {

def matchesVersion(version: String): Boolean = {
SparkShimServiceProvider.VERSIONNAMES.contains(version)
}

def buildShim: SparkShims = {
new Spark320Shims()
}
}
Loading

0 comments on commit 6614a88

Please sign in to comment.