diff --git a/integration_tests/src/main/python/higher_order_functions_test.py b/integration_tests/src/main/python/higher_order_functions_test.py new file mode 100644 index 00000000000..23d61793b46 --- /dev/null +++ b/integration_tests/src/main/python/higher_order_functions_test.py @@ -0,0 +1,31 @@ +# Copyright (c) 2023, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from asserts import assert_gpu_and_cpu_are_equal_collect +from marks import ignore_order + + +@ignore_order(local=True) +def test_tiered_project_with_complex_transform(): + confs = {"spark.rapids.sql.tiered.project.enabled": "true"} + def do_project(spark): + df = spark.createDataFrame( + [ + (1, "a", [(0, "z"), (1, "y")]), + (2, "b", [(2, "x")]) + ], + "a int, b string, c array>").repartition(2) + return df.selectExpr( + "transform(c, (v, i) -> named_struct('x', c[i].x, 'y', c[i].y)) AS t") + assert_gpu_and_cpu_are_equal_collect(do_project, conf=confs) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala index 6b25a09e5a8..dfb4272f23f 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/higherOrderFunctions.scala @@ -75,6 +75,8 @@ case class GpuLambdaFunction( override def dataType: DataType = function.dataType override def nullable: Boolean = function.nullable + override def disableTieredProjectCombine: Boolean = true + override def columnarEval(batch: ColumnarBatch): GpuColumnVector = function.asInstanceOf[GpuExpression].columnarEval(batch) } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala index bc1fac5afad..257c82eadd1 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/catalyst/expressions/GpuEquivalentExpressions.scala @@ -27,7 +27,6 @@ import com.nvidia.spark.rapids.{GpuAlias, GpuCaseWhen, GpuCoalesce, GpuExpressio import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSeq, AttributeSet, CaseWhen, Coalesce, Expression, If, LeafExpression, PlanExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.expressions.objects.LambdaVariable /** * This class is used to compute equality of (sub)expression trees. Expressions can be added @@ -195,9 +194,6 @@ class GpuEquivalentExpressions { expr.isInstanceOf[GpuUnevaluable] || (expr.isInstanceOf[GpuExpression] && expr.asInstanceOf[GpuExpression].disableTieredProjectCombine) || - // `LambdaVariable` is usually used as a loop variable, which can't be evaluated ahead of the - // loop. So we can't evaluate sub-expressions containing `LambdaVariable` at the beginning. - expr.find(_.isInstanceOf[LambdaVariable]).isDefined || // `PlanExpression` wraps query plan. To compare query plans of `PlanExpression` on executor, // can cause error like NPE. (expr.find(_.isInstanceOf[PlanExpression[_]]).isDefined && TaskContext.get != null)