Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spark 3.0 EMR Shim layer #827

Merged
merged 10 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integration_tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ All of the tests will run in a single application. They just enable and disable

You do need to have access to a compatible GPU with the needed CUDA drivers. The exact details of how to set this up are beyond the scope of this document, but the Spark feature for scheduling GPUs does make this very simple if you have it configured.

### Runtime Environment

`--runtime_env` is used to specify the environment you are running the tests in. generally Valid values are `databricks` and `emr`. This is generally used
tgravescs marked this conversation as resolved.
Show resolved Hide resolved
when certain environment have different behavior.
tgravescs marked this conversation as resolved.
Show resolved Hide resolved

### timezone

The RAPIDS plugin currently only supports the UTC time zone. Spark uses the default system time zone unless explicitly set otherwise.
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ def is_apache_runtime():
def is_databricks_runtime():
return runtime_env() == "databricks"

def is_emr_runtime():
return runtime_env() == "emr"

_limit = -1

def get_limit():
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/csv_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,8 @@ def test_csv_fallback(spark_tmp_path, read_func, disable_conf):
assert_gpu_fallback_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
'FileSourceScanExec',
conf={disable_conf: 'false'})
conf={disable_conf: 'false',
"spark.sql.sources.useV1SourceList": "csv"})

csv_supported_date_formats = ['yyyy-MM-dd', 'yyyy/MM/dd', 'yyyy-MM', 'yyyy/MM',
'MM-yyyy', 'MM/yyyy', 'MM-dd-yyyy', 'MM/dd/yyyy']
Expand Down
4 changes: 3 additions & 1 deletion integration_tests/src/main/python/join_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import pytest
from pyspark.sql.functions import broadcast
from asserts import assert_gpu_and_cpu_are_equal_collect
from conftest import is_databricks_runtime
from conftest import is_databricks_runtime, is_emr_runtime
from data_gen import *
from marks import ignore_order, allow_non_gpu, incompat
from spark_session import with_spark_session, is_before_spark_310
Expand Down Expand Up @@ -165,6 +165,8 @@ def do_join(spark):

@ignore_order
@allow_non_gpu('DataWritingCommandExec')
@pytest.mark.xfail(condition=is_emr_runtime(),
reason='https://github.com/NVIDIA/spark-rapids/issues/821')
@pytest.mark.parametrize('repartition', ["true", "false"], ids=idfn)
def test_join_bucketed_table(repartition):
def do_join(spark):
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/orc_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ def test_orc_fallback(spark_tmp_path, read_func, disable_conf):
assert_gpu_fallback_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
'FileSourceScanExec',
conf={disable_conf: 'false'})
conf={disable_conf: 'false',
"spark.sql.sources.useV1SourceList": "orc"})

@pytest.mark.parametrize('orc_gens', orc_gens_list, ids=idfn)
@pytest.mark.parametrize('read_func', [read_orc_df, read_orc_sql])
Expand Down
3 changes: 2 additions & 1 deletion integration_tests/src/main/python/parquet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def test_parquet_fallback(spark_tmp_path, read_func, disable_conf):
assert_gpu_fallback_collect(
lambda spark : reader(spark).select(f.col('*'), f.col('_c2') + f.col('_c3')),
'FileSourceScanExec',
conf={disable_conf: 'false'})
conf={disable_conf: 'false',
"spark.sql.sources.useV1SourceList": "parquet"})
revans2 marked this conversation as resolved.
Show resolved Hide resolved

parquet_compress_options = ['none', 'uncompressed', 'snappy', 'gzip']
# The following need extra jars 'lzo', 'lz4', 'brotli', 'zstd'
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def with_gpu_session(func, conf={}):
return with_spark_session(func, conf=copy)

def is_spark_300():
return spark_version() == "3.0.0"
return (spark_version() == "3.0.0" or spark_version().startswith('3.0.0-amzn'))

def is_before_spark_310():
return spark_version() < "3.1.0"
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@
<rat.consoleOutput>false</rat.consoleOutput>
<slf4j.version>1.7.30</slf4j.version>
<spark300.version>3.0.0</spark300.version>
<spark300emr.version>3.0.0-amzn</spark300emr.version>
<!--
If you update a dependendy version so it is no longer a SNAPSHOT
please update the snapshot-shims profile as well so it is accurate -->
Expand Down
6 changes: 6 additions & 0 deletions shims/aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark300emr_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark301_${scala.binary.version}</artifactId>
Expand Down
1 change: 1 addition & 0 deletions shims/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@

