From 0dd862bab54f3be019bd5c232acf5b9b5817b860 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 31 Jan 2024 09:58:23 +0800 Subject: [PATCH 1/3] Support barrier mode for mapInPandas/mapInArrow Signed-off-by: Bobby Wang --- integration_tests/src/main/python/udf_test.py | 43 ++++++++++++++- .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../execution/python/GpuMapInBatchExec.scala | 13 ++++- .../execution/python/GpuMapInPandasExec.scala | 39 +++++++++++--- .../rapids/shims/GpuMapInPandasExecMeta.scala | 52 +++++++++++++++++++ .../shims/GpuPythonMapInArrowExec.scala | 19 ++++--- .../shims/GpuPythonMapInArrowExecMeta.scala | 45 ++++++++++++++++ .../shims/PythonMapInArrowExecShims.scala | 2 +- .../rapids/shims/GpuMapInPandasExecMeta.scala | 43 +++++++++++++++ .../shims/GpuPythonMapInArrowExecMeta.scala | 42 +++++++++++++++ 10 files changed, 280 insertions(+), 20 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala create mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala create mode 100644 sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala create mode 100644 sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index dbccee4a374..fd8000b7e2f 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -13,9 +13,11 @@ # limitations under the License. import pytest +from pyspark import BarrierTaskContext, TaskContext from conftest import is_at_least_precommit_run -from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341 +from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341, \ + with_gpu_session from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version @@ -425,3 +427,42 @@ def test_func(spark): lambda data: [pd.DataFrame([len(list(data))])], schema="ret:integer") assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf) + + +@pytest.mark.skipif(is_before_spark_350(), reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0') +def test_map_in_pandas_with_barrier_mode(): + def func(iterator): + tc = TaskContext.get() + assert tc is not None + assert not isinstance(tc, BarrierTaskContext) + for batch in iterator: + yield batch + with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInPandas(func, "id long", False).collect()) + + def func1(iterator): + tc = TaskContext.get() + assert tc is not None + assert isinstance(tc, BarrierTaskContext) + for batch in iterator: + yield batch + with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInPandas(func1, "id long", True).collect()) + + +@pytest.mark.skipif(is_before_spark_350(), reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0') +def test_map_in_arrow_with_barrier_mode(): + def func(iterator): + from pyspark import BarrierTaskContext, TaskContext + tc = TaskContext.get() + assert tc is not None + assert not isinstance(tc, BarrierTaskContext) + for batch in iterator: + yield batch + with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInArrow(func, "id long", False).collect()) + + def func1(iterator): + tc = TaskContext.get() + assert tc is not None + assert isinstance(tc, BarrierTaskContext) + for batch in iterator: + yield batch + with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInArrow(func1, "id long", True).collect()) \ No newline at end of file diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 050eae46a07..64ac9808c61 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -67,7 +67,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.rapids.execution._ import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta -import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd} +import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuMapInPandasExecMeta, GpuTimeAdd} import org.apache.spark.sql.rapids.zorder.ZOrderRules import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala index 0199c52250f..35d166dbc5f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala @@ -44,6 +44,8 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { protected val func: Expression protected val pythonEvalType: Int + protected val isBarrier: Boolean + private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func override def producedAttributes: AttributeSet = AttributeSet(output) @@ -65,7 +67,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { val localEvalType = pythonEvalType // Start process - child.executeColumnar().mapPartitionsInternal { inputIter => + val func = (inputIter: Iterator[ColumnarBatch]) => { val context = TaskContext.get() // Single function with one struct. @@ -109,7 +111,14 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { numOutputRows += cb.numRows cb } - } // end of mapPartitionsInternal + } // end of func + + if (isBarrier) { + child.executeColumnar().barrier().mapPartitions(func) + } else { + child.executeColumnar().mapPartitionsInternal(func) + } + } // end of internalDoExecuteColumnar } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala index aaca88c38a0..452d7e03fd8 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,32 @@ import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.python.MapInPandasExec -class GpuMapInPandasExecMeta( +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "342"} +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +class GpuMapInPandasExecMetaBase( mapPandas: MapInPandasExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -34,9 +57,9 @@ class GpuMapInPandasExecMeta( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapPandas.func.asInstanceOf[PythonUDF], conf, Some(this)) - private val resultAttrs: Seq[BaseExprMeta[Attribute]] = + protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -45,7 +68,8 @@ class GpuMapInPandasExecMeta( GpuMapInPandasExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded() + childPlans.head.convertIfNeeded(), + isBarrier = false, ) } @@ -60,7 +84,8 @@ class GpuMapInPandasExecMeta( case class GpuMapInPandasExec( func: Expression, output: Seq[Attribute], - child: SparkPlan) extends GpuMapInBatchExec { + child: SparkPlan, + override val isBarrier: Boolean) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala new file mode 100644 index 00000000000..a0fb7353581 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.execution.python.MapInPandasExec +import org.apache.spark.sql.rapids.execution.python.GpuMapInPandasExecMetaBase + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "342"} +spark-rapids-shim-json-lines ***/ +class GpuMapInPandasExecMeta( + mapPandas: MapInPandasExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { + +} diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala index 6d06a97db06..5eca8b18294 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,16 +31,17 @@ {"spark": "350"} {"spark": "351"} spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.execution.python +package org.apache.spark.sql.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.python.PythonMapInArrowExec +import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec -class GpuPythonMapInArrowExecMeta( +class GpuPythonMapInArrowExecMetaBase( mapArrow: PythonMapInArrowExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -51,9 +52,9 @@ class GpuPythonMapInArrowExecMeta( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this)) - private val resultAttrs: Seq[BaseExprMeta[Attribute]] = + protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -62,7 +63,8 @@ class GpuPythonMapInArrowExecMeta( GpuPythonMapInArrowExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded() + childPlans.head.convertIfNeeded(), + isBarrier = false, ) } @@ -77,7 +79,8 @@ class GpuPythonMapInArrowExecMeta( case class GpuPythonMapInArrowExec( func: Expression, output: Seq[Attribute], - child: SparkPlan) extends GpuMapInBatchExec { + child: SparkPlan, + override val isBarrier: Boolean) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF } diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala new file mode 100644 index 00000000000..e9d711315a9 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "342"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.execution.python._ + +class GpuPythonMapInArrowExecMeta( + mapArrow: PythonMapInArrowExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { + +} diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 98ccc540613..98826aa324b 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta import org.apache.spark.sql.types.{BinaryType, StringType} object PythonMapInArrowExecShims { diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala new file mode 100644 index 00000000000..d8377f9c349 --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions.{Attribute} +import org.apache.spark.sql.execution.python.MapInPandasExec +import org.apache.spark.sql.rapids.execution.python.{GpuMapInPandasExec, GpuMapInPandasExecMetaBase} + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +class GpuMapInPandasExecMeta( + mapPandas: MapInPandasExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { + + override def convertToGpu(): GpuExec = + GpuMapInPandasExec( + udf.convertToGpu(), + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded(), + isBarrier = mapPandas.isBarrier, + ) +} diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala new file mode 100644 index 00000000000..c27f4824c4a --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.python.PythonMapInArrowExec + +class GpuPythonMapInArrowExecMeta( + mapArrow: PythonMapInArrowExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { + + override def convertToGpu(): GpuExec = + GpuPythonMapInArrowExec( + udf.convertToGpu(), + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded(), + isBarrier = mapArrow.isBarrier, + ) +} From 98b0efdd12522b064e86e1d03b372f049c6caf52 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 31 Jan 2024 15:35:52 +0800 Subject: [PATCH 2/3] fix --- integration_tests/src/main/python/udf_test.py | 43 ++++++++----------- .../execution/python/GpuMapInPandasExec.scala | 23 ---------- .../shims/PythonMapInArrowExecShims.scala | 2 +- 3 files changed, 19 insertions(+), 49 deletions(-) diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index fd8000b7e2f..16e7774faef 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -35,7 +35,7 @@ raise AssertionError("incorrect pyarrow version during required testing " + str(e)) pytestmark = pytest.mark.skip(reason=str(e)) -from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect +from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_fallback_collect, assert_equal from data_gen import * from marks import approximate_float, allow_non_gpu, ignore_order from pyspark.sql import Window @@ -429,40 +429,33 @@ def test_func(spark): assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf) -@pytest.mark.skipif(is_before_spark_350(), reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0') -def test_map_in_pandas_with_barrier_mode(): +@pytest.mark.skipif(is_before_spark_350(), + reason='mapInPandas/mapInArrow with barrier mode is introduced by Pyspark 3.5.0') +@pytest.mark.parametrize('is_map_in_pandas', [True, False], ids=idfn) +def test_map_in_pandas_in_arrow_with_barrier_mode(is_map_in_pandas): def func(iterator): tc = TaskContext.get() assert tc is not None assert not isinstance(tc, BarrierTaskContext) for batch in iterator: yield batch - with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInPandas(func, "id long", False).collect()) - def func1(iterator): + def func_barrier_mode(iterator): tc = TaskContext.get() assert tc is not None assert isinstance(tc, BarrierTaskContext) for batch in iterator: yield batch - with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInPandas(func1, "id long", True).collect()) - -@pytest.mark.skipif(is_before_spark_350(), reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0') -def test_map_in_arrow_with_barrier_mode(): - def func(iterator): - from pyspark import BarrierTaskContext, TaskContext - tc = TaskContext.get() - assert tc is not None - assert not isinstance(tc, BarrierTaskContext) - for batch in iterator: - yield batch - with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInArrow(func, "id long", False).collect()) - - def func1(iterator): - tc = TaskContext.get() - assert tc is not None - assert isinstance(tc, BarrierTaskContext) - for batch in iterator: - yield batch - with_gpu_session(lambda spark: spark.range(0, 1, 1, 1).mapInArrow(func1, "id long", True).collect()) \ No newline at end of file + def run(spark): + df = spark.range(0, 10, 1, 1) + if is_map_in_pandas: + ret1 = df.mapInPandas(func, "id long", False).collect() + ret2 = df.mapInPandas(func_barrier_mode, "id long", True).collect() + else: + ret1 = df.mapInArrow(func, "id long", False).collect() + ret2 = df.mapInArrow(func_barrier_mode, "id long", True).collect() + return ret1, ret2 + + non_barrier_result, barrier_result = with_gpu_session(lambda spark: run(spark)) + assert_equal(non_barrier_result, barrier_result) diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala index 452d7e03fd8..a678f1336f0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala @@ -23,29 +23,6 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonU import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.MapInPandasExec -/*** spark-rapids-shim-json-lines -{"spark": "311"} -{"spark": "312"} -{"spark": "313"} -{"spark": "320"} -{"spark": "321"} -{"spark": "321cdh"} -{"spark": "322"} -{"spark": "323"} -{"spark": "324"} -{"spark": "330"} -{"spark": "330cdh"} -{"spark": "331"} -{"spark": "332"} -{"spark": "332cdh"} -{"spark": "333"} -{"spark": "334"} -{"spark": "340"} -{"spark": "341"} -{"spark": "342"} -{"spark": "350"} -{"spark": "351"} -spark-rapids-shim-json-lines ***/ class GpuMapInPandasExecMetaBase( mapPandas: MapInPandasExec, conf: RapidsConf, diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 958f9fc89fd..79425c47e00 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -35,7 +35,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec -import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta object PythonMapInArrowExecShims { From b1e166bf6952e6dd849b1cab74586d05b84f2c82 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 1 Feb 2024 10:02:34 +0800 Subject: [PATCH 3/3] fix --- integration_tests/src/main/python/udf_test.py | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index 16e7774faef..c5717a676a6 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -16,8 +16,7 @@ from pyspark import BarrierTaskContext, TaskContext from conftest import is_at_least_precommit_run -from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341, \ - with_gpu_session +from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341 from pyspark.sql.pandas.utils import require_minimum_pyarrow_version, require_minimum_pandas_version @@ -430,32 +429,38 @@ def test_func(spark): @pytest.mark.skipif(is_before_spark_350(), - reason='mapInPandas/mapInArrow with barrier mode is introduced by Pyspark 3.5.0') -@pytest.mark.parametrize('is_map_in_pandas', [True, False], ids=idfn) -def test_map_in_pandas_in_arrow_with_barrier_mode(is_map_in_pandas): + reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0') +@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) +def test_map_in_pandas_with_barrier_mode(is_barrier): def func(iterator): tc = TaskContext.get() assert tc is not None - assert not isinstance(tc, BarrierTaskContext) + if is_barrier: + assert isinstance(tc, BarrierTaskContext) + else: + assert not isinstance(tc, BarrierTaskContext) + for batch in iterator: yield batch - def func_barrier_mode(iterator): + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.range(0, 10, 1, 1).mapInPandas(func, "id long", is_barrier)) + + +@pytest.mark.skipif(is_before_spark_350(), + reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0') +@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) +def test_map_in_arrow_with_barrier_mode(is_barrier): + def func(iterator): tc = TaskContext.get() assert tc is not None - assert isinstance(tc, BarrierTaskContext) + if is_barrier: + assert isinstance(tc, BarrierTaskContext) + else: + assert not isinstance(tc, BarrierTaskContext) + for batch in iterator: yield batch - def run(spark): - df = spark.range(0, 10, 1, 1) - if is_map_in_pandas: - ret1 = df.mapInPandas(func, "id long", False).collect() - ret2 = df.mapInPandas(func_barrier_mode, "id long", True).collect() - else: - ret1 = df.mapInArrow(func, "id long", False).collect() - ret2 = df.mapInArrow(func_barrier_mode, "id long", True).collect() - return ret1, ret2 - - non_barrier_result, barrier_result = with_gpu_session(lambda spark: run(spark)) - assert_equal(non_barrier_result, barrier_result) + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.range(0, 10, 1, 1).mapInArrow(func, "id long", is_barrier))