From d6be1081bc8fe7fe66eddde8fb5c648892bc0d89 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 1 Feb 2021 10:36:06 -0600 Subject: [PATCH] RAPIDS accelerated Spark Scala UDF support (#1636) Signed-off-by: Jason Lowe --- docs/FAQ.md | 6 +- docs/additional-functionality/rapids-udfs.md | 59 +++++++----- docs/configs.md | 5 +- docs/supported_ops.md | 94 ++++++++++++++++++- integration_tests/README.md | 8 +- integration_tests/pom.xml | 2 +- .../src/main/python/rapids_udf_test.py | 6 +- jenkins/databricks/build.sh | 4 +- jenkins/spark-tests.sh | 6 +- .../main/java/com/nvidia/spark/RapidsUDF.java | 4 +- .../nvidia/spark/rapids/GpuOverrides.scala | 6 +- .../spark/rapids/GpuUserDefinedFunction.scala | 82 ++++++++++++++++ .../sql/hive/rapids/GpuHiveOverrides.scala | 9 +- .../spark/sql/hive/rapids/hiveUDFs.scala | 56 ++--------- .../apache/spark/sql/rapids/GpuScalaUDF.scala | 68 ++++++++++++++ .../sql/rapids/execution/TrampolineUtil.scala | 7 +- tests/pom.xml | 6 ++ .../nvidia/spark/rapids/ScalaUDFSuite.scala | 37 ++++++++ udf-examples/README.md | 8 +- udf-examples/pom.xml | 16 +++- .../udf/{ => hive}/StringWordCount.java | 2 +- .../rapids/udf/{ => hive}/URLDecode.java | 6 +- .../rapids/udf/{ => hive}/URLEncode.java | 9 +- .../spark/rapids/udf/scala/URLDecode.scala | 69 ++++++++++++++ .../spark/rapids/udf/scala/URLEncode.scala | 50 ++++++++++ 25 files changed, 507 insertions(+), 118 deletions(-) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala create mode 100644 sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala create mode 100644 tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDFSuite.scala rename udf-examples/src/main/java/com/nvidia/spark/rapids/udf/{ => hive}/StringWordCount.java (98%) rename udf-examples/src/main/java/com/nvidia/spark/rapids/udf/{ => hive}/URLDecode.java (94%) rename udf-examples/src/main/java/com/nvidia/spark/rapids/udf/{ => hive}/URLEncode.java (94%) create mode 100644 udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala create mode 100644 udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLEncode.scala diff --git a/docs/FAQ.md b/docs/FAQ.md index 761731e2432..3ca0d3f5194 100644 --- a/docs/FAQ.md +++ b/docs/FAQ.md @@ -252,10 +252,10 @@ can throw at it. The RAPIDS Accelerator provides the following solutions for running user-defined functions on the GPU: -#### RAPIDS-Accelerated UDFs +#### RAPIDS Accelerated UDFs -UDFs can provide a RAPIDS-accelerated implementation which allows the RAPIDS Accelerator to perform -the operation on the GPU. See the [RAPIDS-accelerated UDF documentation](additional-functionality/rapids-udfs.md) +UDFs can provide a RAPIDS accelerated implementation which allows the RAPIDS Accelerator to perform +the operation on the GPU. See the [RAPIDS accelerated UDF documentation](additional-functionality/rapids-udfs.md) for details. #### Automatic Translation of Scala UDFs to Apache Spark Operations diff --git a/docs/additional-functionality/rapids-udfs.md b/docs/additional-functionality/rapids-udfs.md index 031a9404571..6e99f5f2be8 100644 --- a/docs/additional-functionality/rapids-udfs.md +++ b/docs/additional-functionality/rapids-udfs.md @@ -1,12 +1,12 @@ --- layout: page -title: RAPIDS-Accelerated User-Defined Functions +title: RAPIDS Accelerated User-Defined Functions parent: Additional Functionality nav_order: 3 --- -# RAPIDS-Accelerated User-Defined Functions +# RAPIDS Accelerated User-Defined Functions -This document describes how UDFs can provide a RAPIDS-accelerated +This document describes how UDFs can provide a RAPIDS accelerated implementation alongside the CPU implementation, enabling the RAPIDS Accelerator to perform the user-defined operation on the GPU. @@ -28,26 +28,26 @@ with the RAPIDS Accelerator. This implementation can then be invoked by the RAPIDS Accelerator when a corresponding query step using the UDF executes on the GPU. -## Limitations of RAPIDS-Accelerated UDFs +## Limitations of RAPIDS Accelerated UDFs -The RAPIDS Accelerator only supports RAPIDS-accelerated forms of regular -Hive UDFs. Other forms of Spark UDFs are not supported, such as: +The RAPIDS Accelerator only supports RAPIDS accelerated forms of the +following UDF types: +- Scala UDFs implementing a `Function` interface and registered via `SparkSession.udf.register` +- [Simple](https://github.com/apache/hive/blob/cb213d88304034393d68cc31a95be24f5aac62b6/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java) + or + [Generic](https://github.com/apache/hive/blob/cb213d88304034393d68cc31a95be24f5aac62b6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java) + Hive UDFs + +Other forms of Spark UDFs are not supported, such as: +- Spark Java UDFs (i.e.: derived from `org.apache.spark.sql.api.java.UDF`* interfaces) - Hive Aggregate Function (UDAF) - Hive Tabular Function (UDTF) -- Lambda functions and others registered via `SparkSession.udf` -- Functions created with `org.apache.spark.sql.functions.udf` - -## Adding GPU Implementations to Hive UDFs +- Lambda functions -As mentioned in the [Limitations](#limitations-of-rapids-accelerated-udfs) -section, the RAPIDS Accelerator only detects GPU implementations for Hive -regular UDFs. The Hive UDF can be either -[simple](https://github.com/apache/hive/blob/cb213d88304034393d68cc31a95be24f5aac62b6/ql/src/java/org/apache/hadoop/hive/ql/exec/UDF.java) -or -[generic](https://github.com/apache/hive/blob/cb213d88304034393d68cc31a95be24f5aac62b6/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDF.java). +## Adding GPU Implementations to UDFs -The RAPIDS Accelerator will detect a GPU implementation if the UDF class -implements the +For supported UDFs, the RAPIDS Accelerator will detect a GPU implementation +if the UDF class implements the [RapidsUDF](../sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java) interface. This interface requires implementing the following method: @@ -71,7 +71,7 @@ must not make any assumptions on the number of input rows. #### Scalar Inputs -Passing scalar inputs to a RAPIDS-accelerated UDF is supported with +Passing scalar inputs to a RAPIDS accelerated UDF is supported with limitations. The scalar value will be replicated into a full column before being passed to `evaluateColumnar`. Therefore the UDF implementation cannot easily detect the difference between a scalar input and a columnar input. @@ -92,19 +92,30 @@ cudf type to match the result type of the original UDF. For example, if the CPU UDF returns a `double` then `evaluateColumnar` must return a column of type `FLOAT64`. -### RAPIDS-Accelerated Hive UDF Examples +## RAPIDS Accelerated UDF Examples -Source code for examples of RAPIDS-accelerated Hive UDFs is provided +Source code for examples of RAPIDS accelerated Hive UDFs is provided in the [udf-examples](../udf-examples) project. -- [URLDecode](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLDecode.java) +### Spark Scala UDF Examples + +- [URLDecode](../udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala) +decodes URL-encoded strings using the +[Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable) +- [URLEncode](../udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLEncode.scala) +URL-encodes strings using the +[Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable) + +### Hive UDF Examples + +- [URLDecode](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLDecode.java) implements a Hive simple UDF using the [Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable) to decode URL-encoded strings -- [URLEncode](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLEncode.java) +- [URLEncode](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLEncode.java) implements a Hive generic UDF using the [Java APIs of RAPIDS cudf](https://docs.rapids.ai/api/cudf-java/stable) to URL-encode strings -- [StringWordCount](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/StringWordCount.java) +- [StringWordCount](../udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/StringWordCount.java) implements a Hive simple UDF using [native code](../udf-examples/src/main/cpp/src) to count words in strings diff --git a/docs/configs.md b/docs/configs.md index 501e5075ff0..3e3c85ade51 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -204,6 +204,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Rint|`rint`|Rounds up a double value to the nearest double equal to an integer|true|None| spark.rapids.sql.expression.Round|`round`|Round an expression to d decimal places using HALF_UP rounding mode|true|None| spark.rapids.sql.expression.RowNumber|`row_number`|Window function that returns the index for the row within the aggregation window|true|None| +spark.rapids.sql.expression.ScalaUDF| |User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface|true|None| spark.rapids.sql.expression.Second|`second`|Returns the second component of the string/timestamp|true|None| spark.rapids.sql.expression.ShiftLeft|`shiftleft`|Bitwise shift left (<<)|true|None| spark.rapids.sql.expression.ShiftRight|`shiftright`|Bitwise shift right (>>)|true|None| @@ -254,8 +255,8 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.Min|`min`|Min aggregate operator|true|None| spark.rapids.sql.expression.Sum|`sum`|Sum aggregate operator|true|None| spark.rapids.sql.expression.NormalizeNaNAndZero| |Normalize NaN and zero|true|None| -spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, support requires the UDF to implement a RAPIDS-accelerated interface|true|None| -spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, support requires the UDF to implement a RAPIDS-accelerated interface|true|None| +spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None| +spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None| ### Execution diff --git a/docs/supported_ops.md b/docs/supported_ops.md index eda2868183e..9f94938b593 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -10973,6 +10973,96 @@ Accelerator support is described below. +ScalaUDF + +User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface +None +project +param +S +S +S +S +S +S +S +S +S* +S +S* +S +S +S +PS* (missing nested UDT) +PS* (missing nested UDT) +PS* (missing nested UDT) +NS + + +result +S +S +S +S +S +S +S +S +S* +S +S* +S +S +S +PS* (missing nested UDT) +PS* (missing nested UDT) +PS* (missing nested UDT) +NS + + +lambda +param +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS + + +result +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS +NS + + Second `second` Returns the second component of the string/timestamp @@ -16778,7 +16868,7 @@ Accelerator support is described below. HiveGenericUDF -Hive Generic UDF, support requires the UDF to implement a RAPIDS-accelerated interface +Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface None project param @@ -16868,7 +16958,7 @@ Accelerator support is described below. HiveSimpleUDF -Hive UDF, support requires the UDF to implement a RAPIDS-accelerated interface +Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface None project param diff --git a/integration_tests/README.md b/integration_tests/README.md index 62dbcdabc2a..9ebeb507d3c 100644 --- a/integration_tests/README.md +++ b/integration_tests/README.md @@ -107,7 +107,7 @@ individually, so you don't risk running unit tests along with the integration te http://www.scalatest.org/user_guide/using_the_scalatest_shell ```shell -spark-shell --jars rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT-tests.jar,rapids-4-spark-udf-examples-0.4.0-SNAPSHOT,rapids-4-spark-integration-tests_2.12-0.4.0-SNAPSHOT-tests.jar,scalatest_2.12-3.0.5.jar,scalactic_2.12-3.0.5.jar +spark-shell --jars rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT-tests.jar,rapids-4-spark-udf-examples_2.12-0.4.0-SNAPSHOT,rapids-4-spark-integration-tests_2.12-0.4.0-SNAPSHOT-tests.jar,scalatest_2.12-3.0.5.jar,scalactic_2.12-3.0.5.jar ``` First you import the `scalatest_shell` and tell the tests where they can find the test files you @@ -131,7 +131,7 @@ If you just want to verify the SQL replacement is working you will need to add t example assumes CUDA 10.1 is being used. ``` -$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar" ./runtests.py +$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples_2.12-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar" ./runtests.py ``` You don't have to enable the plugin for this to work, the test framework will do that for you. @@ -183,7 +183,7 @@ The TPCxBB, TPCH, TPCDS, and Mortgage tests in this framework can be enabled by As an example, here is the `spark-submit` command with the TPCxBB parameters on CUDA 10.1: ``` -$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar,rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT.jar" ./runtests.py --tpcxbb_format="csv" --tpcxbb_path="/path/to/tpcxbb/csv" +$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples_2.12-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar,rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT.jar" ./runtests.py --tpcxbb_format="csv" --tpcxbb_path="/path/to/tpcxbb/csv" ``` Be aware that running these tests with read data requires at least an entire GPU, and preferable several GPUs/executors @@ -212,7 +212,7 @@ To run cudf_udf tests, need following configuration changes: As an example, here is the `spark-submit` command with the cudf_udf parameter on CUDA 10.1: ``` -$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar,rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT.jar" --conf spark.rapids.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.concurrentPythonWorkers=2 --py-files "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar" --conf spark.executorEnv.PYTHONPATH="rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar" ./runtests.py --cudf_udf +$SPARK_HOME/bin/spark-submit --jars "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar,rapids-4-spark-udf-examples_2.12-0.4.0-SNAPSHOT.jar,cudf-0.18-SNAPSHOT-cuda10-1.jar,rapids-4-spark-tests_2.12-0.4.0-SNAPSHOT.jar" --conf spark.rapids.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.memory.gpu.allocFraction=0.3 --conf spark.rapids.python.concurrentPythonWorkers=2 --py-files "rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar" --conf spark.executorEnv.PYTHONPATH="rapids-4-spark_2.12-0.4.0-SNAPSHOT.jar" ./runtests.py --cudf_udf ``` ## Writing tests diff --git a/integration_tests/pom.xml b/integration_tests/pom.xml index c068a2f8b64..ff1c4a8a6e1 100644 --- a/integration_tests/pom.xml +++ b/integration_tests/pom.xml @@ -107,7 +107,7 @@ com.nvidia - rapids-4-spark-udf-examples + rapids-4-spark-udf-examples_${scala.binary.version} ${project.version} test diff --git a/integration_tests/src/main/python/rapids_udf_test.py b/integration_tests/src/main/python/rapids_udf_test.py index 9134db89680..d96ec286495 100644 --- a/integration_tests/src/main/python/rapids_udf_test.py +++ b/integration_tests/src/main/python/rapids_udf_test.py @@ -36,7 +36,7 @@ def test_hive_simple_udf(): with_spark_session(skip_if_no_hive) data_gens = [["i", int_gen], ["s", StringGen('([^%]{0,1}(%[0-9A-F][0-9A-F]){0,1}){0,30}')]] def evalfn(spark): - load_udf_or_skip_test(spark, "urldecode", "com.nvidia.spark.rapids.udf.URLDecode") + load_udf_or_skip_test(spark, "urldecode", "com.nvidia.spark.rapids.udf.hive.URLDecode") return gen_df(spark, data_gens) assert_gpu_and_cpu_are_equal_sql( evalfn, @@ -47,7 +47,7 @@ def test_hive_generic_udf(): with_spark_session(skip_if_no_hive) data_gens = [["s", StringGen('.{0,30}')]] def evalfn(spark): - load_udf_or_skip_test(spark, "urlencode", "com.nvidia.spark.rapids.udf.URLEncode") + load_udf_or_skip_test(spark, "urlencode", "com.nvidia.spark.rapids.udf.hive.URLEncode") return gen_df(spark, data_gens) assert_gpu_and_cpu_are_equal_sql( evalfn, @@ -59,7 +59,7 @@ def test_hive_simple_udf_native(enable_rapids_udf_example_native): with_spark_session(skip_if_no_hive) data_gens = [["s", StringGen('.{0,30}')]] def evalfn(spark): - load_udf_or_skip_test(spark, "wordcount", "com.nvidia.spark.rapids.udf.StringWordCount") + load_udf_or_skip_test(spark, "wordcount", "com.nvidia.spark.rapids.udf.hive.StringWordCount") return gen_df(spark, data_gens) assert_gpu_and_cpu_are_equal_sql( evalfn, diff --git a/jenkins/databricks/build.sh b/jenkins/databricks/build.sh index a41bf94563e..f285e243122 100755 --- a/jenkins/databricks/build.sh +++ b/jenkins/databricks/build.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2020-2021, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -46,7 +46,7 @@ CUDA_VERSION=`mvn help:evaluate -q -pl dist -Dexpression=cuda.version -DforceStd # the version of spark used when we install the databricks jars in .m2 SPARK_VERSION_TO_INSTALL_DATABRICKS_JARS=$BASE_SPARK_VERSION-databricks RAPIDS_BUILT_JAR=rapids-4-spark_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar -RAPIDS_UDF_JAR=rapids-4-spark-udf-examples-$SPARK_PLUGIN_JAR_VERSION.jar +RAPIDS_UDF_JAR=rapids-4-spark-udf-examples_$SCALA_VERSION-$SPARK_PLUGIN_JAR_VERSION.jar echo "Scala version is: $SCALA_VERSION" mvn -B -P${BUILD_PROFILES} clean package -DskipTests || true diff --git a/jenkins/spark-tests.sh b/jenkins/spark-tests.sh index 0537559a6d7..077f30af77b 100755 --- a/jenkins/spark-tests.sh +++ b/jenkins/spark-tests.sh @@ -1,6 +1,6 @@ #!/bin/bash # -# Copyright (c) 2019-2020, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2019-2021, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ $MVN_GET_CMD -DremoteRepositories=$CUDF_REPO \ $MVN_GET_CMD -DremoteRepositories=$PROJECT_REPO \ -DgroupId=com.nvidia -DartifactId=rapids-4-spark_$SCALA_BINARY_VER -Dversion=$PROJECT_VER $MVN_GET_CMD -DremoteRepositories=$PROJECT_TEST_REPO \ - -DgroupId=com.nvidia -DartifactId=rapids-4-spark-udf-examples -Dversion=$PROJECT_TEST_VER + -DgroupId=com.nvidia -DartifactId=rapids-4-spark-udf-examples_$SCALA_BINARY_VER -Dversion=$PROJECT_TEST_VER $MVN_GET_CMD -DremoteRepositories=$PROJECT_TEST_REPO \ -DgroupId=com.nvidia -DartifactId=rapids-4-spark-integration-tests_$SCALA_BINARY_VER -Dversion=$PROJECT_TEST_VER if [ "$CUDA_CLASSIFIER"x == x ];then @@ -42,7 +42,7 @@ else CUDF_JAR="$ARTF_ROOT/cudf-$CUDF_VER-$CUDA_CLASSIFIER.jar" fi RAPIDS_PLUGIN_JAR="$ARTF_ROOT/rapids-4-spark_${SCALA_BINARY_VER}-$PROJECT_VER.jar" -RAPIDS_UDF_JAR="$ARTF_ROOT/rapids-4-spark-udf-examples-$PROJECT_TEST_VER.jar" +RAPIDS_UDF_JAR="$ARTF_ROOT/rapids-4-spark-udf-examples_${SCALA_BINARY_VER}-$PROJECT_TEST_VER.jar" RAPIDS_TEST_JAR="$ARTF_ROOT/rapids-4-spark-integration-tests_${SCALA_BINARY_VER}-$PROJECT_TEST_VER.jar" $MVN_GET_CMD -DremoteRepositories=$PROJECT_TEST_REPO \ diff --git a/sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java b/sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java index af4354c11e1..6c2a871da6a 100644 --- a/sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java +++ b/sql-plugin/src/main/java/com/nvidia/spark/RapidsUDF.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,7 +18,7 @@ import ai.rapids.cudf.ColumnVector; -/** A RAPIDS-accelerated version of a user-defined function (UDF). */ +/** A RAPIDS accelerated version of a user-defined function (UDF). */ public interface RapidsUDF { /** * Evaluate a user-defined function with RAPIDS cuDF columnar inputs diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index ee20c10f9b1..e35e276fed4 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2192,11 +2192,13 @@ object GpuOverrides { (a, conf, p, r) => new UnaryExprMeta[MakeDecimal](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuMakeDecimal(child, a.precision, a.scale, a.nullOnOverflow) - }) + }), + GpuScalaUDF.exprMeta ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + // Shim expressions should be last to allow overrides with shim-specific versions val expressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = - commonExpressions ++ ShimLoader.getSparkShims.getExprs ++ GpuHiveOverrides.exprs + commonExpressions ++ GpuHiveOverrides.exprs ++ ShimLoader.getSparkShims.getExprs def wrapScan[INPUT <: Scan]( diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala new file mode 100644 index 00000000000..178fd655331 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuUserDefinedFunction.scala @@ -0,0 +1,82 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.{NvtxColor, NvtxRange} +import com.nvidia.spark.RapidsUDF +import com.nvidia.spark.rapids.RapidsPluginImplicits._ + +import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.expressions.{Expression, UserDefinedExpression} +import org.apache.spark.sql.rapids.execution.TrampolineUtil +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** Common implementation across all RAPIDS accelerated UDF types */ +trait GpuUserDefinedFunction extends GpuExpression with UserDefinedExpression with Serializable { + /** name of the UDF function */ + val name: String + + /** User's UDF instance */ + val function: RapidsUDF + + /** True if the UDF is deterministic */ + val udfDeterministic: Boolean + + override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) + + private[this] val nvtxRangeName = s"UDF: $name" + private[this] lazy val funcCls = TrampolineUtil.getSimpleName(function.getClass) + private[this] lazy val inputTypesString = children.map(_.dataType.catalogString).mkString(", ") + private[this] lazy val outputType = dataType.catalogString + + private[this] def evalExpr(expr: Expression, batch: ColumnarBatch): GpuColumnVector = { + expr.columnarEval(batch) match { + case v: GpuColumnVector => v + case other => + withResource(GpuScalar.from(other, expr.dataType)) { s => + GpuColumnVector.from(s, batch.numRows(), expr.dataType) + } + } + } + + override def columnarEval(batch: ColumnarBatch): Any = { + withResource(children.safeMap(evalExpr(_, batch))) { exprResults => + val funcInputs = exprResults.map(_.getBase()).toArray + withResource(new NvtxRange(nvtxRangeName, NvtxColor.PURPLE)) { _ => + try { + closeOnExcept(function.evaluateColumnar(funcInputs: _*)) { resultColumn => + if (batch.numRows() != resultColumn.getRowCount) { + throw new IllegalStateException("UDF returned a different row count than the " + + s"input, expected ${batch.numRows} found ${resultColumn.getRowCount}") + } + GpuColumnVector.fromChecked(resultColumn, dataType) + } + } catch { + case e: Exception => + throw new SparkException("Failed to execute user defined function " + + s"($funcCls: ($inputTypesString) => $outputType)", e) + } + } + } + } +} + +object GpuUserDefinedFunction { + // UDFs can support all types except UDT which does not have a clear columnar representation. + val udfTypeSig: TypeSig = (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + + TypeSig.BINARY + TypeSig.CALENDAR + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested() +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala index 2821f350cf5..c259936464d 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala @@ -18,15 +18,12 @@ package org.apache.spark.sql.hive.rapids import com.nvidia.spark.RapidsUDF import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, RepeatingParamCheck, TypeSig} +import com.nvidia.spark.rapids.GpuUserDefinedFunction.udfTypeSig import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hive.{HiveGenericUDF, HiveSimpleUDF} object GpuHiveOverrides { - // UDFs can support all types except UDT which does not have a clear columnar representation. - private val udfTypeSig = (TypeSig.commonCudfTypes + TypeSig.DECIMAL + TypeSig.NULL + - TypeSig.BINARY + TypeSig.CALENDAR + TypeSig.ARRAY + TypeSig.MAP + TypeSig.STRUCT).nested() - def isSparkHiveAvailable: Boolean = { // Using the same approach as SparkSession.hiveClassesArePresent val loader = Thread.currentThread().getContextClassLoader @@ -50,7 +47,7 @@ object GpuHiveOverrides { Seq( GpuOverrides.expr[HiveSimpleUDF]( - "Hive UDF, support requires the UDF to implement a RAPIDS-accelerated interface", + "Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface", ExprChecks.projectNotLambda( udfTypeSig, TypeSig.all, @@ -78,7 +75,7 @@ object GpuHiveOverrides { }), GpuOverrides.expr[HiveGenericUDF]( "Hive Generic UDF, support requires the UDF to implement a " + - "RAPIDS-accelerated interface", + "RAPIDS accelerated interface", ExprChecks.projectNotLambda( udfTypeSig, TypeSig.all, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala index 6fd6d646adb..fe941f30bb2 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/hiveUDFs.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,66 +16,28 @@ package org.apache.spark.sql.hive.rapids -import ai.rapids.cudf.{NvtxColor, NvtxRange} import com.nvidia.spark.RapidsUDF -import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar} -import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.GpuUserDefinedFunction import org.apache.hadoop.hive.ql.exec.UDF import org.apache.hadoop.hive.ql.udf.generic.GenericUDF -import org.apache.spark.sql.catalyst.expressions.{Expression, UserDefinedExpression} +import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper import org.apache.spark.sql.types.DataType -import org.apache.spark.sql.vectorized.ColumnarBatch -abstract class GpuHiveUDFBase( - name: String, - funcWrapper: HiveFunctionWrapper, - children: Seq[Expression], - dataType: DataType, - udfDeterministic: Boolean) extends GpuExpression with UserDefinedExpression with Serializable { - - private[this] val nvtxRangeName = "UDF: " + name - - override lazy val deterministic: Boolean = udfDeterministic && children.forall(_.deterministic) +/** Common implementation across Hive UDFs */ +trait GpuHiveUDFBase extends GpuUserDefinedFunction { + val funcWrapper: HiveFunctionWrapper override def nullable: Boolean = true override def foldable: Boolean = udfDeterministic && children.forall(_.foldable) - @transient - val function: RapidsUDF - override def toString: String = { s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" } override def prettyName: String = name - - private[this] def evalExpr(expr: Expression, batch: ColumnarBatch): GpuColumnVector = { - expr.columnarEval(batch) match { - case v: GpuColumnVector => v - case other => - withResource(GpuScalar.from(other, expr.dataType)) { s => - GpuColumnVector.from(s, batch.numRows(), expr.dataType) - } - } - } - - override def columnarEval(batch: ColumnarBatch): Any = { - withResource(children.safeMap(evalExpr(_, batch))) { exprResults => - val funcInputs = exprResults.map(_.getBase()).toArray - withResource(new NvtxRange(nvtxRangeName, NvtxColor.PURPLE)) { _ => - closeOnExcept(function.evaluateColumnar(funcInputs: _*)) { resultColumn => - if (batch.numRows() != resultColumn.getRowCount) { - throw new IllegalStateException("UDF returned a different row count than the input, " + - s"expected ${batch.numRows} found ${resultColumn.getRowCount}") - } - GpuColumnVector.fromChecked(resultColumn, dataType) - } - } - } - } } /** GPU-accelerated version of Spark's `HiveSimpleUDF` */ @@ -84,8 +46,7 @@ case class GpuHiveSimpleUDF( funcWrapper: HiveFunctionWrapper, children: Seq[Expression], dataType: DataType, - udfDeterministic: Boolean) - extends GpuHiveUDFBase(name, funcWrapper, children, dataType, udfDeterministic) { + udfDeterministic: Boolean) extends GpuHiveUDFBase { @transient override lazy val function: RapidsUDF = funcWrapper.createFunction[UDF]().asInstanceOf[RapidsUDF] @@ -99,8 +60,7 @@ case class GpuHiveGenericUDF( children: Seq[Expression], dataType: DataType, udfDeterministic: Boolean, - override val foldable: Boolean) - extends GpuHiveUDFBase(name, funcWrapper, children, dataType, udfDeterministic) { + override val foldable: Boolean) extends GpuHiveUDFBase { @transient override lazy val function: RapidsUDF = funcWrapper.createFunction[GenericUDF]() .asInstanceOf[RapidsUDF] diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala new file mode 100644 index 00000000000..10e0ac80240 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/GpuScalaUDF.scala @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids + +import com.nvidia.spark.RapidsUDF +import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, GpuUserDefinedFunction, RepeatingParamCheck, TypeSig} +import com.nvidia.spark.rapids.GpuUserDefinedFunction.udfTypeSig + +import org.apache.spark.sql.catalyst.expressions.{Expression, ScalaUDF} +import org.apache.spark.sql.types.DataType + +case class GpuScalaUDF( + function: RapidsUDF, + dataType: DataType, + children: Seq[Expression], + udfName: Option[String], + nullable: Boolean, + udfDeterministic: Boolean) extends GpuUserDefinedFunction { + override def toString: String = s"${udfName.getOrElse("UDF")}(${children.mkString(", ")})" + + /** name of the UDF function */ + override val name: String = udfName.getOrElse("???") +} + +object GpuScalaUDF { + def exprMeta: ExprRule[ScalaUDF] = GpuOverrides.expr[ScalaUDF]( + "User Defined Function, support requires the UDF to implement a RAPIDS accelerated interface", + ExprChecks.projectNotLambda( + udfTypeSig, + TypeSig.all, + repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))), + (a, conf, p, r) => new ExprMeta[ScalaUDF](a, conf, p, r) { + override def tagExprForGpu(): Unit = { + a.function match { + case _: RapidsUDF => + case _ => + val udfName = a.udfName.getOrElse("UDF") + val udfClass = a.function.getClass + willNotWorkOnGpu(s"$udfName implemented by $udfClass does not provide " + + "a GPU implementation") + } + } + + override def convertToGpu(): GpuExpression = { + GpuScalaUDF( + a.function.asInstanceOf[RapidsUDF], + a.dataType, + childExprs.map(_.convertToGpu()), + a.udfName, + a.nullable, + a.udfDeterministic) + } + }) +} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala index abbcec5ec8b..db978716c64 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/TrampolineUtil.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019, NVIDIA CORPORATION. + * Copyright (c) 2019-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -108,4 +108,9 @@ object TrampolineUtil { def incBytesRead(inputMetrics: InputMetrics, bytesRead: Long): Unit = { inputMetrics.incBytesRead(bytesRead) } + + /** Get the simple name of a class with fixup for any Scala internal errors */ + def getSimpleName(cls: Class[_]): String = { + Utils.getSimpleName(cls) + } } diff --git a/tests/pom.xml b/tests/pom.xml index 7c22c4045b3..146f74368f7 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -102,6 +102,12 @@ ${project.version} test + + com.nvidia + rapids-4-spark-udf-examples_${scala.binary.version} + ${project.version} + test + org.mockito mockito-core diff --git a/tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDFSuite.scala b/tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDFSuite.scala new file mode 100644 index 00000000000..1ebb5356032 --- /dev/null +++ b/tests/src/test/scala/com/nvidia/spark/rapids/ScalaUDFSuite.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import com.nvidia.spark.rapids.udf.scala.{URLDecode, URLEncode} + +import org.apache.spark.sql.functions.col + +class ScalaUDFSuite extends SparkQueryCompareTestSuite { + testSparkResultsAreEqual("Scala urldecode", nullableStringsFromCsv) { frame => + // This is a basic smoke-test of the Scala UDF framework, not an + // exhaustive test of the specific UDF implementation itself. + val urldecode = frame.sparkSession.udf.register("urldecode", new URLDecode()) + frame.select(urldecode(col("strings"))) + } + + testSparkResultsAreEqual("Scala urlencode", nullableStringsFromCsv) { frame => + // This is a basic smoke-test of the Scala UDF framework, not an + // exhaustive test of the specific UDF implementation itself. + val urlencode = frame.sparkSession.udf.register("urlencode", new URLEncode()) + frame.select(urlencode(col("strings"))) + } +} diff --git a/udf-examples/README.md b/udf-examples/README.md index 1feea61fa2b..532fc5db739 100644 --- a/udf-examples/README.md +++ b/udf-examples/README.md @@ -1,9 +1,9 @@ -# RAPIDS-Accelerated UDF Examples +# RAPIDS Accelerated UDF Examples -This project contains sample implementations of RAPIDS-accelerated +This project contains sample implementations of RAPIDS accelerated user-defined functions. See the -[RAPIDS-accelerated UDF documentation](../docs/rapids-udfs.md) for details -on how RAPIDS-accelerated UDFs work and guidelines for creating them. +[RAPIDS accelerated UDF documentation](../docs/additional-functionality/rapids-udfs.md) for details +on how RAPIDS accelerated UDFs work and guidelines for creating them. ## Building the Native Code Examples diff --git a/udf-examples/pom.xml b/udf-examples/pom.xml index 700cd7b4549..2ae790c4756 100644 --- a/udf-examples/pom.xml +++ b/udf-examples/pom.xml @@ -25,9 +25,9 @@ 0.4.0-SNAPSHOT com.nvidia - rapids-4-spark-udf-examples + rapids-4-spark-udf-examples_2.12 RAPIDS Accelerator for Apache Spark UDF Examples - Sample implementations of RAPIDS-accelerated + Sample implementations of RAPIDS accelerated user defined functions for use with the RAPIDS Accelerator for Apache Spark 0.4.0-SNAPSHOT @@ -76,6 +76,18 @@ + + net.alchim31.maven + scala-maven-plugin + + + org.scalastyle + scalastyle-maven-plugin + + + org.apache.rat + apache-rat-plugin + diff --git a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/StringWordCount.java b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/StringWordCount.java similarity index 98% rename from udf-examples/src/main/java/com/nvidia/spark/rapids/udf/StringWordCount.java rename to udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/StringWordCount.java index fa7d64912f9..f5bf236c563 100644 --- a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/StringWordCount.java +++ b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/StringWordCount.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.udf; +package com.nvidia.spark.rapids.udf.hive; import ai.rapids.cudf.ColumnVector; import ai.rapids.cudf.DType; diff --git a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLDecode.java b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLDecode.java similarity index 94% rename from udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLDecode.java rename to udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLDecode.java index 90277315440..4aa1dae0af7 100644 --- a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLDecode.java +++ b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLDecode.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.udf; +package com.nvidia.spark.rapids.udf.hive; import ai.rapids.cudf.ColumnVector; import ai.rapids.cudf.DType; @@ -26,7 +26,7 @@ import java.net.URLDecoder; /** - * A user-defined function (UDF) that decodes URL-encoded strings. + * A Hive user-defined function (UDF) that decodes URL-encoded strings. * This class demonstrates how to implement a simple Hive UDF that also * provides a RAPIDS implementation that can run on the GPU when the query * is executed with the RAPIDS Accelerator for Apache Spark. diff --git a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLEncode.java b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLEncode.java similarity index 94% rename from udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLEncode.java rename to udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLEncode.java index 0fb6bef0511..61b05d83309 100644 --- a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/URLEncode.java +++ b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/URLEncode.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020, NVIDIA CORPORATION. + * Copyright (c) 2020-2021, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -14,11 +14,10 @@ * limitations under the License. */ -package com.nvidia.spark.rapids.udf; +package com.nvidia.spark.rapids.udf.hive; import ai.rapids.cudf.ColumnVector; import ai.rapids.cudf.DType; -import ai.rapids.cudf.Scalar; import com.nvidia.spark.RapidsUDF; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -33,7 +32,7 @@ import java.net.URLEncoder; /** - * A user-defined function (UDF) that URL-encodes strings. + * A Hive user-defined function (UDF) that URL-encodes strings. * This class demonstrates how to implement a Hive GenericUDF that also * provides a RAPIDS implementation that can run on the GPU when the query * is executed with the RAPIDS Accelerator for Apache Spark. @@ -73,7 +72,7 @@ public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumen /** Row-by-row implementation that executes on the CPU */ @Override - public Object evaluate(DeferredObject[] arguments) throws HiveException { + public Object evaluate(GenericUDF.DeferredObject[] arguments) throws HiveException { Text text = converter.convert(arguments[0].get()); if (text == null) { return null; diff --git a/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala b/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala new file mode 100644 index 00000000000..e99e1bdfa2a --- /dev/null +++ b/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLDecode.scala @@ -0,0 +1,69 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.udf.scala + +import java.net.URLDecoder + +import ai.rapids.cudf.{ColumnVector, DType, Scalar} +import com.nvidia.spark.RapidsUDF + +/** + * A Scala user-defined function (UDF) that decodes URL-encoded strings. + * This class demonstrates how to implement a Scala UDF that also + * provides a RAPIDS implementation that can run on the GPU when the query + * is executed with the RAPIDS Accelerator for Apache Spark. + */ +class URLDecode extends Function[String, String] with RapidsUDF with Serializable { + /** Row-by-row implementation that executes on the CPU */ + override def apply(s: String): String = { + Option(s).map { s => + try { + URLDecoder.decode(s, "utf-8") + } catch { + case _: IllegalArgumentException => s + } + }.orNull + } + + /** Columnar implementation that runs on the GPU */ + override def evaluateColumnar(args: ColumnVector*): ColumnVector = { + // The CPU implementation takes a single string argument, so similarly + // there should only be one column argument of type STRING. + require(args.length == 1, s"Unexpected argument count: ${args.length}") + val input = args.head + require(input.getType == DType.STRING, s"Argument type is not a string: ${input.getType}") + + // The cudf urlDecode does not convert '+' to a space, so do that as a pre-pass first. + // All intermediate results are closed to avoid leaking GPU resources. + val plusScalar = Scalar.fromString("+") + try { + val spaceScalar = Scalar.fromString(" ") + try { + val replaced = input.stringReplace(plusScalar, spaceScalar) + try { + replaced.urlDecode() + } finally { + replaced.close() + } + } finally { + spaceScalar.close() + } + } finally { + plusScalar.close() + } + } +} diff --git a/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLEncode.scala b/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLEncode.scala new file mode 100644 index 00000000000..95bae373645 --- /dev/null +++ b/udf-examples/src/main/scala/com/nvidia/spark/rapids/udf/scala/URLEncode.scala @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.udf.scala + +import java.net.URLEncoder + +import ai.rapids.cudf.{ColumnVector, DType} +import com.nvidia.spark.RapidsUDF + +/** + * A Scala user-defined function (UDF) that URL-encodes strings. + * This class demonstrates how to implement a Scala UDF that also + * provides a RAPIDS implementation that can run on the GPU when the query + * is executed with the RAPIDS Accelerator for Apache Spark. + */ +class URLEncode extends Function[String, String] with RapidsUDF with Serializable { + /** Row-by-row implementation that executes on the CPU */ + override def apply(s: String): String = { + Option(s).map { s => + URLEncoder.encode(s, "utf-8") + .replace("+", "%20") + .replace("*", "%2A") + .replace("%7E", "~") + }.orNull + } + + /** Columnar implementation that runs on the GPU */ + override def evaluateColumnar(args: ColumnVector*): ColumnVector = { + // The CPU implementation takes a single string argument, so similarly + // there should only be one column argument of type STRING. + require(args.length == 1, s"Unexpected argument count: ${args.length}") + val input = args.head + require(input.getType == DType.STRING, s"Argument type is not a string: ${input.getType}") + input.urlEncode() + } +}