-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile #44852
Conversation
I will perform the manual tests for this PR, will update it after done. |
7a14dc3
to
6f6591c
Compare
I was trying to add unit tests to check if the ResourceProfile is correctly applied to the underlying RDD generated in MapInPandasExec, here is my testing code, df1 = df.mapInPandas(lambda iter: iter, "id long")
assert df1.rdd.getResourceProfile() is None
treqs = TaskResourceRequests().cpus(2)
expected_rp = ResourceProfileBuilder().require(treqs).build
df2 = df.mapInPandas(lambda iter: iter, "id long", False, expected_rp)
assert df2.rdd.getResourceProfile() is not None But the ResourceProfile got from I also tried to use JVM RDD to get the correct parent RDD with the below code, df2.rdd._jrdd.firstParent() or df2.rdd._jrdd.parent(0) But both of them didn't work, with below error messages, py4j.protocol.Py4JError: An error occurred while calling o45.parent. Trace:
py4j.Py4JException: Method parent([class java.lang.Integer]) does not exist
y4j.protocol.Py4JError: An error occurred while calling o45.firstParent. Trace:
py4j.Py4JException: Method firstParent([]) does not exist I don't know how to add unit tests for this PR, but I will perform the manual tests. |
Manual testsThe manual tests were conducted on a spark Standalone cluster with only 1 worker. With dynamic allocation disabled.pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=false The above command requires 1 executor with 4 CPU cores, and the default
Test code: from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(1)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() When the required The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises 6 tasks, the first 4 tasks will be executed simultaneously, then the last 2 tasks. The second ResultStage comprises 3 tasks, all of which will be executed simultaneously since the required
Test code, from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(2)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.
Test code, from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() When the required The first shuffle stage behaves the same as the first one. The second ResultStage comprises 3 tasks, all of which will be running serially.
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() exception happened. 4j.protocol.Py4JJavaError: An error occurred while calling o160.collectToPython.
: org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:413)
at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:193)
at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:163)
at scala.Option.getOrElse(Option.scala:201)
at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:162)
at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:142)
at org.apache.spark.rdd.RDD.withResources(RDD.scala:1844)
at org.apache.spark.sql.execution.python.MapInBatchExec.$anonfun$doExecute$3(MapInBatchExec.scala:86)
at scala.Option.map(Option.scala:242)
at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute(MapInBatchExec.scala:86)
at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute$(MapInBatchExec.scala:50)
at org.apache.spark.sql.execution.python.MapInArrowExec.doExecute(MapInArrowExec.scala:29)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:560)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:150)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:116)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:918)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:72)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:196)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:840) |
With dynamic allocation enabled.pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
--conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=1\ The above command enables the dynamic allocation and the max executors required is set to 1 in order to test. TaskResourceProfile without any specific executor request informationTest code, from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 4)
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4). The above code will require an extra executor which will have the same Different executor request informationfrom pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
def filter_func(iterator):
for pdf in iterator:
yield pdf
df = spark.range(0, 100, 1, 4)
ereqs = ExecutorResourceRequests().cores(6)
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).require(ereqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect() |
Hi @WeichenXu123 @zhengruifeng @tgravescs, Could you please help review this PR. |
BTW, I will perform the similar manual tests for spark connect. |
It would be nice to have more description here about what the challenge is (why it just didn't work) and then how you went about solving/fixing it.
Is this really true? doesn't this now allow user to call these together? Does it affect spark connect api since you mention that? Please update description as it should be clear to reviewer without looking at the code what the proposed changes are. |
This is definitely a user facing api change |
Hi @tgravescs, Thx for your comment. I just updated the description and submitted a new commit which adds the unit tests to test the resource profile for sql dataframe and connect dataframe API. @tgravescs @WeichenXu123 @zhengruifeng Could you help review this PR. Thx. |
Hi @WeichenXu123, @tgravescs, @zhengruifeng, @Ngone51, I've separated the big PR into 2 PRs, one for sql, the other for connect. This PR is only for sql, Could you help review it. |
Co-authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
Co-authored-by: Ruifeng Zheng <ruifengz@foxmail.com>
@zhengruifeng Could you help review it again? Thx |
Hi @HyukjinKwon, Could you help to review this PR. Thx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I dont know much about this so I defer to @zhengruifeng and @WeichenXu123 who has more context.
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Hi @HyukjinKwon, thx for your review and comment. The CI on the newest commit got passed, could you help review it again? Thx very much. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will defer to @WeichenXu123
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Hi @HyukjinKwon could you help merge? |
Hi @WeichenXu123, @HyukjinKwon, @zhengruifeng, Could you help merge it? Thx very much. |
Merged to master. |
…ResourceProfile ### What changes were proposed in this pull request? Support stage-level scheduling for PySpark connect DataFrame APIs (mapInPandas and mapInArrow). ### Why are the changes needed? #44852 has supported ResourceProfile in mapInPandas/mapInArrow for SQL, So it's the right time to enable it for connect. ### Does this PR introduce _any_ user-facing change? Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the connect pyspark client. ### How was this patch tested? Pass the CIs and manual tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes #45232 from wbo4958/connect-rp. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…f pandas is not available ### What changes were proposed in this pull request? This is a follow-up of the followings to skip `pandas`-related tests if pandas is not available. - #44852 - #45232 ### Why are the changes needed? `pandas` is an optional dependency. We had better skip it without causing failures. To recover the PyPy 3.8 CI, - https://github.com/apache/spark/actions/runs/8541011879/job/23421483071 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually test. ``` $ python/run-tests --modules=pyspark-resource --parallelism=1 --python-executables=python3.10 Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log Will test against the following Python executables: ['python3.10'] Will test the following Python modules: ['pyspark-resource'] python3.10 python_implementation is CPython python3.10 version is: Python 3.10.13 Starting test(python3.10): pyspark.resource.profile (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/021bc7bb-242f-4cb4-8584-11ed6e711f78/python3.10__pyspark.resource.profile__jn89f1hh.log) Finished test(python3.10): pyspark.resource.profile (1s) Starting test(python3.10): pyspark.resource.tests.test_connect_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/244d6c6f-8799-4a2a-b7a7-20d7c50d643d/python3.10__pyspark.resource.tests.test_connect_resources__5ta1tf6e.log) Finished test(python3.10): pyspark.resource.tests.test_connect_resources (0s) ... 1 tests were skipped Starting test(python3.10): pyspark.resource.tests.test_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/671e7afa-e764-443f-bc40-7e940d7342ea/python3.10__pyspark.resource.tests.test_resources__lhbp6y5f.log) Finished test(python3.10): pyspark.resource.tests.test_resources (2s) ... 1 tests were skipped Tests passed in 4 seconds Skipped tests in pyspark.resource.tests.test_connect_resources with python3.10: test_profile_before_sc_for_connect (pyspark.resource.tests.test_connect_resources.ResourceProfileTests) ... skip (0.005s) Skipped tests in pyspark.resource.tests.test_resources with python3.10: test_profile_before_sc_for_sql (pyspark.resource.tests.test_resources.ResourceProfileTests) ... skip (0.001s) ``` ### Was this patch authored or co-authored using generative AI tooling? No. Closes #45869 from dongjoon-hyun/SPARK-46812. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
What changes were proposed in this pull request?
Support stage-level scheduling for some PySpark DataFrame APIs (mapInPandas and mapInArrow).
Why are the changes needed?
The introduction of barrier mode in Spark, as seen in #40520, allows for the implementation of Spark ML cases (pure Python algorithms) using DataFrame APIs such as mapInPandas and mapInArrow, so it's necessary to enable stage-level scheduling for DataFrame APIs.
Does this PR introduce any user-facing change?
Yes, This PR adds a new argument "profile" for mapInPandas and mapInArrow.
How to use it? take mapInPandas as an example,
How was this patch tested?
The newly added tests can pass, and some manual tests are needed for dynamic allocation on or off.
Was this patch authored or co-authored using generative AI tooling?
No