<modules>
<module>spark300</module>
<module>spark300emr</module>
<module>spark301</module>
<module>aggregator</module>
</modules>
Expand Down
87 changes: 87 additions & 0 deletions shims/spark300emr/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020, NVIDIA CORPORATION.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims_2.12</artifactId>
<version>0.3.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark300emr_2.12</artifactId>
<name>RAPIDS Accelerator for Apache Spark SQL Plugin Spark 3.0.0 EMR Shim</name>
<description>The RAPIDS SQL plugin for Apache Spark 3.0.0 EMR Shim</description>
<version>0.3.0-SNAPSHOT</version>

<!-- Set 'spark.version' for the shims layer -->
<!-- Create a separate file 'SPARK_VER.properties' in the jar to save cudf & spark version info -->
<build>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>dependency</id>
<phase>generate-resources</phase>
<configuration>
<target>
<mkdir dir="${project.build.directory}/extra-resources"/>
<exec executable="bash" output="${project.build.directory}/extra-resources/spark-${spark300emr.version}-info.properties">
<arg value="${user.dir}/build/dependency-info.sh"/>
<arg value="${cudf.version}"/>
<arg value="${cuda.version}"/>
<arg value="${spark300emr.version}"/>
</exec>
</target>
</configuration>
<goals>
<goal>run</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>

<resources>
<resource>
<!-- Include the properties file to provide the build information. -->
<directory>${project.build.directory}/extra-resources</directory>
</resource>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
</build>

<dependencies>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-shims-spark300_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark300.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.nvidia.spark.rapids.shims.spark300emr.SparkShimServiceProvider
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark300emr

import com.nvidia.spark.rapids._
import com.nvidia.spark.rapids.shims.spark300.Spark300Shims
import com.nvidia.spark.rapids.spark300emr.RapidsShuffleManager

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile}

class Spark300EMRShims extends Spark300Shims {

override def getSparkShimVersion: ShimVersion = SparkShimServiceProvider.VERSION

override def getRapidsShuffleManagerClass: String = {
classOf[RapidsShuffleManager].getCanonicalName
}

// use reflection here so we don't have to compile against their jars
override def getFileScanRDD(
sparkSession: SparkSession,
readFunction: (PartitionedFile) => Iterator[InternalRow],
filePartitions: Seq[FilePartition]): RDD[InternalRow] = {

val tclass = classOf[org.apache.spark.sql.execution.datasources.FileScanRDD]
val constructors = tclass.getConstructors()
if (constructors.size > 1) {
throw new IllegalStateException(s"Only expected 1 constructor for FileScanRDD")
}
val constructor = constructors(0)
revans2 marked this conversation as resolved.
Show resolved Hide resolved
val instance = if (constructor.getParameterCount() == 4) {
constructor.newInstance(sparkSession, readFunction, filePartitions, None)
} else if (constructor.getParameterCount() == 3) {
constructor.newInstance(sparkSession, readFunction, filePartitions)
} else {
throw new IllegalStateException("Could not find appropriate constructor for FileScan RDD")
}
instance.asInstanceOf[FileScanRDD]
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.shims.spark300emr

import com.nvidia.spark.rapids.{EMRShimVersion, SparkShims, SparkShimVersion}

object SparkShimServiceProvider {
val VERSION = EMRShimVersion(3, 0, 0)
}

class SparkShimServiceProvider extends com.nvidia.spark.rapids.SparkShimServiceProvider {

def matchesVersion(version: String): Boolean = {
// EMR version looks like 3.0.0-amzn-0
val amznVersion = (SparkShimServiceProvider.VERSION.toString + raw"(-\d+)").r
version match {
case amznVersion(_*) => true
case _ => false
}
}

def buildShim: SparkShims = {
new Spark300EMRShims()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.spark300emr

import org.apache.spark.SparkConf
import org.apache.spark.sql.rapids.shims.spark300.RapidsShuffleInternalManager

/** A shuffle manager optimized for the RAPIDS Plugin for Apache Spark. */
sealed class RapidsShuffleManager(
conf: SparkConf,
isDriver: Boolean) extends RapidsShuffleInternalManager(conf, isDriver) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ case class DatabricksShimVersion(major: Int, minor: Int, patch: Int) extends Shi
override def toString(): String = s"$major.$minor.$patch-databricks"
}

case class EMRShimVersion(major: Int, minor: Int, patch: Int) extends ShimVersion {
override def toString(): String = s"$major.$minor.$patch-amzn"
}

trait SparkShims {
def getSparkShimVersion: ShimVersion
def isGpuHashJoin(plan: SparkPlan): Boolean
Expand Down