From 705ca38e328ee7fbd52aa046b8a67da66353c186 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 10 Aug 2023 09:01:53 -0500 Subject: [PATCH 1/7] GPU support for DynamicPruningExpression Signed-off-by: Jason Lowe --- .../rapids/GpuDynamicPruningExpression.scala | 29 +++++++++++++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 8 +++++ 2 files changed, 37 insertions(+) create mode 100644 sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala new file mode 100644 index 00000000000..34ee4d6e5f2 --- /dev/null +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids + +import ai.rapids.cudf.ColumnVector + +import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, Expression} + +case class GpuDynamicPruningExpression(child: Expression) + extends GpuUnaryExpression with DynamicPruning { + + override protected def doColumnar(input: GpuColumnVector): ColumnVector = { + input.getBase.incRefCount() + } +} diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 86ddade8298..02b381cf00a 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3531,6 +3531,14 @@ object GpuOverrides extends Logging { (a, conf, p, r) => new UnaryExprMeta[RaiseError](a, conf, p, r) { override def convertToGpu(child: Expression): GpuExpression = GpuRaiseError(child) }), + expr[DynamicPruningExpression]( + "Dynamic pruning expression marker", + ExprChecks.unaryProject(TypeSig.all, TypeSig.all, TypeSig.BOOLEAN, TypeSig.BOOLEAN), + (a, conf, p, r) => new UnaryExprMeta[DynamicPruningExpression](a, conf, p, r) { + override def convertToGpu(child: Expression): GpuExpression = { + GpuDynamicPruningExpression(child) + } + }), SparkShimImpl.ansiCastRule ).collect { case r if r != null => (r.getClassFor.asSubclass(classOf[Expression]), r)}.toMap From 5719c32d5628736b5bb22025a59278dbe6780135 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Mon, 14 Aug 2023 09:05:47 -0500 Subject: [PATCH 2/7] Add support for InSubqueryExec Signed-off-by: Jason Lowe --- .../advanced_configs.md | 1 + docs/supported_ops.md | 463 ++++++++++-------- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../spark/rapids/shims/InSubqueryShims.scala | 38 ++ .../spark/rapids/GpuInSubqueryExec.scala | 113 +++++ .../spark/rapids/shims/InSubqueryShims.scala | 46 ++ .../sql/rapids/GpuInSubqueryExecSuite.scala | 72 +++ tools/generated_files/operatorsScore.csv | 1 + tools/generated_files/supportedExprs.csv | 2 + 9 files changed, 529 insertions(+), 209 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala create mode 100644 sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala create mode 100644 tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala diff --git a/docs/additional-functionality/advanced_configs.md b/docs/additional-functionality/advanced_configs.md index d2593788b67..e533cdac6fc 100644 --- a/docs/additional-functionality/advanced_configs.md +++ b/docs/additional-functionality/advanced_configs.md @@ -225,6 +225,7 @@ Name | SQL Function(s) | Description | Default Value | Notes spark.rapids.sql.expression.DayOfYear|`dayofyear`|Returns the day of the year from a date or timestamp|true|None| spark.rapids.sql.expression.DenseRank|`dense_rank`|Window function that returns the dense rank value within the aggregation window|true|None| spark.rapids.sql.expression.Divide|`/`|Division|true|None| +spark.rapids.sql.expression.DynamicPruningExpression| |Dynamic pruning expression marker|true|None| spark.rapids.sql.expression.ElementAt|`element_at`|Returns element of array at given(1-based) index in value if column is array. Returns value for the given key in value if column is map.|true|None| spark.rapids.sql.expression.EndsWith| |Ends with|true|None| spark.rapids.sql.expression.EqualNullSafe|`<=>`|Check if the values are equal including nulls <=>|true|None| diff --git a/docs/supported_ops.md b/docs/supported_ops.md index 1ebf37e95a1..7169e2ae361 100644 --- a/docs/supported_ops.md +++ b/docs/supported_ops.md @@ -5616,6 +5616,79 @@ are limited. +DynamicPruningExpression + +Dynamic pruning expression marker +None +project +input +S + + + + + + + + + + + + + + + + + + + +result +S +S +S +S +S +S +S +S +PS
UTC is only supported TZ for TIMESTAMP
+S +S +S +S +S +PS
UTC is only supported TZ for child TIMESTAMP
+PS
UTC is only supported TZ for child TIMESTAMP
+PS
UTC is only supported TZ for child TIMESTAMP
+S + + +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + ElementAt `element_at` Returns element of array at given(1-based) index in value if column is array. Returns value for the given key in value if column is map. @@ -5684,32 +5757,6 @@ are limited. NS -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - EndsWith Ends with @@ -5978,6 +6025,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Exp `exp` Euler's number e raised to a power @@ -6068,32 +6141,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Explode `explode`, `explode_outer` Given an input array produces a sequence of rows for each value in the array @@ -6393,6 +6440,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + FromUnixTime `from_unixtime` Get the string from a unix timestamp @@ -6461,32 +6534,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - GetArrayItem Gets the field at `ordinal` in the Array @@ -6785,6 +6832,32 @@ are limited. NS +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + GetTimestamp Gets timestamps from strings using given pattern. @@ -6853,32 +6926,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - GreaterThan `>` > operator @@ -7143,6 +7190,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Greatest `greatest` Returns the greatest value of all parameters, skipping null values @@ -7237,32 +7310,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Hypot `hypot` Pythagorean addition (Hypotenuse) of real numbers @@ -7535,6 +7582,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + InitCap `initcap` Returns str with the first letter of each word in uppercase. All other letters are in lowercase @@ -7608,32 +7681,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - InputFileBlockStart `input_file_block_start` Returns the start offset of the block being read, or -1 if not available @@ -7942,6 +7989,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + JsonTuple `json_tuple` Returns a tuple like the function get_json_object, but it takes multiple names. All the input parameters and output column types are string. @@ -8010,32 +8083,6 @@ are limited. -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - KnownFloatingPointNormalized Tag to prevent redundant normalization @@ -8334,6 +8381,32 @@ are limited. +Expression +SQL Functions(s) +Description +Notes +Context +Param/Output +BOOLEAN +BYTE +SHORT +INT +LONG +FLOAT +DOUBLE +DATE +TIMESTAMP +STRING +DECIMAL +NULL +BINARY +CALENDAR +ARRAY +MAP +STRUCT +UDT + + Lead `lead` Window function that returns N entries ahead of this one @@ -8423,32 +8496,6 @@ are limited. NS -Expression -SQL Functions(s) -Description -Notes -Context -Param/Output -BOOLEAN -BYTE -SHORT -INT -LONG -FLOAT -DOUBLE -DATE -TIMESTAMP -STRING -DECIMAL -NULL -BINARY -CALENDAR -ARRAY -MAP -STRUCT -UDT - - Least `least` Returns the least value of all parameters, skipping null values diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 02b381cf00a..568b39625dc 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -3546,7 +3546,7 @@ object GpuOverrides extends Logging { val expressions: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = commonExpressions ++ TimeStamp.getExprs ++ GpuHiveOverrides.exprs ++ ZOrderRules.exprs ++ DecimalArithmeticOverrides.exprs ++ - BloomFilterShims.exprs ++ SparkShimImpl.getExprs + BloomFilterShims.exprs ++ InSubqueryShims.exprs ++ SparkShimImpl.getExprs def wrapScan[INPUT <: Scan]( scan: INPUT, diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala new file mode 100644 index 00000000000..5cbdce8afde --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "321db" } +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids.ExprRule + +import org.apache.spark.sql.catalyst.expressions.Expression + + +object InSubqueryShims { + val exprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = Map.empty +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala new file mode 100644 index 00000000000..e9d668e947b --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala @@ -0,0 +1,113 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids + +import com.nvidia.spark.rapids.shims.ShimExpression + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.catalyst.expressions.{Expression, ExprId, Predicate} +import org.apache.spark.sql.execution.{BaseSubqueryExec, ExecSubqueryExpression, InSubqueryExec} +import org.apache.spark.sql.vectorized.ColumnarBatch + +/** + * GPU version of InSubqueryExec. + * Unlike Spark CPU version, result parameter is *not* transient. It only works on the CPU because + * the CPU generates code on the driver containing the result values, and that is what the executor + * uses to evaluate the expression. The GPU does not use code generation, so we must have the + * result value transferred to the executor in order to build the GpuInSet to perform the eval. + */ +case class GpuInSubqueryExec( + child: Expression, + plan: BaseSubqueryExec, + exprId: ExprId, + shouldBroadcast: Boolean, + private var resultBroadcast: Broadcast[Array[Any]], + private var result: Array[Any]) + extends ExecSubqueryExpression with Predicate with ShimExpression with GpuExpression { + + override def children: Seq[Expression] = Seq(child) + + @transient private lazy val inSet = GpuInSet(child, result) + + override def nullable: Boolean = child.nullable + override def toString: String = s"child IN ${plan.name}" + override def withNewPlan(plan: BaseSubqueryExec): GpuInSubqueryExec = copy(plan = plan) + + def updateResult(): Unit = { + val rows = plan.executeCollect() + result = if (plan.output.length > 1) { + rows.asInstanceOf[Array[Any]] + } else { + rows.map(_.get(0, child.dataType)) + } + if (shouldBroadcast) { + resultBroadcast = plan.session.sparkContext.broadcast(result) + // Set the result to null, since we should only be serializing data for either + // result or resultBroadcast to the executor, not both. + result = null + } + } + + private def prepareResult(): Unit = { + require(result != null || resultBroadcast != null, s"$this has not finished") + if (result == null && resultBroadcast != null) { + result = resultBroadcast.value + } + } + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + prepareResult() + inSet.columnarEval(batch) + } + + override lazy val canonicalized: GpuInSubqueryExec = { + copy( + child = child.canonicalized, + plan = plan.canonicalized.asInstanceOf[BaseSubqueryExec], + exprId = ExprId(0), + resultBroadcast = null, + result = null) + } +} + +class InSubqueryExecMeta( + expr: InSubqueryExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends ExprMeta(expr, conf, parent, rule) { + + override def convertToGpu(): GpuExpression = { + expr match { + case InSubqueryExec(_, plan, exprId, shouldBroadcast, resultBroadcast, result) => + val gpuChild = childExprs.head.convertToGpu() + GpuInSubqueryExec(gpuChild, plan, exprId, shouldBroadcast, resultBroadcast, result) + case e => throw new IllegalStateException(s"Unexpected CPU expression $e") + } + } +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala new file mode 100644 index 00000000000..247f51456e5 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala @@ -0,0 +1,46 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package com.nvidia.spark.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.InSubqueryExec + + +object InSubqueryShims { + val exprs: Map[Class[_ <: Expression], ExprRule[_ <: Expression]] = { + Seq( + GpuOverrides.expr[InSubqueryExec]( + "Evaluates to true if values are in a subquery's result set", + ExprChecks.unaryProject(TypeSig.BOOLEAN, TypeSig.BOOLEAN, + TypeSig.commonCudfTypes + TypeSig.NULL + TypeSig.DECIMAL_128, TypeSig.comparable), + (a, conf, p, r) => new InSubqueryExecMeta(a, conf, p, r)) + ).map(r => (r.getClassFor.asSubclass(classOf[Expression]), r)).toMap + } +} diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala new file mode 100644 index 00000000000..f5669298690 --- /dev/null +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "340"} +{"spark": "341"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids + +import com.nvidia.spark.rapids.{GpuOverrides, GpuTransitionOverrides, SparkQueryCompareTestSuite} + +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, ExprId} +import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LocalTableScanExec, ProjectExec, RowToColumnarExec, SparkPlan, SubqueryExec} + +class GpuInSubqueryExecSuite extends SparkQueryCompareTestSuite { + private def readToPhysicalPlan(df: DataFrame): SparkPlan = { + df.queryExecution.executedPlan.transformUp { + case s: LocalTableScanExec => RowToColumnarExec(s) + } + } + + private def subqueryTable(spark: SparkSession): DataFrame = { + import spark.sqlContext.implicits._ + Seq("400.0", "123.4").toDF("strings") + } + + private def buildCpuInSubqueryPlan(spark: SparkSession): SparkPlan = { + val df1ReadExec = readToPhysicalPlan(nullableStringsIntsDf(spark)) + val df2ReadExec = readToPhysicalPlan(subqueryTable(spark)) + val inSubquery = InSubqueryExec( + df1ReadExec.output.head, + SubqueryExec("sbe", + ProjectExec(Seq(df2ReadExec.output.head), df2ReadExec)), + ExprId(7)) + FilterExec(DynamicPruningExpression(inSubquery), df1ReadExec) + } + + test("InSubqueryExec") { + val gpuResults = withGpuSparkSession({ spark => + val overrides = new GpuOverrides() + val transitionOverrides = new GpuTransitionOverrides() + val gpuPlan = transitionOverrides(overrides(buildCpuInSubqueryPlan(spark))) + gpuPlan.execute().collect() + }) + assertResult(1)(gpuResults.length) + val row = gpuResults.head + assertResult(2)(row.numFields) + assertResult("400.0")(row.getString(0)) + assert(row.isNullAt(1)) + } +} diff --git a/tools/generated_files/operatorsScore.csv b/tools/generated_files/operatorsScore.csv index 235c33cca81..dbc46e26ed8 100644 --- a/tools/generated_files/operatorsScore.csv +++ b/tools/generated_files/operatorsScore.csv @@ -98,6 +98,7 @@ DayOfWeek,4 DayOfYear,4 DenseRank,4 Divide,4 +DynamicPruningExpression,4 ElementAt,4 EndsWith,4 EqualNullSafe,4 diff --git a/tools/generated_files/supportedExprs.csv b/tools/generated_files/supportedExprs.csv index 960ef3d4486..43801b67688 100644 --- a/tools/generated_files/supportedExprs.csv +++ b/tools/generated_files/supportedExprs.csv @@ -173,6 +173,8 @@ DenseRank,S,`dense_rank`,None,window,result,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA,NA,N Divide,S,`/`,None,project,lhs,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA Divide,S,`/`,None,project,rhs,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA Divide,S,`/`,None,project,result,NA,NA,NA,NA,NA,NA,S,NA,NA,NA,S,NA,NA,NA,NA,NA,NA,NA +DynamicPruningExpression,S, ,None,project,input,S,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA +DynamicPruningExpression,S, ,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,S,PS,PS,PS,S ElementAt,S,`element_at`,None,project,array/map,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,NA,PS,PS,NA,NA ElementAt,S,`element_at`,None,project,index/key,PS,PS,PS,S,PS,PS,PS,PS,PS,PS,PS,NS,NS,NS,NS,NS,NS,NS ElementAt,S,`element_at`,None,project,result,S,S,S,S,S,S,S,S,PS,S,S,S,S,NS,PS,PS,PS,NS From 0776b3ea8bae2cda6cb9252e4127b00f85ccea62 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 22 Aug 2023 11:54:30 -0500 Subject: [PATCH 3/7] Avoid replacing InSubqueryExec on Databricks platforms --- .../scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala | 2 ++ .../scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala | 2 -- .../org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala | 2 -- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala index 5cbdce8afde..bcf1a3ab34f 100644 --- a/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala +++ b/sql-plugin/src/main/spark311/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala @@ -25,6 +25,8 @@ {"spark": "322"} {"spark": "323"} {"spark": "324"} +{"spark": "330db"} +{"spark": "332db"} spark-rapids-shim-json-lines ***/ package com.nvidia.spark.rapids.shims diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala index 247f51456e5..33e2e4f1525 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/InSubqueryShims.scala @@ -17,10 +17,8 @@ /*** spark-rapids-shim-json-lines {"spark": "330"} {"spark": "330cdh"} -{"spark": "330db"} {"spark": "331"} {"spark": "332"} -{"spark": "332db"} {"spark": "333"} {"spark": "340"} {"spark": "341"} diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala index f5669298690..ca3f3e98bc0 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala @@ -17,10 +17,8 @@ /*** spark-rapids-shim-json-lines {"spark": "330"} {"spark": "330cdh"} -{"spark": "330db"} {"spark": "331"} {"spark": "332"} -{"spark": "332db"} {"spark": "333"} {"spark": "340"} {"spark": "341"} From d45ffd5d30b4cc100f6172287f75c3ddde7209eb Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 22 Aug 2023 13:55:50 -0500 Subject: [PATCH 4/7] Databricks build fix --- .../scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala index e9d668e947b..cf60249843a 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/GpuInSubqueryExec.scala @@ -17,10 +17,8 @@ /*** spark-rapids-shim-json-lines {"spark": "330"} {"spark": "330cdh"} -{"spark": "330db"} {"spark": "331"} {"spark": "332"} -{"spark": "332db"} {"spark": "333"} {"spark": "340"} {"spark": "341"} From 964967d80ee20f9e04b1065c5530100aaf37949f Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Tue, 22 Aug 2023 14:52:42 -0500 Subject: [PATCH 5/7] Add comments, update GpuDynamicPruningExpression to handle scalars --- .../spark/rapids/GpuDynamicPruningExpression.scala | 14 ++++++++++---- .../spark/sql/rapids/GpuInSubqueryExecSuite.scala | 13 +++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala index 34ee4d6e5f2..5ebe3db1cd9 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala @@ -16,14 +16,20 @@ package com.nvidia.spark.rapids -import ai.rapids.cudf.ColumnVector +import com.nvidia.spark.rapids.RapidsPluginImplicits._ +import com.nvidia.spark.rapids.shims.ShimUnaryExpression import org.apache.spark.sql.catalyst.expressions.{DynamicPruning, Expression} +import org.apache.spark.sql.vectorized.ColumnarBatch case class GpuDynamicPruningExpression(child: Expression) - extends GpuUnaryExpression with DynamicPruning { + extends ShimUnaryExpression with GpuExpression with DynamicPruning { - override protected def doColumnar(input: GpuColumnVector): ColumnVector = { - input.getBase.incRefCount() + override def columnarEvalAny(batch: ColumnarBatch): Unit = { + child.columnarEvalAny(batch) + } + + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = { + child.columnarEval(batch) } } diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala index ca3f3e98bc0..e1dc355c3fb 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala @@ -31,8 +31,21 @@ import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.catalyst.expressions.{DynamicPruningExpression, ExprId} import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LocalTableScanExec, ProjectExec, RowToColumnarExec, SparkPlan, SubqueryExec} +/** + * Testing for GpuInSubqeryExec. It is difficult to build a series of DataFrame + * operations, even at the logical plan level, that will reliably translate + * to an InSubqueryExec in the final physical CPU plan. Most Spark implementations + * turn this into some sort of join instead. Therefore these tests build low-level + * physical plans with InSubqueryExec and manually invoke the GPU optimization rules + * to make sure we're exercising GpuInSubqueryExec. + */ class GpuInSubqueryExecSuite extends SparkQueryCompareTestSuite { private def readToPhysicalPlan(df: DataFrame): SparkPlan = { + // Since we're building up the low-level plan manually, Spark won't + // automatically inject the columnar transitions for us. This adds + // a columnar transition to the local table scan which will + // remain on the CPU. When the GPU optimization rules later run, + // this will be turned into a GPU columnar transition. df.queryExecution.executedPlan.transformUp { case s: LocalTableScanExec => RowToColumnarExec(s) } From e8bcd42aed4ac0a3ad4bbf255cf6baf023e982ab Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 23 Aug 2023 09:07:16 -0500 Subject: [PATCH 6/7] Add test for broadcast mode --- .../sql/rapids/GpuInSubqueryExecSuite.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala index e1dc355c3fb..aa6dbc5caa5 100644 --- a/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala +++ b/tests/src/test/spark330/scala/org/apache/spark/sql/rapids/GpuInSubqueryExecSuite.scala @@ -56,28 +56,34 @@ class GpuInSubqueryExecSuite extends SparkQueryCompareTestSuite { Seq("400.0", "123.4").toDF("strings") } - private def buildCpuInSubqueryPlan(spark: SparkSession): SparkPlan = { + private def buildCpuInSubqueryPlan( + spark: SparkSession, + shouldBroadcast: Boolean): SparkPlan = { val df1ReadExec = readToPhysicalPlan(nullableStringsIntsDf(spark)) val df2ReadExec = readToPhysicalPlan(subqueryTable(spark)) val inSubquery = InSubqueryExec( df1ReadExec.output.head, SubqueryExec("sbe", ProjectExec(Seq(df2ReadExec.output.head), df2ReadExec)), - ExprId(7)) + ExprId(7), + shouldBroadcast=shouldBroadcast) FilterExec(DynamicPruningExpression(inSubquery), df1ReadExec) } - test("InSubqueryExec") { - val gpuResults = withGpuSparkSession({ spark => - val overrides = new GpuOverrides() - val transitionOverrides = new GpuTransitionOverrides() - val gpuPlan = transitionOverrides(overrides(buildCpuInSubqueryPlan(spark))) - gpuPlan.execute().collect() - }) - assertResult(1)(gpuResults.length) - val row = gpuResults.head - assertResult(2)(row.numFields) - assertResult("400.0")(row.getString(0)) - assert(row.isNullAt(1)) + for (shouldBroadcast <- Seq(false, true)) { + test(s"InSubqueryExec shouldBroadcast=$shouldBroadcast") { + val gpuResults = withGpuSparkSession({ spark => + val overrides = new GpuOverrides() + val transitionOverrides = new GpuTransitionOverrides() + val cpuPlan = buildCpuInSubqueryPlan(spark, shouldBroadcast) + val gpuPlan = transitionOverrides(overrides(cpuPlan)) + gpuPlan.execute().collect() + }) + assertResult(1)(gpuResults.length) + val row = gpuResults.head + assertResult(2)(row.numFields) + assertResult("400.0")(row.getString(0)) + assert(row.isNullAt(1)) + } } } From 10e22aa104689a966d89594b5def5853aba6c2b5 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 23 Aug 2023 13:14:47 -0500 Subject: [PATCH 7/7] Fix typo in override --- .../com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala index 5ebe3db1cd9..12979e3ecb5 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuDynamicPruningExpression.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch case class GpuDynamicPruningExpression(child: Expression) extends ShimUnaryExpression with GpuExpression with DynamicPruning { - override def columnarEvalAny(batch: ColumnarBatch): Unit = { + override def columnarEvalAny(batch: ColumnarBatch): Any = { child.columnarEvalAny(batch) }