diff --git a/integration_tests/src/test/scala/com/nvidia/spark/rapids/UnaryOperatorsSuite.scala b/integration_tests/src/test/scala/com/nvidia/spark/rapids/UnaryOperatorsSuite.scala index af45fb3a447..a352fa91297 100644 --- a/integration_tests/src/test/scala/com/nvidia/spark/rapids/UnaryOperatorsSuite.scala +++ b/integration_tests/src/test/scala/com/nvidia/spark/rapids/UnaryOperatorsSuite.scala @@ -35,4 +35,8 @@ class UnaryOperatorsSuite extends SparkQueryCompareTestSuite { frame => frame.selectExpr("md5(strings)", "md5(cast(ints as string))", "md5(cast(longs as binary))") } + + testSparkResultsAreEqual("Test murmur3", mixedDfWithNulls) { + frame => frame.selectExpr("hash(longs, doubles, 1, null, 'stock string', ints, strings)") + } } diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 70258460a15..3840c0a5be0 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -2003,6 +2003,19 @@ object GpuOverrides { (a, conf, p, r) => new ComplexTypeMergingExprMeta[Concat](a, conf, p, r) { override def convertToGpu(child: Seq[Expression]): GpuExpression = GpuConcat(child) }), + expr[Murmur3Hash] ( + "Murmur3 hash operator", + ExprChecks.projectNotLambda(TypeSig.INT, TypeSig.INT, + repeatingParamCheck = Some(RepeatingParamCheck("input", + TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + + TypeSig.FLOAT + TypeSig.DOUBLE + TypeSig.STRING + TypeSig.NULL, + TypeSig.BOOLEAN + TypeSig.BYTE + TypeSig.SHORT + TypeSig.INT + TypeSig.LONG + + TypeSig.FLOAT + TypeSig.DOUBLE + TypeSig.STRING + TypeSig.NULL))), + (a, conf, p, r) => new ExprMeta[Murmur3Hash](a, conf, p, r) { + override val childExprs: Seq[BaseExprMeta[_]] = a.children + .map(GpuOverrides.wrapExpr(_, conf, Some(this))) + def convertToGpu(): GpuExpression = GpuMurmur3Hash(childExprs.map(_.convertToGpu())) + }), expr[Contains]( "Contains", ExprChecks.binaryProjectNotLambda(TypeSig.BOOLEAN, TypeSig.BOOLEAN, diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala index ec4b449975f..c99bc8681fe 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/HashFunctions.scala @@ -16,11 +16,16 @@ package org.apache.spark.sql.rapids -import ai.rapids.cudf.{BinaryOp, ColumnVector, DType} -import com.nvidia.spark.rapids.{GpuColumnVector, GpuUnaryExpression} +import scala.collection.mutable.ArrayBuffer + +import ai.rapids.cudf.{BinaryOp, ColumnVector, ColumnView, Scalar} +import com.nvidia.spark.rapids.{GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression} +import com.nvidia.spark.rapids.RapidsPluginImplicits._ import org.apache.spark.sql.catalyst.expressions.{Expression, ImplicitCastInputTypes, NullIntolerant} import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized.ColumnarBatch +import org.apache.spark.unsafe.types.UTF8String case class GpuMd5(child: Expression) extends GpuUnaryExpression with ImplicitCastInputTypes with NullIntolerant { @@ -34,3 +39,32 @@ case class GpuMd5(child: Expression) } } } + +case class GpuMurmur3Hash(child: Seq[Expression]) extends GpuExpression { + override def dataType: DataType = IntegerType + + override def toString: String = s"hash($child)" + def nullable: Boolean = children.exists(_.nullable) + def children: Seq[Expression] = child + + def columnarEval(batch: ColumnarBatch): Any = { + val rows = batch.numRows() + val columns: ArrayBuffer[ColumnVector] = new ArrayBuffer[ColumnVector]() + try { + children.foreach { child => child.columnarEval(batch) match { + case vector: GpuColumnVector => + columns += vector.getBase + case col => if (col != null) { + withResource(GpuScalar.from(col)) { scalarValue => + columns += ai.rapids.cudf.ColumnVector.fromScalar(scalarValue, rows) + } + } + } + } + GpuColumnVector.from( + ColumnVector.spark32BitMurmurHash3(42, columns.toArray[ColumnView]), dataType) + } finally { + columns.safeClose() + } + } +}