Skip to content

Commit

Permalink
Add udf-compiler skeleton (NVIDIA#434)
Browse files Browse the repository at this point in the history
- This change adds a udf-compiler plugin without detailed compiler
implementations
- To keep previous commit history and for a better review, this change
removes the detailed compiler implementation based on previous commits
- This change keeps the unit test framework but removes test cases.
- Compiler implementations will come in next MR

Co-authored-by: Sean Lee <selee@nvidia.com>
Co-authored-by: Nicholas Edelman <nedelman@nvidia.com>
Co-authored-by: Alessandro Bellina <abellina@nvidia.com>
Signed-off-by: wjxiz1992 <wjxiz1992@gmail.com>

Co-authored-by: Alessandro Bellina <abellina@nvidia.com>
Co-authored-by: Sean Lee <selee@nvidia.com>
Co-authored-by: Nicholas Edelman <nedelman@nvidia.com>
  • Loading branch information
4 people authored Jul 30, 2020
1 parent c46827a commit b183122
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 0 deletions.
12 changes: 12 additions & 0 deletions docs/compatibility.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,15 @@ Casting from string to timestamp currently has the following limitations.
milliseconds, with 2 digits each for hours, minutes, and seconds, and 6 digits for milliseconds.
Only timezone 'Z' (UTC) is supported. Casting unsupported formats will result in null values.

## UDF to Catalyst Expressions
To speedup the process of UDF, spark-rapids introduces a udf-compiler extension to translate UDFs to Catalyst expressions.

To enable this operation on the GPU, set
[`spark.rapids.sql.udfCompiler.enabled`](configs.md#sql.udfCompiler.enabled) to `true`.

However, Spark may produce different results for a compiled udf and the non-compiled. For example: a udf of `x/y` where `y` happens to be `0`, the compiled catalyst expressions will return `NULL` while the original udf would fail the entire job with a `java.lang.ArithmeticException: / by zero`

When translating UDFs to Catalyst expressions, the supported UDF functions are limited:

| Operand type | Operation |
| ------------------------------------------------------------------- | ------------------|
1 change: 1 addition & 0 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Name | Description | Default Value
<a name="sql.concurrentGpuTasks"></a>spark.rapids.sql.concurrentGpuTasks|Set the number of tasks that can execute concurrently per GPU. Tasks may temporarily block when the number of concurrent tasks in the executor exceeds this amount. Allowing too many concurrent tasks on the same GPU may lead to GPU out of memory errors.|1
<a name="sql.csvTimestamps.enabled"></a>spark.rapids.sql.csvTimestamps.enabled|When set to true, enables the CSV parser to read timestamps. The default output format for Spark includes a timezone at the end. Anything except the UTC timezone is not supported. Timestamps after 2038 and before 1902 are also not supported.|false
<a name="sql.enabled"></a>spark.rapids.sql.enabled|Enable (true) or disable (false) sql operations on the GPU|true
<a name="sql.udfCompiler.enabled"></a>spark.rapids.sql.udfCompiler.enabled|When set to true, all UDFs are compiled to Catalyst expressions by Catalyst Analyzer|false
<a name="sql.explain"></a>spark.rapids.sql.explain|Explain why some parts of a query were not placed on a GPU or not. Possible values are ALL: print everything, NONE: print nothing, NOT_ON_GPU: print only parts of a query that did not go on the GPU|NONE
<a name="sql.format.csv.enabled"></a>spark.rapids.sql.format.csv.enabled|When set to false disables all csv input and output acceleration. (only input is currently supported anyways)|true
<a name="sql.format.csv.read.enabled"></a>spark.rapids.sql.format.csv.read.enabled|When set to false disables csv input acceleration|true
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,16 @@
<module>integration_tests</module>
<module>shims</module>
<module>api_validation</module>
<module>udf-compiler</module>
</modules>

<profiles>
<profile>
<id>udf-compiler</id>
<modules>
<module>udf-compiler</module>
</modules>
</profile>
<profile>
<id>alpha-features</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ object RapidsConf {
.booleanConf
.createWithDefault(true)

val UDF_COMPILER_ENABLED = conf("spark.rapids.sql.udfCompiler.enabled")
.doc("When set to true, all UDFs will be compiled to Catalyst expressions by Catalyst " +
"Analyzer.")
.booleanConf
.createWithDefault(false)

val INCOMPATIBLE_OPS = conf("spark.rapids.sql.incompatibleOps.enabled")
.doc("For operations that work, but are not 100% compatible with the Spark equivalent " +
"set if they should be enabled by default or disabled by default.")
Expand Down Expand Up @@ -752,6 +758,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val isSqlEnabled: Boolean = get(SQL_ENABLED)

lazy val isUdfCompilerEnabled: Boolean = get(UDF_COMPILER_ENABLED)

lazy val exportColumnarRdd: Boolean = get(EXPORT_COLUMNAR_RDD)

lazy val isIncompatEnabled: Boolean = get(INCOMPATIBLE_OPS)
Expand Down
25 changes: 25 additions & 0 deletions udf-compiler/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
UDF Compiler
============

How to run tests
----------------

From `rapids-plugin-4-spark` root directory, use this command to run the `OpcodeSuite`:

```
mvn test -DwildcardSuites=com.nvidia.spark.OpcodeSuite
```

How to run spark shell
----------------------

To run the spark-shell, you need a `SPARK_HOME`, the `cudf-0.15-SNAPSHOT-cuda10-1.jar`, and the jars produced in the plugin. The cudf jar will be downloaded when mvn test (or package) is run into the ~/.m2 directory. It's easy to get the jar from this directory, and place somewhere accessible. In the case below, the cudf jar is assumed to be in a directory `$JARS`:

```
export SPARK_HOME=[your spark distribution directory]
export JARS=[path to cudf 0.15 jar]
$SPARK_HOME/bin/spark-shell \
--jars $JARS/cudf-0.15-SNAPSHOT-cuda10-1.jar,udf-compiler/target/rapids-4-spark-udf-0.2.0-SNAPSHOT.jar,sql-plugin/target/rapids-4-spark-sql_2.12-0.2.0-SNAPSHOT.jar \
--conf spark.sql.extensions="com.nvidia.spark.SQLPlugin,com.nvidia.spark.udf.Plugin"
```
137 changes: 137 additions & 0 deletions udf-compiler/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright (c) 2020, NVIDIA CORPORATION.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-parent</artifactId>
<version>0.2.0-SNAPSHOT</version>
</parent>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-udf</artifactId>
<version>0.2.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-catalyst_${scala.binary.version}</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-sql_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ai.rapids</groupId>
<artifactId>cudf</artifactId>
<classifier>${cuda.version}</classifier>
</dependency>
</dependencies>

<build>
<resources>
<resource>
<!-- Include the properties file to provide the build information. -->
<directory>${project.build.directory}/extra-resources</directory>
<filtering>true</filtering>
</resource>
<resource>
<directory>${project.basedir}/..</directory>
<targetPath>META-INF</targetPath>
<includes>
<!-- The NOTICE will be taken care of by the antrun task below -->
<include>LICENSE</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>copy-notice</id>
<goals>
<goal>run</goal>
</goals>
<phase>process-resources</phase>
<configuration>
<target>
<!-- copy NOTICE-binary to NOTICE -->
<copy
todir="${project.build.directory}/classes/META-INF/"
verbose="true">
<fileset dir="${project.basedir}/..">
<include name="NOTICE-binary"/>
</fileset>
<mapper type="glob" from="*-binary" to="*"/>
</copy>
</target>
</configuration>
</execution>
</executions>
</plugin>
<!-- disable surefire as we are using scalatest only -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
59 changes: 59 additions & 0 deletions udf-compiler/src/main/scala/com/nvidia/spark/udf/Plugin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.udf

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSessionExtensions
import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule

import com.nvidia.spark.rapids.RapidsConf

class Plugin extends Function1[SparkSessionExtensions, Unit] with Logging {
override def apply(extensions: SparkSessionExtensions): Unit = {
logWarning("Installing rapids UDF compiler extensions to Spark. The compiler is disabled" +
s" by default. To enable it, set `${RapidsConf.UDF_COMPILER_ENABLED}` to true")
extensions.injectResolutionRule(_ => LogicalPlanRules())
}
}

case class LogicalPlanRules() extends Rule[LogicalPlan] with Logging {
def replacePartialFunc(plan: LogicalPlan): PartialFunction[Expression, Expression] = {
case d: Expression => attemptToReplaceExpression(plan, d)
}

def attemptToReplaceExpression(plan: LogicalPlan, exp: Expression): Expression = {
exp
}

override def apply(plan: LogicalPlan): LogicalPlan = {
val conf = new RapidsConf(plan.conf)
if (conf.isUdfCompilerEnabled) {
plan match {
case project: Project =>
Project(project.projectList.map(e => attemptToReplaceExpression(plan, e))
.asInstanceOf[Seq[NamedExpression]], project.child)
case x => {
x.transformExpressions(replacePartialFunc(plan))
}
}
} else {
plan
}
}
}
28 changes: 28 additions & 0 deletions udf-compiler/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#
# Copyright (c) 2019, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootCategory=INFO, file
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=true
log4j.appender.file.file=target/surefire-reports/scala-test-detailed-output.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n

#Just warnings for the console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
63 changes: 63 additions & 0 deletions udf-compiler/src/test/scala/com/nvidia/spark/OpcodeSuite.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright (c) 2019-2020, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions.{udf => makeUdf}
import org.apache.spark.sql.{Dataset, SparkSession}
import org.scalatest.FunSuite

import org.scalatest.Assertions._
import org.apache.spark.sql.functions.{log => logalias}
import java.nio.charset.Charset

import com.nvidia.spark.rapids.RapidsConf



class OpcodeSuite extends FunSuite {

val conf: SparkConf = new SparkConf()
.set("spark.sql.extensions", "com.nvidia.spark.udf.Plugin")
.set(RapidsConf.EXPLAIN.key, "true")

val spark: SparkSession =
SparkSession.builder()
.master("local[1]")
.appName("OpcodeSuite")
.config(conf)
.getOrCreate()

import spark.implicits._

// Utility Function for checking equivalency of Dataset type
def checkEquiv[T](ds1: Dataset[T], ds2: Dataset[T]) : Unit = {
ds1.explain(true)
ds2.explain(true)
val resultdf = ds1.toDF()
val refdf = ds2.toDF()
ds1.show
ds2.show
val columns = refdf.schema.fields.map(_.name)
val selectiveDifferences = columns.map(col => refdf.select(col).except(resultdf.select(col)))
selectiveDifferences.map(diff => { assert(diff.count==0) } )
ds1.show
ds2.show
}
}

0 comments on commit b183122

Please sign in to comment.