Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support row-based Hive UDFs [databricks] #4224

Merged
merged 9 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions aggregator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,25 @@
<pattern>org.apache.hadoop.hive.</pattern>
<shadedPattern>${rapids.shade.package}.hadoop.hive.</shadedPattern>
<excludes>
<!--
Class exclusions for Hive UDFs, to avoid the ClassNotFoundException,
For example:
E Caused by: java.lang.ClassNotFoundException: com.nvidia.shaded.spark.hadoop.hive.serde2.objectinspector.ObjectInspector
E at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
-->
<exclude>org.apache.hadoop.hive.conf.HiveConf</exclude>
<exclude>org.apache.hadoop.hive.ql.exec.FunctionRegistry</exclude>
<exclude>org.apache.hadoop.hive.ql.exec.UDF</exclude>
<exclude>org.apache.hadoop.hive.ql.exec.UDFMethodResolver</exclude>
<exclude>org.apache.hadoop.hive.ql.udf.UDFType</exclude>
<exclude>org.apache.hadoop.hive.ql.udf.generic.GenericUDF</exclude>
<exclude>org.apache.hadoop.hive.ql.udf.generic.GenericUDF$DeferredObject</exclude>
<exclude>org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper</exclude>
<exclude>org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector</exclude>
<exclude>org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory</exclude>
<exclude>org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory$ObjectInspectorOptions</exclude>
<exclude>org.apache.hadoop.hive.serde2.objectinspector.StructField</exclude>
<exclude>org.apache.hadoop.hive.serde2.typeinfo.TypeInfo</exclude>
</excludes>
</relocation>
<relocation>
Expand Down
4 changes: 2 additions & 2 deletions docs/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ Name | SQL Function(s) | Description | Default Value | Notes
<a name="sql.expression.VarianceSamp"></a>spark.rapids.sql.expression.VarianceSamp|`var_samp`, `variance`|Aggregation computing sample variance|true|None|
<a name="sql.expression.NormalizeNaNAndZero"></a>spark.rapids.sql.expression.NormalizeNaNAndZero| |Normalize NaN and zero|true|None|
<a name="sql.expression.ScalarSubquery"></a>spark.rapids.sql.expression.ScalarSubquery| |Subquery that will return only one row and one column|true|None|
<a name="sql.expression.HiveGenericUDF"></a>spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None|
<a name="sql.expression.HiveSimpleUDF"></a>spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None|
<a name="sql.expression.HiveGenericUDF"></a>spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance|true|None|
<a name="sql.expression.HiveSimpleUDF"></a>spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance|true|None|

### Execution

Expand Down
4 changes: 2 additions & 2 deletions docs/supported_ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -16149,7 +16149,7 @@ are limited.
<tr>
<td rowSpan="2">HiveGenericUDF</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface</td>
<td rowSpan="2">Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>param</td>
Expand Down Expand Up @@ -16196,7 +16196,7 @@ are limited.
<tr>
<td rowSpan="2">HiveSimpleUDF</td>
<td rowSpan="2"> </td>
<td rowSpan="2">Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface</td>
<td rowSpan="2">Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance</td>
<td rowSpan="2">None</td>
<td rowSpan="2">project</td>
<td>param</td>
Expand Down
43 changes: 43 additions & 0 deletions integration_tests/src/main/python/row-based_udf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest

from asserts import assert_gpu_and_cpu_are_equal_sql
from data_gen import *
from spark_session import with_spark_session
from rapids_udf_test import skip_if_no_hive, load_hive_udf_or_skip_test

def test_hive_empty_simple_udf():
with_spark_session(skip_if_no_hive)
data_gens = [["i", int_gen], ["s", string_gen]]
def evalfn(spark):
load_hive_udf_or_skip_test(spark, "emptysimple", "com.nvidia.spark.rapids.udf.hive.EmptyHiveSimpleUDF")
return gen_df(spark, data_gens)
assert_gpu_and_cpu_are_equal_sql(
evalfn,
"hive_simple_udf_test_table",
"SELECT i, emptysimple(s) FROM hive_simple_udf_test_table",
conf={'spark.rapids.sql.rowBasedUDF.enabled': 'true'})

def test_hive_empty_generic_udf():
with_spark_session(skip_if_no_hive)
def evalfn(spark):
load_hive_udf_or_skip_test(spark, "emptygeneric", "com.nvidia.spark.rapids.udf.hive.EmptyHiveGenericUDF")
return gen_df(spark, [["s", string_gen]])
assert_gpu_and_cpu_are_equal_sql(
evalfn,
"hive_generic_udf_test_table",
"SELECT emptygeneric(s) FROM hive_generic_udf_test_table",
conf={'spark.rapids.sql.rowBasedUDF.enabled': 'true'})
6 changes: 6 additions & 0 deletions sql-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.rapids

import com.nvidia.spark.RapidsUDF
import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, RepeatingParamCheck, ShimLoader, TypeSig}
import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, RapidsConf, RepeatingParamCheck, ShimLoader, TypeSig}
import com.nvidia.spark.rapids.GpuUserDefinedFunction.udfTypeSig

