diff --git a/aggregator/pom.xml b/aggregator/pom.xml index 2ced15b918d..9a429628975 100644 --- a/aggregator/pom.xml +++ b/aggregator/pom.xml @@ -100,9 +100,25 @@ org.apache.hadoop.hive. ${rapids.shade.package}.hadoop.hive. + org.apache.hadoop.hive.conf.HiveConf + org.apache.hadoop.hive.ql.exec.FunctionRegistry org.apache.hadoop.hive.ql.exec.UDF + org.apache.hadoop.hive.ql.exec.UDFMethodResolver + org.apache.hadoop.hive.ql.udf.UDFType org.apache.hadoop.hive.ql.udf.generic.GenericUDF + org.apache.hadoop.hive.ql.udf.generic.GenericUDF$DeferredObject + org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils$ConversionHelper + org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector + org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory + org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory$ObjectInspectorOptions + org.apache.hadoop.hive.serde2.objectinspector.StructField + org.apache.hadoop.hive.serde2.typeinfo.TypeInfo diff --git a/docs/configs.md b/docs/configs.md index c099987fed6..1bcb56a77c0 100644 --- a/docs/configs.md +++ b/docs/configs.md @@ -332,8 +332,8 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.VarianceSamp|`var_samp`, `variance`|Aggregation computing sample variance|true|None| spark.rapids.sql.expression.NormalizeNaNAndZero| |Normalize NaN and zero|true|None| spark.rapids.sql.expression.ScalarSubquery| |Subquery that will return only one row and one column|true|None| -spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None| -spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface|true|None| +spark.rapids.sql.expression.HiveGenericUDF| |Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance|true|None| +spark.rapids.sql.expression.HiveSimpleUDF| |Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance|true|None| ### Execution diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 360e1e3dda7..080ef00e469 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -16149,7 +16149,7 @@ are limited. HiveGenericUDF -Hive Generic UDF, support requires the UDF to implement a RAPIDS accelerated interface +Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance None project param @@ -16196,7 +16196,7 @@ are limited. HiveSimpleUDF -Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface +Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to get better performance None project param diff --git a/integration_tests/src/main/python/row-based_udf_test.py b/integration_tests/src/main/python/row-based_udf_test.py new file mode 100644 index 00000000000..45ae6704746 --- /dev/null +++ b/integration_tests/src/main/python/row-based_udf_test.py @@ -0,0 +1,43 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from asserts import assert_gpu_and_cpu_are_equal_sql +from data_gen import * +from spark_session import with_spark_session +from rapids_udf_test import skip_if_no_hive, load_hive_udf_or_skip_test + +def test_hive_empty_simple_udf(): + with_spark_session(skip_if_no_hive) + data_gens = [["i", int_gen], ["s", string_gen]] + def evalfn(spark): + load_hive_udf_or_skip_test(spark, "emptysimple", "com.nvidia.spark.rapids.udf.hive.EmptyHiveSimpleUDF") + return gen_df(spark, data_gens) + assert_gpu_and_cpu_are_equal_sql( + evalfn, + "hive_simple_udf_test_table", + "SELECT i, emptysimple(s) FROM hive_simple_udf_test_table", + conf={'spark.rapids.sql.rowBasedUDF.enabled': 'true'}) + +def test_hive_empty_generic_udf(): + with_spark_session(skip_if_no_hive) + def evalfn(spark): + load_hive_udf_or_skip_test(spark, "emptygeneric", "com.nvidia.spark.rapids.udf.hive.EmptyHiveGenericUDF") + return gen_df(spark, [["s", string_gen]]) + assert_gpu_and_cpu_are_equal_sql( + evalfn, + "hive_generic_udf_test_table", + "SELECT emptygeneric(s) FROM hive_generic_udf_test_table", + conf={'spark.rapids.sql.rowBasedUDF.enabled': 'true'}) diff --git a/sql-plugin/pom.xml b/sql-plugin/pom.xml index 9111fb89927..67f88916989 100644 --- a/sql-plugin/pom.xml +++ b/sql-plugin/pom.xml @@ -154,6 +154,12 @@ ${spark.version} provided + + org.apache.hive + hive-serde + ${spark.version} + provided + org.apache.spark spark-hive_${scala.binary.version} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala index 6384461e0d1..770fcb028fe 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/GpuHiveOverrides.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.rapids import com.nvidia.spark.RapidsUDF -import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, RepeatingParamCheck, ShimLoader, TypeSig} +import com.nvidia.spark.rapids.{ExprChecks, ExprMeta, ExprRule, GpuExpression, GpuOverrides, RapidsConf, RepeatingParamCheck, ShimLoader, TypeSig} import com.nvidia.spark.rapids.GpuUserDefinedFunction.udfTypeSig import org.apache.spark.sql.catalyst.expressions.Expression @@ -45,59 +45,86 @@ object GpuHiveOverrides { Seq( GpuOverrides.expr[HiveSimpleUDF]( - "Hive UDF, support requires the UDF to implement a RAPIDS accelerated interface", + "Hive UDF, the UDF can choose to implement a RAPIDS accelerated interface to" + + " get better performance", ExprChecks.projectOnly( udfTypeSig, TypeSig.all, repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))), (a, conf, p, r) => new ExprMeta[HiveSimpleUDF](a, conf, p, r) { + private val opRapidsFunc = a.function match { + case rapidsUDF: RapidsUDF => Some(rapidsUDF) + case _ => None + } + override def tagExprForGpu(): Unit = { - a.function match { - case _: RapidsUDF => - case _ => - willNotWorkOnGpu(s"Hive UDF ${a.name} implemented by " + - s"${a.funcWrapper.functionClassName} does not provide a GPU implementation") + if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) { + willNotWorkOnGpu(s"Hive SimpleUDF ${a.name} implemented by " + + s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " + + s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`") } } override def convertToGpu(): GpuExpression = { - // To avoid adding a Hive dependency just to check if the UDF function is deterministic, - // we use the original HiveSimpleUDF `deterministic` method as a proxy. - GpuHiveSimpleUDF( - a.name, - a.funcWrapper, - childExprs.map(_.convertToGpu()), - a.dataType, - a.deterministic) + opRapidsFunc.map { _ => + // We use the original HiveGenericUDF `deterministic` method as a proxy + // for simplicity. + GpuHiveSimpleUDF( + a.name, + a.funcWrapper, + childExprs.map(_.convertToGpu()), + a.dataType, + a.deterministic) + }.getOrElse { + // This `require` is just for double check. + require(conf.isCpuBasedUDFEnabled) + GpuRowBasedHiveSimpleUDF( + a.name, + a.funcWrapper, + childExprs.map(_.convertToGpu())) + } } }), GpuOverrides.expr[HiveGenericUDF]( - "Hive Generic UDF, support requires the UDF to implement a " + - "RAPIDS accelerated interface", + "Hive Generic UDF, the UDF can choose to implement a RAPIDS accelerated interface to" + + " get better performance", ExprChecks.projectOnly( udfTypeSig, TypeSig.all, repeatingParamCheck = Some(RepeatingParamCheck("param", udfTypeSig, TypeSig.all))), (a, conf, p, r) => new ExprMeta[HiveGenericUDF](a, conf, p, r) { + private val opRapidsFunc = a.function match { + case rapidsUDF: RapidsUDF => Some(rapidsUDF) + case _ => None + } + override def tagExprForGpu(): Unit = { - a.function match { - case _: RapidsUDF => - case _ => - willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " + - s"${a.funcWrapper.functionClassName} does not provide a GPU implementation") + if (opRapidsFunc.isEmpty && !conf.isCpuBasedUDFEnabled) { + willNotWorkOnGpu(s"Hive GenericUDF ${a.name} implemented by " + + s"${a.funcWrapper.functionClassName} does not provide a GPU implementation " + + s"and CPU-based UDFs are not enabled by `${RapidsConf.ENABLE_CPU_BASED_UDF.key}`") } } override def convertToGpu(): GpuExpression = { - // To avoid adding a Hive dependency just to check if the UDF function is deterministic, - // we use the original HiveGenericUDF `deterministic` method as a proxy. - GpuHiveGenericUDF( - a.name, - a.funcWrapper, - childExprs.map(_.convertToGpu()), - a.dataType, - a.deterministic, - a.foldable) + opRapidsFunc.map { _ => + // We use the original HiveGenericUDF `deterministic` method as a proxy + // for simplicity. + GpuHiveGenericUDF( + a.name, + a.funcWrapper, + childExprs.map(_.convertToGpu()), + a.dataType, + a.deterministic, + a.foldable) + }.getOrElse { + // This `require` is just for double check. + require(conf.isCpuBasedUDFEnabled) + GpuRowBasedHiveGenericUDF( + a.name, + a.funcWrapper, + childExprs.map(_.convertToGpu())) + } } }) ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/rowBasedHiveUDFs.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/rowBasedHiveUDFs.scala new file mode 100644 index 00000000000..cc0ddfcbd86 --- /dev/null +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/hive/rapids/rowBasedHiveUDFs.scala @@ -0,0 +1,155 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.rapids + +import scala.collection.JavaConverters._ + +import com.nvidia.spark.rapids.GpuRowBasedUserDefinedFunction +import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, UDF} +import org.apache.hadoop.hive.ql.udf.{UDFType => HiveUDFType} +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF.DeferredObject +import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils.ConversionHelper +import org.apache.hadoop.hive.serde2.objectinspector.{ConstantObjectInspector, ObjectInspectorFactory} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory.ObjectInspectorOptions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Expression, SpecializedGetters} +import org.apache.spark.sql.hive.{DeferredObjectAdapter, HiveInspectors} +import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper +import org.apache.spark.sql.types.DataType + +/** Common implementation across row-based Hive UDFs */ +trait GpuRowBasedHiveUDFBase extends GpuRowBasedUserDefinedFunction with HiveInspectors { + val funcWrapper: HiveFunctionWrapper + + @transient + val function: AnyRef + + override val udfDeterministic: Boolean = { + val udfType = function.getClass.getAnnotation(classOf[HiveUDFType]) + udfType != null && udfType.deterministic() && !udfType.stateful() + } + + override final val checkNull: Boolean = false + + override def nullable: Boolean = true + + override def toString: String = { + s"$nodeName#${funcWrapper.functionClassName}(${children.mkString(",")})" + } + + override def prettyName: String = name + + @transient + protected lazy val childRowAccessors: Array[SpecializedGetters => Any] = + children.zipWithIndex.map { case (child, i) => + val accessor = InternalRow.getAccessor(child.dataType, child.nullable) + row: SpecializedGetters => accessor(row, i) + }.toArray + + @transient + protected lazy val argumentInspectors = children.map(toInspector) +} + +/** Row-based version of Spark's `HiveSimpleUDF` running in a GPU operation */ +case class GpuRowBasedHiveSimpleUDF( + name: String, + funcWrapper: HiveFunctionWrapper, + children: Seq[Expression]) extends GpuRowBasedHiveUDFBase { + + @scala.annotation.nowarn("msg=class UDF in package exec is deprecated") + @transient + override lazy val function: UDF = funcWrapper.createFunction[UDF]() + + @transient + private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), x.dataType)).toArray + + @transient + private lazy val cached: Array[AnyRef] = new Array[AnyRef](children.length) + + @transient + private lazy val inputDataTypes: Array[DataType] = children.map(_.dataType).toArray + + @transient + private lazy val method = + function.getResolver.getEvalMethod(children.map(_.dataType.toTypeInfo).asJava) + + // Create parameter converters + @transient + private lazy val conversionHelper = new ConversionHelper(method, argumentInspectors.toArray) + + @transient + private lazy val unwrapper = unwrapperFor( + ObjectInspectorFactory.getReflectionObjectInspector( + method.getGenericReturnType, ObjectInspectorOptions.JAVA)) + + override protected def evaluateRow(childrenRow: InternalRow): Any = { + val inputs = wrap(childRowAccessors.map(_(childrenRow)), wrappers, cached, inputDataTypes) + val ret = FunctionRegistry.invoke( + method, + function, + conversionHelper.convertIfNecessary(inputs : _*): _*) + unwrapper(ret) + } + + override lazy val dataType: DataType = javaTypeToDataType(method.getGenericReturnType) + + override def foldable: Boolean = udfDeterministic && children.forall(_.foldable) + + override def sql: String = s"$name(${children.map(_.sql).mkString(", ")})" +} + +/** Row-based version of Spark's `HiveGenericUDF` running in a GPU operation */ +case class GpuRowBasedHiveGenericUDF( + name: String, + funcWrapper: HiveFunctionWrapper, + children: Seq[Expression]) extends GpuRowBasedHiveUDFBase { + + @transient + override lazy val function: GenericUDF = funcWrapper.createFunction[GenericUDF]() + + @transient + private lazy val returnInspector = + function.initializeAndFoldConstants(argumentInspectors.toArray) + + @transient + private lazy val deferredObjects = argumentInspectors.zip(children).map { + case (inspect, child) => new DeferredObjectAdapter(inspect, child.dataType) + }.toArray + + @transient + private lazy val unwrapper = unwrapperFor(returnInspector) + + override protected def evaluateRow(childrenRow: InternalRow): Any = { + returnInspector // Make sure initialized. + + var i = 0 + val length = children.length + while (i < length) { + val idx = i + deferredObjects(i).set(() => childRowAccessors(idx)(childrenRow)) + i += 1 + } + unwrapper(function.evaluate(deferredObjects.asInstanceOf[Array[DeferredObject]])) + } + + override lazy val dataType: DataType = inspectorToDataType(returnInspector) + + override def foldable: Boolean = + udfDeterministic && returnInspector.isInstanceOf[ConstantObjectInspector] +} diff --git a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveGenericUDF.java b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveGenericUDF.java new file mode 100644 index 00000000000..b026936eb7e --- /dev/null +++ b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveGenericUDF.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.udf.hive; + +import org.apache.hadoop.hive.ql.exec.UDFArgumentException; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorConverter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.io.Text; + +/** An empty Hive GenericUDF returning the input directly for row-based UDF test only */ +public class EmptyHiveGenericUDF extends GenericUDF { + private transient PrimitiveObjectInspectorConverter.TextConverter converter; + private final Text textResult = new Text(); + + @Override + public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException { + if (arguments.length != 1) { + throw new UDFArgumentException("One argument is supported, but found: " + arguments.length); + } + if (!(arguments[0] instanceof PrimitiveObjectInspector)) { + throw new UDFArgumentException("Unsupported argument type: " + arguments[0].getTypeName()); + } + PrimitiveObjectInspector poi = (PrimitiveObjectInspector) arguments[0]; + converter = new PrimitiveObjectInspectorConverter.TextConverter(poi); + return PrimitiveObjectInspectorFactory.writableStringObjectInspector; + } + + @Override + public Object evaluate(DeferredObject[] deferredObjects) throws HiveException { + Text text = converter.convert(deferredObjects[0].get()); + textResult.set(text == null ? "" : text.toString()); + return textResult; + } + + @Override + public String getDisplayString(String[] strings) { + return getStandardDisplayString("empty", strings); + } +} diff --git a/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveSimpleUDF.java b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveSimpleUDF.java new file mode 100644 index 00000000000..43c494e2ecf --- /dev/null +++ b/udf-examples/src/main/java/com/nvidia/spark/rapids/udf/hive/EmptyHiveSimpleUDF.java @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2021, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.udf.hive; + +import org.apache.hadoop.hive.ql.exec.UDF; + +/** An empty Hive simple UDF returning the input directly for row-based UDF test only. */ +public class EmptyHiveSimpleUDF extends UDF { + public String evaluate(String in) { + return in; + } +}