From cca5955da35233c4d35f0b40363c990258549def Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Fri, 2 Feb 2024 14:15:51 +0800 Subject: [PATCH] Support barrier mode for mapInPandas/mapInArrow (#10364) Signed-off-by: Bobby Wang --- integration_tests/src/main/python/udf_test.py | 39 ++++++++++++++ .../nvidia/spark/rapids/GpuOverrides.scala | 2 +- .../execution/python/GpuMapInBatchExec.scala | 13 ++++- .../execution/python/GpuMapInPandasExec.scala | 16 +++--- .../rapids/shims/GpuMapInPandasExecMeta.scala | 52 +++++++++++++++++++ .../shims/PythonMapInArrowExecShims.scala | 2 +- .../shims/GpuPythonMapInArrowExec.scala | 19 ++++--- .../shims/GpuPythonMapInArrowExecMeta.scala | 45 ++++++++++++++++ .../shims/PythonMapInArrowExecShims.scala | 2 +- .../rapids/shims/GpuMapInPandasExecMeta.scala | 43 +++++++++++++++ .../shims/GpuPythonMapInArrowExecMeta.scala | 42 +++++++++++++++ 11 files changed, 255 insertions(+), 20 deletions(-) create mode 100644 sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala create mode 100644 sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala create mode 100644 sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala create mode 100644 sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala diff --git a/integration_tests/src/main/python/udf_test.py b/integration_tests/src/main/python/udf_test.py index dbccee4a374..4060166ebff 100644 --- a/integration_tests/src/main/python/udf_test.py +++ b/integration_tests/src/main/python/udf_test.py @@ -13,6 +13,7 @@ # limitations under the License. import pytest +from pyspark import BarrierTaskContext, TaskContext from conftest import is_at_least_precommit_run from spark_session import is_databricks_runtime, is_before_spark_330, is_before_spark_350, is_spark_341 @@ -425,3 +426,41 @@ def test_func(spark): lambda data: [pd.DataFrame([len(list(data))])], schema="ret:integer") assert_gpu_and_cpu_are_equal_collect(test_func, conf=arrow_udf_conf) + + +@pytest.mark.skipif(is_before_spark_350(), + reason='mapInPandas with barrier mode is introduced by Pyspark 3.5.0') +@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) +def test_map_in_pandas_with_barrier_mode(is_barrier): + def func(iterator): + tc = TaskContext.get() + assert tc is not None + if is_barrier: + assert isinstance(tc, BarrierTaskContext) + else: + assert not isinstance(tc, BarrierTaskContext) + + for batch in iterator: + yield batch + + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.range(0, 10, 1, 1).mapInPandas(func, "id long", is_barrier)) + + +@pytest.mark.skipif(is_before_spark_350(), + reason='mapInArrow with barrier mode is introduced by Pyspark 3.5.0') +@pytest.mark.parametrize('is_barrier', [True, False], ids=idfn) +def test_map_in_arrow_with_barrier_mode(is_barrier): + def func(iterator): + tc = TaskContext.get() + assert tc is not None + if is_barrier: + assert isinstance(tc, BarrierTaskContext) + else: + assert not isinstance(tc, BarrierTaskContext) + + for batch in iterator: + yield batch + + assert_gpu_and_cpu_are_equal_collect( + lambda spark: spark.range(0, 10, 1, 1).mapInArrow(func, "id long", is_barrier)) diff --git a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala index 050eae46a07..64ac9808c61 100644 --- a/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala +++ b/sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOverrides.scala @@ -67,7 +67,7 @@ import org.apache.spark.sql.rapids.catalyst.expressions.GpuRand import org.apache.spark.sql.rapids.execution._ import org.apache.spark.sql.rapids.execution.python._ import org.apache.spark.sql.rapids.execution.python.GpuFlatMapGroupsInPandasExecMeta -import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuTimeAdd} +import org.apache.spark.sql.rapids.shims.{GpuAscii, GpuMapInPandasExecMeta, GpuTimeAdd} import org.apache.spark.sql.rapids.zorder.ZOrderRules import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String} diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala index 0199c52250f..35d166dbc5f 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInBatchExec.scala @@ -44,6 +44,8 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { protected val func: Expression protected val pythonEvalType: Int + protected val isBarrier: Boolean + private val pandasFunction = func.asInstanceOf[GpuPythonUDF].func override def producedAttributes: AttributeSet = AttributeSet(output) @@ -65,7 +67,7 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { val localEvalType = pythonEvalType // Start process - child.executeColumnar().mapPartitionsInternal { inputIter => + val func = (inputIter: Iterator[ColumnarBatch]) => { val context = TaskContext.get() // Single function with one struct. @@ -109,7 +111,14 @@ trait GpuMapInBatchExec extends ShimUnaryExecNode with GpuPythonExecBase { numOutputRows += cb.numRows cb } - } // end of mapPartitionsInternal + } // end of func + + if (isBarrier) { + child.executeColumnar().barrier().mapPartitions(func) + } else { + child.executeColumnar().mapPartitionsInternal(func) + } + } // end of internalDoExecuteColumnar } diff --git a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala index aaca88c38a0..a678f1336f0 100644 --- a/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala +++ b/sql-plugin/src/main/scala/org/apache/spark/sql/rapids/execution/python/GpuMapInPandasExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,9 @@ import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.python.MapInPandasExec -class GpuMapInPandasExecMeta( +class GpuMapInPandasExecMetaBase( mapPandas: MapInPandasExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -34,9 +34,9 @@ class GpuMapInPandasExecMeta( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapPandas.func.asInstanceOf[PythonUDF], conf, Some(this)) - private val resultAttrs: Seq[BaseExprMeta[Attribute]] = + protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapPandas.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -45,7 +45,8 @@ class GpuMapInPandasExecMeta( GpuMapInPandasExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded() + childPlans.head.convertIfNeeded(), + isBarrier = false, ) } @@ -60,7 +61,8 @@ class GpuMapInPandasExecMeta( case class GpuMapInPandasExec( func: Expression, output: Seq[Attribute], - child: SparkPlan) extends GpuMapInBatchExec { + child: SparkPlan, + override val isBarrier: Boolean) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_PANDAS_ITER_UDF } diff --git a/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala new file mode 100644 index 00000000000..a0fb7353581 --- /dev/null +++ b/sql-plugin/src/main/spark311/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.execution.python.MapInPandasExec +import org.apache.spark.sql.rapids.execution.python.GpuMapInPandasExecMetaBase + +/*** spark-rapids-shim-json-lines +{"spark": "311"} +{"spark": "312"} +{"spark": "313"} +{"spark": "320"} +{"spark": "321"} +{"spark": "321cdh"} +{"spark": "322"} +{"spark": "323"} +{"spark": "324"} +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "342"} +spark-rapids-shim-json-lines ***/ +class GpuMapInPandasExecMeta( + mapPandas: MapInPandasExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { + +} diff --git a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 958f9fc89fd..79425c47e00 100644 --- a/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark330/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -35,7 +35,7 @@ import com.nvidia.spark.rapids._ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec -import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta object PythonMapInArrowExecShims { diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala index 6d06a97db06..5eca8b18294 100644 --- a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExec.scala @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,16 +31,17 @@ {"spark": "350"} {"spark": "351"} spark-rapids-shim-json-lines ***/ -package org.apache.spark.sql.rapids.execution.python +package org.apache.spark.sql.rapids.shims import com.nvidia.spark.rapids._ import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, PythonUDF} import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.python._ +import org.apache.spark.sql.execution.python.PythonMapInArrowExec +import org.apache.spark.sql.rapids.execution.python.GpuMapInBatchExec -class GpuPythonMapInArrowExecMeta( +class GpuPythonMapInArrowExecMetaBase( mapArrow: PythonMapInArrowExec, conf: RapidsConf, parent: Option[RapidsMeta[_, _, _]], @@ -51,9 +52,9 @@ class GpuPythonMapInArrowExecMeta( override def noReplacementPossibleMessage(reasons: String): String = s"cannot run even partially on the GPU because $reasons" - private val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( + protected val udf: BaseExprMeta[PythonUDF] = GpuOverrides.wrapExpr( mapArrow.func.asInstanceOf[PythonUDF], conf, Some(this)) - private val resultAttrs: Seq[BaseExprMeta[Attribute]] = + protected val resultAttrs: Seq[BaseExprMeta[Attribute]] = mapArrow.output.map(GpuOverrides.wrapExpr(_, conf, Some(this))) override val childExprs: Seq[BaseExprMeta[_]] = resultAttrs :+ udf @@ -62,7 +63,8 @@ class GpuPythonMapInArrowExecMeta( GpuPythonMapInArrowExec( udf.convertToGpu(), resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], - childPlans.head.convertIfNeeded() + childPlans.head.convertIfNeeded(), + isBarrier = false, ) } @@ -77,7 +79,8 @@ class GpuPythonMapInArrowExecMeta( case class GpuPythonMapInArrowExec( func: Expression, output: Seq[Attribute], - child: SparkPlan) extends GpuMapInBatchExec { + child: SparkPlan, + override val isBarrier: Boolean) extends GpuMapInBatchExec { override protected val pythonEvalType: Int = PythonEvalType.SQL_MAP_ARROW_ITER_UDF } diff --git a/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala new file mode 100644 index 00000000000..e9d711315a9 --- /dev/null +++ b/sql-plugin/src/main/spark330/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "330"} +{"spark": "330cdh"} +{"spark": "330db"} +{"spark": "331"} +{"spark": "332"} +{"spark": "332cdh"} +{"spark": "332db"} +{"spark": "333"} +{"spark": "334"} +{"spark": "340"} +{"spark": "341"} +{"spark": "341db"} +{"spark": "342"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.execution.python._ + +class GpuPythonMapInArrowExecMeta( + mapArrow: PythonMapInArrowExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { + +} diff --git a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala index 98ccc540613..98826aa324b 100644 --- a/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala +++ b/sql-plugin/src/main/spark350/scala/com/nvidia/spark/rapids/shims/PythonMapInArrowExecShims.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.python.PythonMapInArrowExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.rapids.execution.TrampolineUtil -import org.apache.spark.sql.rapids.execution.python.GpuPythonMapInArrowExecMeta +import org.apache.spark.sql.rapids.shims.GpuPythonMapInArrowExecMeta import org.apache.spark.sql.types.{BinaryType, StringType} object PythonMapInArrowExecShims { diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala new file mode 100644 index 00000000000..d8377f9c349 --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuMapInPandasExecMeta.scala @@ -0,0 +1,43 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions.{Attribute} +import org.apache.spark.sql.execution.python.MapInPandasExec +import org.apache.spark.sql.rapids.execution.python.{GpuMapInPandasExec, GpuMapInPandasExecMetaBase} + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +class GpuMapInPandasExecMeta( + mapPandas: MapInPandasExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuMapInPandasExecMetaBase(mapPandas, conf, parent, rule) { + + override def convertToGpu(): GpuExec = + GpuMapInPandasExec( + udf.convertToGpu(), + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded(), + isBarrier = mapPandas.isBarrier, + ) +} diff --git a/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala new file mode 100644 index 00000000000..c27f4824c4a --- /dev/null +++ b/sql-plugin/src/main/spark350/scala/org/apache/spark/sql/rapids/shims/GpuPythonMapInArrowExecMeta.scala @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/*** spark-rapids-shim-json-lines +{"spark": "350"} +{"spark": "351"} +spark-rapids-shim-json-lines ***/ +package org.apache.spark.sql.rapids.shims + +import com.nvidia.spark.rapids._ + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.python.PythonMapInArrowExec + +class GpuPythonMapInArrowExecMeta( + mapArrow: PythonMapInArrowExec, + conf: RapidsConf, + parent: Option[RapidsMeta[_, _, _]], + rule: DataFromReplacementRule) + extends GpuPythonMapInArrowExecMetaBase(mapArrow, conf, parent, rule) { + + override def convertToGpu(): GpuExec = + GpuPythonMapInArrowExec( + udf.convertToGpu(), + resultAttrs.map(_.convertToGpu()).asInstanceOf[Seq[Attribute]], + childPlans.head.convertIfNeeded(), + isBarrier = mapArrow.isBarrier, + ) +}