Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile #44852

Closed
wants to merge 7 commits into from

Conversation

wbo4958
Copy link
Contributor

@wbo4958 wbo4958 commented Jan 23, 2024

What changes were proposed in this pull request?

Support stage-level scheduling for some PySpark DataFrame APIs (mapInPandas and mapInArrow).

Why are the changes needed?

The introduction of barrier mode in Spark, as seen in #40520, allows for the implementation of Spark ML cases (pure Python algorithms) using DataFrame APIs such as mapInPandas and mapInArrow, so it's necessary to enable stage-level scheduling for DataFrame APIs.

Does this PR introduce any user-facing change?

Yes, This PR adds a new argument "profile" for mapInPandas and mapInArrow.

def mapInPandas(
    self, func: "PandasMapIterFunction",
        schema: Union[StructType, str],
        barrier: bool = False,
        profile: Optional[ResourceProfile] = None,
) -> "DataFrame":

def mapInArrow(
    self, func: "ArrowMapIterFunction",
        schema: Union[StructType, str],
        barrier: bool = False,
        profile: Optional[ResourceProfile] = None,
) -> "DataFrame":

How to use it? take mapInPandas as an example,

from pyspark import TaskContext
def func(iterator):
    tc = TaskContext.get()
    assert tc.cpus() == 3
    for batch in iterator:
        yield batch
df = spark.range(10)

from pyspark.resource import TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build

df.mapInPandas(func, "id long", False, rp).collect()

How was this patch tested?

The newly added tests can pass, and some manual tests are needed for dynamic allocation on or off.

Was this patch authored or co-authored using generative AI tooling?

No

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 23, 2024

I will perform the manual tests for this PR, will update it after done.

@wbo4958 wbo4958 changed the title Make mapInPandas / mapInArrow support ResourceProfile [SPARK-46812][SQL][CONNECT][PYSPARKMake mapInPandas / mapInArrow support ResourceProfile Jan 23, 2024
@wbo4958 wbo4958 changed the title [SPARK-46812][SQL][CONNECT][PYSPARKMake mapInPandas / mapInArrow support ResourceProfile [SPARK-46812][SQL][CONNECT][PYSPARK] Make mapInPandas / mapInArrow support ResourceProfile Jan 23, 2024
@wbo4958 wbo4958 changed the title [SPARK-46812][SQL][CONNECT][PYSPARK] Make mapInPandas / mapInArrow support ResourceProfile [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile Jan 23, 2024
@wbo4958 wbo4958 force-pushed the df-rp branch 2 times, most recently from 7a14dc3 to 6f6591c Compare January 24, 2024 02:36
@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 24, 2024

I was trying to add unit tests to check if the ResourceProfile is correctly applied to the underlying RDD generated in MapInPandasExec, here is my testing code,

        df1 = df.mapInPandas(lambda iter: iter, "id long")
        assert df1.rdd.getResourceProfile() is None

        treqs = TaskResourceRequests().cpus(2)
        expected_rp = ResourceProfileBuilder().require(treqs).build

        df2 = df.mapInPandas(lambda iter: iter, "id long", False, expected_rp)
        assert df2.rdd.getResourceProfile() is not None

But the ResourceProfile got from df2.rdd.getResourceProfile() is None, the reason for it is df2.rdd will add some other extra MapPartitionRDDs that don't have ResourceProfile attached.

I also tried to use JVM RDD to get the correct parent RDD with the below code,

df2.rdd._jrdd.firstParent()

or

df2.rdd._jrdd.parent(0)

But both of them didn't work, with below error messages,

py4j.protocol.Py4JError: An error occurred while calling o45.parent. Trace:
py4j.Py4JException: Method parent([class java.lang.Integer]) does not exist

y4j.protocol.Py4JError: An error occurred while calling o45.firstParent. Trace:
py4j.Py4JException: Method firstParent([]) does not exist

I don't know how to add unit tests for this PR, but I will perform the manual tests.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 24, 2024

Manual tests

The manual tests were conducted on a spark Standalone cluster with only 1 worker.

With dynamic allocation disabled.

pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
   --conf spark.dynamicAllocation.enabled=false

The above command requires 1 executor with 4 CPU cores, and the default task.cpus = 1, so the default tasks parallelism is 4 at a time.

  1. task.cores=1

Test code:

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(1)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=1, executor.cores=4 (No executor resource specified, use the default one), there will be 4 tasks running for rp at the same time.

The entire Spark application consists of a single Spark job that will be divided into two stages. The first shuffle stage comprises 6 tasks, the first 4 tasks will be executed simultaneously, then the last 2 tasks.

1

The second ResultStage comprises 3 tasks, all of which will be executed simultaneously since the required task.cpus is 1.

2

  1. task.cores=2

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(2)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=2, executor.cores=4 (No executor resource specified, use the default one), there will be 2 tasks running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, so the first 2 tasks will be running at a time, and then execute the last task.

3

  1. task.cores=3

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

When the required task.cpus=3, executor.cores=4 (No executor resource specified, use the default one), there will be 1 task running for rp.

The first shuffle stage behaves the same as the first one.

The second ResultStage comprises 3 tasks, all of which will be running serially.

4

  1. task.cores=5
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 6)
from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

exception happened.

4j.protocol.Py4JJavaError: An error occurred while calling o160.collectToPython.
: org.apache.spark.SparkException: The number of cores per executor (=4) has to be >= the number of cpus per task = 5.
	at org.apache.spark.resource.ResourceUtils$.validateTaskCpusLargeEnough(ResourceUtils.scala:413)
	at org.apache.spark.resource.ResourceProfile.calculateTasksAndLimitingResource(ResourceProfile.scala:193)
	at org.apache.spark.resource.ResourceProfile.$anonfun$limitingResource$1(ResourceProfile.scala:163)
	at scala.Option.getOrElse(Option.scala:201)
	at org.apache.spark.resource.ResourceProfile.limitingResource(ResourceProfile.scala:162)
	at org.apache.spark.resource.ResourceProfileManager.addResourceProfile(ResourceProfileManager.scala:142)
	at org.apache.spark.rdd.RDD.withResources(RDD.scala:1844)
	at org.apache.spark.sql.execution.python.MapInBatchExec.$anonfun$doExecute$3(MapInBatchExec.scala:86)
	at scala.Option.map(Option.scala:242)
	at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute(MapInBatchExec.scala:86)
	at org.apache.spark.sql.execution.python.MapInBatchExec.doExecute$(MapInBatchExec.scala:50)
	at org.apache.spark.sql.execution.python.MapInArrowExec.doExecute(MapInArrowExec.scala:29)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:195)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:246)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:243)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:191)
	at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:364)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:445)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:418)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:390)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4265)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4439)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:560)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4437)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$6(SQLExecution.scala:150)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId0$1(SQLExecution.scala:116)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:918)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId0(SQLExecution.scala:72)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:196)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4437)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4262)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:840)

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 24, 2024