import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -45,59 +45,86 @@ object GpuHiveOverrides {

Seq(
GpuOverrides.expr[HiveSimpleUDF](
"Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface",
"Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to" +
" get better performance",
ExprChecks.projectOnly(
udfTypeSig,
TypeSig.all,
repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new ExprMeta[HiveSimpleUDF](a, conf, p, r) {
private val opRapidsFunc = a.function match {
case rapidsUDF: RapidsUDF => Some(rapidsUDF)
case _ => None
}

override def tagExprForGpu(): Unit = {
a.function match {
case _: RapidsUDF =>
case _ =>
willNotWorkOnGpu(s"Hive UDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation")
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive SimpleUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
}
}

override def convertToGpu(): GpuExpression = {
// To avoid adding a Hive dependency just to check if the UDF function is deterministic,
// we use the original HiveSimpleUDF `deterministic` method as a proxy.
GpuHiveSimpleUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()),
a.dataType,
a.deterministic)
opRapidsFunc.map { _ =>
// We use the original HiveGenericUDF `deterministic` method as a proxy
// for simplicity.
GpuHiveSimpleUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()),
a.dataType,
a.deterministic)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveSimpleUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()))
}
}
}),
GpuOverrides.expr[HiveGenericUDF](
"Hive Generic UDF, support requires the UDF to implement a " +
"RAPIDS accelerated interface",
"Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to" +
" get better performance",
ExprChecks.projectOnly(
udfTypeSig,
TypeSig.all,
repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))),
(a, conf, p, r) => new ExprMeta[HiveGenericUDF](a, conf, p, r) {
private val opRapidsFunc = a.function match {
case rapidsUDF: RapidsUDF => Some(rapidsUDF)
case _ => None
}

override def tagExprForGpu(): Unit = {
a.function match {
case _: RapidsUDF =>
case _ =>
willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation")
if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) {
willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " +
s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " +
s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`")
}
}

override def convertToGpu(): GpuExpression = {
// To avoid adding a Hive dependency just to check if the UDF function is deterministic,
// we use the original HiveGenericUDF `deterministic` method as a proxy.
GpuHiveGenericUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()),
a.dataType,
a.deterministic,
a.foldable)
opRapidsFunc.map { _ =>
// We use the original HiveGenericUDF `deterministic` method as a proxy
// for simplicity.
GpuHiveGenericUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()),
a.dataType,
a.deterministic,
a.foldable)
}.getOrElse {
// This `require` is just for double check.
require(conf.isCpuBasedUDFEnabled)
GpuRowBasedHiveGenericUDF(
a.name,
a.funcWrapper,
childExprs.map(_.convertToGpu()))
}
}
})
).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright (c) 2021, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.rapids

import scala.collection.JavaConverters._

import com.nvidia.spark.rapids.GpuRowBasedUserDefinedFunction
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, UDF}
import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType}
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper
import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspectorFactory}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, SpecializedGetters}
import org.apache.spark.sql.hive.{DeferredObjectAdapter, HiveInspectors}
import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
import org.apache.spark.sql.types.DataType

/** Common implementation across row-based Hive UDFs */
trait GpuRowBasedHiveUDFBase extends GpuRowBasedUserDefinedFunction with HiveInspectors {
val funcWrapper: HiveFunctionWrapper

@transient
val function: AnyRef

override val udfDeterministic: Boolean = {
val udfType = function.getClass.getAnnotation(classOf[HiveUDFType])
udfType != null && udfType.deterministic() && !udfType.stateful()
}

override final val checkNull: Boolean = false

override def nullable: Boolean = true

override def toString: String = {
s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})"
}

override def prettyName: String = name

@transient
protected lazy val childRowAccessors: Array[SpecializedGetters => Any] =
children.zipWithIndex.map { case (child, i) =>
val accessor = InternalRow.getAccessor(child.dataType, child.nullable)
row: SpecializedGetters => accessor(row, i)
}.toArray

@transient
protected lazy val argumentInspectors = children.map(toInspector)
}

/** Row-based version of Spark's `HiveSimpleUDF` running in a GPU operation */
case class GpuRowBasedHiveSimpleUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]) extends GpuRowBasedHiveUDFBase {

@scala.annotation.nowarn("msg=class UDF in package exec is deprecated")
@transient
override lazy val function: UDF = funcWrapper.createFunction[UDF]()

@transient
private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray

@transient
private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length)

@transient
private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray

@transient
private lazy val method =
function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava)

// Create parameter converters
@transient
private lazy val conversionHelper = new ConversionHelper(method, argumentInspectors.toArray)

@transient
private lazy val unwrapper = unwrapperFor(
ObjectInspectorFactory.getReflectionObjectInspector(
method.getGenericReturnType, ObjectInspectorOptions.JAVA))

override protected def evaluateRow(childrenRow: InternalRow): Any = {
val inputs = wrap(childRowAccessors.map(_(childrenRow)), wrappers, cached, inputDataTypes)
val ret = FunctionRegistry.invoke(
method,
function,
conversionHelper.convertIfNecessary(inputs : _*): _*)
unwrapper(ret)
}

override lazy val dataType: DataType = javaTypeToDataType(method.getGenericReturnType)

override def foldable: Boolean = udfDeterministic && children.forall(_.foldable)

override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})"
}

/** Row-based version of Spark's `HiveGenericUDF` running in a GPU operation */
case class GpuRowBasedHiveGenericUDF(
name: String,
funcWrapper: HiveFunctionWrapper,
children: Seq[Expression]) extends GpuRowBasedHiveUDFBase {

@transient
override lazy val function: GenericUDF = funcWrapper.createFunction[GenericUDF]()

@transient
private lazy val returnInspector =
function.initializeAndFoldConstants(argumentInspectors.toArray)

@transient
private lazy val deferredObjects = argumentInspectors.zip(children).map {
case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType)
}.toArray

@transient
private lazy val unwrapper = unwrapperFor(returnInspector)

override protected def evaluateRow(childrenRow: InternalRow): Any = {
returnInspector // Make sure initialized.

var i = 0
val length = children.length
while (i < length) {
val idx = i
deferredObjects(i).set(() => childRowAccessors(idx)(childrenRow))
i += 1
}
unwrapper(function.evaluate(deferredObjects.asInstanceOf[Array[DeferredObject]]))
}

override lazy val dataType: DataType = inspectorToDataType(returnInspector)

override def foldable: Boolean =
udfDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector]
}
Loading