With dynamic allocation enabled.

pyspark --master spark://192.168.141.19:7077 --conf spark.executor.cores=4 --conf spark.task.cpus=1 \
  --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.maxExecutors=1\

The above command enables the dynamic allocation and the max executors required is set to 1 in order to test.

TaskResourceProfile without any specific executor request information

Test code,

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 4)

treqs = TaskResourceRequests().cpus(3)
rp = ResourceProfileBuilder().require(treqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

The rp refers to the TaskResourceProfile without any specific executor request information, thus the executor information will utilize the default values from Default ResourceProfile (executor.cores=4).

The above code will require an extra executor which will have the same executor.cores/memory as the default ResourceProfile.

0

1

2

Different executor request information

from pyspark.resource import ExecutorResourceRequests, TaskResourceRequests, ResourceProfileBuilder

def filter_func(iterator):
    for pdf in iterator:
        yield pdf

df = spark.range(0, 100, 1, 4)

ereqs = ExecutorResourceRequests().cores(6)
treqs = TaskResourceRequests().cpus(5)
rp = ResourceProfileBuilder().require(treqs).require(ereqs).build
df.repartition(3).mapInArrow(lambda iter: iter, df.schema, False, rp).collect()

5

6

7

@wbo4958 wbo4958 marked this pull request as ready for review January 24, 2024 13:29
@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 24, 2024

Hi @WeichenXu123 @zhengruifeng @tgravescs, Could you please help review this PR.

@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 24, 2024

BTW, I will perform the similar manual tests for spark connect.

@tgravescs
Copy link
Contributor

It would be nice to have more description here about what the challenge is (why it just didn't work) and then how you went about solving/fixing it.

Does this PR introduce any user-facing change?
No

Is this really true? doesn't this now allow user to call these together? Does it affect spark connect api since you mention that?

Please update description as it should be clear to reviewer without looking at the code what the proposed changes are.

@tgravescs
Copy link
Contributor

def mapInArrow(
       self,
       func: "ArrowMapIterFunction",
       schema: Union[StructType, str],
       barrier: bool = False,
       profile: Optional[ResourceProfile] = None,

This is definitely a user facing api change

@wbo4958 wbo4958 marked this pull request as draft January 26, 2024 08:26
@wbo4958 wbo4958 marked this pull request as ready for review January 26, 2024 08:58
@wbo4958
Copy link
Contributor Author

wbo4958 commented Jan 26, 2024

Hi @tgravescs, Thx for your comment.

I just updated the description and submitted a new commit which adds the unit tests to test the resource profile for sql dataframe and connect dataframe API.

@tgravescs @WeichenXu123 @zhengruifeng Could you help review this PR. Thx.

@github-actions github-actions bot added the BUILD label Jan 29, 2024
@wbo4958 wbo4958 marked this pull request as draft January 29, 2024 08:11
@wbo4958 wbo4958 marked this pull request as ready for review January 30, 2024 00:26
@github-actions github-actions bot removed the BUILD label Feb 1, 2024
@wbo4958 wbo4958 changed the title [SPARK-46812][SQL][CONNECT][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile [SPARK-46812][SQL][PYTHON] Make mapInPandas / mapInArrow support ResourceProfile Feb 1, 2024
@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 1, 2024

Hi @WeichenXu123, @tgravescs, @zhengruifeng, @Ngone51, I've separated the big PR into 2 PRs, one for sql, the other for connect. This PR is only for sql, Could you help review it.

wbo4958 and others added 4 commits February 2, 2024 13:00
@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 5, 2024

@zhengruifeng Could you help review it again? Thx

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 5, 2024

Hi @HyukjinKwon, Could you help to review this PR. Thx

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! :)

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont know much about this so I defer to @zhengruifeng and @WeichenXu123 who has more context.

wbo4958 and others added 2 commits February 7, 2024 10:39
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
Co-authored-by: Hyukjin Kwon <gurwls223@gmail.com>
@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 8, 2024

Hi @HyukjinKwon, thx for your review and comment. The CI on the newest commit got passed, could you help review it again? Thx very much.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will defer to @WeichenXu123

Copy link
Contributor

@WeichenXu123 WeichenXu123 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 10, 2024

Hi @HyukjinKwon could you help merge?

@wbo4958
Copy link
Contributor Author

wbo4958 commented Feb 18, 2024

Hi @WeichenXu123, @HyukjinKwon, @zhengruifeng, Could you help merge it? Thx very much.

@WeichenXu123
Copy link
Contributor

Merged to master.

@wbo4958 wbo4958 deleted the df-rp branch February 20, 2024 00:37
HyukjinKwon pushed a commit that referenced this pull request Apr 2, 2024
…ResourceProfile

### What changes were proposed in this pull request?

Support stage-level scheduling for PySpark connect DataFrame APIs (mapInPandas and mapInArrow).

### Why are the changes needed?

#44852 has supported ResourceProfile in mapInPandas/mapInArrow for SQL, So it's the right time to enable it for connect.

### Does this PR introduce _any_ user-facing change?

Yes, Users can pass ResourceProfile to mapInPandas/mapInArrow through the connect pyspark client.

### How was this patch tested?

Pass the CIs and manual tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #45232 from wbo4958/connect-rp.

Authored-by: Bobby Wang <wbo4958@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
dongjoon-hyun added a commit that referenced this pull request Apr 4, 2024
…f pandas is not available

### What changes were proposed in this pull request?

This is a follow-up of the followings to skip `pandas`-related tests if pandas is not available.
- #44852
- #45232

### Why are the changes needed?

`pandas` is an optional dependency. We had better skip it without causing failures.

To recover the PyPy 3.8 CI,
- https://github.com/apache/spark/actions/runs/8541011879/job/23421483071

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manually test.
```
$ python/run-tests --modules=pyspark-resource --parallelism=1 --python-executables=python3.10
Running PySpark tests. Output is in /Users/dongjoon/APACHE/spark-merge/python/unit-tests.log
Will test against the following Python executables: ['python3.10']
Will test the following Python modules: ['pyspark-resource']
python3.10 python_implementation is CPython
python3.10 version is: Python 3.10.13
Starting test(python3.10): pyspark.resource.profile (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/021bc7bb-242f-4cb4-8584-11ed6e711f78/python3.10__pyspark.resource.profile__jn89f1hh.log)
Finished test(python3.10): pyspark.resource.profile (1s)
Starting test(python3.10): pyspark.resource.tests.test_connect_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/244d6c6f-8799-4a2a-b7a7-20d7c50d643d/python3.10__pyspark.resource.tests.test_connect_resources__5ta1tf6e.log)
Finished test(python3.10): pyspark.resource.tests.test_connect_resources (0s) ... 1 tests were skipped
Starting test(python3.10): pyspark.resource.tests.test_resources (temp output: /Users/dongjoon/APACHE/spark-merge/python/target/671e7afa-e764-443f-bc40-7e940d7342ea/python3.10__pyspark.resource.tests.test_resources__lhbp6y5f.log)
Finished test(python3.10): pyspark.resource.tests.test_resources (2s) ... 1 tests were skipped
Tests passed in 4 seconds

Skipped tests in pyspark.resource.tests.test_connect_resources with python3.10:
      test_profile_before_sc_for_connect (pyspark.resource.tests.test_connect_resources.ResourceProfileTests) ... skip (0.005s)

Skipped tests in pyspark.resource.tests.test_resources with python3.10:
      test_profile_before_sc_for_sql (pyspark.resource.tests.test_resources.ResourceProfileTests) ... skip (0.001s)
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45869 from dongjoon-hyun/SPARK-46812.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants